diff options
| author | eug-vs <eug-vs@keemail.me> | 2020-11-28 00:21:26 +0300 | 
|---|---|---|
| committer | eug-vs <eug-vs@keemail.me> | 2020-11-28 00:21:26 +0300 | 
| commit | 7c69da67cf43dd076d20df87b5a7ea52a47e69ac (patch) | |
| tree | 04473c6b40cda94d5af714072ddb2ac0ffef59af /src/services | |
| parent | 9f7115217c34732753102e044a94711400b5affc (diff) | |
| download | bsu-fantom-7c69da67cf43dd076d20df87b5a7ea52a47e69ac.tar.gz | |
feat: correctly re-schedule old events
Diffstat (limited to 'src/services')
| -rw-r--r-- | src/services/events/event.model.js | 21 | ||||
| -rw-r--r-- | src/services/events/event.schema.js | 9 | ||||
| -rw-r--r-- | src/services/events/event.service.js | 74 | 
3 files changed, 59 insertions, 45 deletions
| diff --git a/src/services/events/event.model.js b/src/services/events/event.model.js index ef86572..b8a13bc 100644 --- a/src/services/events/event.model.js +++ b/src/services/events/event.model.js @@ -1,5 +1,4 @@  const cron = require('cron'); -const Bluebird = require('bluebird');  const { model } = require('mongoose');  const schema = require('./event.schema.js'); @@ -11,26 +10,27 @@ schema.methods.computeNextRunAt = function() {    return new Date(nextRunAt);  }; +schema.methods.setStatus = function(status) { +  this.status = status;; +  this.save(); +}; +  schema.pre('save', function(next) {    this.nextRunAt = this.computeNextRunAt();    next();  }); -schema.statics.rescheduleOldEvents = async function () { -  console.log('Reschedule old events'); -  const oldEvents = await this.find({ +schema.statics.findMissedEvents = async function () { +  return this.find({      nextRunAt: {        // TODO: skip single-fire events        $lt: new Date()      },    }); - -  // Saving events triggers computing new nextRunAt -  return Bluebird.map(oldEvents, event => event.save());  }; -schema.statics.findNextEvent = function () { -  return this.findOne( +schema.statics.findNextEvents = function(limit = 10) { +  return this.find(      {        nextRunAt: {          $exists: 1, @@ -41,7 +41,8 @@ schema.statics.findNextEvent = function () {      {        sort: {          nextRunAt: 1 -      } +      }, +      limit      }    )  }; diff --git a/src/services/events/event.schema.js b/src/services/events/event.schema.js index 0bbb978..2250700 100644 --- a/src/services/events/event.schema.js +++ b/src/services/events/event.schema.js @@ -1,4 +1,4 @@ -const { Schema, Types } = require('mongoose'); +const { Schema } = require('mongoose');  module.exports = new Schema({    name: { @@ -10,16 +10,11 @@ module.exports = new Schema({      type: String,      required: true    }, -  participants: { -    type: [String], -    default: [] -  },    status: {      type: String,      default: 'notStarted'    }, -  conferenceId: String, -  attendanceId: String, +  context: {},    nextRunAt: Date,    lastRunAt: Date  }, { timestamps: true }); diff --git a/src/services/events/event.service.js b/src/services/events/event.service.js index 6c0454e..4d87c6c 100644 --- a/src/services/events/event.service.js +++ b/src/services/events/event.service.js @@ -2,28 +2,53 @@ const { Types } = require('mongoose');  const { Service } = require('feathers-mongoose');  const _ = require('lodash');  const cron = require('cron'); +const Bluebird = require('bluebird');  const Model = require('./event.model.js');  const handleAttendClassJob = require('../../handlers');  const CronJob = cron.CronJob; +const handleTestJob = () => new Promise(res => setTimeout(res, 10000)); + +  class Events extends Service {    async setup(app, path) { -    await this.Model.rescheduleOldEvents(); -    const fetchNextEvent = () => this.fetchNextEvent(); -    this.fetchEventJob = new CronJob('*/10 * * * * *', fetchNextEvent); -    this.fetchEventJob.start() +    this.jobs = []; +    await this.rescheduleMissedEvents(); + +    const job = new CronJob('*/10 * * * * *', () => this.updateJobs()); +    job.start(); +  } + +  startAllJobs() { +    this.jobs.forEach(job => job.start()); +  } + +  stopAllJobs() { +    this.jobs.forEach(job => job.stop()); +  } + +  async rescheduleMissedEvents() { +    const missedEvents = await this.Model.findMissedEvents(); +    return Bluebird.map(missedEvents, event => event.save());    } -  async fetchNextEvent() { -    const event = await this.Model.findNextEvent(); +  async updateJobs() { +    // Reschedule missed events before we stop jobs to avoid +    // accidentally stopping the job that has not triggered yet +    // (if event schedule resonates with updateJobs schedule) +    await this.rescheduleMissedEvents(); -    if (event) { -      this.log(`Found an upcoming event: ${event.name}`); -      if (this.nextJob) this.nextJob.stop(); -      this.nextJob = new CronJob(event.schedule, () => this.process(event._id)) -      this.nextJob.start() -    } else this.log('No upcoming events.'); +    this.stopAllJobs(); + +    const events = await this.Model.findNextEvents(); +    if (!events.length) this.log('No upcoming events.'); +    this.jobs = events.map(event => new CronJob( +      event.schedule, +      () => this.run(event._id) +    )); + +    this.startAllJobs();    }    log(message) { @@ -32,27 +57,20 @@ class Events extends Service {      return console.log(`[${timestamp}] ${message}`);    } -  async process(id) { +  async run(id) {      const event = await this.Model.findById(id); -    this.log(`Running event ${event.name}`); -    event.status = 'running'; -    await event.save(); -    await this.fetchNextEvent(); +    event.lastRunAt = new Date(); +    event.setStatus('running') +      try { -      // await handleAttendClassJob(event); -      await new Promise(res => { -        console.log('Job started') -        setTimeout(() => { -          console.log('Job ended') -          res(); -        }, 10000) -      }); +      this.log(`Event ${event.name} started`) +      await handleTestJob(event); +      this.log(`Event ${event.name} ended`)      } catch (error) { -      event.status = 'failed'; +      event.setStatus('failed')      }      if (event.status === 'running') { -      event.status = 'complete'; -      event.save(); +      event.setStatus('complete')      }    }  } | 
