1use std::{collections::HashMap, error::Error, future::Future, pin::Pin, time::Duration};
2
3use chrono::Utc;
4use chrono_tz::Tz;
5
6use crate::app_state::AppState as AS;
7#[cfg(feature = "jobs")]
8use crate::jobs::Job;
9
10pub struct CronRegistry<AppState: AS> {
11 pub(super) jobs: HashMap<&'static str, CronJob<AppState>>,
12}
13
14#[async_trait::async_trait]
15pub trait CronFn<AppState: AS> {
16 async fn run(&self, app_state: AppState, context: String) -> Result<(), String>;
20}
21
22pub struct CronFnClosure<
23 AppState: AS,
24 FnError: Error + Send + Sync + 'static,
25 F: Fn(AppState, String) -> Pin<Box<dyn Future<Output = Result<(), FnError>> + Send>>
26 + Send
27 + Sync
28 + 'static,
29> {
30 pub(super) func: F,
31 _marker: std::marker::PhantomData<AppState>,
32}
33
34#[async_trait::async_trait]
35impl<
36 AppState: AS,
37 FnError: Error + Send + Sync + 'static,
38 F: Fn(AppState, String) -> Pin<Box<dyn Future<Output = Result<(), FnError>> + Send>>
39 + Send
40 + Sync
41 + 'static,
42> CronFn<AppState> for CronFnClosure<AppState, FnError, F>
43{
44 async fn run(&self, app_state: AppState, context: String) -> Result<(), String> {
45 (self.func)(app_state, context)
46 .await
47 .map_err(|err| format!("{err:?}"))
48 }
49}
50
51#[derive(Clone, Debug)]
52pub struct IntervalSchedule(pub Duration);
53
54impl IntervalSchedule {
55 fn should_run(
56 &self,
57 last_run: Option<&chrono::DateTime<Utc>>,
58 now: chrono::DateTime<Utc>,
59 _worker_started_at: chrono::DateTime<Utc>,
60 _timezone: Tz,
61 ) -> bool {
62 if let Some(last_run) = last_run {
63 let elapsed = now - last_run;
64 if elapsed < chrono::Duration::zero() {
66 return false;
67 }
68 let elapsed = elapsed.to_std().unwrap_or(Duration::ZERO);
70 elapsed > self.0
71 } else {
72 true
73 }
74 }
75
76 pub fn next_run(
77 &self,
78 last_run: Option<&chrono::DateTime<Utc>>,
79 now: chrono::DateTime<Utc>,
80 timezone: Tz,
81 ) -> chrono::DateTime<Tz> {
82 let last_run = last_run.unwrap_or(&now);
83 let last_run_tz = last_run.with_timezone(&timezone);
84 let duration: chrono::Duration = chrono::Duration::from_std(self.0).unwrap();
85 let next_run = last_run_tz.checked_add_signed(duration).unwrap();
86 next_run.with_timezone(&timezone)
87 }
88}
89
90#[derive(Clone, Debug)]
91pub struct CronSchedule(pub Box<cron::Schedule>);
92
93impl CronSchedule {
94 fn should_run(
95 &self,
96 last_run: Option<&chrono::DateTime<Utc>>,
97 now: chrono::DateTime<Utc>,
98 worker_started_at: chrono::DateTime<Utc>,
99 timezone: Tz,
100 ) -> bool {
101 let last_run = last_run.unwrap_or(&worker_started_at);
103 let last_run_tz = last_run.with_timezone(&timezone);
104
105 if let Some(next_run) = self.0.after(&last_run_tz).next() {
106 let now_tz = now.with_timezone(&timezone);
107 now_tz >= next_run
108 } else {
109 false
110 }
111 }
112
113 pub fn next_run(
114 &self,
115 last_run: Option<&chrono::DateTime<Utc>>,
116 now: chrono::DateTime<Utc>,
117 timezone: Tz,
118 ) -> chrono::DateTime<Tz> {
119 let last_run = last_run.unwrap_or(&now);
120 let last_run_tz = last_run.with_timezone(&timezone);
121 self.0.after(&last_run_tz).next().unwrap()
122 }
123}
124
125#[derive(Clone, Debug)]
126pub enum Schedule {
127 Interval(IntervalSchedule),
128 Cron(CronSchedule),
129}
130
131impl Schedule {
132 pub fn should_run(
133 &self,
134 last_run: Option<&chrono::DateTime<Utc>>,
135 now: chrono::DateTime<Utc>,
136 worker_started_at: chrono::DateTime<Utc>,
137 timezone: Tz,
138 ) -> bool {
139 match self {
140 Schedule::Interval(interval) => {
141 interval.should_run(last_run, now, worker_started_at, timezone)
142 }
143 Schedule::Cron(cron) => cron.should_run(last_run, now, worker_started_at, timezone),
144 }
145 }
146
147 pub fn next_run(
148 &self,
149 last_run: Option<&chrono::DateTime<Utc>>,
150 now: chrono::DateTime<Utc>,
151 timezone: Tz,
152 ) -> chrono::DateTime<Tz> {
153 match self {
154 Schedule::Interval(interval) => interval.next_run(last_run, now, timezone),
155 Schedule::Cron(cron) => cron.next_run(last_run, now, timezone),
156 }
157 }
158}
159
160#[allow(clippy::type_complexity)]
161pub struct CronJob<AppState: AS> {
162 pub name: &'static str,
163 pub description: Option<&'static str>,
164 func: Box<dyn CronFn<AppState> + Send + Sync + 'static>,
165 pub schedule: Schedule,
166}
167
168#[derive(Debug, thiserror::Error)]
169#[error("TickError: {0}")]
170pub enum TickError {
171 JobError(String),
172 SqlxError(sqlx::Error),
173}
174
175impl<AppState: AS> CronJob<AppState> {
176 #[tracing::instrument(
177 name = "cron_job.tick",
178 skip_all,
179 fields(
180 cron_job.name = self.name,
181 cron_job.schedule = ?self.schedule
182 )
183 )]
184 pub(crate) async fn tick(
185 &self,
186 app_state: AppState,
187 last_enqueue_map: &HashMap<String, chrono::DateTime<Utc>>,
188 worker_started_at: chrono::DateTime<Utc>,
189 timezone: Tz,
190 ) -> Result<(), TickError> {
191 let last_enqueue = last_enqueue_map.get(self.name);
192 let context = format!("Cron@{}", app_state.version());
193 let now = Utc::now();
194
195 let should_run = self
196 .schedule
197 .should_run(last_enqueue, now, worker_started_at, timezone);
198
199 if should_run {
200 tracing::info!(
201 task_name = self.name,
202 last_run = ?last_enqueue,
203 "Enqueuing Task"
204 );
205 (self.func)
206 .run(app_state.clone(), context)
207 .await
208 .map_err(TickError::JobError)?;
209
210 sqlx::query!(
211 "INSERT INTO Crons (cron_id, name, last_run_at, created_at, updated_at)
212 VALUES ($1, $2, $3, $4, $5)
213 ON CONFLICT (name)
214 DO UPDATE SET
215 last_run_at = $3",
216 uuid::Uuid::new_v4(),
217 self.name,
218 now,
219 now,
220 now
221 )
222 .execute(app_state.db())
223 .await
224 .map_err(TickError::SqlxError)?;
225 }
226
227 Ok(())
228 }
229
230 pub async fn run(&self, app_state: AppState, context: String) -> Result<(), String> {
231 (self.func).run(app_state, context).await
232 }
233}
234
235impl<AppState: AS> CronRegistry<AppState> {
236 pub fn new() -> Self {
237 Self {
238 jobs: HashMap::new(),
239 }
240 }
241
242 #[tracing::instrument(name = "cron.register", skip_all, fields(cron_job.name = name, cron_job.interval = ?interval))]
243 pub fn register<FnError: Error + Send + Sync + 'static>(
244 &mut self,
245 name: &'static str,
246 description: Option<&'static str>,
247 interval: Duration,
248 job: impl Fn(AppState, String) -> Pin<Box<dyn Future<Output = Result<(), FnError>> + Send>>
249 + Send
250 + Sync
251 + 'static,
252 ) {
253 let cron_job = CronJob {
254 name,
255 description,
256 func: Box::new(CronFnClosure {
257 func: job,
258 _marker: std::marker::PhantomData,
259 }),
260 schedule: Schedule::Interval(IntervalSchedule(interval)),
261 };
262 self.jobs.insert(name, cron_job);
263 }
264
265 #[tracing::instrument(name = "cron.register_with_cron", skip_all, fields(cron_job.name = name, cron_job.cron = cron_expr))]
266 pub fn register_with_cron<FnError: Error + Send + Sync + 'static>(
267 &mut self,
268 name: &'static str,
269 description: Option<&'static str>,
270 cron_expr: &str,
271 job: impl Fn(AppState, String) -> Pin<Box<dyn Future<Output = Result<(), FnError>> + Send>>
272 + Send
273 + Sync
274 + 'static,
275 ) -> Result<(), cron::error::Error> {
276 let cron_schedule = cron_expr.parse::<cron::Schedule>()?;
277 let cron_job = CronJob {
278 name,
279 description,
280 func: Box::new(CronFnClosure {
281 func: job,
282 _marker: std::marker::PhantomData,
283 }),
284 schedule: Schedule::Cron(CronSchedule(Box::new(cron_schedule))),
285 };
286 self.jobs.insert(name, cron_job);
287 Ok(())
288 }
289
290 #[cfg(feature = "jobs")]
291 #[tracing::instrument(name = "cron.register_job", skip_all, fields(cron_job.name = J::NAME, cron_job.interval = ?interval))]
292 pub fn register_job<J: Job<AppState>>(
293 &mut self,
294 job: J,
295 description: Option<&'static str>,
296 interval: Duration,
297 ) {
298 self.register(J::NAME, description, interval, move |app_state, context| {
299 J::enqueue(job.clone(), app_state, context, None)
300 });
301 }
302
303 #[cfg(feature = "jobs")]
304 #[tracing::instrument(name = "cron.register_job_with_cron", skip_all, fields(cron_job.name = J::NAME, cron_job.cron = cron_expr))]
305 pub fn register_job_with_cron<J: Job<AppState>>(
306 &mut self,
307 job: J,
308 description: Option<&'static str>,
309 cron_expr: &str,
310 ) -> Result<(), cron::error::Error> {
311 self.register_with_cron(
312 J::NAME,
313 description,
314 cron_expr,
315 move |app_state, context| J::enqueue(job.clone(), app_state, context, None),
316 )
317 }
318
319 #[cfg(feature = "jobs")]
320 #[tracing::instrument(name = "cron.get", skip_all, fields(cron_job.name = name))]
321 pub fn get(&self, name: &str) -> Option<&CronJob<AppState>> {
322 self.jobs.get(name)
323 }
324
325 pub fn jobs(&self) -> &HashMap<&'static str, CronJob<AppState>> {
327 &self.jobs
328 }
329}
330
331impl<AppState: AS> Default for CronRegistry<AppState> {
332 fn default() -> Self {
333 Self::new()
334 }
335}
336
337#[cfg(test)]
338mod test {
339 use crate::app_state::AppState;
340 use crate::server::cookies::CookieKey;
341
342 use super::*;
343
344 #[derive(Clone)]
345 struct TestAppState {
346 db: sqlx::PgPool,
347 cookie_key: CookieKey,
348 }
349
350 impl AppState for TestAppState {
351 fn db(&self) -> &sqlx::PgPool {
352 &self.db
353 }
354
355 fn version(&self) -> &'static str {
356 "test"
357 }
358
359 fn cookie_key(&self) -> &CookieKey {
360 &self.cookie_key
361 }
362 }
363
364 #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
365 struct TestJob;
366
367 #[async_trait::async_trait]
368 impl Job<TestAppState> for TestJob {
369 const NAME: &'static str = "test_job";
370
371 async fn run(&self, _app_state: TestAppState) -> color_eyre::Result<()> {
372 Ok(())
373 }
374 }
375
376 #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
377 struct FailingJob;
378
379 #[async_trait::async_trait]
380 impl Job<TestAppState> for FailingJob {
381 const NAME: &'static str = "failing_job";
382
383 async fn run(&self, _app_state: TestAppState) -> color_eyre::Result<()> {
384 Err(color_eyre::eyre::eyre!("Test error"))
385 }
386 }
387
388 #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
389 struct SecondTestJob;
390
391 #[async_trait::async_trait]
392 impl Job<TestAppState> for SecondTestJob {
393 const NAME: &'static str = "second_test_job";
394
395 async fn run(&self, _app_state: TestAppState) -> color_eyre::Result<()> {
396 Ok(())
397 }
398 }
399
400 #[sqlx::test]
401 async fn test_tick_creates_new_cron_record(db: sqlx::PgPool) {
402 let app_state = TestAppState {
403 db: db.clone(),
404 cookie_key: CookieKey::generate(),
405 };
406 let mut registry = CronRegistry::new();
407 registry.register_job(TestJob, None, Duration::from_secs(1));
408
409 let cron_job = registry.jobs.get(TestJob::NAME).unwrap();
410 assert_eq!(cron_job.name, TestJob::NAME);
411 assert!(
412 matches!(cron_job.schedule, Schedule::Interval(IntervalSchedule(d)) if d == Duration::from_secs(1))
413 );
414
415 let worker = crate::cron::Worker::new(app_state.clone(), registry);
416
417 let existing_record =
418 sqlx::query!("SELECT cron_id FROM Crons where name = $1", TestJob::NAME)
419 .fetch_optional(&app_state.db)
420 .await
421 .unwrap();
422 assert!(
423 existing_record.is_none(),
424 "Record should not exist {}",
425 existing_record.unwrap().cron_id
426 );
427
428 worker.tick().await.unwrap();
429
430 let last_run = sqlx::query!(
431 "SELECT last_run_at FROM Crons WHERE name = $1",
432 TestJob::NAME
433 )
434 .fetch_one(&app_state.db)
435 .await
436 .unwrap();
437
438 let now = Utc::now();
439 let last_run_at = last_run.last_run_at;
440 let diff = now.signed_duration_since(last_run_at);
441 assert!(diff.num_milliseconds() < 1000);
442 }
443
444 #[sqlx::test]
445 async fn test_tick_skips_updating_existing_cron_record(db: sqlx::PgPool) {
446 let app_state = TestAppState {
447 db: db.clone(),
448 cookie_key: CookieKey::generate(),
449 };
450 let mut registry = CronRegistry::new();
451 registry.register_job(TestJob, None, Duration::from_secs(60));
452 let worker = crate::cron::Worker::new(app_state.clone(), registry);
453
454 let previously = Utc::now();
455 sqlx::query!(
456 "INSERT INTO Crons (cron_id, name, last_run_at, created_at, updated_at)
457 VALUES ($1, $2, $3, $3, $3)
458 ON CONFLICT (name)
459 DO UPDATE SET
460 last_run_at = $3",
461 uuid::Uuid::new_v4(),
462 TestJob::NAME,
463 previously
464 )
465 .execute(&app_state.db)
466 .await
467 .unwrap();
468
469 worker.tick().await.unwrap();
470
471 let last_run = sqlx::query!(
472 "SELECT last_run_at FROM Crons WHERE name = $1",
473 TestJob::NAME
474 )
475 .fetch_one(&app_state.db)
476 .await
477 .unwrap();
478
479 let diff = last_run.last_run_at.signed_duration_since(previously);
480 assert!(diff.num_milliseconds() < 50);
481 }
482
483 #[sqlx::test]
484 async fn test_tick_updates_cron_record_when_interval_elapsed(db: sqlx::PgPool) {
485 let app_state = TestAppState {
486 db: db.clone(),
487 cookie_key: CookieKey::generate(),
488 };
489 let mut registry = CronRegistry::new();
490 registry.register_job(TestJob, None, Duration::from_secs(1));
491 let worker = crate::cron::Worker::new(app_state.clone(), registry);
492
493 let two_seconds_ago = Utc::now() - chrono::Duration::seconds(2);
494 sqlx::query!(
495 "INSERT INTO Crons (cron_id, name, last_run_at, created_at, updated_at)
496 VALUES ($1, $2, $3, $3, $3)",
497 uuid::Uuid::new_v4(),
498 TestJob::NAME,
499 two_seconds_ago
500 )
501 .execute(&app_state.db)
502 .await
503 .unwrap();
504
505 worker.tick().await.unwrap();
506
507 let last_run = sqlx::query!(
508 "SELECT last_run_at FROM Crons WHERE name = $1",
509 TestJob::NAME
510 )
511 .fetch_one(&app_state.db)
512 .await
513 .unwrap();
514
515 assert!(last_run.last_run_at > two_seconds_ago);
516 let diff = Utc::now().signed_duration_since(last_run.last_run_at);
517 assert!(diff.num_milliseconds() < 1000);
518 }
519
520 #[sqlx::test]
521 async fn test_tick_enqueues_failing_job_successfully(db: sqlx::PgPool) {
522 let app_state = TestAppState {
523 db: db.clone(),
524 cookie_key: CookieKey::generate(),
525 };
526 let mut registry = CronRegistry::new();
527 registry.register_job(FailingJob, None, Duration::from_secs(1));
528
529 let cron_job = registry.jobs.get(FailingJob::NAME).unwrap();
530 let last_enqueue_map = HashMap::new();
531 let worker_started_at = Utc::now();
532
533 let result = cron_job
534 .tick(
535 app_state.clone(),
536 &last_enqueue_map,
537 worker_started_at,
538 chrono_tz::UTC,
539 )
540 .await;
541 assert!(result.is_ok());
542
543 let cron_record = sqlx::query!(
544 "SELECT last_run_at FROM Crons WHERE name = $1",
545 FailingJob::NAME
546 )
547 .fetch_one(&app_state.db)
548 .await
549 .unwrap();
550
551 let now = Utc::now();
552 let diff = now.signed_duration_since(cron_record.last_run_at);
553 assert!(diff.num_milliseconds() < 1000);
554
555 let job_count = sqlx::query!(
556 "SELECT COUNT(*) as count FROM jobs WHERE name = $1",
557 FailingJob::NAME
558 )
559 .fetch_one(&app_state.db)
560 .await
561 .unwrap();
562 assert_eq!(job_count.count.unwrap(), 1);
563 }
564
565 #[sqlx::test]
566 async fn test_tick_with_custom_function_error(db: sqlx::PgPool) {
567 let app_state = TestAppState {
568 db: db.clone(),
569 cookie_key: CookieKey::generate(),
570 };
571 #[derive(Debug, thiserror::Error)]
572 #[error("Custom function error")]
573 struct CustomError;
574
575 let mut registry = CronRegistry::new();
576 registry.register(
577 "custom_failing",
578 None,
579 Duration::from_secs(1),
580 |_app_state, _context| Box::pin(async { Err(CustomError) }),
581 );
582
583 let cron_job = registry.jobs.get("custom_failing").unwrap();
584 let last_enqueue_map = HashMap::new();
585 let worker_started_at = Utc::now();
586
587 let result = cron_job
588 .tick(
589 app_state.clone(),
590 &last_enqueue_map,
591 worker_started_at,
592 chrono_tz::UTC,
593 )
594 .await;
595 assert!(result.is_err());
596 match result.unwrap_err() {
597 TickError::JobError(err) => {
598 assert!(err.contains("CustomError"));
599 }
600 TickError::SqlxError(_) => panic!("Expected JobError"),
601 }
602
603 let cron_count = sqlx::query!(
604 "SELECT COUNT(*) as count FROM Crons WHERE name = $1",
605 "custom_failing"
606 )
607 .fetch_one(&app_state.db)
608 .await
609 .unwrap();
610 assert_eq!(cron_count.count.unwrap(), 0);
611 }
612
613 #[sqlx::test]
614 async fn test_worker_tick_with_multiple_jobs(db: sqlx::PgPool) {
615 let app_state = TestAppState {
616 db: db.clone(),
617 cookie_key: CookieKey::generate(),
618 };
619 let mut registry = CronRegistry::new();
620 registry.register_job(TestJob, None, Duration::from_secs(1));
621 registry.register_job(SecondTestJob, None, Duration::from_secs(1));
622
623 assert_eq!(registry.jobs.len(), 2);
624
625 let worker = crate::cron::Worker::new(app_state.clone(), registry);
626
627 worker.tick().await.unwrap();
628
629 let test_job_record = sqlx::query!(
630 "SELECT last_run_at FROM Crons WHERE name = $1",
631 TestJob::NAME
632 )
633 .fetch_one(&app_state.db)
634 .await
635 .unwrap();
636
637 let second_job_record = sqlx::query!(
638 "SELECT last_run_at FROM Crons WHERE name = $1",
639 SecondTestJob::NAME
640 )
641 .fetch_one(&app_state.db)
642 .await
643 .unwrap();
644
645 let now = Utc::now();
646 let diff1 = now.signed_duration_since(test_job_record.last_run_at);
647 let diff2 = now.signed_duration_since(second_job_record.last_run_at);
648
649 assert!(diff1.num_milliseconds() < 1000);
650 assert!(diff2.num_milliseconds() < 1000);
651 }
652
653 #[sqlx::test]
654 async fn test_worker_respects_existing_last_run_times(db: sqlx::PgPool) {
655 let app_state = TestAppState {
656 db: db.clone(),
657 cookie_key: CookieKey::generate(),
658 };
659 let mut registry = CronRegistry::new();
660 registry.register_job(TestJob, None, Duration::from_secs(10));
661 registry.register_job(SecondTestJob, None, Duration::from_secs(5));
662
663 let worker = crate::cron::Worker::new(app_state.clone(), registry);
664
665 let recent_time = Utc::now() - chrono::Duration::seconds(3);
666 let old_time = Utc::now() - chrono::Duration::seconds(10);
667
668 sqlx::query!(
669 "INSERT INTO Crons (cron_id, name, last_run_at, created_at, updated_at)
670 VALUES ($1, $2, $3, $3, $3)",
671 uuid::Uuid::new_v4(),
672 TestJob::NAME,
673 recent_time
674 )
675 .execute(&app_state.db)
676 .await
677 .unwrap();
678
679 sqlx::query!(
680 "INSERT INTO Crons (cron_id, name, last_run_at, created_at, updated_at)
681 VALUES ($1, $2, $3, $3, $3)",
682 uuid::Uuid::new_v4(),
683 SecondTestJob::NAME,
684 old_time
685 )
686 .execute(&app_state.db)
687 .await
688 .unwrap();
689
690 worker.tick().await.unwrap();
691
692 let test_job_record = sqlx::query!(
693 "SELECT last_run_at FROM Crons WHERE name = $1",
694 TestJob::NAME
695 )
696 .fetch_one(&app_state.db)
697 .await
698 .unwrap();
699
700 let second_job_record = sqlx::query!(
701 "SELECT last_run_at FROM Crons WHERE name = $1",
702 SecondTestJob::NAME
703 )
704 .fetch_one(&app_state.db)
705 .await
706 .unwrap();
707
708 let recent_diff = test_job_record
709 .last_run_at
710 .signed_duration_since(recent_time);
711 assert!(recent_diff.num_milliseconds() < 100);
712 assert!(second_job_record.last_run_at > old_time);
713 }
714
715 #[sqlx::test]
716 async fn test_tick_handles_future_last_run_time(db: sqlx::PgPool) {
717 let app_state = TestAppState {
718 db: db.clone(),
719 cookie_key: CookieKey::generate(),
720 };
721 let mut registry = CronRegistry::new();
722 registry.register_job(TestJob, None, Duration::from_secs(1));
723
724 let cron_job = registry.jobs.get(TestJob::NAME).unwrap();
725
726 let future_time = Utc::now() + chrono::Duration::hours(1);
727 let mut last_enqueue_map = HashMap::new();
728 last_enqueue_map.insert(TestJob::NAME.to_string(), future_time);
729 let worker_started_at = Utc::now();
730
731 let result = cron_job
732 .tick(
733 app_state.clone(),
734 &last_enqueue_map,
735 worker_started_at,
736 chrono_tz::UTC,
737 )
738 .await;
739
740 assert!(result.is_ok());
742
743 let cron_count = sqlx::query!(
744 "SELECT COUNT(*) as count FROM Crons WHERE name = $1",
745 TestJob::NAME
746 )
747 .fetch_one(&app_state.db)
748 .await
749 .unwrap();
750 assert_eq!(
751 cron_count.count.unwrap(),
752 0,
753 "Should not have created cron record with future last_run time"
754 );
755 }
756
757 #[sqlx::test]
758 async fn test_cron_expression_scheduling(db: sqlx::PgPool) {
759 let app_state = TestAppState {
760 db: db.clone(),
761 cookie_key: CookieKey::generate(),
762 };
763 let mut registry = CronRegistry::new();
764
765 registry
767 .register_job_with_cron(TestJob, None, "0 * * * * * *")
768 .unwrap();
769
770 let cron_job = registry.jobs.get(TestJob::NAME).unwrap();
771 assert!(matches!(cron_job.schedule, Schedule::Cron(_)));
772
773 let last_enqueue_map = HashMap::new();
775 let worker_started_at = Utc::now() - chrono::Duration::minutes(2);
777 let result = cron_job
778 .tick(
779 app_state.clone(),
780 &last_enqueue_map,
781 worker_started_at,
782 chrono_tz::UTC,
783 )
784 .await;
785 assert!(result.is_ok());
786
787 let cron_record = sqlx::query!(
789 "SELECT last_run_at FROM Crons WHERE name = $1",
790 TestJob::NAME
791 )
792 .fetch_optional(&app_state.db)
793 .await
794 .unwrap();
795
796 assert!(cron_record.is_some());
798 if let Some(record) = cron_record {
799 let now = Utc::now();
800 let diff = now.signed_duration_since(record.last_run_at);
801 assert!(diff.num_milliseconds() < 1000);
802 }
803 }
804
805 #[sqlx::test]
806 async fn test_cron_expression_respects_schedule(db: sqlx::PgPool) {
807 let app_state = TestAppState {
808 db: db.clone(),
809 cookie_key: CookieKey::generate(),
810 };
811 let mut registry = CronRegistry::new();
812
813 registry
815 .register_job_with_cron(TestJob, None, "0 30 * * * * *")
816 .unwrap();
817
818 let cron_job = registry.jobs.get(TestJob::NAME).unwrap();
819
820 let last_run = Utc::now() - chrono::Duration::minutes(29);
822 let mut last_enqueue_map = HashMap::new();
823 last_enqueue_map.insert(TestJob::NAME.to_string(), last_run);
824
825 let worker_started_at = Utc::now();
828 let result = cron_job
829 .tick(
830 app_state.clone(),
831 &last_enqueue_map,
832 worker_started_at,
833 chrono_tz::UTC,
834 )
835 .await;
836 assert!(result.is_ok());
837 }
838
839 #[test]
840 fn test_invalid_cron_expression() {
841 let mut registry: CronRegistry<TestAppState> = CronRegistry::new();
842
843 let result = registry.register_with_cron(
845 "invalid_cron",
846 None,
847 "invalid expression",
848 |_app_state, _context| Box::pin(async { Ok::<(), std::io::Error>(()) }),
849 );
850
851 assert!(result.is_err());
852 }
853
854 #[sqlx::test]
855 async fn test_mixed_interval_and_cron_jobs(db: sqlx::PgPool) {
856 let app_state = TestAppState {
857 db: db.clone(),
858 cookie_key: CookieKey::generate(),
859 };
860 let mut registry = CronRegistry::new();
861
862 registry.register_job(TestJob, None, Duration::from_secs(1));
864
865 registry
867 .register_job_with_cron(SecondTestJob, None, "* * * * * * *")
868 .unwrap();
869
870 assert_eq!(registry.jobs.len(), 2);
871
872 let interval_job = registry.jobs.get(TestJob::NAME).unwrap();
873 assert!(matches!(interval_job.schedule, Schedule::Interval(_)));
874
875 let cron_job = registry.jobs.get(SecondTestJob::NAME).unwrap();
876 assert!(matches!(cron_job.schedule, Schedule::Cron(_)));
877
878 let mut worker = crate::cron::Worker::new(app_state.clone(), registry);
880 worker.started_at = Utc::now() - chrono::Duration::seconds(2);
881
882 worker.tick().await.unwrap();
884
885 let test_job_count = sqlx::query!(
886 "SELECT COUNT(*) as count FROM Crons WHERE name = $1",
887 TestJob::NAME
888 )
889 .fetch_one(&app_state.db)
890 .await
891 .unwrap();
892
893 let second_job_count = sqlx::query!(
894 "SELECT COUNT(*) as count FROM Crons WHERE name = $1",
895 SecondTestJob::NAME
896 )
897 .fetch_one(&app_state.db)
898 .await
899 .unwrap();
900
901 assert_eq!(test_job_count.count.unwrap(), 1);
902 assert_eq!(second_job_count.count.unwrap(), 1);
903 }
904}