diff options
author | eug-vs <eug-vs@keemail.me> | 2020-11-28 00:21:26 +0300 |
---|---|---|
committer | eug-vs <eug-vs@keemail.me> | 2020-11-28 00:21:26 +0300 |
commit | 7c69da67cf43dd076d20df87b5a7ea52a47e69ac (patch) | |
tree | 04473c6b40cda94d5af714072ddb2ac0ffef59af | |
parent | 9f7115217c34732753102e044a94711400b5affc (diff) | |
download | bsu-fantom-7c69da67cf43dd076d20df87b5a7ea52a47e69ac.tar.gz |
feat: correctly re-schedule old events
-rw-r--r-- | src/services/events/event.model.js | 21 | ||||
-rw-r--r-- | src/services/events/event.schema.js | 9 | ||||
-rw-r--r-- | src/services/events/event.service.js | 74 |
3 files changed, 59 insertions, 45 deletions
diff --git a/src/services/events/event.model.js b/src/services/events/event.model.js index ef86572..b8a13bc 100644 --- a/src/services/events/event.model.js +++ b/src/services/events/event.model.js @@ -1,5 +1,4 @@ const cron = require('cron'); -const Bluebird = require('bluebird'); const { model } = require('mongoose'); const schema = require('./event.schema.js'); @@ -11,26 +10,27 @@ schema.methods.computeNextRunAt = function() { return new Date(nextRunAt); }; +schema.methods.setStatus = function(status) { + this.status = status;; + this.save(); +}; + schema.pre('save', function(next) { this.nextRunAt = this.computeNextRunAt(); next(); }); -schema.statics.rescheduleOldEvents = async function () { - console.log('Reschedule old events'); - const oldEvents = await this.find({ +schema.statics.findMissedEvents = async function () { + return this.find({ nextRunAt: { // TODO: skip single-fire events $lt: new Date() }, }); - - // Saving events triggers computing new nextRunAt - return Bluebird.map(oldEvents, event => event.save()); }; -schema.statics.findNextEvent = function () { - return this.findOne( +schema.statics.findNextEvents = function(limit = 10) { + return this.find( { nextRunAt: { $exists: 1, @@ -41,7 +41,8 @@ schema.statics.findNextEvent = function () { { sort: { nextRunAt: 1 - } + }, + limit } ) }; diff --git a/src/services/events/event.schema.js b/src/services/events/event.schema.js index 0bbb978..2250700 100644 --- a/src/services/events/event.schema.js +++ b/src/services/events/event.schema.js @@ -1,4 +1,4 @@ -const { Schema, Types } = require('mongoose'); +const { Schema } = require('mongoose'); module.exports = new Schema({ name: { @@ -10,16 +10,11 @@ module.exports = new Schema({ type: String, required: true }, - participants: { - type: [String], - default: [] - }, status: { type: String, default: 'notStarted' }, - conferenceId: String, - attendanceId: String, + context: {}, nextRunAt: Date, lastRunAt: Date }, { timestamps: true }); diff --git a/src/services/events/event.service.js b/src/services/events/event.service.js index 6c0454e..4d87c6c 100644 --- a/src/services/events/event.service.js +++ b/src/services/events/event.service.js @@ -2,28 +2,53 @@ const { Types } = require('mongoose'); const { Service } = require('feathers-mongoose'); const _ = require('lodash'); const cron = require('cron'); +const Bluebird = require('bluebird'); const Model = require('./event.model.js'); const handleAttendClassJob = require('../../handlers'); const CronJob = cron.CronJob; +const handleTestJob = () => new Promise(res => setTimeout(res, 10000)); + + class Events extends Service { async setup(app, path) { - await this.Model.rescheduleOldEvents(); - const fetchNextEvent = () => this.fetchNextEvent(); - this.fetchEventJob = new CronJob('*/10 * * * * *', fetchNextEvent); - this.fetchEventJob.start() + this.jobs = []; + await this.rescheduleMissedEvents(); + + const job = new CronJob('*/10 * * * * *', () => this.updateJobs()); + job.start(); + } + + startAllJobs() { + this.jobs.forEach(job => job.start()); + } + + stopAllJobs() { + this.jobs.forEach(job => job.stop()); + } + + async rescheduleMissedEvents() { + const missedEvents = await this.Model.findMissedEvents(); + return Bluebird.map(missedEvents, event => event.save()); } - async fetchNextEvent() { - const event = await this.Model.findNextEvent(); + async updateJobs() { + // Reschedule missed events before we stop jobs to avoid + // accidentally stopping the job that has not triggered yet + // (if event schedule resonates with updateJobs schedule) + await this.rescheduleMissedEvents(); - if (event) { - this.log(`Found an upcoming event: ${event.name}`); - if (this.nextJob) this.nextJob.stop(); - this.nextJob = new CronJob(event.schedule, () => this.process(event._id)) - this.nextJob.start() - } else this.log('No upcoming events.'); + this.stopAllJobs(); + + const events = await this.Model.findNextEvents(); + if (!events.length) this.log('No upcoming events.'); + this.jobs = events.map(event => new CronJob( + event.schedule, + () => this.run(event._id) + )); + + this.startAllJobs(); } log(message) { @@ -32,27 +57,20 @@ class Events extends Service { return console.log(`[${timestamp}] ${message}`); } - async process(id) { + async run(id) { const event = await this.Model.findById(id); - this.log(`Running event ${event.name}`); - event.status = 'running'; - await event.save(); - await this.fetchNextEvent(); + event.lastRunAt = new Date(); + event.setStatus('running') + try { - // await handleAttendClassJob(event); - await new Promise(res => { - console.log('Job started') - setTimeout(() => { - console.log('Job ended') - res(); - }, 10000) - }); + this.log(`Event ${event.name} started`) + await handleTestJob(event); + this.log(`Event ${event.name} ended`) } catch (error) { - event.status = 'failed'; + event.setStatus('failed') } if (event.status === 'running') { - event.status = 'complete'; - event.save(); + event.setStatus('complete') } } } |