Module jobs

Module jobs 

Source
Expand description

Background job processing with PostgreSQL persistence.

The job system provides a reliable, database-backed work queue with automatic retries, exponential backoff, priority scheduling, and a dead letter queue for permanently failed jobs.

§Defining a Job

Implement the Job trait for your job type:

use cja::jobs::Job;
use serde::{Serialize, Deserialize};

#[derive(Debug, Serialize, Deserialize, Clone)]
struct SendEmailJob {
    to: String,
    subject: String,
}

#[async_trait::async_trait]
impl Job<MyAppState> for SendEmailJob {
    const NAME: &'static str = "SendEmailJob";

    async fn run(&self, app_state: MyAppState) -> cja::Result<()> {
        // Send email using app_state.db() for templates, etc.
        Ok(())
    }
}

§Registering Jobs

All job types must be registered via the impl_job_registry! macro:

cja::impl_job_registry!(MyAppState, SendEmailJob, CleanupJob, ReportJob);

This generates a Jobs struct that routes jobs by their NAME constant. Forgetting to register a job type means it won’t be processed.

§Enqueuing Jobs

let job = SendEmailJob {
    to: "user@example.com".into(),
    subject: "Welcome!".into(),
};

// Default priority (0)
job.clone().enqueue(app_state.clone(), "user-signup".into(), None).await?;

// High priority — higher values run first (ORDER BY priority DESC)
job.enqueue(app_state, "urgent".into(), Some(10)).await?;

Priority: Higher values run first. Priority 10 runs before 0, and -10 runs last. This is per-enqueue, not per-job-type.

§Retry Behavior

Failed jobs are automatically retried with exponential backoff (delay = 2^(error_count + 1) seconds):

Retry #Delay
14 seconds
28 seconds
5~1 minute
10~17 minutes
20~12 days

After DEFAULT_MAX_RETRIES (20) attempts, jobs are moved to the dead_letter_jobs table.

§Idempotency

Jobs must be idempotent. The system retries on timeouts, worker crashes, and network failures. Guard against double-application with:

  • ON CONFLICT DO NOTHING for inserts
  • Early-exit checks at job start
  • Idempotency keys in your domain logic

§Concurrent Workers

Multiple worker::job_worker instances can run safely — the worker SQL uses FOR UPDATE SKIP LOCKED to prevent duplicate processing at the database level.

§Cancellation Support

Long-running jobs can override Job::run_with_cancellation to exit gracefully during shutdown by checking the cancellation token periodically.

§Database Schema

§jobs table

ColumnTypeDescription
job_idUUIDPrimary key
nameTEXTJob type name (matches Job::NAME)
payloadJSONBSerialized job data
priorityINTHigher = runs first (ORDER BY priority DESC)
run_atTIMESTAMPTZWhen job can next be executed
created_atTIMESTAMPTZWhen job was enqueued
locked_atTIMESTAMPTZWhen a worker locked this job
locked_byTEXTWorker UUID that holds the lock
contextTEXTDebug info (e.g., “user-signup”)
error_countINTNumber of failures
last_error_messageTEXTMost recent error
last_failed_atTIMESTAMPTZTimestamp of last failure

§dead_letter_jobs table

Jobs that exceeded max retries are moved here for manual investigation.

ColumnTypeDescription
idUUIDPrimary key
original_job_idUUIDOriginal job_id
nameTEXTJob type name
payloadJSONBSerialized job data
contextTEXTDebug info
priorityINTOriginal priority
error_countINTTotal failure count
last_error_messageTEXTFinal error
created_atTIMESTAMPTZOriginal creation time
failed_atTIMESTAMPTZWhen moved to dead letter

Re-exports§

pub use worker::DEFAULT_LOCK_TIMEOUT;
pub use worker::DEFAULT_MAX_RETRIES;

Modules§

registry
worker

Structs§

CancellationToken
A token which can be used to signal a cancellation request to one or more tasks.

Enums§

EnqueueError

Traits§

Job
A trait for defining background jobs that can be enqueued and processed asynchronously.