Expand description
Background job processing with PostgreSQL persistence.
The job system provides a reliable, database-backed work queue with automatic retries, exponential backoff, priority scheduling, and a dead letter queue for permanently failed jobs.
§Defining a Job
Implement the Job trait for your job type:
use cja::jobs::Job;
use serde::{Serialize, Deserialize};
#[derive(Debug, Serialize, Deserialize, Clone)]
struct SendEmailJob {
to: String,
subject: String,
}
#[async_trait::async_trait]
impl Job<MyAppState> for SendEmailJob {
const NAME: &'static str = "SendEmailJob";
async fn run(&self, app_state: MyAppState) -> cja::Result<()> {
// Send email using app_state.db() for templates, etc.
Ok(())
}
}§Registering Jobs
All job types must be registered via the impl_job_registry! macro:
cja::impl_job_registry!(MyAppState, SendEmailJob, CleanupJob, ReportJob);This generates a Jobs struct that routes jobs by their NAME constant.
Forgetting to register a job type means it won’t be processed.
§Enqueuing Jobs
let job = SendEmailJob {
to: "user@example.com".into(),
subject: "Welcome!".into(),
};
// Default priority (0)
job.clone().enqueue(app_state.clone(), "user-signup".into(), None).await?;
// High priority — higher values run first (ORDER BY priority DESC)
job.enqueue(app_state, "urgent".into(), Some(10)).await?;Priority: Higher values run first. Priority 10 runs before 0, and -10 runs last. This is per-enqueue, not per-job-type.
§Retry Behavior
Failed jobs are automatically retried with exponential backoff
(delay = 2^(error_count + 1) seconds):
| Retry # | Delay |
|---|---|
| 1 | 4 seconds |
| 2 | 8 seconds |
| 5 | ~1 minute |
| 10 | ~17 minutes |
| 20 | ~12 days |
After DEFAULT_MAX_RETRIES (20) attempts,
jobs are moved to the dead_letter_jobs table.
§Idempotency
Jobs must be idempotent. The system retries on timeouts, worker crashes, and network failures. Guard against double-application with:
ON CONFLICT DO NOTHINGfor inserts- Early-exit checks at job start
- Idempotency keys in your domain logic
§Concurrent Workers
Multiple worker::job_worker instances can run safely — the worker SQL
uses FOR UPDATE SKIP LOCKED to prevent duplicate processing at the
database level.
§Cancellation Support
Long-running jobs can override Job::run_with_cancellation to exit
gracefully during shutdown by checking the cancellation token periodically.
§Database Schema
§jobs table
| Column | Type | Description |
|---|---|---|
job_id | UUID | Primary key |
name | TEXT | Job type name (matches Job::NAME) |
payload | JSONB | Serialized job data |
priority | INT | Higher = runs first (ORDER BY priority DESC) |
run_at | TIMESTAMPTZ | When job can next be executed |
created_at | TIMESTAMPTZ | When job was enqueued |
locked_at | TIMESTAMPTZ | When a worker locked this job |
locked_by | TEXT | Worker UUID that holds the lock |
context | TEXT | Debug info (e.g., “user-signup”) |
error_count | INT | Number of failures |
last_error_message | TEXT | Most recent error |
last_failed_at | TIMESTAMPTZ | Timestamp of last failure |
§dead_letter_jobs table
Jobs that exceeded max retries are moved here for manual investigation.
| Column | Type | Description |
|---|---|---|
id | UUID | Primary key |
original_job_id | UUID | Original job_id |
name | TEXT | Job type name |
payload | JSONB | Serialized job data |
context | TEXT | Debug info |
priority | INT | Original priority |
error_count | INT | Total failure count |
last_error_message | TEXT | Final error |
created_at | TIMESTAMPTZ | Original creation time |
failed_at | TIMESTAMPTZ | When moved to dead letter |
Re-exports§
pub use worker::DEFAULT_LOCK_TIMEOUT;pub use worker::DEFAULT_MAX_RETRIES;
Modules§
Structs§
- Cancellation
Token - A token which can be used to signal a cancellation request to one or more tasks.
Enums§
Traits§
- Job
- A trait for defining background jobs that can be enqueued and processed asynchronously.