pub async fn job_worker<AppState: AS>(
app_state: AppState,
registry: impl JobRegistry<AppState>,
sleep_duration: Duration,
max_retries: i32,
shutdown_token: CancellationToken,
lock_timeout: Duration,
) -> Result<()>Expand description
Start a job worker that processes jobs from the queue.
The worker will continuously poll for jobs and execute them using the provided registry. Jobs are executed with automatic retry logic on failure.
§Arguments
app_state- The application state containing database connection and configurationregistry- The job registry that maps job names to their implementationssleep_duration- How long to sleep when no jobs are availablemax_retries- Maximum number of times to retry a failed job before moving to the dead letter queue (default: 20)shutdown_token- Cancellation token for graceful shutdown. When cancelled, the worker will stop accepting new jobs and release database locks before exiting.lock_timeout- How long a job can be locked before it’s considered abandoned and becomes available for other workers (default: 2 hours)
§Retry Behavior
When a job fails:
- The error count is incremented
- The error message and timestamp are recorded
- The job is requeued with exponential backoff: delay =
2^(error_count + 1)seconds (first retry: 2s, second: 4s, third: 8s, fourth: 16s, etc.) - If
error_count>=max_retries, the job is moved to the dead letter queue
§Graceful Shutdown
When the shutdown_token is cancelled:
- The worker stops polling for new jobs
- Any currently executing job is allowed to complete
- Database locks are released immediately (instead of waiting for the lock timeout)
§Lock Timeout
If a worker crashes or becomes unresponsive while processing a job, the job will remain
locked in the database. The lock_timeout parameter controls how long to wait before
considering such jobs abandoned. After the timeout expires, any worker can pick up the
job and retry it.
§Example
ⓘ
use std::time::Duration;
use tokio_util::sync::CancellationToken;
let shutdown_token = CancellationToken::new();
let worker_token = shutdown_token.clone();
// Start worker with graceful shutdown support and lock timeout
tokio::spawn(async move {
cja::jobs::worker::job_worker(
app_state,
registry,
Duration::from_secs(60), // poll every 60s when idle
20, // max 20 retries
worker_token, // for graceful shutdown
Duration::from_secs(2 * 3600), // 2 hour lock timeout
).await.unwrap();
});
// Later, trigger shutdown
shutdown_token.cancel();