cja/cron/
worker.rs

1use std::{collections::HashMap, time::Duration};
2
3use chrono::{DateTime, Utc};
4use chrono_tz::Tz;
5use tokio_util::sync::CancellationToken;
6
7use crate::app_state::AppState as AS;
8
9use super::registry::{CronRegistry, TickError};
10
11/// Worker that executes cron jobs on a schedule
12///
13/// The worker runs cron jobs based on their configured schedules. For cron expressions,
14/// the timezone parameter determines when the cron expression is evaluated. For example,
15/// a cron expression "0 9 * * *" (9 AM daily) will run at 9 AM in the configured timezone.
16///
17/// Interval-based jobs ignore the timezone and run based on elapsed time since last run.
18pub struct Worker<AppState: AS> {
19    id: uuid::Uuid,
20    state: AppState,
21    registry: CronRegistry<AppState>,
22    pub(crate) started_at: DateTime<Utc>,
23    timezone: Tz,
24    sleep_duration: Duration,
25}
26
27impl<AppState: AS> Worker<AppState> {
28    /// Create a new Worker with UTC as the default timezone
29    pub fn new(state: AppState, registry: CronRegistry<AppState>) -> Self {
30        Self::new_with_timezone(state, registry, chrono_tz::UTC, Duration::from_secs(60))
31    }
32
33    /// Create a new Worker with a specific timezone
34    pub fn new_with_timezone(
35        state: AppState,
36        registry: CronRegistry<AppState>,
37        timezone: Tz,
38        sleep_duration: Duration,
39    ) -> Self {
40        Self {
41            id: uuid::Uuid::new_v4(),
42            state,
43            registry,
44            started_at: Utc::now(),
45            timezone,
46            sleep_duration,
47        }
48    }
49
50    /// Run the cron worker until the shutdown token is cancelled.
51    ///
52    /// The worker will execute cron jobs on their configured schedules until shutdown
53    /// is requested. When the `shutdown_token` is cancelled:
54    /// - The worker stops scheduling new cron job executions
55    /// - Any currently running tick completes before shutdown
56    /// - The sleep between ticks can be interrupted for faster shutdown
57    ///
58    /// # Arguments
59    ///
60    /// * `shutdown_token` - Cancellation token for graceful shutdown
61    ///
62    /// # Example
63    ///
64    /// ```ignore
65    /// use tokio_util::sync::CancellationToken;
66    ///
67    /// let shutdown_token = CancellationToken::new();
68    /// let worker_token = shutdown_token.clone();
69    ///
70    /// tokio::spawn(async move {
71    ///     worker.run(worker_token).await.unwrap();
72    /// });
73    ///
74    /// // Later, trigger shutdown
75    /// shutdown_token.cancel();
76    /// ```
77    pub async fn run(self, shutdown_token: CancellationToken) -> Result<(), TickError> {
78        tracing::debug!(cron_worker_id = %self.id, "Starting Cron loop");
79        loop {
80            tokio::select! {
81                result = self.tick() => {
82                    result?;
83                }
84                () = shutdown_token.cancelled() => {
85                    tracing::info!(cron_worker_id = %self.id, "Cron worker shutdown requested");
86                    break;
87                }
88            }
89
90            tokio::select! {
91                () = tokio::time::sleep(self.sleep_duration) => {}
92                () = shutdown_token.cancelled() => {
93                    tracing::info!(cron_worker_id = %self.id, "Cron worker shutdown during sleep");
94                    break;
95                }
96            }
97        }
98
99        tracing::info!(cron_worker_id = %self.id, "Cron worker shutdown complete");
100        Ok(())
101    }
102
103    async fn last_enqueue_map(&self) -> Result<HashMap<String, DateTime<Utc>>, TickError> {
104        let last_runs = sqlx::query!("SELECT name, last_run_at FROM Crons")
105            .fetch_all(self.state.db())
106            .await
107            .map_err(TickError::SqlxError)?;
108
109        let last_run_map: HashMap<String, DateTime<Utc>> = last_runs
110            .iter()
111            .map(|row| (row.name.clone(), row.last_run_at))
112            .collect();
113
114        Ok(last_run_map)
115    }
116
117    #[tracing::instrument(name = "cron.tick", skip_all, fields(cron_worker.id = %self.id))]
118    pub(crate) async fn tick(&self) -> Result<(), TickError> {
119        let last_enqueue_map = self.last_enqueue_map().await?;
120        for job in self.registry.jobs.values() {
121            job.tick(
122                self.state.clone(),
123                &last_enqueue_map,
124                self.started_at,
125                self.timezone,
126            )
127            .await?;
128        }
129
130        Ok(())
131    }
132}