diff options
| author | eug-vs <eug-vs@keemail.me> | 2020-11-27 22:41:25 +0300 | 
|---|---|---|
| committer | eug-vs <eug-vs@keemail.me> | 2020-11-27 22:41:25 +0300 | 
| commit | 9f7115217c34732753102e044a94711400b5affc (patch) | |
| tree | 364390decc2b093162ad740838c8add16d2955c7 /src/services | |
| parent | 6a4e4402a52c6649f0ffb1c6a1e9183caaf6e313 (diff) | |
| download | bsu-fantom-9f7115217c34732753102e044a94711400b5affc.tar.gz | |
feat: use cron to schedule jobs
Diffstat (limited to 'src/services')
| -rw-r--r-- | src/services/events/event.model.js | 55 | ||||
| -rw-r--r-- | src/services/events/event.schema.js | 26 | ||||
| -rw-r--r-- | src/services/events/event.service.js | 107 | 
3 files changed, 129 insertions, 59 deletions
| diff --git a/src/services/events/event.model.js b/src/services/events/event.model.js new file mode 100644 index 0000000..ef86572 --- /dev/null +++ b/src/services/events/event.model.js @@ -0,0 +1,55 @@ +const cron = require('cron'); +const Bluebird = require('bluebird'); +const { model } = require('mongoose'); +const schema = require('./event.schema.js'); + +const CronJob = cron.CronJob; + +schema.methods.computeNextRunAt = function() { +  const job = new CronJob(this.schedule); +  const nextRunAt = job.nextDates(); +  return new Date(nextRunAt); +}; + +schema.pre('save', function(next) { +  this.nextRunAt = this.computeNextRunAt(); +  next(); +}); + +schema.statics.rescheduleOldEvents = async function () { +  console.log('Reschedule old events'); +  const oldEvents = await this.find({ +    nextRunAt: { +      // TODO: skip single-fire events +      $lt: new Date() +    }, +  }); + +  // Saving events triggers computing new nextRunAt +  return Bluebird.map(oldEvents, event => event.save()); +}; + +schema.statics.findNextEvent = function () { +  return this.findOne( +    { +      nextRunAt: { +        $exists: 1, +        $gt: new Date() +      }, +    }, +    null, +    { +      sort: { +        nextRunAt: 1 +      } +    } +  ) +}; + +const Model = model('Event', schema); + + +module.exports = Model; + + + diff --git a/src/services/events/event.schema.js b/src/services/events/event.schema.js new file mode 100644 index 0000000..0bbb978 --- /dev/null +++ b/src/services/events/event.schema.js @@ -0,0 +1,26 @@ +const { Schema, Types } = require('mongoose'); + +module.exports = new Schema({ +  name: { +    type: String, +    required: true, +    unique: true +  }, +  schedule: { +    type: String, +    required: true +  }, +  participants: { +    type: [String], +    default: [] +  }, +  status: { +    type: String, +    default: 'notStarted' +  }, +  conferenceId: String, +  attendanceId: String, +  nextRunAt: Date, +  lastRunAt: Date +}, { timestamps: true }); + diff --git a/src/services/events/event.service.js b/src/services/events/event.service.js index a8b0e72..6c0454e 100644 --- a/src/services/events/event.service.js +++ b/src/services/events/event.service.js @@ -1,75 +1,64 @@  const { Types } = require('mongoose'); -const Agenda = require('agenda'); +const { Service } = require('feathers-mongoose');  const _ = require('lodash'); -const { getConnection } = require('../../connectDb.js'); -const handleAttendClassJob = require('../../handlers'); - - -class Events { -  setup(app) { -    this.collectionName = 'events'; - -    // Reuse mongoose connection -    const connection = getConnection(); -    this.agenda = new Agenda(); -    this.agenda.mongo( -      connection.collection(this.collectionName).conn.db, -      this.collectionName -    ); - -    // Define jobs -    this.agenda.define('attend class', handleAttendClassJob); +const cron = require('cron'); +const Model = require('./event.model.js'); +const handleAttendClassJob = require('../../handlers'); +const CronJob = cron.CronJob; -    // Logs -    this.agenda.on('start', job => { -      console.log(`Starting ${job.attrs.data.name} job`); -      job.attrs.status = 'running'; -      job.save(); -    }); -    this.agenda.on('complete', job => { -      console.log(`Job ${job.attrs.data.name} finished`); -      if (job.attrs.status === 'running') { -        job.attrs.status = 'complete'; -        job.save(); -      } -    }); -    this.agenda.on('fail', (err, job) => { -      console.log(`Job ${job.attrs.data.name} failed with the error ${err.message}`); -      job.attrs.status = 'failed'; -      job.save(); -    }); - -    return this.agenda.start(); -  } - -  create(data, params) { -    return this.agenda.schedule(data.date, 'attend class', data); +class Events extends Service { +  async setup(app, path) { +    await this.Model.rescheduleOldEvents(); +    const fetchNextEvent = () => this.fetchNextEvent(); +    this.fetchEventJob = new CronJob('*/10 * * * * *', fetchNextEvent); +    this.fetchEventJob.start()    } -  find(params) { -    return this.agenda.jobs({}); -  } +  async fetchNextEvent() { +    const event = await this.Model.findNextEvent(); -  findOneById(id) { -    return this.agenda -      .jobs({ _id: Types.ObjectId(id) }) -      .then(results => results[0]); +    if (event) { +      this.log(`Found an upcoming event: ${event.name}`); +      if (this.nextJob) this.nextJob.stop(); +      this.nextJob = new CronJob(event.schedule, () => this.process(event._id)) +      this.nextJob.start() +    } else this.log('No upcoming events.');    } -  async patch(id, attrs, params) { -    console.log(`Patch ${id}`); -    const job = await this.findOneById(id); -    job.attrs = _.merge(job.attrs, attrs); -    return this.agenda.saveJob(job); +  log(message) { +    const dateOpts = { timeStyle: 'medium', dateStyle: 'short' }; +    const timestamp = new Date().toLocaleString('en', dateOpts); +    return console.log(`[${timestamp}] ${message}`);    } -  async remove(id) { -    console.log(`Remove ${id}`); -    return this.agenda.cancel({ _id: Types.ObjectId(id) }); +  async process(id) { +    const event = await this.Model.findById(id); +    this.log(`Running event ${event.name}`); +    event.status = 'running'; +    await event.save(); +    await this.fetchNextEvent(); +    try { +      // await handleAttendClassJob(event); +      await new Promise(res => { +        console.log('Job started') +        setTimeout(() => { +          console.log('Job ended') +          res(); +        }, 10000) +      }); +    } catch (error) { +      event.status = 'failed'; +    } +    if (event.status === 'running') { +      event.status = 'complete'; +      event.save(); +    }    }  } -module.exports = app => app.use('/events', new Events()); +module.exports = app => { +  app.use('/events', new Events({ Model })); +}; | 
