cja/jobs/
worker.rs

1use std::time::Duration;
2
3use thiserror::Error;
4use tokio_util::sync::CancellationToken;
5use tracing::Span;
6
7use crate::app_state::AppState as AS;
8
9use super::registry::JobRegistry;
10
11/// Default maximum number of retry attempts before a job is moved to the dead letter queue.
12///
13/// With exponential backoff (`2^(error_count+1)` seconds), 20 retries means:
14/// - First retry after 2 seconds
15/// - Last retry after ~6 days
16/// - Total retry window of ~12 days
17pub const DEFAULT_MAX_RETRIES: i32 = 20;
18
19/// Default lock timeout duration (2 hours).
20///
21/// Jobs locked for longer than this duration will be considered abandoned and
22/// made available for other workers to pick up. This handles cases where a worker
23/// crashes or becomes unresponsive while processing a job.
24pub const DEFAULT_LOCK_TIMEOUT: Duration = Duration::from_secs(2 * 60 * 60);
25
26pub(super) type RunJobResult = Result<RunJobSuccess, JobError>;
27
28#[derive(Debug)]
29pub(super) struct RunJobSuccess(JobFromDB);
30
31#[derive(Debug, sqlx::FromRow)]
32pub struct JobFromDB {
33    pub job_id: uuid::Uuid,
34    pub name: String,
35    pub payload: serde_json::Value,
36    pub priority: i32,
37    pub run_at: chrono::DateTime<chrono::Utc>,
38    pub created_at: chrono::DateTime<chrono::Utc>,
39    pub context: String,
40    pub error_count: i32,
41    pub last_error_message: Option<String>,
42    pub last_failed_at: Option<chrono::DateTime<chrono::Utc>>,
43}
44
45#[derive(Debug, Error)]
46#[error("JobError(id:${}) ${1}", self.0.job_id)]
47pub(crate) struct JobError(JobFromDB, color_eyre::Report);
48
49struct Worker<AppState: AS, R: JobRegistry<AppState>> {
50    id: uuid::Uuid,
51    state: AppState,
52    registry: R,
53    sleep_duration: Duration,
54    max_retries: i32,
55    cancellation_token: CancellationToken,
56    lock_timeout: Duration,
57}
58
59impl<AppState: AS, R: JobRegistry<AppState>> Worker<AppState, R> {
60    fn new(
61        state: AppState,
62        registry: R,
63        sleep_duration: Duration,
64        max_retries: i32,
65        cancellation_token: CancellationToken,
66        lock_timeout: Duration,
67    ) -> Self {
68        Self {
69            id: uuid::Uuid::new_v4(),
70            state,
71            registry,
72            sleep_duration,
73            max_retries,
74            cancellation_token,
75            lock_timeout,
76        }
77    }
78
79    #[tracing::instrument(
80        name = "worker.run_job",
81        skip(self, job),
82        fields(
83            job.id = %job.job_id,
84            job.name = job.name,
85            job.priority = job.priority,
86            job.run_at = %job.run_at,
87            job.created_at = %job.created_at,
88            job.context = job.context,
89            job.error_count = job.error_count,
90            worker.id = %self.id,
91        )
92        err,
93    )]
94    async fn run_job(&self, job: &JobFromDB) -> color_eyre::Result<()> {
95        self.registry
96            .run_job(job, self.state.clone(), self.cancellation_token.clone())
97            .await
98    }
99
100    pub(crate) async fn run_next_job(&self, job: JobFromDB) -> color_eyre::Result<RunJobResult> {
101        let job_result = self.run_job(&job).await;
102
103        if let Err(e) = job_result {
104            // Extract error message from color_eyre::Report
105            let error_message = format!("{e}");
106
107            // Check if job has exceeded max retries
108            if job.error_count >= self.max_retries {
109                // Move job to dead letter queue
110                tracing::error!(
111                    worker.id = %self.id,
112                    job_id = %job.job_id,
113                    error_count = job.error_count,
114                    max_retries = self.max_retries,
115                    "Job permanently failed - moved to dead letter queue"
116                );
117
118                let mut tx = self.state.db().begin().await?;
119
120                sqlx::query(
121                    "INSERT INTO dead_letter_jobs (original_job_id, name, payload, context, priority, error_count, last_error_message, created_at)
122                     VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
123                )
124                .bind(job.job_id)
125                .bind(&job.name)
126                .bind(&job.payload)
127                .bind(&job.context)
128                .bind(job.priority)
129                .bind(job.error_count)
130                .bind(&error_message)
131                .bind(job.created_at)
132                .execute(&mut *tx)
133                .await?;
134
135                sqlx::query("DELETE FROM jobs WHERE job_id = $1 AND locked_by = $2")
136                    .bind(job.job_id)
137                    .bind(self.id.to_string())
138                    .execute(&mut *tx)
139                    .await?;
140
141                tx.commit().await?;
142
143                return Ok(Err(JobError(job, e)));
144            }
145
146            // Job is under max retries - requeue with exponential backoff
147            tracing::warn!(
148                worker.id = %self.id,
149                job_id = %job.job_id,
150                error_count = job.error_count,
151                retry_attempt = job.error_count + 1,
152                "Job failed, retry #{}",
153                job.error_count + 1
154            );
155
156            sqlx::query(
157                "
158                UPDATE jobs
159                SET locked_by = NULL,
160                    locked_at = NULL,
161                    error_count = error_count + 1,
162                    last_error_message = $3,
163                    last_failed_at = NOW(),
164                    run_at = NOW() + (POWER(2, error_count + 1)) * interval '1 second'
165                WHERE job_id = $1 AND locked_by = $2
166                ",
167            )
168            .bind(job.job_id)
169            .bind(self.id.to_string())
170            .bind(error_message)
171            .execute(self.state.db())
172            .await?;
173
174            return Ok(Err(JobError(job, e)));
175        }
176
177        sqlx::query(
178            "
179                DELETE FROM jobs
180                WHERE job_id = $1 AND locked_by = $2
181                ",
182        )
183        .bind(job.job_id)
184        .bind(self.id.to_string())
185        .execute(self.state.db())
186        .await?;
187
188        Ok(Ok(RunJobSuccess(job)))
189    }
190
191    #[tracing::instrument(
192        name = "worker.fetch_next_job",
193        skip(self),
194        fields(
195            worker.id = %self.id,
196            job.id,
197            job.name,
198            lock_timeout_secs = self.lock_timeout.as_secs(),
199        ),
200        err,
201    )]
202    #[allow(clippy::cast_possible_wrap)]
203    async fn fetch_next_job(&self) -> color_eyre::Result<Option<JobFromDB>> {
204        // Cast is safe: lock timeouts are typically hours, not approaching i64::MAX seconds
205        let lock_timeout_secs = self.lock_timeout.as_secs() as i64;
206
207        let job = sqlx::query_as::<_, JobFromDB>(
208            "
209            UPDATE jobs
210            SET LOCKED_BY = $1, LOCKED_AT = NOW()
211            WHERE job_id = (
212                SELECT job_id
213                FROM jobs
214                WHERE run_at <= NOW()
215                  AND (
216                    locked_by IS NULL
217                    OR locked_at < NOW() - ($2 || ' seconds')::interval
218                  )
219                ORDER BY priority DESC, created_at ASC
220                LIMIT 1
221                FOR UPDATE SKIP LOCKED
222            )
223            RETURNING job_id, name, payload, priority, run_at, created_at, context, error_count, last_error_message, last_failed_at
224            ",
225        )
226        .bind(self.id.to_string())
227        .bind(lock_timeout_secs.to_string())
228        .fetch_optional(self.state.db())
229        .await?;
230
231        if let Some(job) = &job {
232            let span = Span::current();
233            span.record("job.id", job.job_id.to_string());
234            span.record("job.name", &job.name);
235        }
236
237        Ok(job)
238    }
239
240    #[tracing::instrument(
241        name = "worker.tick",
242        skip(self),
243        fields(
244            worker.id = %self.id,
245        ),
246    )]
247    async fn tick(&self) -> color_eyre::Result<()> {
248        let job = self.fetch_next_job().await?;
249
250        let Some(job) = job else {
251            let duration = self.sleep_duration;
252            tracing::debug!(worker.id =% self.id, ?duration, "No Job to Run, sleeping for requested duration");
253
254            tokio::time::sleep(duration).await;
255
256            return Ok(());
257        };
258
259        let result = self.run_next_job(job).await?;
260
261        match result {
262            Ok(RunJobSuccess(job)) => {
263                tracing::info!(worker.id =% self.id, job_id =% job.job_id, "Job Ran");
264            }
265            Err(job_error) => {
266                tracing::error!(
267                    worker.id =% self.id,
268                    job_id =% job_error.0.job_id,
269                    error_count =% job_error.0.error_count,
270                    error_msg =% job_error.1,
271                    "Job Errored"
272                );
273            }
274        }
275
276        Ok(())
277    }
278}
279
280/// Release database locks held by a worker.
281///
282/// This should be called during graceful shutdown to immediately release any job locks
283/// held by this worker, rather than waiting for the 2-hour lock timeout.
284async fn cleanup_worker_locks<AppState: AS, R: JobRegistry<AppState>>(
285    worker: &Worker<AppState, R>,
286) -> color_eyre::Result<()> {
287    tracing::info!(worker_id = %worker.id, "Releasing database locks");
288
289    let result = sqlx::query(
290        "UPDATE jobs
291         SET locked_by = NULL, locked_at = NULL
292         WHERE locked_by = $1",
293    )
294    .bind(worker.id.to_string())
295    .execute(worker.state.db())
296    .await?;
297
298    tracing::info!(
299        worker_id = %worker.id,
300        locks_released = result.rows_affected(),
301        "Database locks released"
302    );
303
304    Ok(())
305}
306
307/// Start a job worker that processes jobs from the queue.
308///
309/// The worker will continuously poll for jobs and execute them using the provided registry.
310/// Jobs are executed with automatic retry logic on failure.
311///
312/// # Arguments
313///
314/// * `app_state` - The application state containing database connection and configuration
315/// * `registry` - The job registry that maps job names to their implementations
316/// * `sleep_duration` - How long to sleep when no jobs are available
317/// * `max_retries` - Maximum number of times to retry a failed job before moving to the dead letter queue (default: 20)
318/// * `shutdown_token` - Cancellation token for graceful shutdown. When cancelled, the worker
319///   will stop accepting new jobs and release database locks before exiting.
320/// * `lock_timeout` - How long a job can be locked before it's considered abandoned and
321///   becomes available for other workers (default: 2 hours)
322///
323/// # Retry Behavior
324///
325/// When a job fails:
326/// - The error count is incremented
327/// - The error message and timestamp are recorded
328/// - The job is requeued with exponential backoff: delay = `2^(error_count + 1)` seconds
329///   (first retry: 2s, second: 4s, third: 8s, fourth: 16s, etc.)
330/// - If `error_count` >= `max_retries`, the job is moved to the dead letter queue
331///
332/// # Graceful Shutdown
333///
334/// When the `shutdown_token` is cancelled:
335/// - The worker stops polling for new jobs
336/// - Any currently executing job is allowed to complete
337/// - Database locks are released immediately (instead of waiting for the lock timeout)
338///
339/// # Lock Timeout
340///
341/// If a worker crashes or becomes unresponsive while processing a job, the job will remain
342/// locked in the database. The `lock_timeout` parameter controls how long to wait before
343/// considering such jobs abandoned. After the timeout expires, any worker can pick up the
344/// job and retry it.
345///
346/// # Example
347///
348/// ```ignore
349/// use std::time::Duration;
350/// use tokio_util::sync::CancellationToken;
351///
352/// let shutdown_token = CancellationToken::new();
353/// let worker_token = shutdown_token.clone();
354///
355/// // Start worker with graceful shutdown support and lock timeout
356/// tokio::spawn(async move {
357///     cja::jobs::worker::job_worker(
358///         app_state,
359///         registry,
360///         Duration::from_secs(60),      // poll every 60s when idle
361///         20,                            // max 20 retries
362///         worker_token,                  // for graceful shutdown
363///         Duration::from_secs(2 * 3600), // 2 hour lock timeout
364///     ).await.unwrap();
365/// });
366///
367/// // Later, trigger shutdown
368/// shutdown_token.cancel();
369/// ```
370pub async fn job_worker<AppState: AS>(
371    app_state: AppState,
372    registry: impl JobRegistry<AppState>,
373    sleep_duration: Duration,
374    max_retries: i32,
375    shutdown_token: CancellationToken,
376    lock_timeout: Duration,
377) -> color_eyre::Result<()> {
378    let worker = Worker::new(
379        app_state,
380        registry,
381        sleep_duration,
382        max_retries,
383        shutdown_token.clone(),
384        lock_timeout,
385    );
386
387    loop {
388        tokio::select! {
389            result = worker.tick() => {
390                result?;
391            }
392            () = shutdown_token.cancelled() => {
393                tracing::info!(worker_id = %worker.id, "Job worker shutdown requested");
394                break;
395            }
396        }
397    }
398
399    cleanup_worker_locks(&worker).await?;
400    tracing::info!(worker_id = %worker.id, "Job worker shutdown complete");
401    Ok(())
402}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407    use crate::app_state::AppState;
408    use crate::impl_job_registry;
409    use crate::jobs::Job;
410    use crate::server::cookies::CookieKey;
411
412    #[derive(Clone)]
413    struct TestAppState {
414        db: sqlx::PgPool,
415        cookie_key: CookieKey,
416    }
417
418    impl AppState for TestAppState {
419        fn db(&self) -> &sqlx::PgPool {
420            &self.db
421        }
422
423        fn version(&self) -> &'static str {
424            "test"
425        }
426
427        fn cookie_key(&self) -> &CookieKey {
428            &self.cookie_key
429        }
430    }
431
432    #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
433    struct TestJob {
434        id: String,
435    }
436
437    #[async_trait::async_trait]
438    impl Job<TestAppState> for TestJob {
439        const NAME: &'static str = "TestJob";
440
441        async fn run(&self, _app_state: TestAppState) -> color_eyre::Result<()> {
442            Ok(())
443        }
444    }
445
446    impl_job_registry!(TestAppState, TestJob);
447
448    /// Test that `fetch_next_job` picks up a job with a stale lock (lock older than timeout)
449    #[sqlx::test]
450    async fn test_fetch_next_job_picks_up_stale_locked_job(db: sqlx::PgPool) {
451        let app_state = TestAppState {
452            db: db.clone(),
453            cookie_key: CookieKey::generate(),
454        };
455
456        let job_id = uuid::Uuid::new_v4();
457        let stale_worker_id = "crashed-worker";
458
459        // Insert a job locked 120 seconds ago
460        sqlx::query(
461            "INSERT INTO jobs (job_id, name, payload, priority, run_at, created_at, context, error_count, locked_by, locked_at)
462             VALUES ($1, $2, $3, $4, NOW(), NOW(), $5, $6, $7, NOW() - interval '120 seconds')",
463        )
464        .bind(job_id)
465        .bind("TestJob")
466        .bind(serde_json::json!({"id": "stale-lock-test"}))
467        .bind(0)
468        .bind("test-stale-lock")
469        .bind(0)
470        .bind(stale_worker_id)
471        .execute(&db)
472        .await
473        .unwrap();
474
475        // Create a worker with 60 second lock timeout
476        let worker = Worker::new(
477            app_state,
478            Jobs,
479            Duration::from_secs(1),
480            20,
481            CancellationToken::new(),
482            Duration::from_secs(60), // 60 second timeout
483        );
484
485        // fetch_next_job should pick up the stale locked job
486        let fetched = worker.fetch_next_job().await.unwrap();
487        assert!(fetched.is_some());
488        assert_eq!(fetched.unwrap().job_id, job_id);
489    }
490
491    /// Test that `fetch_next_job` does NOT pick up a job with a fresh lock
492    #[sqlx::test]
493    async fn test_fetch_next_job_skips_recently_locked_job(db: sqlx::PgPool) {
494        let app_state = TestAppState {
495            db: db.clone(),
496            cookie_key: CookieKey::generate(),
497        };
498
499        let job_id = uuid::Uuid::new_v4();
500        let active_worker_id = "active-worker";
501
502        // Insert a job locked only 10 seconds ago
503        sqlx::query(
504            "INSERT INTO jobs (job_id, name, payload, priority, run_at, created_at, context, error_count, locked_by, locked_at)
505             VALUES ($1, $2, $3, $4, NOW(), NOW(), $5, $6, $7, NOW() - interval '10 seconds')",
506        )
507        .bind(job_id)
508        .bind("TestJob")
509        .bind(serde_json::json!({"id": "recent-lock-test"}))
510        .bind(0)
511        .bind("test-recent-lock")
512        .bind(0)
513        .bind(active_worker_id)
514        .execute(&db)
515        .await
516        .unwrap();
517
518        // Create a worker with 1 hour lock timeout
519        let worker = Worker::new(
520            app_state,
521            Jobs,
522            Duration::from_secs(1),
523            20,
524            CancellationToken::new(),
525            Duration::from_secs(3600), // 1 hour timeout
526        );
527
528        // fetch_next_job should NOT pick up the recently locked job
529        let fetched = worker.fetch_next_job().await.unwrap();
530        assert!(fetched.is_none());
531    }
532
533    /// Test that unlocked jobs are picked up before stale locked jobs (by priority)
534    #[sqlx::test]
535    async fn test_fetch_next_job_prefers_unlocked_by_priority(db: sqlx::PgPool) {
536        let app_state = TestAppState {
537            db: db.clone(),
538            cookie_key: CookieKey::generate(),
539        };
540
541        let unlocked_job_id = uuid::Uuid::new_v4();
542        let stale_locked_job_id = uuid::Uuid::new_v4();
543
544        // Insert unlocked job with higher priority
545        sqlx::query(
546            "INSERT INTO jobs (job_id, name, payload, priority, run_at, created_at, context, error_count)
547             VALUES ($1, $2, $3, $4, NOW(), NOW(), $5, $6)",
548        )
549        .bind(unlocked_job_id)
550        .bind("TestJob")
551        .bind(serde_json::json!({"id": "unlocked"}))
552        .bind(10) // Higher priority
553        .bind("test-unlocked")
554        .bind(0)
555        .execute(&db)
556        .await
557        .unwrap();
558
559        // Insert stale locked job with lower priority
560        sqlx::query(
561            "INSERT INTO jobs (job_id, name, payload, priority, run_at, created_at, context, error_count, locked_by, locked_at)
562             VALUES ($1, $2, $3, $4, NOW(), NOW(), $5, $6, $7, NOW() - interval '120 seconds')",
563        )
564        .bind(stale_locked_job_id)
565        .bind("TestJob")
566        .bind(serde_json::json!({"id": "stale-locked"}))
567        .bind(5) // Lower priority
568        .bind("test-stale")
569        .bind(0)
570        .bind("crashed-worker")
571        .execute(&db)
572        .await
573        .unwrap();
574
575        // Create a worker with 60 second lock timeout
576        let worker = Worker::new(
577            app_state,
578            Jobs,
579            Duration::from_secs(1),
580            20,
581            CancellationToken::new(),
582            Duration::from_secs(60),
583        );
584
585        // Should pick the higher priority unlocked job first
586        let fetched = worker.fetch_next_job().await.unwrap();
587        assert!(fetched.is_some());
588        assert_eq!(fetched.unwrap().job_id, unlocked_job_id);
589    }
590}