summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoreug-vs <eug-vs@keemail.me>2020-11-28 00:21:26 +0300
committereug-vs <eug-vs@keemail.me>2020-11-28 00:21:26 +0300
commit7c69da67cf43dd076d20df87b5a7ea52a47e69ac (patch)
tree04473c6b40cda94d5af714072ddb2ac0ffef59af
parent9f7115217c34732753102e044a94711400b5affc (diff)
downloadbsu-fantom-7c69da67cf43dd076d20df87b5a7ea52a47e69ac.tar.gz
feat: correctly re-schedule old events
-rw-r--r--src/services/events/event.model.js21
-rw-r--r--src/services/events/event.schema.js9
-rw-r--r--src/services/events/event.service.js74
3 files changed, 59 insertions, 45 deletions
diff --git a/src/services/events/event.model.js b/src/services/events/event.model.js
index ef86572..b8a13bc 100644
--- a/src/services/events/event.model.js
+++ b/src/services/events/event.model.js
@@ -1,5 +1,4 @@
const cron = require('cron');
-const Bluebird = require('bluebird');
const { model } = require('mongoose');
const schema = require('./event.schema.js');
@@ -11,26 +10,27 @@ schema.methods.computeNextRunAt = function() {
return new Date(nextRunAt);
};
+schema.methods.setStatus = function(status) {
+ this.status = status;;
+ this.save();
+};
+
schema.pre('save', function(next) {
this.nextRunAt = this.computeNextRunAt();
next();
});
-schema.statics.rescheduleOldEvents = async function () {
- console.log('Reschedule old events');
- const oldEvents = await this.find({
+schema.statics.findMissedEvents = async function () {
+ return this.find({
nextRunAt: {
// TODO: skip single-fire events
$lt: new Date()
},
});
-
- // Saving events triggers computing new nextRunAt
- return Bluebird.map(oldEvents, event => event.save());
};
-schema.statics.findNextEvent = function () {
- return this.findOne(
+schema.statics.findNextEvents = function(limit = 10) {
+ return this.find(
{
nextRunAt: {
$exists: 1,
@@ -41,7 +41,8 @@ schema.statics.findNextEvent = function () {
{
sort: {
nextRunAt: 1
- }
+ },
+ limit
}
)
};
diff --git a/src/services/events/event.schema.js b/src/services/events/event.schema.js
index 0bbb978..2250700 100644
--- a/src/services/events/event.schema.js
+++ b/src/services/events/event.schema.js
@@ -1,4 +1,4 @@
-const { Schema, Types } = require('mongoose');
+const { Schema } = require('mongoose');
module.exports = new Schema({
name: {
@@ -10,16 +10,11 @@ module.exports = new Schema({
type: String,
required: true
},
- participants: {
- type: [String],
- default: []
- },
status: {
type: String,
default: 'notStarted'
},
- conferenceId: String,
- attendanceId: String,
+ context: {},
nextRunAt: Date,
lastRunAt: Date
}, { timestamps: true });
diff --git a/src/services/events/event.service.js b/src/services/events/event.service.js
index 6c0454e..4d87c6c 100644
--- a/src/services/events/event.service.js
+++ b/src/services/events/event.service.js
@@ -2,28 +2,53 @@ const { Types } = require('mongoose');
const { Service } = require('feathers-mongoose');
const _ = require('lodash');
const cron = require('cron');
+const Bluebird = require('bluebird');
const Model = require('./event.model.js');
const handleAttendClassJob = require('../../handlers');
const CronJob = cron.CronJob;
+const handleTestJob = () => new Promise(res => setTimeout(res, 10000));
+
+
class Events extends Service {
async setup(app, path) {
- await this.Model.rescheduleOldEvents();
- const fetchNextEvent = () => this.fetchNextEvent();
- this.fetchEventJob = new CronJob('*/10 * * * * *', fetchNextEvent);
- this.fetchEventJob.start()
+ this.jobs = [];
+ await this.rescheduleMissedEvents();
+
+ const job = new CronJob('*/10 * * * * *', () => this.updateJobs());
+ job.start();
+ }
+
+ startAllJobs() {
+ this.jobs.forEach(job => job.start());
+ }
+
+ stopAllJobs() {
+ this.jobs.forEach(job => job.stop());
+ }
+
+ async rescheduleMissedEvents() {
+ const missedEvents = await this.Model.findMissedEvents();
+ return Bluebird.map(missedEvents, event => event.save());
}
- async fetchNextEvent() {
- const event = await this.Model.findNextEvent();
+ async updateJobs() {
+ // Reschedule missed events before we stop jobs to avoid
+ // accidentally stopping the job that has not triggered yet
+ // (if event schedule resonates with updateJobs schedule)
+ await this.rescheduleMissedEvents();
- if (event) {
- this.log(`Found an upcoming event: ${event.name}`);
- if (this.nextJob) this.nextJob.stop();
- this.nextJob = new CronJob(event.schedule, () => this.process(event._id))
- this.nextJob.start()
- } else this.log('No upcoming events.');
+ this.stopAllJobs();
+
+ const events = await this.Model.findNextEvents();
+ if (!events.length) this.log('No upcoming events.');
+ this.jobs = events.map(event => new CronJob(
+ event.schedule,
+ () => this.run(event._id)
+ ));
+
+ this.startAllJobs();
}
log(message) {
@@ -32,27 +57,20 @@ class Events extends Service {
return console.log(`[${timestamp}] ${message}`);
}
- async process(id) {
+ async run(id) {
const event = await this.Model.findById(id);
- this.log(`Running event ${event.name}`);
- event.status = 'running';
- await event.save();
- await this.fetchNextEvent();
+ event.lastRunAt = new Date();
+ event.setStatus('running')
+
try {
- // await handleAttendClassJob(event);
- await new Promise(res => {
- console.log('Job started')
- setTimeout(() => {
- console.log('Job ended')
- res();
- }, 10000)
- });
+ this.log(`Event ${event.name} started`)
+ await handleTestJob(event);
+ this.log(`Event ${event.name} ended`)
} catch (error) {
- event.status = 'failed';
+ event.setStatus('failed')
}
if (event.status === 'running') {
- event.status = 'complete';
- event.save();
+ event.setStatus('complete')
}
}
}