1use std::time::Duration;
2
3use thiserror::Error;
4use tokio_util::sync::CancellationToken;
5use tracing::Span;
6
7use crate::app_state::AppState as AS;
8
9use super::registry::JobRegistry;
10
11pub const DEFAULT_MAX_RETRIES: i32 = 20;
18
19pub const DEFAULT_LOCK_TIMEOUT: Duration = Duration::from_secs(2 * 60 * 60);
25
26pub(super) type RunJobResult = Result<RunJobSuccess, JobError>;
27
28#[derive(Debug)]
29pub(super) struct RunJobSuccess(JobFromDB);
30
31#[derive(Debug, sqlx::FromRow)]
32pub struct JobFromDB {
33 pub job_id: uuid::Uuid,
34 pub name: String,
35 pub payload: serde_json::Value,
36 pub priority: i32,
37 pub run_at: chrono::DateTime<chrono::Utc>,
38 pub created_at: chrono::DateTime<chrono::Utc>,
39 pub context: String,
40 pub error_count: i32,
41 pub last_error_message: Option<String>,
42 pub last_failed_at: Option<chrono::DateTime<chrono::Utc>>,
43}
44
45#[derive(Debug, Error)]
46#[error("JobError(id:${}) ${1}", self.0.job_id)]
47pub(crate) struct JobError(JobFromDB, color_eyre::Report);
48
49struct Worker<AppState: AS, R: JobRegistry<AppState>> {
50 id: uuid::Uuid,
51 state: AppState,
52 registry: R,
53 sleep_duration: Duration,
54 max_retries: i32,
55 cancellation_token: CancellationToken,
56 lock_timeout: Duration,
57}
58
59impl<AppState: AS, R: JobRegistry<AppState>> Worker<AppState, R> {
60 fn new(
61 state: AppState,
62 registry: R,
63 sleep_duration: Duration,
64 max_retries: i32,
65 cancellation_token: CancellationToken,
66 lock_timeout: Duration,
67 ) -> Self {
68 Self {
69 id: uuid::Uuid::new_v4(),
70 state,
71 registry,
72 sleep_duration,
73 max_retries,
74 cancellation_token,
75 lock_timeout,
76 }
77 }
78
79 #[tracing::instrument(
80 name = "worker.run_job",
81 skip(self, job),
82 fields(
83 job.id = %job.job_id,
84 job.name = job.name,
85 job.priority = job.priority,
86 job.run_at = %job.run_at,
87 job.created_at = %job.created_at,
88 job.context = job.context,
89 job.error_count = job.error_count,
90 worker.id = %self.id,
91 )
92 err,
93 )]
94 async fn run_job(&self, job: &JobFromDB) -> color_eyre::Result<()> {
95 self.registry
96 .run_job(job, self.state.clone(), self.cancellation_token.clone())
97 .await
98 }
99
100 pub(crate) async fn run_next_job(&self, job: JobFromDB) -> color_eyre::Result<RunJobResult> {
101 let job_result = self.run_job(&job).await;
102
103 if let Err(e) = job_result {
104 let error_message = format!("{e}");
106
107 if job.error_count >= self.max_retries {
109 tracing::error!(
111 worker.id = %self.id,
112 job_id = %job.job_id,
113 error_count = job.error_count,
114 max_retries = self.max_retries,
115 "Job permanently failed - moved to dead letter queue"
116 );
117
118 let mut tx = self.state.db().begin().await?;
119
120 sqlx::query(
121 "INSERT INTO dead_letter_jobs (original_job_id, name, payload, context, priority, error_count, last_error_message, created_at)
122 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
123 )
124 .bind(job.job_id)
125 .bind(&job.name)
126 .bind(&job.payload)
127 .bind(&job.context)
128 .bind(job.priority)
129 .bind(job.error_count)
130 .bind(&error_message)
131 .bind(job.created_at)
132 .execute(&mut *tx)
133 .await?;
134
135 sqlx::query("DELETE FROM jobs WHERE job_id = $1 AND locked_by = $2")
136 .bind(job.job_id)
137 .bind(self.id.to_string())
138 .execute(&mut *tx)
139 .await?;
140
141 tx.commit().await?;
142
143 return Ok(Err(JobError(job, e)));
144 }
145
146 tracing::warn!(
148 worker.id = %self.id,
149 job_id = %job.job_id,
150 error_count = job.error_count,
151 retry_attempt = job.error_count + 1,
152 "Job failed, retry #{}",
153 job.error_count + 1
154 );
155
156 sqlx::query(
157 "
158 UPDATE jobs
159 SET locked_by = NULL,
160 locked_at = NULL,
161 error_count = error_count + 1,
162 last_error_message = $3,
163 last_failed_at = NOW(),
164 run_at = NOW() + (POWER(2, error_count + 1)) * interval '1 second'
165 WHERE job_id = $1 AND locked_by = $2
166 ",
167 )
168 .bind(job.job_id)
169 .bind(self.id.to_string())
170 .bind(error_message)
171 .execute(self.state.db())
172 .await?;
173
174 return Ok(Err(JobError(job, e)));
175 }
176
177 sqlx::query(
178 "
179 DELETE FROM jobs
180 WHERE job_id = $1 AND locked_by = $2
181 ",
182 )
183 .bind(job.job_id)
184 .bind(self.id.to_string())
185 .execute(self.state.db())
186 .await?;
187
188 Ok(Ok(RunJobSuccess(job)))
189 }
190
191 #[tracing::instrument(
192 name = "worker.fetch_next_job",
193 skip(self),
194 fields(
195 worker.id = %self.id,
196 job.id,
197 job.name,
198 lock_timeout_secs = self.lock_timeout.as_secs(),
199 ),
200 err,
201 )]
202 #[allow(clippy::cast_possible_wrap)]
203 async fn fetch_next_job(&self) -> color_eyre::Result<Option<JobFromDB>> {
204 let lock_timeout_secs = self.lock_timeout.as_secs() as i64;
206
207 let job = sqlx::query_as::<_, JobFromDB>(
208 "
209 UPDATE jobs
210 SET LOCKED_BY = $1, LOCKED_AT = NOW()
211 WHERE job_id = (
212 SELECT job_id
213 FROM jobs
214 WHERE run_at <= NOW()
215 AND (
216 locked_by IS NULL
217 OR locked_at < NOW() - ($2 || ' seconds')::interval
218 )
219 ORDER BY priority DESC, created_at ASC
220 LIMIT 1
221 FOR UPDATE SKIP LOCKED
222 )
223 RETURNING job_id, name, payload, priority, run_at, created_at, context, error_count, last_error_message, last_failed_at
224 ",
225 )
226 .bind(self.id.to_string())
227 .bind(lock_timeout_secs.to_string())
228 .fetch_optional(self.state.db())
229 .await?;
230
231 if let Some(job) = &job {
232 let span = Span::current();
233 span.record("job.id", job.job_id.to_string());
234 span.record("job.name", &job.name);
235 }
236
237 Ok(job)
238 }
239
240 #[tracing::instrument(
241 name = "worker.tick",
242 skip(self),
243 fields(
244 worker.id = %self.id,
245 ),
246 )]
247 async fn tick(&self) -> color_eyre::Result<()> {
248 let job = self.fetch_next_job().await?;
249
250 let Some(job) = job else {
251 let duration = self.sleep_duration;
252 tracing::debug!(worker.id =% self.id, ?duration, "No Job to Run, sleeping for requested duration");
253
254 tokio::time::sleep(duration).await;
255
256 return Ok(());
257 };
258
259 let result = self.run_next_job(job).await?;
260
261 match result {
262 Ok(RunJobSuccess(job)) => {
263 tracing::info!(worker.id =% self.id, job_id =% job.job_id, "Job Ran");
264 }
265 Err(job_error) => {
266 tracing::error!(
267 worker.id =% self.id,
268 job_id =% job_error.0.job_id,
269 error_count =% job_error.0.error_count,
270 error_msg =% job_error.1,
271 "Job Errored"
272 );
273 }
274 }
275
276 Ok(())
277 }
278}
279
280async fn cleanup_worker_locks<AppState: AS, R: JobRegistry<AppState>>(
285 worker: &Worker<AppState, R>,
286) -> color_eyre::Result<()> {
287 tracing::info!(worker_id = %worker.id, "Releasing database locks");
288
289 let result = sqlx::query(
290 "UPDATE jobs
291 SET locked_by = NULL, locked_at = NULL
292 WHERE locked_by = $1",
293 )
294 .bind(worker.id.to_string())
295 .execute(worker.state.db())
296 .await?;
297
298 tracing::info!(
299 worker_id = %worker.id,
300 locks_released = result.rows_affected(),
301 "Database locks released"
302 );
303
304 Ok(())
305}
306
307pub async fn job_worker<AppState: AS>(
371 app_state: AppState,
372 registry: impl JobRegistry<AppState>,
373 sleep_duration: Duration,
374 max_retries: i32,
375 shutdown_token: CancellationToken,
376 lock_timeout: Duration,
377) -> color_eyre::Result<()> {
378 let worker = Worker::new(
379 app_state,
380 registry,
381 sleep_duration,
382 max_retries,
383 shutdown_token.clone(),
384 lock_timeout,
385 );
386
387 loop {
388 tokio::select! {
389 result = worker.tick() => {
390 result?;
391 }
392 () = shutdown_token.cancelled() => {
393 tracing::info!(worker_id = %worker.id, "Job worker shutdown requested");
394 break;
395 }
396 }
397 }
398
399 cleanup_worker_locks(&worker).await?;
400 tracing::info!(worker_id = %worker.id, "Job worker shutdown complete");
401 Ok(())
402}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407 use crate::app_state::AppState;
408 use crate::impl_job_registry;
409 use crate::jobs::Job;
410 use crate::server::cookies::CookieKey;
411
412 #[derive(Clone)]
413 struct TestAppState {
414 db: sqlx::PgPool,
415 cookie_key: CookieKey,
416 }
417
418 impl AppState for TestAppState {
419 fn db(&self) -> &sqlx::PgPool {
420 &self.db
421 }
422
423 fn version(&self) -> &'static str {
424 "test"
425 }
426
427 fn cookie_key(&self) -> &CookieKey {
428 &self.cookie_key
429 }
430 }
431
432 #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
433 struct TestJob {
434 id: String,
435 }
436
437 #[async_trait::async_trait]
438 impl Job<TestAppState> for TestJob {
439 const NAME: &'static str = "TestJob";
440
441 async fn run(&self, _app_state: TestAppState) -> color_eyre::Result<()> {
442 Ok(())
443 }
444 }
445
446 impl_job_registry!(TestAppState, TestJob);
447
448 #[sqlx::test]
450 async fn test_fetch_next_job_picks_up_stale_locked_job(db: sqlx::PgPool) {
451 let app_state = TestAppState {
452 db: db.clone(),
453 cookie_key: CookieKey::generate(),
454 };
455
456 let job_id = uuid::Uuid::new_v4();
457 let stale_worker_id = "crashed-worker";
458
459 sqlx::query(
461 "INSERT INTO jobs (job_id, name, payload, priority, run_at, created_at, context, error_count, locked_by, locked_at)
462 VALUES ($1, $2, $3, $4, NOW(), NOW(), $5, $6, $7, NOW() - interval '120 seconds')",
463 )
464 .bind(job_id)
465 .bind("TestJob")
466 .bind(serde_json::json!({"id": "stale-lock-test"}))
467 .bind(0)
468 .bind("test-stale-lock")
469 .bind(0)
470 .bind(stale_worker_id)
471 .execute(&db)
472 .await
473 .unwrap();
474
475 let worker = Worker::new(
477 app_state,
478 Jobs,
479 Duration::from_secs(1),
480 20,
481 CancellationToken::new(),
482 Duration::from_secs(60), );
484
485 let fetched = worker.fetch_next_job().await.unwrap();
487 assert!(fetched.is_some());
488 assert_eq!(fetched.unwrap().job_id, job_id);
489 }
490
491 #[sqlx::test]
493 async fn test_fetch_next_job_skips_recently_locked_job(db: sqlx::PgPool) {
494 let app_state = TestAppState {
495 db: db.clone(),
496 cookie_key: CookieKey::generate(),
497 };
498
499 let job_id = uuid::Uuid::new_v4();
500 let active_worker_id = "active-worker";
501
502 sqlx::query(
504 "INSERT INTO jobs (job_id, name, payload, priority, run_at, created_at, context, error_count, locked_by, locked_at)
505 VALUES ($1, $2, $3, $4, NOW(), NOW(), $5, $6, $7, NOW() - interval '10 seconds')",
506 )
507 .bind(job_id)
508 .bind("TestJob")
509 .bind(serde_json::json!({"id": "recent-lock-test"}))
510 .bind(0)
511 .bind("test-recent-lock")
512 .bind(0)
513 .bind(active_worker_id)
514 .execute(&db)
515 .await
516 .unwrap();
517
518 let worker = Worker::new(
520 app_state,
521 Jobs,
522 Duration::from_secs(1),
523 20,
524 CancellationToken::new(),
525 Duration::from_secs(3600), );
527
528 let fetched = worker.fetch_next_job().await.unwrap();
530 assert!(fetched.is_none());
531 }
532
533 #[sqlx::test]
535 async fn test_fetch_next_job_prefers_unlocked_by_priority(db: sqlx::PgPool) {
536 let app_state = TestAppState {
537 db: db.clone(),
538 cookie_key: CookieKey::generate(),
539 };
540
541 let unlocked_job_id = uuid::Uuid::new_v4();
542 let stale_locked_job_id = uuid::Uuid::new_v4();
543
544 sqlx::query(
546 "INSERT INTO jobs (job_id, name, payload, priority, run_at, created_at, context, error_count)
547 VALUES ($1, $2, $3, $4, NOW(), NOW(), $5, $6)",
548 )
549 .bind(unlocked_job_id)
550 .bind("TestJob")
551 .bind(serde_json::json!({"id": "unlocked"}))
552 .bind(10) .bind("test-unlocked")
554 .bind(0)
555 .execute(&db)
556 .await
557 .unwrap();
558
559 sqlx::query(
561 "INSERT INTO jobs (job_id, name, payload, priority, run_at, created_at, context, error_count, locked_by, locked_at)
562 VALUES ($1, $2, $3, $4, NOW(), NOW(), $5, $6, $7, NOW() - interval '120 seconds')",
563 )
564 .bind(stale_locked_job_id)
565 .bind("TestJob")
566 .bind(serde_json::json!({"id": "stale-locked"}))
567 .bind(5) .bind("test-stale")
569 .bind(0)
570 .bind("crashed-worker")
571 .execute(&db)
572 .await
573 .unwrap();
574
575 let worker = Worker::new(
577 app_state,
578 Jobs,
579 Duration::from_secs(1),
580 20,
581 CancellationToken::new(),
582 Duration::from_secs(60),
583 );
584
585 let fetched = worker.fetch_next_job().await.unwrap();
587 assert!(fetched.is_some());
588 assert_eq!(fetched.unwrap().job_id, unlocked_job_id);
589 }
590}