cja/cron/
registry.rs

1use std::{collections::HashMap, error::Error, future::Future, pin::Pin, time::Duration};
2
3use chrono::Utc;
4use chrono_tz::Tz;
5
6use crate::app_state::AppState as AS;
7#[cfg(feature = "jobs")]
8use crate::jobs::Job;
9
10pub struct CronRegistry<AppState: AS> {
11    pub(super) jobs: HashMap<&'static str, CronJob<AppState>>,
12}
13
14#[async_trait::async_trait]
15pub trait CronFn<AppState: AS> {
16    // This collapses the error type to a string, right now thats because thats
17    // what the only consumer really needs. As we add more error debugging we'll
18    // need to change this.
19    async fn run(&self, app_state: AppState, context: String) -> Result<(), String>;
20}
21
22pub struct CronFnClosure<
23    AppState: AS,
24    FnError: Error + Send + Sync + 'static,
25    F: Fn(AppState, String) -> Pin<Box<dyn Future<Output = Result<(), FnError>> + Send>>
26        + Send
27        + Sync
28        + 'static,
29> {
30    pub(super) func: F,
31    _marker: std::marker::PhantomData<AppState>,
32}
33
34#[async_trait::async_trait]
35impl<
36    AppState: AS,
37    FnError: Error + Send + Sync + 'static,
38    F: Fn(AppState, String) -> Pin<Box<dyn Future<Output = Result<(), FnError>> + Send>>
39        + Send
40        + Sync
41        + 'static,
42> CronFn<AppState> for CronFnClosure<AppState, FnError, F>
43{
44    async fn run(&self, app_state: AppState, context: String) -> Result<(), String> {
45        (self.func)(app_state, context)
46            .await
47            .map_err(|err| format!("{err:?}"))
48    }
49}
50
51#[derive(Clone, Debug)]
52pub struct IntervalSchedule(pub Duration);
53
54impl IntervalSchedule {
55    fn should_run(
56        &self,
57        last_run: Option<&chrono::DateTime<Utc>>,
58        now: chrono::DateTime<Utc>,
59        _worker_started_at: chrono::DateTime<Utc>,
60        _timezone: Tz,
61    ) -> bool {
62        if let Some(last_run) = last_run {
63            let elapsed = now - last_run;
64            // If elapsed is negative, return false (don't run)
65            if elapsed < chrono::Duration::zero() {
66                return false;
67            }
68            // Safe to convert since we checked it's non-negative
69            let elapsed = elapsed.to_std().unwrap_or(Duration::ZERO);
70            elapsed > self.0
71        } else {
72            true
73        }
74    }
75
76    pub fn next_run(
77        &self,
78        last_run: Option<&chrono::DateTime<Utc>>,
79        now: chrono::DateTime<Utc>,
80        timezone: Tz,
81    ) -> chrono::DateTime<Tz> {
82        let last_run = last_run.unwrap_or(&now);
83        let last_run_tz = last_run.with_timezone(&timezone);
84        let duration: chrono::Duration = chrono::Duration::from_std(self.0).unwrap();
85        let next_run = last_run_tz.checked_add_signed(duration).unwrap();
86        next_run.with_timezone(&timezone)
87    }
88}
89
90#[derive(Clone, Debug)]
91pub struct CronSchedule(pub Box<cron::Schedule>);
92
93impl CronSchedule {
94    fn should_run(
95        &self,
96        last_run: Option<&chrono::DateTime<Utc>>,
97        now: chrono::DateTime<Utc>,
98        worker_started_at: chrono::DateTime<Utc>,
99        timezone: Tz,
100    ) -> bool {
101        // Use last run time if available, otherwise use worker start time
102        let last_run = last_run.unwrap_or(&worker_started_at);
103        let last_run_tz = last_run.with_timezone(&timezone);
104
105        if let Some(next_run) = self.0.after(&last_run_tz).next() {
106            let now_tz = now.with_timezone(&timezone);
107            now_tz >= next_run
108        } else {
109            false
110        }
111    }
112
113    pub fn next_run(
114        &self,
115        last_run: Option<&chrono::DateTime<Utc>>,
116        now: chrono::DateTime<Utc>,
117        timezone: Tz,
118    ) -> chrono::DateTime<Tz> {
119        let last_run = last_run.unwrap_or(&now);
120        let last_run_tz = last_run.with_timezone(&timezone);
121        self.0.after(&last_run_tz).next().unwrap()
122    }
123}
124
125#[derive(Clone, Debug)]
126pub enum Schedule {
127    Interval(IntervalSchedule),
128    Cron(CronSchedule),
129}
130
131impl Schedule {
132    pub fn should_run(
133        &self,
134        last_run: Option<&chrono::DateTime<Utc>>,
135        now: chrono::DateTime<Utc>,
136        worker_started_at: chrono::DateTime<Utc>,
137        timezone: Tz,
138    ) -> bool {
139        match self {
140            Schedule::Interval(interval) => {
141                interval.should_run(last_run, now, worker_started_at, timezone)
142            }
143            Schedule::Cron(cron) => cron.should_run(last_run, now, worker_started_at, timezone),
144        }
145    }
146
147    pub fn next_run(
148        &self,
149        last_run: Option<&chrono::DateTime<Utc>>,
150        now: chrono::DateTime<Utc>,
151        timezone: Tz,
152    ) -> chrono::DateTime<Tz> {
153        match self {
154            Schedule::Interval(interval) => interval.next_run(last_run, now, timezone),
155            Schedule::Cron(cron) => cron.next_run(last_run, now, timezone),
156        }
157    }
158}
159
160#[allow(clippy::type_complexity)]
161pub struct CronJob<AppState: AS> {
162    pub name: &'static str,
163    pub description: Option<&'static str>,
164    func: Box<dyn CronFn<AppState> + Send + Sync + 'static>,
165    pub schedule: Schedule,
166}
167
168#[derive(Debug, thiserror::Error)]
169#[error("TickError: {0}")]
170pub enum TickError {
171    JobError(String),
172    SqlxError(sqlx::Error),
173}
174
175impl<AppState: AS> CronJob<AppState> {
176    #[tracing::instrument(
177        name = "cron_job.tick",
178        skip_all,
179        fields(
180            cron_job.name = self.name,
181            cron_job.schedule = ?self.schedule
182        )
183    )]
184    pub(crate) async fn tick(
185        &self,
186        app_state: AppState,
187        last_enqueue_map: &HashMap<String, chrono::DateTime<Utc>>,
188        worker_started_at: chrono::DateTime<Utc>,
189        timezone: Tz,
190    ) -> Result<(), TickError> {
191        let last_enqueue = last_enqueue_map.get(self.name);
192        let context = format!("Cron@{}", app_state.version());
193        let now = Utc::now();
194
195        let should_run = self
196            .schedule
197            .should_run(last_enqueue, now, worker_started_at, timezone);
198
199        if should_run {
200            tracing::info!(
201                task_name = self.name,
202                last_run = ?last_enqueue,
203                "Enqueuing Task"
204            );
205            (self.func)
206                .run(app_state.clone(), context)
207                .await
208                .map_err(TickError::JobError)?;
209
210            sqlx::query!(
211                "INSERT INTO Crons (cron_id, name, last_run_at, created_at, updated_at)
212                VALUES ($1, $2, $3, $4, $5)
213                ON CONFLICT (name)
214                DO UPDATE SET
215                last_run_at = $3",
216                uuid::Uuid::new_v4(),
217                self.name,
218                now,
219                now,
220                now
221            )
222            .execute(app_state.db())
223            .await
224            .map_err(TickError::SqlxError)?;
225        }
226
227        Ok(())
228    }
229
230    pub async fn run(&self, app_state: AppState, context: String) -> Result<(), String> {
231        (self.func).run(app_state, context).await
232    }
233}
234
235impl<AppState: AS> CronRegistry<AppState> {
236    pub fn new() -> Self {
237        Self {
238            jobs: HashMap::new(),
239        }
240    }
241
242    #[tracing::instrument(name = "cron.register", skip_all, fields(cron_job.name = name, cron_job.interval = ?interval))]
243    pub fn register<FnError: Error + Send + Sync + 'static>(
244        &mut self,
245        name: &'static str,
246        description: Option<&'static str>,
247        interval: Duration,
248        job: impl Fn(AppState, String) -> Pin<Box<dyn Future<Output = Result<(), FnError>> + Send>>
249        + Send
250        + Sync
251        + 'static,
252    ) {
253        let cron_job = CronJob {
254            name,
255            description,
256            func: Box::new(CronFnClosure {
257                func: job,
258                _marker: std::marker::PhantomData,
259            }),
260            schedule: Schedule::Interval(IntervalSchedule(interval)),
261        };
262        self.jobs.insert(name, cron_job);
263    }
264
265    #[tracing::instrument(name = "cron.register_with_cron", skip_all, fields(cron_job.name = name, cron_job.cron = cron_expr))]
266    pub fn register_with_cron<FnError: Error + Send + Sync + 'static>(
267        &mut self,
268        name: &'static str,
269        description: Option<&'static str>,
270        cron_expr: &str,
271        job: impl Fn(AppState, String) -> Pin<Box<dyn Future<Output = Result<(), FnError>> + Send>>
272        + Send
273        + Sync
274        + 'static,
275    ) -> Result<(), cron::error::Error> {
276        let cron_schedule = cron_expr.parse::<cron::Schedule>()?;
277        let cron_job = CronJob {
278            name,
279            description,
280            func: Box::new(CronFnClosure {
281                func: job,
282                _marker: std::marker::PhantomData,
283            }),
284            schedule: Schedule::Cron(CronSchedule(Box::new(cron_schedule))),
285        };
286        self.jobs.insert(name, cron_job);
287        Ok(())
288    }
289
290    #[cfg(feature = "jobs")]
291    #[tracing::instrument(name = "cron.register_job", skip_all, fields(cron_job.name = J::NAME, cron_job.interval = ?interval))]
292    pub fn register_job<J: Job<AppState>>(
293        &mut self,
294        job: J,
295        description: Option<&'static str>,
296        interval: Duration,
297    ) {
298        self.register(J::NAME, description, interval, move |app_state, context| {
299            J::enqueue(job.clone(), app_state, context, None)
300        });
301    }
302
303    #[cfg(feature = "jobs")]
304    #[tracing::instrument(name = "cron.register_job_with_cron", skip_all, fields(cron_job.name = J::NAME, cron_job.cron = cron_expr))]
305    pub fn register_job_with_cron<J: Job<AppState>>(
306        &mut self,
307        job: J,
308        description: Option<&'static str>,
309        cron_expr: &str,
310    ) -> Result<(), cron::error::Error> {
311        self.register_with_cron(
312            J::NAME,
313            description,
314            cron_expr,
315            move |app_state, context| J::enqueue(job.clone(), app_state, context, None),
316        )
317    }
318
319    #[cfg(feature = "jobs")]
320    #[tracing::instrument(name = "cron.get", skip_all, fields(cron_job.name = name))]
321    pub fn get(&self, name: &str) -> Option<&CronJob<AppState>> {
322        self.jobs.get(name)
323    }
324
325    /// Returns a reference to all registered cron jobs.
326    pub fn jobs(&self) -> &HashMap<&'static str, CronJob<AppState>> {
327        &self.jobs
328    }
329}
330
331impl<AppState: AS> Default for CronRegistry<AppState> {
332    fn default() -> Self {
333        Self::new()
334    }
335}
336
337#[cfg(test)]
338mod test {
339    use crate::app_state::AppState;
340    use crate::server::cookies::CookieKey;
341
342    use super::*;
343
344    #[derive(Clone)]
345    struct TestAppState {
346        db: sqlx::PgPool,
347        cookie_key: CookieKey,
348    }
349
350    impl AppState for TestAppState {
351        fn db(&self) -> &sqlx::PgPool {
352            &self.db
353        }
354
355        fn version(&self) -> &'static str {
356            "test"
357        }
358
359        fn cookie_key(&self) -> &CookieKey {
360            &self.cookie_key
361        }
362    }
363
364    #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
365    struct TestJob;
366
367    #[async_trait::async_trait]
368    impl Job<TestAppState> for TestJob {
369        const NAME: &'static str = "test_job";
370
371        async fn run(&self, _app_state: TestAppState) -> color_eyre::Result<()> {
372            Ok(())
373        }
374    }
375
376    #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
377    struct FailingJob;
378
379    #[async_trait::async_trait]
380    impl Job<TestAppState> for FailingJob {
381        const NAME: &'static str = "failing_job";
382
383        async fn run(&self, _app_state: TestAppState) -> color_eyre::Result<()> {
384            Err(color_eyre::eyre::eyre!("Test error"))
385        }
386    }
387
388    #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
389    struct SecondTestJob;
390
391    #[async_trait::async_trait]
392    impl Job<TestAppState> for SecondTestJob {
393        const NAME: &'static str = "second_test_job";
394
395        async fn run(&self, _app_state: TestAppState) -> color_eyre::Result<()> {
396            Ok(())
397        }
398    }
399
400    #[sqlx::test]
401    async fn test_tick_creates_new_cron_record(db: sqlx::PgPool) {
402        let app_state = TestAppState {
403            db: db.clone(),
404            cookie_key: CookieKey::generate(),
405        };
406        let mut registry = CronRegistry::new();
407        registry.register_job(TestJob, None, Duration::from_secs(1));
408
409        let cron_job = registry.jobs.get(TestJob::NAME).unwrap();
410        assert_eq!(cron_job.name, TestJob::NAME);
411        assert!(
412            matches!(cron_job.schedule, Schedule::Interval(IntervalSchedule(d)) if d == Duration::from_secs(1))
413        );
414
415        let worker = crate::cron::Worker::new(app_state.clone(), registry);
416
417        let existing_record =
418            sqlx::query!("SELECT cron_id FROM Crons where name = $1", TestJob::NAME)
419                .fetch_optional(&app_state.db)
420                .await
421                .unwrap();
422        assert!(
423            existing_record.is_none(),
424            "Record should not exist {}",
425            existing_record.unwrap().cron_id
426        );
427
428        worker.tick().await.unwrap();
429
430        let last_run = sqlx::query!(
431            "SELECT last_run_at FROM Crons WHERE name = $1",
432            TestJob::NAME
433        )
434        .fetch_one(&app_state.db)
435        .await
436        .unwrap();
437
438        let now = Utc::now();
439        let last_run_at = last_run.last_run_at;
440        let diff = now.signed_duration_since(last_run_at);
441        assert!(diff.num_milliseconds() < 1000);
442    }
443
444    #[sqlx::test]
445    async fn test_tick_skips_updating_existing_cron_record(db: sqlx::PgPool) {
446        let app_state = TestAppState {
447            db: db.clone(),
448            cookie_key: CookieKey::generate(),
449        };
450        let mut registry = CronRegistry::new();
451        registry.register_job(TestJob, None, Duration::from_secs(60));
452        let worker = crate::cron::Worker::new(app_state.clone(), registry);
453
454        let previously = Utc::now();
455        sqlx::query!(
456            "INSERT INTO Crons (cron_id, name, last_run_at, created_at, updated_at)
457            VALUES ($1, $2, $3, $3, $3)
458            ON CONFLICT (name)
459            DO UPDATE SET
460            last_run_at = $3",
461            uuid::Uuid::new_v4(),
462            TestJob::NAME,
463            previously
464        )
465        .execute(&app_state.db)
466        .await
467        .unwrap();
468
469        worker.tick().await.unwrap();
470
471        let last_run = sqlx::query!(
472            "SELECT last_run_at FROM Crons WHERE name = $1",
473            TestJob::NAME
474        )
475        .fetch_one(&app_state.db)
476        .await
477        .unwrap();
478
479        let diff = last_run.last_run_at.signed_duration_since(previously);
480        assert!(diff.num_milliseconds() < 50);
481    }
482
483    #[sqlx::test]
484    async fn test_tick_updates_cron_record_when_interval_elapsed(db: sqlx::PgPool) {
485        let app_state = TestAppState {
486            db: db.clone(),
487            cookie_key: CookieKey::generate(),
488        };
489        let mut registry = CronRegistry::new();
490        registry.register_job(TestJob, None, Duration::from_secs(1));
491        let worker = crate::cron::Worker::new(app_state.clone(), registry);
492
493        let two_seconds_ago = Utc::now() - chrono::Duration::seconds(2);
494        sqlx::query!(
495            "INSERT INTO Crons (cron_id, name, last_run_at, created_at, updated_at)
496            VALUES ($1, $2, $3, $3, $3)",
497            uuid::Uuid::new_v4(),
498            TestJob::NAME,
499            two_seconds_ago
500        )
501        .execute(&app_state.db)
502        .await
503        .unwrap();
504
505        worker.tick().await.unwrap();
506
507        let last_run = sqlx::query!(
508            "SELECT last_run_at FROM Crons WHERE name = $1",
509            TestJob::NAME
510        )
511        .fetch_one(&app_state.db)
512        .await
513        .unwrap();
514
515        assert!(last_run.last_run_at > two_seconds_ago);
516        let diff = Utc::now().signed_duration_since(last_run.last_run_at);
517        assert!(diff.num_milliseconds() < 1000);
518    }
519
520    #[sqlx::test]
521    async fn test_tick_enqueues_failing_job_successfully(db: sqlx::PgPool) {
522        let app_state = TestAppState {
523            db: db.clone(),
524            cookie_key: CookieKey::generate(),
525        };
526        let mut registry = CronRegistry::new();
527        registry.register_job(FailingJob, None, Duration::from_secs(1));
528
529        let cron_job = registry.jobs.get(FailingJob::NAME).unwrap();
530        let last_enqueue_map = HashMap::new();
531        let worker_started_at = Utc::now();
532
533        let result = cron_job
534            .tick(
535                app_state.clone(),
536                &last_enqueue_map,
537                worker_started_at,
538                chrono_tz::UTC,
539            )
540            .await;
541        assert!(result.is_ok());
542
543        let cron_record = sqlx::query!(
544            "SELECT last_run_at FROM Crons WHERE name = $1",
545            FailingJob::NAME
546        )
547        .fetch_one(&app_state.db)
548        .await
549        .unwrap();
550
551        let now = Utc::now();
552        let diff = now.signed_duration_since(cron_record.last_run_at);
553        assert!(diff.num_milliseconds() < 1000);
554
555        let job_count = sqlx::query!(
556            "SELECT COUNT(*) as count FROM jobs WHERE name = $1",
557            FailingJob::NAME
558        )
559        .fetch_one(&app_state.db)
560        .await
561        .unwrap();
562        assert_eq!(job_count.count.unwrap(), 1);
563    }
564
565    #[sqlx::test]
566    async fn test_tick_with_custom_function_error(db: sqlx::PgPool) {
567        let app_state = TestAppState {
568            db: db.clone(),
569            cookie_key: CookieKey::generate(),
570        };
571        #[derive(Debug, thiserror::Error)]
572        #[error("Custom function error")]
573        struct CustomError;
574
575        let mut registry = CronRegistry::new();
576        registry.register(
577            "custom_failing",
578            None,
579            Duration::from_secs(1),
580            |_app_state, _context| Box::pin(async { Err(CustomError) }),
581        );
582
583        let cron_job = registry.jobs.get("custom_failing").unwrap();
584        let last_enqueue_map = HashMap::new();
585        let worker_started_at = Utc::now();
586
587        let result = cron_job
588            .tick(
589                app_state.clone(),
590                &last_enqueue_map,
591                worker_started_at,
592                chrono_tz::UTC,
593            )
594            .await;
595        assert!(result.is_err());
596        match result.unwrap_err() {
597            TickError::JobError(err) => {
598                assert!(err.contains("CustomError"));
599            }
600            TickError::SqlxError(_) => panic!("Expected JobError"),
601        }
602
603        let cron_count = sqlx::query!(
604            "SELECT COUNT(*) as count FROM Crons WHERE name = $1",
605            "custom_failing"
606        )
607        .fetch_one(&app_state.db)
608        .await
609        .unwrap();
610        assert_eq!(cron_count.count.unwrap(), 0);
611    }
612
613    #[sqlx::test]
614    async fn test_worker_tick_with_multiple_jobs(db: sqlx::PgPool) {
615        let app_state = TestAppState {
616            db: db.clone(),
617            cookie_key: CookieKey::generate(),
618        };
619        let mut registry = CronRegistry::new();
620        registry.register_job(TestJob, None, Duration::from_secs(1));
621        registry.register_job(SecondTestJob, None, Duration::from_secs(1));
622
623        assert_eq!(registry.jobs.len(), 2);
624
625        let worker = crate::cron::Worker::new(app_state.clone(), registry);
626
627        worker.tick().await.unwrap();
628
629        let test_job_record = sqlx::query!(
630            "SELECT last_run_at FROM Crons WHERE name = $1",
631            TestJob::NAME
632        )
633        .fetch_one(&app_state.db)
634        .await
635        .unwrap();
636
637        let second_job_record = sqlx::query!(
638            "SELECT last_run_at FROM Crons WHERE name = $1",
639            SecondTestJob::NAME
640        )
641        .fetch_one(&app_state.db)
642        .await
643        .unwrap();
644
645        let now = Utc::now();
646        let diff1 = now.signed_duration_since(test_job_record.last_run_at);
647        let diff2 = now.signed_duration_since(second_job_record.last_run_at);
648
649        assert!(diff1.num_milliseconds() < 1000);
650        assert!(diff2.num_milliseconds() < 1000);
651    }
652
653    #[sqlx::test]
654    async fn test_worker_respects_existing_last_run_times(db: sqlx::PgPool) {
655        let app_state = TestAppState {
656            db: db.clone(),
657            cookie_key: CookieKey::generate(),
658        };
659        let mut registry = CronRegistry::new();
660        registry.register_job(TestJob, None, Duration::from_secs(10));
661        registry.register_job(SecondTestJob, None, Duration::from_secs(5));
662
663        let worker = crate::cron::Worker::new(app_state.clone(), registry);
664
665        let recent_time = Utc::now() - chrono::Duration::seconds(3);
666        let old_time = Utc::now() - chrono::Duration::seconds(10);
667
668        sqlx::query!(
669            "INSERT INTO Crons (cron_id, name, last_run_at, created_at, updated_at)
670                VALUES ($1, $2, $3, $3, $3)",
671            uuid::Uuid::new_v4(),
672            TestJob::NAME,
673            recent_time
674        )
675        .execute(&app_state.db)
676        .await
677        .unwrap();
678
679        sqlx::query!(
680            "INSERT INTO Crons (cron_id, name, last_run_at, created_at, updated_at)
681                VALUES ($1, $2, $3, $3, $3)",
682            uuid::Uuid::new_v4(),
683            SecondTestJob::NAME,
684            old_time
685        )
686        .execute(&app_state.db)
687        .await
688        .unwrap();
689
690        worker.tick().await.unwrap();
691
692        let test_job_record = sqlx::query!(
693            "SELECT last_run_at FROM Crons WHERE name = $1",
694            TestJob::NAME
695        )
696        .fetch_one(&app_state.db)
697        .await
698        .unwrap();
699
700        let second_job_record = sqlx::query!(
701            "SELECT last_run_at FROM Crons WHERE name = $1",
702            SecondTestJob::NAME
703        )
704        .fetch_one(&app_state.db)
705        .await
706        .unwrap();
707
708        let recent_diff = test_job_record
709            .last_run_at
710            .signed_duration_since(recent_time);
711        assert!(recent_diff.num_milliseconds() < 100);
712        assert!(second_job_record.last_run_at > old_time);
713    }
714
715    #[sqlx::test]
716    async fn test_tick_handles_future_last_run_time(db: sqlx::PgPool) {
717        let app_state = TestAppState {
718            db: db.clone(),
719            cookie_key: CookieKey::generate(),
720        };
721        let mut registry = CronRegistry::new();
722        registry.register_job(TestJob, None, Duration::from_secs(1));
723
724        let cron_job = registry.jobs.get(TestJob::NAME).unwrap();
725
726        let future_time = Utc::now() + chrono::Duration::hours(1);
727        let mut last_enqueue_map = HashMap::new();
728        last_enqueue_map.insert(TestJob::NAME.to_string(), future_time);
729        let worker_started_at = Utc::now();
730
731        let result = cron_job
732            .tick(
733                app_state.clone(),
734                &last_enqueue_map,
735                worker_started_at,
736                chrono_tz::UTC,
737            )
738            .await;
739
740        // With future last_run time, the job should not run
741        assert!(result.is_ok());
742
743        let cron_count = sqlx::query!(
744            "SELECT COUNT(*) as count FROM Crons WHERE name = $1",
745            TestJob::NAME
746        )
747        .fetch_one(&app_state.db)
748        .await
749        .unwrap();
750        assert_eq!(
751            cron_count.count.unwrap(),
752            0,
753            "Should not have created cron record with future last_run time"
754        );
755    }
756
757    #[sqlx::test]
758    async fn test_cron_expression_scheduling(db: sqlx::PgPool) {
759        let app_state = TestAppState {
760            db: db.clone(),
761            cookie_key: CookieKey::generate(),
762        };
763        let mut registry = CronRegistry::new();
764
765        // Register with cron expression that runs every minute
766        registry
767            .register_job_with_cron(TestJob, None, "0 * * * * * *")
768            .unwrap();
769
770        let cron_job = registry.jobs.get(TestJob::NAME).unwrap();
771        assert!(matches!(cron_job.schedule, Schedule::Cron(_)));
772
773        // First run should wait for the next scheduled time based on worker start
774        let last_enqueue_map = HashMap::new();
775        // Use a time in the past to ensure the cron will trigger
776        let worker_started_at = Utc::now() - chrono::Duration::minutes(2);
777        let result = cron_job
778            .tick(
779                app_state.clone(),
780                &last_enqueue_map,
781                worker_started_at,
782                chrono_tz::UTC,
783            )
784            .await;
785        assert!(result.is_ok());
786
787        // Verify cron record was created (if it ran)
788        let cron_record = sqlx::query!(
789            "SELECT last_run_at FROM Crons WHERE name = $1",
790            TestJob::NAME
791        )
792        .fetch_optional(&app_state.db)
793        .await
794        .unwrap();
795
796        // The job should have run since we used a worker start time 2 minutes ago
797        assert!(cron_record.is_some());
798        if let Some(record) = cron_record {
799            let now = Utc::now();
800            let diff = now.signed_duration_since(record.last_run_at);
801            assert!(diff.num_milliseconds() < 1000);
802        }
803    }
804
805    #[sqlx::test]
806    async fn test_cron_expression_respects_schedule(db: sqlx::PgPool) {
807        let app_state = TestAppState {
808            db: db.clone(),
809            cookie_key: CookieKey::generate(),
810        };
811        let mut registry = CronRegistry::new();
812
813        // Register with cron expression that runs at specific minute (e.g., minute 30)
814        registry
815            .register_job_with_cron(TestJob, None, "0 30 * * * * *")
816            .unwrap();
817
818        let cron_job = registry.jobs.get(TestJob::NAME).unwrap();
819
820        // Set last run to 29 minutes ago (shouldn't trigger if we're not at minute 30)
821        let last_run = Utc::now() - chrono::Duration::minutes(29);
822        let mut last_enqueue_map = HashMap::new();
823        last_enqueue_map.insert(TestJob::NAME.to_string(), last_run);
824
825        // This might or might not run depending on current minute
826        // We'll just verify it doesn't error
827        let worker_started_at = Utc::now();
828        let result = cron_job
829            .tick(
830                app_state.clone(),
831                &last_enqueue_map,
832                worker_started_at,
833                chrono_tz::UTC,
834            )
835            .await;
836        assert!(result.is_ok());
837    }
838
839    #[test]
840    fn test_invalid_cron_expression() {
841        let mut registry: CronRegistry<TestAppState> = CronRegistry::new();
842
843        // Invalid cron expression should return error
844        let result = registry.register_with_cron(
845            "invalid_cron",
846            None,
847            "invalid expression",
848            |_app_state, _context| Box::pin(async { Ok::<(), std::io::Error>(()) }),
849        );
850
851        assert!(result.is_err());
852    }
853
854    #[sqlx::test]
855    async fn test_mixed_interval_and_cron_jobs(db: sqlx::PgPool) {
856        let app_state = TestAppState {
857            db: db.clone(),
858            cookie_key: CookieKey::generate(),
859        };
860        let mut registry = CronRegistry::new();
861
862        // Register one job with interval
863        registry.register_job(TestJob, None, Duration::from_secs(1));
864
865        // Register another job with cron expression (every second)
866        registry
867            .register_job_with_cron(SecondTestJob, None, "* * * * * * *")
868            .unwrap();
869
870        assert_eq!(registry.jobs.len(), 2);
871
872        let interval_job = registry.jobs.get(TestJob::NAME).unwrap();
873        assert!(matches!(interval_job.schedule, Schedule::Interval(_)));
874
875        let cron_job = registry.jobs.get(SecondTestJob::NAME).unwrap();
876        assert!(matches!(cron_job.schedule, Schedule::Cron(_)));
877
878        // Create worker with start time in past to ensure both jobs run
879        let mut worker = crate::cron::Worker::new(app_state.clone(), registry);
880        worker.started_at = Utc::now() - chrono::Duration::seconds(2);
881
882        // Both jobs should run on first tick
883        worker.tick().await.unwrap();
884
885        let test_job_count = sqlx::query!(
886            "SELECT COUNT(*) as count FROM Crons WHERE name = $1",
887            TestJob::NAME
888        )
889        .fetch_one(&app_state.db)
890        .await
891        .unwrap();
892
893        let second_job_count = sqlx::query!(
894            "SELECT COUNT(*) as count FROM Crons WHERE name = $1",
895            SecondTestJob::NAME
896        )
897        .fetch_one(&app_state.db)
898        .await
899        .unwrap();
900
901        assert_eq!(test_job_count.count.unwrap(), 1);
902        assert_eq!(second_job_count.count.unwrap(), 1);
903    }
904}