cja/jobs/
mod.rs

1//! Background job processing with `PostgreSQL` persistence.
2//!
3//! The job system provides a reliable, database-backed work queue with automatic retries,
4//! exponential backoff, priority scheduling, and a dead letter queue for permanently
5//! failed jobs.
6//!
7//! # Defining a Job
8//!
9//! Implement the [`Job`] trait for your job type:
10//!
11//! ```rust,ignore
12//! use cja::jobs::Job;
13//! use serde::{Serialize, Deserialize};
14//!
15//! #[derive(Debug, Serialize, Deserialize, Clone)]
16//! struct SendEmailJob {
17//!     to: String,
18//!     subject: String,
19//! }
20//!
21//! #[async_trait::async_trait]
22//! impl Job<MyAppState> for SendEmailJob {
23//!     const NAME: &'static str = "SendEmailJob";
24//!
25//!     async fn run(&self, app_state: MyAppState) -> cja::Result<()> {
26//!         // Send email using app_state.db() for templates, etc.
27//!         Ok(())
28//!     }
29//! }
30//! ```
31//!
32//! # Registering Jobs
33//!
34//! All job types must be registered via the [`impl_job_registry!`](crate::impl_job_registry) macro:
35//!
36//! ```rust,ignore
37//! cja::impl_job_registry!(MyAppState, SendEmailJob, CleanupJob, ReportJob);
38//! ```
39//!
40//! This generates a `Jobs` struct that routes jobs by their `NAME` constant.
41//! Forgetting to register a job type means it won't be processed.
42//!
43//! # Enqueuing Jobs
44//!
45//! ```rust,ignore
46//! let job = SendEmailJob {
47//!     to: "user@example.com".into(),
48//!     subject: "Welcome!".into(),
49//! };
50//!
51//! // Default priority (0)
52//! job.clone().enqueue(app_state.clone(), "user-signup".into(), None).await?;
53//!
54//! // High priority — higher values run first (ORDER BY priority DESC)
55//! job.enqueue(app_state, "urgent".into(), Some(10)).await?;
56//! ```
57//!
58//! **Priority**: Higher values run first. Priority 10 runs before 0, and -10
59//! runs last. This is per-enqueue, not per-job-type.
60//!
61//! # Retry Behavior
62//!
63//! Failed jobs are automatically retried with exponential backoff
64//! (delay = `2^(error_count + 1)` seconds):
65//!
66//! | Retry # | Delay |
67//! |---------|-------|
68//! | 1 | 4 seconds |
69//! | 2 | 8 seconds |
70//! | 5 | ~1 minute |
71//! | 10 | ~17 minutes |
72//! | 20 | ~12 days |
73//!
74//! After [`DEFAULT_MAX_RETRIES`] (20) attempts,
75//! jobs are moved to the `dead_letter_jobs` table.
76//!
77//! # Idempotency
78//!
79//! **Jobs must be idempotent.** The system retries on timeouts, worker crashes,
80//! and network failures. Guard against double-application with:
81//!
82//! - `ON CONFLICT DO NOTHING` for inserts
83//! - Early-exit checks at job start
84//! - Idempotency keys in your domain logic
85//!
86//! # Concurrent Workers
87//!
88//! Multiple [`worker::job_worker`] instances can run safely — the worker SQL
89//! uses `FOR UPDATE SKIP LOCKED` to prevent duplicate processing at the
90//! database level.
91//!
92//! # Cancellation Support
93//!
94//! Long-running jobs can override [`Job::run_with_cancellation`] to exit
95//! gracefully during shutdown by checking the cancellation token periodically.
96//!
97//! # Database Schema
98//!
99//! ## `jobs` table
100//!
101//! | Column | Type | Description |
102//! |--------|------|-------------|
103//! | `job_id` | UUID | Primary key |
104//! | `name` | TEXT | Job type name (matches `Job::NAME`) |
105//! | `payload` | JSONB | Serialized job data |
106//! | `priority` | INT | Higher = runs first (`ORDER BY priority DESC`) |
107//! | `run_at` | TIMESTAMPTZ | When job can next be executed |
108//! | `created_at` | TIMESTAMPTZ | When job was enqueued |
109//! | `locked_at` | TIMESTAMPTZ | When a worker locked this job |
110//! | `locked_by` | TEXT | Worker UUID that holds the lock |
111//! | `context` | TEXT | Debug info (e.g., "user-signup") |
112//! | `error_count` | INT | Number of failures |
113//! | `last_error_message` | TEXT | Most recent error |
114//! | `last_failed_at` | TIMESTAMPTZ | Timestamp of last failure |
115//!
116//! ## `dead_letter_jobs` table
117//!
118//! Jobs that exceeded max retries are moved here for manual investigation.
119//!
120//! | Column | Type | Description |
121//! |--------|------|-------------|
122//! | `id` | UUID | Primary key |
123//! | `original_job_id` | UUID | Original `job_id` |
124//! | `name` | TEXT | Job type name |
125//! | `payload` | JSONB | Serialized job data |
126//! | `context` | TEXT | Debug info |
127//! | `priority` | INT | Original priority |
128//! | `error_count` | INT | Total failure count |
129//! | `last_error_message` | TEXT | Final error |
130//! | `created_at` | TIMESTAMPTZ | Original creation time |
131//! | `failed_at` | TIMESTAMPTZ | When moved to dead letter |
132
133use crate::app_state::AppState as AS;
134use serde::{Serialize, de::DeserializeOwned};
135use thiserror::Error;
136use tracing::instrument;
137
138pub mod registry;
139
140pub use tokio_util::sync::CancellationToken;
141pub use worker::{DEFAULT_LOCK_TIMEOUT, DEFAULT_MAX_RETRIES};
142
143#[derive(Debug, Error)]
144pub enum EnqueueError {
145    #[error("SqlxError: {0}")]
146    SqlxError(#[from] sqlx::Error),
147    #[error("SerdeJsonError: {0}")]
148    SerdeJsonError(#[from] serde_json::Error),
149}
150
151/// A trait for defining background jobs that can be enqueued and processed asynchronously.
152///
153/// Jobs must be serializable and provide a unique name identifier. The job system
154/// handles persistence, retries, and concurrent execution automatically.
155///
156/// # Automatic Retry Behavior
157///
158/// All jobs are automatically retried on failure with exponential backoff:
159/// - Failed jobs are requeued with increasing delays: 2, 4, 8, 16, 32... seconds
160/// - Error messages and failure timestamps are tracked in the database
161/// - Jobs are moved to a dead letter queue after exceeding the configured max retries (default: 20)
162/// - No manual intervention required for transient failures
163///
164/// # Lock Timeout (Abandoned Job Recovery)
165///
166/// Jobs are locked while being processed to prevent multiple workers from running the same
167/// job. If a worker crashes or becomes unresponsive, the lock remains but the job is never
168/// completed. The lock timeout mechanism handles this:
169/// - Jobs locked longer than the timeout (default: 2 hours) are considered abandoned
170/// - Any worker can pick up abandoned jobs and retry them
171/// - This ensures jobs are eventually processed even after worker failures
172///
173/// # Example
174///
175/// ```rust
176/// use cja::jobs::Job;
177/// use serde::{Serialize, Deserialize};
178///
179/// # #[derive(Debug, Clone)]
180/// # struct MyAppState {
181/// #     db: sqlx::PgPool,
182/// # }
183/// # impl cja::app_state::AppState for MyAppState {
184/// #     fn db(&self) -> &sqlx::PgPool { &self.db }
185/// #     fn version(&self) -> &str { "1.0.0" }
186/// #     fn cookie_key(&self) -> &cja::server::cookies::CookieKey { todo!() }
187/// # }
188///
189/// #[derive(Debug, Serialize, Deserialize, Clone)]
190/// struct EmailJob {
191///     to: String,
192///     subject: String,
193///     body: String,
194/// }
195///
196/// #[async_trait::async_trait]
197/// impl Job<MyAppState> for EmailJob {
198///     const NAME: &'static str = "EmailJob";
199///
200///     async fn run(&self, app_state: MyAppState) -> color_eyre::Result<()> {
201///         // Send email logic here
202///         println!("Sending email to {} with subject: {}", self.to, self.subject);
203///         // You can access the database through app_state.db()
204///         Ok(())
205///     }
206/// }
207/// ```
208///
209/// # Enqueuing Jobs
210///
211/// ```rust
212/// # use cja::jobs::Job;
213/// # use serde::{Serialize, Deserialize};
214/// # #[derive(Debug, Serialize, Deserialize, Clone)]
215/// # struct EmailJob { to: String, subject: String, body: String }
216/// # #[derive(Debug, Clone)]
217/// # struct MyAppState { db: sqlx::PgPool }
218/// # impl cja::app_state::AppState for MyAppState {
219/// #     fn db(&self) -> &sqlx::PgPool { &self.db }
220/// #     fn version(&self) -> &str { "1.0.0" }
221/// #     fn cookie_key(&self) -> &cja::server::cookies::CookieKey { todo!() }
222/// # }
223/// # #[async_trait::async_trait]
224/// # impl Job<MyAppState> for EmailJob {
225/// #     const NAME: &'static str = "EmailJob";
226/// #     async fn run(&self, _: MyAppState) -> color_eyre::Result<()> { Ok(()) }
227/// # }
228/// # async fn example(app_state: MyAppState) -> Result<(), cja::jobs::EnqueueError> {
229/// let job = EmailJob {
230///     to: "user@example.com".to_string(),
231///     subject: "Welcome!".to_string(),
232///     body: "Thank you for signing up!".to_string(),
233/// };
234///
235/// // Enqueue the job with a context string for debugging
236/// job.enqueue(app_state, "user-signup".to_string(), None).await?;
237/// # Ok(())
238/// # }
239/// ```
240///
241/// # Job with Database Access
242///
243/// ```rust
244/// use cja::jobs::Job;
245/// use serde::{Serialize, Deserialize};
246///
247/// # #[derive(Debug, Clone)]
248/// # struct MyAppState {
249/// #     db: sqlx::PgPool,
250/// # }
251/// # impl cja::app_state::AppState for MyAppState {
252/// #     fn db(&self) -> &sqlx::PgPool { &self.db }
253/// #     fn version(&self) -> &str { "1.0.0" }
254/// #     fn cookie_key(&self) -> &cja::server::cookies::CookieKey { todo!() }
255/// # }
256///
257/// #[derive(Debug, Serialize, Deserialize, Clone)]
258/// struct ProcessPaymentJob {
259///     user_id: i32,
260///     amount_cents: i64,
261/// }
262///
263/// #[async_trait::async_trait]
264/// impl Job<MyAppState> for ProcessPaymentJob {
265///     const NAME: &'static str = "ProcessPaymentJob";
266///
267///     async fn run(&self, app_state: MyAppState) -> color_eyre::Result<()> {
268///         use crate::cja::app_state::AppState;
269///         use sqlx::Row;
270///
271///         // Access the database through app_state
272///         let user = sqlx::query("SELECT name FROM users WHERE id = $1")
273///             .bind(self.user_id)
274///             .fetch_one(app_state.db())
275///             .await?;
276///
277///         println!("Processing payment of {} cents for user {} #{}",
278///                  self.amount_cents, user.get::<String, _>("name"), self.user_id);
279///
280///         sqlx::query("INSERT INTO payments (user_id, amount_cents) VALUES ($1, $2)")
281///             .bind(self.user_id)
282///             .bind(self.amount_cents)
283///             .execute(app_state.db())
284///             .await?;
285///
286///         Ok(())
287///     }
288/// }
289/// ```
290#[async_trait::async_trait]
291pub trait Job<AppState: AS>:
292    Serialize + DeserializeOwned + Send + Sync + std::fmt::Debug + Clone + 'static
293{
294    /// The unique name identifier for this job type.
295    /// This is used for routing jobs to their handlers.
296    const NAME: &'static str;
297
298    /// Execute the job logic.
299    ///
300    /// This method has access to the full application state,
301    /// including the database connection pool.
302    ///
303    /// If this method returns an error, the job will be automatically retried
304    /// with exponential backoff until it succeeds or exceeds max retries.
305    async fn run(&self, app_state: AppState) -> color_eyre::Result<()>;
306
307    /// Execute the job logic with an optional cancellation token for graceful shutdown.
308    ///
309    /// Long-running jobs can override this method to check the cancellation token
310    /// periodically and exit early during shutdown. The default implementation
311    /// ignores the token and calls `run()`, so existing jobs continue to work
312    /// without changes.
313    ///
314    /// # Example
315    ///
316    /// ```rust
317    /// # use cja::jobs::{Job, CancellationToken};
318    /// # use serde::{Serialize, Deserialize};
319    /// # #[derive(Debug, Serialize, Deserialize, Clone)]
320    /// # struct LongJob { iterations: u32 }
321    /// # #[derive(Debug, Clone)]
322    /// # struct MyAppState { db: sqlx::PgPool }
323    /// # impl cja::app_state::AppState for MyAppState {
324    /// #     fn db(&self) -> &sqlx::PgPool { &self.db }
325    /// #     fn version(&self) -> &str { "1.0.0" }
326    /// #     fn cookie_key(&self) -> &cja::server::cookies::CookieKey { todo!() }
327    /// # }
328    /// #[async_trait::async_trait]
329    /// impl Job<MyAppState> for LongJob {
330    ///     const NAME: &'static str = "LongJob";
331    ///
332    ///     async fn run(&self, _app_state: MyAppState) -> color_eyre::Result<()> {
333    ///         // Simple implementation without cancellation support
334    ///         Ok(())
335    ///     }
336    ///
337    ///     async fn run_with_cancellation(
338    ///         &self,
339    ///         app_state: MyAppState,
340    ///         cancellation_token: CancellationToken,
341    ///     ) -> color_eyre::Result<()> {
342    ///         for i in 0..self.iterations {
343    ///             // Check if shutdown was requested
344    ///             if cancellation_token.is_cancelled() {
345    ///                 tracing::info!("Job cancelled after {} iterations", i);
346    ///                 return Err(color_eyre::eyre::eyre!("Job cancelled during shutdown"));
347    ///             }
348    ///
349    ///             // Do some work
350    ///             process_iteration(i, app_state.clone()).await?;
351    ///         }
352    ///         Ok(())
353    ///     }
354    /// }
355    /// # async fn process_iteration(_i: u32, _app_state: MyAppState) -> color_eyre::Result<()> { Ok(()) }
356    /// ```
357    async fn run_with_cancellation(
358        &self,
359        app_state: AppState,
360        _cancellation_token: CancellationToken,
361    ) -> color_eyre::Result<()> {
362        // Default implementation ignores cancellation token
363        self.run(app_state).await
364    }
365
366    /// Internal method used by the job system to deserialize and run jobs.
367    ///
368    /// You typically won't call this directly - it's used by the job worker.
369    #[instrument(name = "jobs.run_from_value", skip(app_state, cancellation_token), fields(job.name = Self::NAME), err)]
370    async fn run_from_value(
371        value: serde_json::Value,
372        app_state: AppState,
373        cancellation_token: CancellationToken,
374    ) -> color_eyre::Result<()> {
375        let job: Self = serde_json::from_value(value)?;
376
377        job.run_with_cancellation(app_state, cancellation_token)
378            .await
379    }
380
381    /// Enqueue this job for asynchronous execution.
382    ///
383    /// The job will be persisted to the database and picked up by a worker process.
384    /// Jobs are executed with at-least-once semantics and automatic retries on failure.
385    ///
386    /// # Arguments
387    /// * `app_state` - The application state containing the database connection
388    /// * `context` - A string describing why this job was enqueued (useful for debugging)
389    /// * `priority` - Optional priority for this job instance. Higher values run first.
390    ///   Defaults to 0 if `None`. Use negative values for lower-priority background work.
391    ///
392    /// # Example
393    ///
394    /// ```rust
395    /// # use cja::jobs::Job;
396    /// # use serde::{Serialize, Deserialize};
397    /// # #[derive(Debug, Serialize, Deserialize, Clone)]
398    /// # struct MyJob { data: String }
399    /// # #[derive(Debug, Clone)]
400    /// # struct MyAppState { db: sqlx::PgPool }
401    /// # impl cja::app_state::AppState for MyAppState {
402    /// #     fn db(&self) -> &sqlx::PgPool { &self.db }
403    /// #     fn version(&self) -> &str { "1.0.0" }
404    /// #     fn cookie_key(&self) -> &cja::server::cookies::CookieKey { todo!() }
405    /// # }
406    /// # #[async_trait::async_trait]
407    /// # impl Job<MyAppState> for MyJob {
408    /// #     const NAME: &'static str = "MyJob";
409    /// #     async fn run(&self, _: MyAppState) -> color_eyre::Result<()> { Ok(()) }
410    /// # }
411    /// # async fn example(app_state: MyAppState) -> Result<(), cja::jobs::EnqueueError> {
412    /// let job = MyJob { data: "important work".to_string() };
413    /// // Default priority
414    /// job.clone().enqueue(app_state.clone(), "user-requested".to_string(), None).await?;
415    /// // Low priority background work
416    /// job.enqueue(app_state, "background-cleanup".to_string(), Some(-10)).await?;
417    /// # Ok(())
418    /// # }
419    /// ```
420    #[instrument(name = "jobs.enqueue", skip(app_state), fields(job.name = Self::NAME), err)]
421    async fn enqueue(
422        self,
423        app_state: AppState,
424        context: String,
425        priority: Option<i32>,
426    ) -> Result<(), EnqueueError> {
427        sqlx::query(
428            "
429        INSERT INTO jobs (job_id, name, payload, priority, run_at, created_at, context)
430        VALUES ($1, $2, $3, $4, $5, $6, $7)",
431        )
432        .bind(uuid::Uuid::new_v4())
433        .bind(Self::NAME)
434        .bind(serde_json::to_value(self)?)
435        .bind(priority.unwrap_or(0))
436        .bind(chrono::Utc::now())
437        .bind(chrono::Utc::now())
438        .bind(context)
439        .execute(app_state.db())
440        .await?;
441
442        Ok(())
443    }
444}
445
446pub mod worker;