1use std::{collections::HashMap, time::Duration};
2
3use chrono::{DateTime, Utc};
4use chrono_tz::Tz;
5use tokio_util::sync::CancellationToken;
6
7use crate::app_state::AppState as AS;
8
9use super::registry::{CronRegistry, TickError};
10
11pub struct Worker<AppState: AS> {
19 id: uuid::Uuid,
20 state: AppState,
21 registry: CronRegistry<AppState>,
22 pub(crate) started_at: DateTime<Utc>,
23 timezone: Tz,
24 sleep_duration: Duration,
25}
26
27impl<AppState: AS> Worker<AppState> {
28 pub fn new(state: AppState, registry: CronRegistry<AppState>) -> Self {
30 Self::new_with_timezone(state, registry, chrono_tz::UTC, Duration::from_secs(60))
31 }
32
33 pub fn new_with_timezone(
35 state: AppState,
36 registry: CronRegistry<AppState>,
37 timezone: Tz,
38 sleep_duration: Duration,
39 ) -> Self {
40 Self {
41 id: uuid::Uuid::new_v4(),
42 state,
43 registry,
44 started_at: Utc::now(),
45 timezone,
46 sleep_duration,
47 }
48 }
49
50 pub async fn run(self, shutdown_token: CancellationToken) -> Result<(), TickError> {
78 tracing::debug!(cron_worker_id = %self.id, "Starting Cron loop");
79 loop {
80 tokio::select! {
81 result = self.tick() => {
82 result?;
83 }
84 () = shutdown_token.cancelled() => {
85 tracing::info!(cron_worker_id = %self.id, "Cron worker shutdown requested");
86 break;
87 }
88 }
89
90 tokio::select! {
91 () = tokio::time::sleep(self.sleep_duration) => {}
92 () = shutdown_token.cancelled() => {
93 tracing::info!(cron_worker_id = %self.id, "Cron worker shutdown during sleep");
94 break;
95 }
96 }
97 }
98
99 tracing::info!(cron_worker_id = %self.id, "Cron worker shutdown complete");
100 Ok(())
101 }
102
103 async fn last_enqueue_map(&self) -> Result<HashMap<String, DateTime<Utc>>, TickError> {
104 let last_runs = sqlx::query!("SELECT name, last_run_at FROM Crons")
105 .fetch_all(self.state.db())
106 .await
107 .map_err(TickError::SqlxError)?;
108
109 let last_run_map: HashMap<String, DateTime<Utc>> = last_runs
110 .iter()
111 .map(|row| (row.name.clone(), row.last_run_at))
112 .collect();
113
114 Ok(last_run_map)
115 }
116
117 #[tracing::instrument(name = "cron.tick", skip_all, fields(cron_worker.id = %self.id))]
118 pub(crate) async fn tick(&self) -> Result<(), TickError> {
119 let last_enqueue_map = self.last_enqueue_map().await?;
120 for job in self.registry.jobs.values() {
121 job.tick(
122 self.state.clone(),
123 &last_enqueue_map,
124 self.started_at,
125 self.timezone,
126 )
127 .await?;
128 }
129
130 Ok(())
131 }
132}