cja/jobs/mod.rs
1//! Background job processing with `PostgreSQL` persistence.
2//!
3//! The job system provides a reliable, database-backed work queue with automatic retries,
4//! exponential backoff, priority scheduling, and a dead letter queue for permanently
5//! failed jobs.
6//!
7//! # Defining a Job
8//!
9//! Implement the [`Job`] trait for your job type:
10//!
11//! ```rust,ignore
12//! use cja::jobs::Job;
13//! use serde::{Serialize, Deserialize};
14//!
15//! #[derive(Debug, Serialize, Deserialize, Clone)]
16//! struct SendEmailJob {
17//! to: String,
18//! subject: String,
19//! }
20//!
21//! #[async_trait::async_trait]
22//! impl Job<MyAppState> for SendEmailJob {
23//! const NAME: &'static str = "SendEmailJob";
24//!
25//! async fn run(&self, app_state: MyAppState) -> cja::Result<()> {
26//! // Send email using app_state.db() for templates, etc.
27//! Ok(())
28//! }
29//! }
30//! ```
31//!
32//! # Registering Jobs
33//!
34//! All job types must be registered via the [`impl_job_registry!`](crate::impl_job_registry) macro:
35//!
36//! ```rust,ignore
37//! cja::impl_job_registry!(MyAppState, SendEmailJob, CleanupJob, ReportJob);
38//! ```
39//!
40//! This generates a `Jobs` struct that routes jobs by their `NAME` constant.
41//! Forgetting to register a job type means it won't be processed.
42//!
43//! # Enqueuing Jobs
44//!
45//! ```rust,ignore
46//! let job = SendEmailJob {
47//! to: "user@example.com".into(),
48//! subject: "Welcome!".into(),
49//! };
50//!
51//! // Default priority (0)
52//! job.clone().enqueue(app_state.clone(), "user-signup".into(), None).await?;
53//!
54//! // High priority — higher values run first (ORDER BY priority DESC)
55//! job.enqueue(app_state, "urgent".into(), Some(10)).await?;
56//! ```
57//!
58//! **Priority**: Higher values run first. Priority 10 runs before 0, and -10
59//! runs last. This is per-enqueue, not per-job-type.
60//!
61//! # Retry Behavior
62//!
63//! Failed jobs are automatically retried with exponential backoff
64//! (delay = `2^(error_count + 1)` seconds):
65//!
66//! | Retry # | Delay |
67//! |---------|-------|
68//! | 1 | 4 seconds |
69//! | 2 | 8 seconds |
70//! | 5 | ~1 minute |
71//! | 10 | ~17 minutes |
72//! | 20 | ~12 days |
73//!
74//! After [`DEFAULT_MAX_RETRIES`] (20) attempts,
75//! jobs are moved to the `dead_letter_jobs` table.
76//!
77//! # Idempotency
78//!
79//! **Jobs must be idempotent.** The system retries on timeouts, worker crashes,
80//! and network failures. Guard against double-application with:
81//!
82//! - `ON CONFLICT DO NOTHING` for inserts
83//! - Early-exit checks at job start
84//! - Idempotency keys in your domain logic
85//!
86//! # Concurrent Workers
87//!
88//! Multiple [`worker::job_worker`] instances can run safely — the worker SQL
89//! uses `FOR UPDATE SKIP LOCKED` to prevent duplicate processing at the
90//! database level.
91//!
92//! # Cancellation Support
93//!
94//! Long-running jobs can override [`Job::run_with_cancellation`] to exit
95//! gracefully during shutdown by checking the cancellation token periodically.
96//!
97//! # Database Schema
98//!
99//! ## `jobs` table
100//!
101//! | Column | Type | Description |
102//! |--------|------|-------------|
103//! | `job_id` | UUID | Primary key |
104//! | `name` | TEXT | Job type name (matches `Job::NAME`) |
105//! | `payload` | JSONB | Serialized job data |
106//! | `priority` | INT | Higher = runs first (`ORDER BY priority DESC`) |
107//! | `run_at` | TIMESTAMPTZ | When job can next be executed |
108//! | `created_at` | TIMESTAMPTZ | When job was enqueued |
109//! | `locked_at` | TIMESTAMPTZ | When a worker locked this job |
110//! | `locked_by` | TEXT | Worker UUID that holds the lock |
111//! | `context` | TEXT | Debug info (e.g., "user-signup") |
112//! | `error_count` | INT | Number of failures |
113//! | `last_error_message` | TEXT | Most recent error |
114//! | `last_failed_at` | TIMESTAMPTZ | Timestamp of last failure |
115//!
116//! ## `dead_letter_jobs` table
117//!
118//! Jobs that exceeded max retries are moved here for manual investigation.
119//!
120//! | Column | Type | Description |
121//! |--------|------|-------------|
122//! | `id` | UUID | Primary key |
123//! | `original_job_id` | UUID | Original `job_id` |
124//! | `name` | TEXT | Job type name |
125//! | `payload` | JSONB | Serialized job data |
126//! | `context` | TEXT | Debug info |
127//! | `priority` | INT | Original priority |
128//! | `error_count` | INT | Total failure count |
129//! | `last_error_message` | TEXT | Final error |
130//! | `created_at` | TIMESTAMPTZ | Original creation time |
131//! | `failed_at` | TIMESTAMPTZ | When moved to dead letter |
132
133use crate::app_state::AppState as AS;
134use serde::{Serialize, de::DeserializeOwned};
135use thiserror::Error;
136use tracing::instrument;
137
138pub mod registry;
139
140pub use tokio_util::sync::CancellationToken;
141pub use worker::{DEFAULT_LOCK_TIMEOUT, DEFAULT_MAX_RETRIES};
142
143#[derive(Debug, Error)]
144pub enum EnqueueError {
145 #[error("SqlxError: {0}")]
146 SqlxError(#[from] sqlx::Error),
147 #[error("SerdeJsonError: {0}")]
148 SerdeJsonError(#[from] serde_json::Error),
149}
150
151/// A trait for defining background jobs that can be enqueued and processed asynchronously.
152///
153/// Jobs must be serializable and provide a unique name identifier. The job system
154/// handles persistence, retries, and concurrent execution automatically.
155///
156/// # Automatic Retry Behavior
157///
158/// All jobs are automatically retried on failure with exponential backoff:
159/// - Failed jobs are requeued with increasing delays: 2, 4, 8, 16, 32... seconds
160/// - Error messages and failure timestamps are tracked in the database
161/// - Jobs are moved to a dead letter queue after exceeding the configured max retries (default: 20)
162/// - No manual intervention required for transient failures
163///
164/// # Lock Timeout (Abandoned Job Recovery)
165///
166/// Jobs are locked while being processed to prevent multiple workers from running the same
167/// job. If a worker crashes or becomes unresponsive, the lock remains but the job is never
168/// completed. The lock timeout mechanism handles this:
169/// - Jobs locked longer than the timeout (default: 2 hours) are considered abandoned
170/// - Any worker can pick up abandoned jobs and retry them
171/// - This ensures jobs are eventually processed even after worker failures
172///
173/// # Example
174///
175/// ```rust
176/// use cja::jobs::Job;
177/// use serde::{Serialize, Deserialize};
178///
179/// # #[derive(Debug, Clone)]
180/// # struct MyAppState {
181/// # db: sqlx::PgPool,
182/// # }
183/// # impl cja::app_state::AppState for MyAppState {
184/// # fn db(&self) -> &sqlx::PgPool { &self.db }
185/// # fn version(&self) -> &str { "1.0.0" }
186/// # fn cookie_key(&self) -> &cja::server::cookies::CookieKey { todo!() }
187/// # }
188///
189/// #[derive(Debug, Serialize, Deserialize, Clone)]
190/// struct EmailJob {
191/// to: String,
192/// subject: String,
193/// body: String,
194/// }
195///
196/// #[async_trait::async_trait]
197/// impl Job<MyAppState> for EmailJob {
198/// const NAME: &'static str = "EmailJob";
199///
200/// async fn run(&self, app_state: MyAppState) -> color_eyre::Result<()> {
201/// // Send email logic here
202/// println!("Sending email to {} with subject: {}", self.to, self.subject);
203/// // You can access the database through app_state.db()
204/// Ok(())
205/// }
206/// }
207/// ```
208///
209/// # Enqueuing Jobs
210///
211/// ```rust
212/// # use cja::jobs::Job;
213/// # use serde::{Serialize, Deserialize};
214/// # #[derive(Debug, Serialize, Deserialize, Clone)]
215/// # struct EmailJob { to: String, subject: String, body: String }
216/// # #[derive(Debug, Clone)]
217/// # struct MyAppState { db: sqlx::PgPool }
218/// # impl cja::app_state::AppState for MyAppState {
219/// # fn db(&self) -> &sqlx::PgPool { &self.db }
220/// # fn version(&self) -> &str { "1.0.0" }
221/// # fn cookie_key(&self) -> &cja::server::cookies::CookieKey { todo!() }
222/// # }
223/// # #[async_trait::async_trait]
224/// # impl Job<MyAppState> for EmailJob {
225/// # const NAME: &'static str = "EmailJob";
226/// # async fn run(&self, _: MyAppState) -> color_eyre::Result<()> { Ok(()) }
227/// # }
228/// # async fn example(app_state: MyAppState) -> Result<(), cja::jobs::EnqueueError> {
229/// let job = EmailJob {
230/// to: "user@example.com".to_string(),
231/// subject: "Welcome!".to_string(),
232/// body: "Thank you for signing up!".to_string(),
233/// };
234///
235/// // Enqueue the job with a context string for debugging
236/// job.enqueue(app_state, "user-signup".to_string(), None).await?;
237/// # Ok(())
238/// # }
239/// ```
240///
241/// # Job with Database Access
242///
243/// ```rust
244/// use cja::jobs::Job;
245/// use serde::{Serialize, Deserialize};
246///
247/// # #[derive(Debug, Clone)]
248/// # struct MyAppState {
249/// # db: sqlx::PgPool,
250/// # }
251/// # impl cja::app_state::AppState for MyAppState {
252/// # fn db(&self) -> &sqlx::PgPool { &self.db }
253/// # fn version(&self) -> &str { "1.0.0" }
254/// # fn cookie_key(&self) -> &cja::server::cookies::CookieKey { todo!() }
255/// # }
256///
257/// #[derive(Debug, Serialize, Deserialize, Clone)]
258/// struct ProcessPaymentJob {
259/// user_id: i32,
260/// amount_cents: i64,
261/// }
262///
263/// #[async_trait::async_trait]
264/// impl Job<MyAppState> for ProcessPaymentJob {
265/// const NAME: &'static str = "ProcessPaymentJob";
266///
267/// async fn run(&self, app_state: MyAppState) -> color_eyre::Result<()> {
268/// use crate::cja::app_state::AppState;
269/// use sqlx::Row;
270///
271/// // Access the database through app_state
272/// let user = sqlx::query("SELECT name FROM users WHERE id = $1")
273/// .bind(self.user_id)
274/// .fetch_one(app_state.db())
275/// .await?;
276///
277/// println!("Processing payment of {} cents for user {} #{}",
278/// self.amount_cents, user.get::<String, _>("name"), self.user_id);
279///
280/// sqlx::query("INSERT INTO payments (user_id, amount_cents) VALUES ($1, $2)")
281/// .bind(self.user_id)
282/// .bind(self.amount_cents)
283/// .execute(app_state.db())
284/// .await?;
285///
286/// Ok(())
287/// }
288/// }
289/// ```
290#[async_trait::async_trait]
291pub trait Job<AppState: AS>:
292 Serialize + DeserializeOwned + Send + Sync + std::fmt::Debug + Clone + 'static
293{
294 /// The unique name identifier for this job type.
295 /// This is used for routing jobs to their handlers.
296 const NAME: &'static str;
297
298 /// Execute the job logic.
299 ///
300 /// This method has access to the full application state,
301 /// including the database connection pool.
302 ///
303 /// If this method returns an error, the job will be automatically retried
304 /// with exponential backoff until it succeeds or exceeds max retries.
305 async fn run(&self, app_state: AppState) -> color_eyre::Result<()>;
306
307 /// Execute the job logic with an optional cancellation token for graceful shutdown.
308 ///
309 /// Long-running jobs can override this method to check the cancellation token
310 /// periodically and exit early during shutdown. The default implementation
311 /// ignores the token and calls `run()`, so existing jobs continue to work
312 /// without changes.
313 ///
314 /// # Example
315 ///
316 /// ```rust
317 /// # use cja::jobs::{Job, CancellationToken};
318 /// # use serde::{Serialize, Deserialize};
319 /// # #[derive(Debug, Serialize, Deserialize, Clone)]
320 /// # struct LongJob { iterations: u32 }
321 /// # #[derive(Debug, Clone)]
322 /// # struct MyAppState { db: sqlx::PgPool }
323 /// # impl cja::app_state::AppState for MyAppState {
324 /// # fn db(&self) -> &sqlx::PgPool { &self.db }
325 /// # fn version(&self) -> &str { "1.0.0" }
326 /// # fn cookie_key(&self) -> &cja::server::cookies::CookieKey { todo!() }
327 /// # }
328 /// #[async_trait::async_trait]
329 /// impl Job<MyAppState> for LongJob {
330 /// const NAME: &'static str = "LongJob";
331 ///
332 /// async fn run(&self, _app_state: MyAppState) -> color_eyre::Result<()> {
333 /// // Simple implementation without cancellation support
334 /// Ok(())
335 /// }
336 ///
337 /// async fn run_with_cancellation(
338 /// &self,
339 /// app_state: MyAppState,
340 /// cancellation_token: CancellationToken,
341 /// ) -> color_eyre::Result<()> {
342 /// for i in 0..self.iterations {
343 /// // Check if shutdown was requested
344 /// if cancellation_token.is_cancelled() {
345 /// tracing::info!("Job cancelled after {} iterations", i);
346 /// return Err(color_eyre::eyre::eyre!("Job cancelled during shutdown"));
347 /// }
348 ///
349 /// // Do some work
350 /// process_iteration(i, app_state.clone()).await?;
351 /// }
352 /// Ok(())
353 /// }
354 /// }
355 /// # async fn process_iteration(_i: u32, _app_state: MyAppState) -> color_eyre::Result<()> { Ok(()) }
356 /// ```
357 async fn run_with_cancellation(
358 &self,
359 app_state: AppState,
360 _cancellation_token: CancellationToken,
361 ) -> color_eyre::Result<()> {
362 // Default implementation ignores cancellation token
363 self.run(app_state).await
364 }
365
366 /// Internal method used by the job system to deserialize and run jobs.
367 ///
368 /// You typically won't call this directly - it's used by the job worker.
369 #[instrument(name = "jobs.run_from_value", skip(app_state, cancellation_token), fields(job.name = Self::NAME), err)]
370 async fn run_from_value(
371 value: serde_json::Value,
372 app_state: AppState,
373 cancellation_token: CancellationToken,
374 ) -> color_eyre::Result<()> {
375 let job: Self = serde_json::from_value(value)?;
376
377 job.run_with_cancellation(app_state, cancellation_token)
378 .await
379 }
380
381 /// Enqueue this job for asynchronous execution.
382 ///
383 /// The job will be persisted to the database and picked up by a worker process.
384 /// Jobs are executed with at-least-once semantics and automatic retries on failure.
385 ///
386 /// # Arguments
387 /// * `app_state` - The application state containing the database connection
388 /// * `context` - A string describing why this job was enqueued (useful for debugging)
389 /// * `priority` - Optional priority for this job instance. Higher values run first.
390 /// Defaults to 0 if `None`. Use negative values for lower-priority background work.
391 ///
392 /// # Example
393 ///
394 /// ```rust
395 /// # use cja::jobs::Job;
396 /// # use serde::{Serialize, Deserialize};
397 /// # #[derive(Debug, Serialize, Deserialize, Clone)]
398 /// # struct MyJob { data: String }
399 /// # #[derive(Debug, Clone)]
400 /// # struct MyAppState { db: sqlx::PgPool }
401 /// # impl cja::app_state::AppState for MyAppState {
402 /// # fn db(&self) -> &sqlx::PgPool { &self.db }
403 /// # fn version(&self) -> &str { "1.0.0" }
404 /// # fn cookie_key(&self) -> &cja::server::cookies::CookieKey { todo!() }
405 /// # }
406 /// # #[async_trait::async_trait]
407 /// # impl Job<MyAppState> for MyJob {
408 /// # const NAME: &'static str = "MyJob";
409 /// # async fn run(&self, _: MyAppState) -> color_eyre::Result<()> { Ok(()) }
410 /// # }
411 /// # async fn example(app_state: MyAppState) -> Result<(), cja::jobs::EnqueueError> {
412 /// let job = MyJob { data: "important work".to_string() };
413 /// // Default priority
414 /// job.clone().enqueue(app_state.clone(), "user-requested".to_string(), None).await?;
415 /// // Low priority background work
416 /// job.enqueue(app_state, "background-cleanup".to_string(), Some(-10)).await?;
417 /// # Ok(())
418 /// # }
419 /// ```
420 #[instrument(name = "jobs.enqueue", skip(app_state), fields(job.name = Self::NAME), err)]
421 async fn enqueue(
422 self,
423 app_state: AppState,
424 context: String,
425 priority: Option<i32>,
426 ) -> Result<(), EnqueueError> {
427 sqlx::query(
428 "
429 INSERT INTO jobs (job_id, name, payload, priority, run_at, created_at, context)
430 VALUES ($1, $2, $3, $4, $5, $6, $7)",
431 )
432 .bind(uuid::Uuid::new_v4())
433 .bind(Self::NAME)
434 .bind(serde_json::to_value(self)?)
435 .bind(priority.unwrap_or(0))
436 .bind(chrono::Utc::now())
437 .bind(chrono::Utc::now())
438 .bind(context)
439 .execute(app_state.db())
440 .await?;
441
442 Ok(())
443 }
444}
445
446pub mod worker;