job_worker

Function job_worker 

Source
pub async fn job_worker<AppState: AS>(
    app_state: AppState,
    registry: impl JobRegistry<AppState>,
    sleep_duration: Duration,
    max_retries: i32,
    shutdown_token: CancellationToken,
    lock_timeout: Duration,
) -> Result<()>
Expand description

Start a job worker that processes jobs from the queue.

The worker will continuously poll for jobs and execute them using the provided registry. Jobs are executed with automatic retry logic on failure.

§Arguments

  • app_state - The application state containing database connection and configuration
  • registry - The job registry that maps job names to their implementations
  • sleep_duration - How long to sleep when no jobs are available
  • max_retries - Maximum number of times to retry a failed job before moving to the dead letter queue (default: 20)
  • shutdown_token - Cancellation token for graceful shutdown. When cancelled, the worker will stop accepting new jobs and release database locks before exiting.
  • lock_timeout - How long a job can be locked before it’s considered abandoned and becomes available for other workers (default: 2 hours)

§Retry Behavior

When a job fails:

  • The error count is incremented
  • The error message and timestamp are recorded
  • The job is requeued with exponential backoff: delay = 2^(error_count + 1) seconds (first retry: 2s, second: 4s, third: 8s, fourth: 16s, etc.)
  • If error_count >= max_retries, the job is moved to the dead letter queue

§Graceful Shutdown

When the shutdown_token is cancelled:

  • The worker stops polling for new jobs
  • Any currently executing job is allowed to complete
  • Database locks are released immediately (instead of waiting for the lock timeout)

§Lock Timeout

If a worker crashes or becomes unresponsive while processing a job, the job will remain locked in the database. The lock_timeout parameter controls how long to wait before considering such jobs abandoned. After the timeout expires, any worker can pick up the job and retry it.

§Example

use std::time::Duration;
use tokio_util::sync::CancellationToken;

let shutdown_token = CancellationToken::new();
let worker_token = shutdown_token.clone();

// Start worker with graceful shutdown support and lock timeout
tokio::spawn(async move {
    cja::jobs::worker::job_worker(
        app_state,
        registry,
        Duration::from_secs(60),      // poll every 60s when idle
        20,                            // max 20 retries
        worker_token,                  // for graceful shutdown
        Duration::from_secs(2 * 3600), // 2 hour lock timeout
    ).await.unwrap();
});

// Later, trigger shutdown
shutdown_token.cancel();