pub trait Job<AppState: AS>:
Serialize
+ DeserializeOwned
+ Send
+ Sync
+ Debug
+ Clone
+ 'static {
const NAME: &'static str;
// Required method
fn run<'life0, 'async_trait>(
&'life0 self,
app_state: AppState,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
// Provided methods
fn run_with_cancellation<'life0, 'async_trait>(
&'life0 self,
app_state: AppState,
_cancellation_token: CancellationToken,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn run_from_value<'async_trait>(
value: Value,
app_state: AppState,
cancellation_token: CancellationToken,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait { ... }
fn enqueue<'async_trait>(
self,
app_state: AppState,
context: String,
priority: Option<i32>,
) -> Pin<Box<dyn Future<Output = Result<(), EnqueueError>> + Send + 'async_trait>>
where Self: 'async_trait { ... }
}Expand description
A trait for defining background jobs that can be enqueued and processed asynchronously.
Jobs must be serializable and provide a unique name identifier. The job system handles persistence, retries, and concurrent execution automatically.
§Automatic Retry Behavior
All jobs are automatically retried on failure with exponential backoff:
- Failed jobs are requeued with increasing delays: 2, 4, 8, 16, 32… seconds
- Error messages and failure timestamps are tracked in the database
- Jobs are moved to a dead letter queue after exceeding the configured max retries (default: 20)
- No manual intervention required for transient failures
§Lock Timeout (Abandoned Job Recovery)
Jobs are locked while being processed to prevent multiple workers from running the same job. If a worker crashes or becomes unresponsive, the lock remains but the job is never completed. The lock timeout mechanism handles this:
- Jobs locked longer than the timeout (default: 2 hours) are considered abandoned
- Any worker can pick up abandoned jobs and retry them
- This ensures jobs are eventually processed even after worker failures
§Example
use cja::jobs::Job;
use serde::{Serialize, Deserialize};
#[derive(Debug, Serialize, Deserialize, Clone)]
struct EmailJob {
to: String,
subject: String,
body: String,
}
#[async_trait::async_trait]
impl Job<MyAppState> for EmailJob {
const NAME: &'static str = "EmailJob";
async fn run(&self, app_state: MyAppState) -> color_eyre::Result<()> {
// Send email logic here
println!("Sending email to {} with subject: {}", self.to, self.subject);
// You can access the database through app_state.db()
Ok(())
}
}§Enqueuing Jobs
let job = EmailJob {
to: "user@example.com".to_string(),
subject: "Welcome!".to_string(),
body: "Thank you for signing up!".to_string(),
};
// Enqueue the job with a context string for debugging
job.enqueue(app_state, "user-signup".to_string(), None).await?;§Job with Database Access
use cja::jobs::Job;
use serde::{Serialize, Deserialize};
#[derive(Debug, Serialize, Deserialize, Clone)]
struct ProcessPaymentJob {
user_id: i32,
amount_cents: i64,
}
#[async_trait::async_trait]
impl Job<MyAppState> for ProcessPaymentJob {
const NAME: &'static str = "ProcessPaymentJob";
async fn run(&self, app_state: MyAppState) -> color_eyre::Result<()> {
use crate::cja::app_state::AppState;
use sqlx::Row;
// Access the database through app_state
let user = sqlx::query("SELECT name FROM users WHERE id = $1")
.bind(self.user_id)
.fetch_one(app_state.db())
.await?;
println!("Processing payment of {} cents for user {} #{}",
self.amount_cents, user.get::<String, _>("name"), self.user_id);
sqlx::query("INSERT INTO payments (user_id, amount_cents) VALUES ($1, $2)")
.bind(self.user_id)
.bind(self.amount_cents)
.execute(app_state.db())
.await?;
Ok(())
}
}Required Associated Constants§
Required Methods§
Sourcefn run<'life0, 'async_trait>(
&'life0 self,
app_state: AppState,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn run<'life0, 'async_trait>(
&'life0 self,
app_state: AppState,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Execute the job logic.
This method has access to the full application state, including the database connection pool.
If this method returns an error, the job will be automatically retried with exponential backoff until it succeeds or exceeds max retries.
Provided Methods§
Sourcefn run_with_cancellation<'life0, 'async_trait>(
&'life0 self,
app_state: AppState,
_cancellation_token: CancellationToken,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn run_with_cancellation<'life0, 'async_trait>(
&'life0 self,
app_state: AppState,
_cancellation_token: CancellationToken,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Execute the job logic with an optional cancellation token for graceful shutdown.
Long-running jobs can override this method to check the cancellation token
periodically and exit early during shutdown. The default implementation
ignores the token and calls run(), so existing jobs continue to work
without changes.
§Example
#[async_trait::async_trait]
impl Job<MyAppState> for LongJob {
const NAME: &'static str = "LongJob";
async fn run(&self, _app_state: MyAppState) -> color_eyre::Result<()> {
// Simple implementation without cancellation support
Ok(())
}
async fn run_with_cancellation(
&self,
app_state: MyAppState,
cancellation_token: CancellationToken,
) -> color_eyre::Result<()> {
for i in 0..self.iterations {
// Check if shutdown was requested
if cancellation_token.is_cancelled() {
tracing::info!("Job cancelled after {} iterations", i);
return Err(color_eyre::eyre::eyre!("Job cancelled during shutdown"));
}
// Do some work
process_iteration(i, app_state.clone()).await?;
}
Ok(())
}
}Sourcefn run_from_value<'async_trait>(
value: Value,
app_state: AppState,
cancellation_token: CancellationToken,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
fn run_from_value<'async_trait>(
value: Value,
app_state: AppState,
cancellation_token: CancellationToken,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
Internal method used by the job system to deserialize and run jobs.
You typically won’t call this directly - it’s used by the job worker.
Sourcefn enqueue<'async_trait>(
self,
app_state: AppState,
context: String,
priority: Option<i32>,
) -> Pin<Box<dyn Future<Output = Result<(), EnqueueError>> + Send + 'async_trait>>where
Self: 'async_trait,
fn enqueue<'async_trait>(
self,
app_state: AppState,
context: String,
priority: Option<i32>,
) -> Pin<Box<dyn Future<Output = Result<(), EnqueueError>> + Send + 'async_trait>>where
Self: 'async_trait,
Enqueue this job for asynchronous execution.
The job will be persisted to the database and picked up by a worker process. Jobs are executed with at-least-once semantics and automatic retries on failure.
§Arguments
app_state- The application state containing the database connectioncontext- A string describing why this job was enqueued (useful for debugging)priority- Optional priority for this job instance. Higher values run first. Defaults to 0 ifNone. Use negative values for lower-priority background work.
§Example
let job = MyJob { data: "important work".to_string() };
// Default priority
job.clone().enqueue(app_state.clone(), "user-requested".to_string(), None).await?;
// Low priority background work
job.enqueue(app_state, "background-cleanup".to_string(), Some(-10)).await?;Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.