aboutsummaryrefslogtreecommitdiff
path: root/lib/scheduler.ts
blob: 871573304e9e92c23fd51cb0847f595d59b358e9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import cron from 'cron';
import Bluebird from 'bluebird';
import { EventModel, Event } from './event.model';

export type Handler = (event: Event<any>) => void;

const { CronJob } = cron;

const defaultPollingInterval = '*/10 * * * * *';


class Scheduler {
  private jobs: cron.CronJob[];

  private pollingJob: cron.CronJob;

  private handlers: Record<string, Handler>;

  public Model: EventModel<any>;

  constructor(model: EventModel<any>, pollingInterval = defaultPollingInterval) {
    this.Model = model;
    this.jobs = [];
    this.handlers = {};

    this.pollingJob = new CronJob(pollingInterval, () => this.updateJobs());
    this.startPolling();
  }

  public registerHandler(name: string, handler: Handler) {
    this.handlers[name] = handler;
  }

  public startPolling() {
    this.pollingJob.start();
  }

  public stopPolling() {
    this.pollingJob.stop();
    this.stopAllJobs();
  }

  private startAllJobs() {
    this.jobs.forEach(job => job.start());
  }

  private stopAllJobs() {
    this.jobs.forEach(job => job.stop());
  }

  private async rescheduleMissedEvents() {
    const missedEvents = await this.Model.findMissedEvents();
    return Bluebird.map(missedEvents, event => event.save());
  }

  private async updateJobs() {
    // Reschedule missed events before we stop jobs to avoid
    // accidentally stopping the job that has not triggered yet
    // (if event schedule resonates with updateJobs schedule)
    await this.rescheduleMissedEvents();

    this.stopAllJobs();

    const events = await this.Model.findNextEvents();
    if (!events.length) console.log('WARNING: no upcoming events');
    this.jobs = events.map(event => new CronJob(
      event.schedule,
      () => this.run(event.id)
    ));

    this.startAllJobs();
  }

  private async run(id: string) {
    const event = await this.Model.findById(id);
    if (!event) return console.log('WARNING: locked event does not exist');

    try {
      const handleEvent = this.handlers[event.type];

      if (handleEvent) {
        event.start();
        await handleEvent(event);
        return event.complete();
      } throw new Error('No handler found');
    } catch (error) {
      return event.fail(error);
    }
  }
}


export default Scheduler;