diff options
author | eug-vs <eug-vs@keemail.me> | 2020-11-27 22:41:25 +0300 |
---|---|---|
committer | eug-vs <eug-vs@keemail.me> | 2020-11-27 22:41:25 +0300 |
commit | 9f7115217c34732753102e044a94711400b5affc (patch) | |
tree | 364390decc2b093162ad740838c8add16d2955c7 | |
parent | 6a4e4402a52c6649f0ffb1c6a1e9183caaf6e313 (diff) | |
download | bsu-fantom-9f7115217c34732753102e044a94711400b5affc.tar.gz |
feat: use cron to schedule jobs
-rw-r--r-- | src/handlers/index.js | 16 | ||||
-rw-r--r-- | src/services/events/event.model.js | 55 | ||||
-rw-r--r-- | src/services/events/event.schema.js | 26 | ||||
-rw-r--r-- | src/services/events/event.service.js | 107 |
4 files changed, 136 insertions, 68 deletions
diff --git a/src/handlers/index.js b/src/handlers/index.js index 09cd726..2234b05 100644 --- a/src/handlers/index.js +++ b/src/handlers/index.js @@ -12,12 +12,12 @@ const { EDUFPMI_URL, NODE_ENV, HEADFUL } = process.env; const headless = NODE_ENV === 'production' || !HEADFUL; -const handleJobAsUser = async (job, browser, user) => { - console.log(`Running job as ${user.username}`); +const handleEventAsUser = async (event, browser, user) => { + console.log(`Running event as ${user.username}`); const browserContext = await browser.createIncognitoBrowserContext(); - const { conferenceId } = job.attrs.data; + const { conferenceId } = event; const conferenceUrl = `${EDUFPMI_URL}/mod/bigbluebuttonbn/view.php?id=${conferenceId}`; const page = await launchUserSession(user, browserContext); @@ -37,12 +37,10 @@ const handleJobAsUser = async (job, browser, user) => { await browserContext.close(); }; -const handleJob = async job => { - const { data } = job.attrs; - +const handleEvent = async event => { const participants = await UserModel.find({ username: { - $in: data.participants + $in: event.participants } }); @@ -51,12 +49,12 @@ const handleJob = async job => { const browser = await puppeteer.launch({ headless, args: ['--no-sandbox', '--incognito'] }); try { - await Bluebird.map(participants, participant => handleJobAsUser(job, browser, participant)); + await Bluebird.map(participants, participant => handleEventAsUser(event, browser, participant)); } finally { await browser.close(); } }; -module.exports = handleJob; +module.exports = handleEvent; diff --git a/src/services/events/event.model.js b/src/services/events/event.model.js new file mode 100644 index 0000000..ef86572 --- /dev/null +++ b/src/services/events/event.model.js @@ -0,0 +1,55 @@ +const cron = require('cron'); +const Bluebird = require('bluebird'); +const { model } = require('mongoose'); +const schema = require('./event.schema.js'); + +const CronJob = cron.CronJob; + +schema.methods.computeNextRunAt = function() { + const job = new CronJob(this.schedule); + const nextRunAt = job.nextDates(); + return new Date(nextRunAt); +}; + +schema.pre('save', function(next) { + this.nextRunAt = this.computeNextRunAt(); + next(); +}); + +schema.statics.rescheduleOldEvents = async function () { + console.log('Reschedule old events'); + const oldEvents = await this.find({ + nextRunAt: { + // TODO: skip single-fire events + $lt: new Date() + }, + }); + + // Saving events triggers computing new nextRunAt + return Bluebird.map(oldEvents, event => event.save()); +}; + +schema.statics.findNextEvent = function () { + return this.findOne( + { + nextRunAt: { + $exists: 1, + $gt: new Date() + }, + }, + null, + { + sort: { + nextRunAt: 1 + } + } + ) +}; + +const Model = model('Event', schema); + + +module.exports = Model; + + + diff --git a/src/services/events/event.schema.js b/src/services/events/event.schema.js new file mode 100644 index 0000000..0bbb978 --- /dev/null +++ b/src/services/events/event.schema.js @@ -0,0 +1,26 @@ +const { Schema, Types } = require('mongoose'); + +module.exports = new Schema({ + name: { + type: String, + required: true, + unique: true + }, + schedule: { + type: String, + required: true + }, + participants: { + type: [String], + default: [] + }, + status: { + type: String, + default: 'notStarted' + }, + conferenceId: String, + attendanceId: String, + nextRunAt: Date, + lastRunAt: Date +}, { timestamps: true }); + diff --git a/src/services/events/event.service.js b/src/services/events/event.service.js index a8b0e72..6c0454e 100644 --- a/src/services/events/event.service.js +++ b/src/services/events/event.service.js @@ -1,75 +1,64 @@ const { Types } = require('mongoose'); -const Agenda = require('agenda'); +const { Service } = require('feathers-mongoose'); const _ = require('lodash'); -const { getConnection } = require('../../connectDb.js'); -const handleAttendClassJob = require('../../handlers'); - - -class Events { - setup(app) { - this.collectionName = 'events'; - - // Reuse mongoose connection - const connection = getConnection(); - this.agenda = new Agenda(); - this.agenda.mongo( - connection.collection(this.collectionName).conn.db, - this.collectionName - ); - - // Define jobs - this.agenda.define('attend class', handleAttendClassJob); +const cron = require('cron'); +const Model = require('./event.model.js'); +const handleAttendClassJob = require('../../handlers'); +const CronJob = cron.CronJob; - // Logs - this.agenda.on('start', job => { - console.log(`Starting ${job.attrs.data.name} job`); - job.attrs.status = 'running'; - job.save(); - }); - this.agenda.on('complete', job => { - console.log(`Job ${job.attrs.data.name} finished`); - if (job.attrs.status === 'running') { - job.attrs.status = 'complete'; - job.save(); - } - }); - this.agenda.on('fail', (err, job) => { - console.log(`Job ${job.attrs.data.name} failed with the error ${err.message}`); - job.attrs.status = 'failed'; - job.save(); - }); - - return this.agenda.start(); - } - - create(data, params) { - return this.agenda.schedule(data.date, 'attend class', data); +class Events extends Service { + async setup(app, path) { + await this.Model.rescheduleOldEvents(); + const fetchNextEvent = () => this.fetchNextEvent(); + this.fetchEventJob = new CronJob('*/10 * * * * *', fetchNextEvent); + this.fetchEventJob.start() } - find(params) { - return this.agenda.jobs({}); - } + async fetchNextEvent() { + const event = await this.Model.findNextEvent(); - findOneById(id) { - return this.agenda - .jobs({ _id: Types.ObjectId(id) }) - .then(results => results[0]); + if (event) { + this.log(`Found an upcoming event: ${event.name}`); + if (this.nextJob) this.nextJob.stop(); + this.nextJob = new CronJob(event.schedule, () => this.process(event._id)) + this.nextJob.start() + } else this.log('No upcoming events.'); } - async patch(id, attrs, params) { - console.log(`Patch ${id}`); - const job = await this.findOneById(id); - job.attrs = _.merge(job.attrs, attrs); - return this.agenda.saveJob(job); + log(message) { + const dateOpts = { timeStyle: 'medium', dateStyle: 'short' }; + const timestamp = new Date().toLocaleString('en', dateOpts); + return console.log(`[${timestamp}] ${message}`); } - async remove(id) { - console.log(`Remove ${id}`); - return this.agenda.cancel({ _id: Types.ObjectId(id) }); + async process(id) { + const event = await this.Model.findById(id); + this.log(`Running event ${event.name}`); + event.status = 'running'; + await event.save(); + await this.fetchNextEvent(); + try { + // await handleAttendClassJob(event); + await new Promise(res => { + console.log('Job started') + setTimeout(() => { + console.log('Job ended') + res(); + }, 10000) + }); + } catch (error) { + event.status = 'failed'; + } + if (event.status === 'running') { + event.status = 'complete'; + event.save(); + } } } -module.exports = app => app.use('/events', new Events()); +module.exports = app => { + app.use('/events', new Events({ Model })); +}; |