Job

Trait Job 

Source
pub trait Job<AppState: AS>:
    Serialize
    + DeserializeOwned
    + Send
    + Sync
    + Debug
    + Clone
    + 'static {
    const NAME: &'static str;

    // Required method
    fn run<'life0, 'async_trait>(
        &'life0 self,
        app_state: AppState,
    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;

    // Provided methods
    fn run_with_cancellation<'life0, 'async_trait>(
        &'life0 self,
        app_state: AppState,
        _cancellation_token: CancellationToken,
    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait { ... }
    fn run_from_value<'async_trait>(
        value: Value,
        app_state: AppState,
        cancellation_token: CancellationToken,
    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
       where Self: 'async_trait { ... }
    fn enqueue<'async_trait>(
        self,
        app_state: AppState,
        context: String,
        priority: Option<i32>,
    ) -> Pin<Box<dyn Future<Output = Result<(), EnqueueError>> + Send + 'async_trait>>
       where Self: 'async_trait { ... }
}
Expand description

A trait for defining background jobs that can be enqueued and processed asynchronously.

Jobs must be serializable and provide a unique name identifier. The job system handles persistence, retries, and concurrent execution automatically.

§Automatic Retry Behavior

All jobs are automatically retried on failure with exponential backoff:

  • Failed jobs are requeued with increasing delays: 2, 4, 8, 16, 32… seconds
  • Error messages and failure timestamps are tracked in the database
  • Jobs are moved to a dead letter queue after exceeding the configured max retries (default: 20)
  • No manual intervention required for transient failures

§Lock Timeout (Abandoned Job Recovery)

Jobs are locked while being processed to prevent multiple workers from running the same job. If a worker crashes or becomes unresponsive, the lock remains but the job is never completed. The lock timeout mechanism handles this:

  • Jobs locked longer than the timeout (default: 2 hours) are considered abandoned
  • Any worker can pick up abandoned jobs and retry them
  • This ensures jobs are eventually processed even after worker failures

§Example

use cja::jobs::Job;
use serde::{Serialize, Deserialize};


#[derive(Debug, Serialize, Deserialize, Clone)]
struct EmailJob {
    to: String,
    subject: String,
    body: String,
}

#[async_trait::async_trait]
impl Job<MyAppState> for EmailJob {
    const NAME: &'static str = "EmailJob";

    async fn run(&self, app_state: MyAppState) -> color_eyre::Result<()> {
        // Send email logic here
        println!("Sending email to {} with subject: {}", self.to, self.subject);
        // You can access the database through app_state.db()
        Ok(())
    }
}

§Enqueuing Jobs

let job = EmailJob {
    to: "user@example.com".to_string(),
    subject: "Welcome!".to_string(),
    body: "Thank you for signing up!".to_string(),
};

// Enqueue the job with a context string for debugging
job.enqueue(app_state, "user-signup".to_string(), None).await?;

§Job with Database Access

use cja::jobs::Job;
use serde::{Serialize, Deserialize};


#[derive(Debug, Serialize, Deserialize, Clone)]
struct ProcessPaymentJob {
    user_id: i32,
    amount_cents: i64,
}

#[async_trait::async_trait]
impl Job<MyAppState> for ProcessPaymentJob {
    const NAME: &'static str = "ProcessPaymentJob";

    async fn run(&self, app_state: MyAppState) -> color_eyre::Result<()> {
        use crate::cja::app_state::AppState;
        use sqlx::Row;

        // Access the database through app_state
        let user = sqlx::query("SELECT name FROM users WHERE id = $1")
            .bind(self.user_id)
            .fetch_one(app_state.db())
            .await?;

        println!("Processing payment of {} cents for user {} #{}",
                 self.amount_cents, user.get::<String, _>("name"), self.user_id);

        sqlx::query("INSERT INTO payments (user_id, amount_cents) VALUES ($1, $2)")
            .bind(self.user_id)
            .bind(self.amount_cents)
            .execute(app_state.db())
            .await?;

        Ok(())
    }
}

Required Associated Constants§

Source

const NAME: &'static str

The unique name identifier for this job type. This is used for routing jobs to their handlers.

Required Methods§

Source

fn run<'life0, 'async_trait>( &'life0 self, app_state: AppState, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Execute the job logic.

This method has access to the full application state, including the database connection pool.

If this method returns an error, the job will be automatically retried with exponential backoff until it succeeds or exceeds max retries.

Provided Methods§

Source

fn run_with_cancellation<'life0, 'async_trait>( &'life0 self, app_state: AppState, _cancellation_token: CancellationToken, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Execute the job logic with an optional cancellation token for graceful shutdown.

Long-running jobs can override this method to check the cancellation token periodically and exit early during shutdown. The default implementation ignores the token and calls run(), so existing jobs continue to work without changes.

§Example
#[async_trait::async_trait]
impl Job<MyAppState> for LongJob {
    const NAME: &'static str = "LongJob";

    async fn run(&self, _app_state: MyAppState) -> color_eyre::Result<()> {
        // Simple implementation without cancellation support
        Ok(())
    }

    async fn run_with_cancellation(
        &self,
        app_state: MyAppState,
        cancellation_token: CancellationToken,
    ) -> color_eyre::Result<()> {
        for i in 0..self.iterations {
            // Check if shutdown was requested
            if cancellation_token.is_cancelled() {
                tracing::info!("Job cancelled after {} iterations", i);
                return Err(color_eyre::eyre::eyre!("Job cancelled during shutdown"));
            }

            // Do some work
            process_iteration(i, app_state.clone()).await?;
        }
        Ok(())
    }
}
Source

fn run_from_value<'async_trait>( value: Value, app_state: AppState, cancellation_token: CancellationToken, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,

Internal method used by the job system to deserialize and run jobs.

You typically won’t call this directly - it’s used by the job worker.

Source

fn enqueue<'async_trait>( self, app_state: AppState, context: String, priority: Option<i32>, ) -> Pin<Box<dyn Future<Output = Result<(), EnqueueError>> + Send + 'async_trait>>
where Self: 'async_trait,

Enqueue this job for asynchronous execution.

The job will be persisted to the database and picked up by a worker process. Jobs are executed with at-least-once semantics and automatic retries on failure.

§Arguments
  • app_state - The application state containing the database connection
  • context - A string describing why this job was enqueued (useful for debugging)
  • priority - Optional priority for this job instance. Higher values run first. Defaults to 0 if None. Use negative values for lower-priority background work.
§Example
let job = MyJob { data: "important work".to_string() };
// Default priority
job.clone().enqueue(app_state.clone(), "user-requested".to_string(), None).await?;
// Low priority background work
job.enqueue(app_state, "background-cleanup".to_string(), Some(-10)).await?;

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§