From baffdc3e0359447aa2ac0c52b0198b093763aafb Mon Sep 17 00:00:00 2001 From: Max Savin Date: Fri, 12 Jan 2018 17:36:05 +0100 Subject: [PATCH] =?UTF-8?q?v2=20=F0=9F=8E=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ICON.png | Bin LICENSE.md | 0 PATENTS.md | 0 README.md | 45 ++- package.js | 14 + package/package.js | 23 -- package/server/control.js | 59 ---- package/server/private.js | 277 ------------------ package/server/public.js | 96 ------ package/server/runner.js | 102 ------- package/server/startup.js | 14 - package/server/utilities.js | 146 --------- server/.DS_Store | Bin 0 -> 8196 bytes server/api.js | 155 ++++++++++ server/imports/.DS_Store | Bin 0 -> 8196 bytes server/imports/actions/add/generateDoc.js | 23 ++ server/imports/actions/add/index.js | 27 ++ server/imports/actions/add/processInput.js | 42 +++ server/imports/actions/cancel/index.js | 47 +++ server/imports/actions/clear/index.js | 33 +++ server/imports/actions/execute/index.js | 31 ++ server/imports/actions/execute/process.js | 36 +++ server/imports/actions/execute/toolbelt.js | 96 ++++++ server/imports/actions/get/index.js | 7 + server/imports/actions/index.js | 17 ++ server/imports/actions/reschedule/index.js | 58 ++++ server/imports/operator/dominator/README.md | 15 + server/imports/operator/dominator/index.js | 56 ++++ server/imports/operator/index.js | 12 + server/imports/operator/manager/index.js | 51 ++++ server/imports/operator/queue/README.md | 19 ++ server/imports/operator/queue/index.js | 122 ++++++++ server/imports/operator/startup/README.md | 5 + server/imports/operator/startup/index.js | 17 ++ server/imports/utilities/.DS_Store | Bin 0 -> 6148 bytes server/imports/utilities/collection/index.js | 10 + server/imports/utilities/config/index.js | 9 + server/imports/utilities/helpers/date.js | 148 ++++++++++ .../utilities/helpers/generateDueDate.js | 24 ++ server/imports/utilities/helpers/index.js | 11 + server/imports/utilities/helpers/number.js | 27 ++ server/imports/utilities/index.js | 15 + server/imports/utilities/logger/index.js | 21 ++ server/imports/utilities/registry/index.js | 9 + 44 files changed, 1187 insertions(+), 732 deletions(-) mode change 100644 => 100755 ICON.png mode change 100644 => 100755 LICENSE.md mode change 100644 => 100755 PATENTS.md mode change 100644 => 100755 README.md create mode 100644 package.js delete mode 100644 package/package.js delete mode 100644 package/server/control.js delete mode 100644 package/server/private.js delete mode 100644 package/server/public.js delete mode 100644 package/server/runner.js delete mode 100644 package/server/startup.js delete mode 100644 package/server/utilities.js create mode 100644 server/.DS_Store create mode 100644 server/api.js create mode 100644 server/imports/.DS_Store create mode 100644 server/imports/actions/add/generateDoc.js create mode 100644 server/imports/actions/add/index.js create mode 100644 server/imports/actions/add/processInput.js create mode 100644 server/imports/actions/cancel/index.js create mode 100644 server/imports/actions/clear/index.js create mode 100644 server/imports/actions/execute/index.js create mode 100644 server/imports/actions/execute/process.js create mode 100644 server/imports/actions/execute/toolbelt.js create mode 100644 server/imports/actions/get/index.js create mode 100644 server/imports/actions/index.js create mode 100644 server/imports/actions/reschedule/index.js create mode 100644 server/imports/operator/dominator/README.md create mode 100644 server/imports/operator/dominator/index.js create mode 100644 server/imports/operator/index.js create mode 100644 server/imports/operator/manager/index.js create mode 100644 server/imports/operator/queue/README.md create mode 100644 server/imports/operator/queue/index.js create mode 100644 server/imports/operator/startup/README.md create mode 100644 server/imports/operator/startup/index.js create mode 100644 server/imports/utilities/.DS_Store create mode 100644 server/imports/utilities/collection/index.js create mode 100644 server/imports/utilities/config/index.js create mode 100644 server/imports/utilities/helpers/date.js create mode 100644 server/imports/utilities/helpers/generateDueDate.js create mode 100644 server/imports/utilities/helpers/index.js create mode 100644 server/imports/utilities/helpers/number.js create mode 100644 server/imports/utilities/index.js create mode 100644 server/imports/utilities/logger/index.js create mode 100644 server/imports/utilities/registry/index.js diff --git a/ICON.png b/ICON.png old mode 100644 new mode 100755 diff --git a/LICENSE.md b/LICENSE.md old mode 100644 new mode 100755 diff --git a/PATENTS.md b/PATENTS.md old mode 100644 new mode 100755 diff --git a/README.md b/README.md old mode 100644 new mode 100755 index c62c1b6..fcf5e2d --- a/README.md +++ b/README.md @@ -4,35 +4,48 @@ ### The Simple Jobs Queue That Just Works -Run scheduled tasks with Steve Jobs, the simple jobs queue made just for Meteor. With tight MongoDB integration and fibers-based timing functions, this package is reliable, quick and effortless. +Run scheduled tasks with Steve Jobs, the simple jobs queue made just for Meteor. With tight MongoDB integration and fibers-based timing functions, this package is quick, reliable and effortless to use. - - Runs on one server at a time - - Runs predictably and consecutively - - Logs all the jobs and their outcomes - - Retries failed jobs on restart + - Jobs run on one server at a time + - Jobs run predictably and consecutively + - Jobs are logged with their outcomes + - Failed jobs are retried on server restart - No third party dependencies -**The package has been production tested and is ready for action.** It can run hundreds of jobs in seconds with minimal CPU impact, making it a reasonable choice for many applications. To get started, check out the Quick Start below, take a look at the **documentation**, and/or try the **live demo**. +**The new 2.0 runs well but needs to be tested for bugs.** It can run hundreds of jobs in seconds with minimal CPU impact, making it a reasonable choice for many applications. To get started, check out the Quick Start below, take a look at the **documentation**, and/or try the **live demo**. ## Quick Start -First, install the package: +First, install the package, and import if necessary: ```bash meteor add msavin:sjobs ``` +```javascript +import { Jobs } from 'meteor/msavin:jobs'; +``` Then, write your background jobs like you would write your methods: ```javascript Jobs.register({ - sendReminder: function (to, content) { - Email.send({ + "sendReminder": function (to, message) { + var self = this; + + var call = HTTP.put("http://www.mocky.io/v2/5a58d79c2d00006a29d2e66a/?mocky-delay=2000ms", { to: to, - from: "no-reply@jobs.com", - subject: "Your Reminder", - content: content, + message: message }) + + if (call.statusCode === 200) { + self.success(call); + return; + } else { + self.failure(call); + return; + } + + var cantTouchThis = "$99999999999" } }); ``` @@ -43,7 +56,7 @@ Finally, schedule a background job like you would call a method: Jobs.run("sendReminder", "tcook@apple.com", "Don't forget about the launch!"); ``` -One more thing: the function above will schedule the job to run on the moment that the function was called. However, you can delay it by passing in a special **configuration object** at the end. +One more thing: the function above will schedule the job to run on the moment that the function was called. However, you can delay it by passing in a special **configuration object** at the end: ```javascript Jobs.run("sendReminder", "jony@apple.com", "The future is here!", { @@ -58,10 +71,12 @@ Jobs.run("sendReminder", "jony@apple.com", "The future is here!", { }); ``` +The configuration object supports `date`, `in`, `on`, `priority`, and `state`, all of which are completely optional. + ## More Information - [**Primary Features**](https://github.com/msavin/SteveJobs-meteor-jobs-queue/wiki)
Learn how to use the three R's - [**Secondary Features**]()
Learn how to handle edge cases - [**How It Works**](https://github.com/msavin/SteveJobs-meteor-jobs-queue/wiki/How-It-Works)
Learn about the possibilities and limitations -- [**Roadmap**](https://github.com/msavin/SteveJobs-meteor-jobs-queue/projects/1)
See what's next and get involved -- [**Brought to you by Meteor Candy**](https://www.meteorcandy.com/?ref=sjgh)
Add an admin panel to your Meteor app in 5 minutes \ No newline at end of file + +[**Brought to you by Meteor Candy**](https://www.meteorcandy.com/?ref=sjgh) \ No newline at end of file diff --git a/package.js b/package.js new file mode 100644 index 0000000..3b3d282 --- /dev/null +++ b/package.js @@ -0,0 +1,14 @@ +Package.describe({ + name: "msavin:sjobs", + summary: "The simple jobs queue that just works [synced, schedule, tasks, background, later, worker, cron]", + version: "2.0.0", + documentation: "README.md", + git: "https://github.com/msavin/SteveJobs.git", +}); + +Package.onUse(function(api) { + api.versionsFrom('1.3'); + api.use(["mongo", "random", "ecmascript", "check"], "server"); + api.mainModule("server/api.js", "server"); + api.export(["Jobs", "JobsInternal"]); +}); \ No newline at end of file diff --git a/package/package.js b/package/package.js deleted file mode 100644 index ad0905b..0000000 --- a/package/package.js +++ /dev/null @@ -1,23 +0,0 @@ -Package.describe({ - name: "msavin:sjobs", - summary: "The simple jobs queue that just works [synced, schedule, tasks, background, later, worker, cron]", - version: "1.2.0", - documentation: "README.md", - git: 'https://github.com/msavin/SteveJobs.git', -}); - -serverFiles = [ - "server/private.js", - "server/utilities.js", - "server/control.js", - "server/runner.js", - "server/public.js", - "server/startup.js", -]; - -Package.onUse(function(api) { - api.use(["mongo", "random"], "server"); - api.addFiles(serverFiles, "server"); - api.versionsFrom("1.0"); - api.export(["Jobs", "JobsControl", "JobsRunner"]) -}); \ No newline at end of file diff --git a/package/server/control.js b/package/server/control.js deleted file mode 100644 index 4f4f1cb..0000000 --- a/package/server/control.js +++ /dev/null @@ -1,59 +0,0 @@ -/* - This object ensures that only one server is processing jobs - - How it works - 1. Each server is automatically given an id - 2. Every 5 seconds, the server checks to see if it is active - 3. If it is active, it updates a MongoDB document with its id and a timestamp - 4. If it is not active, it checks to see when a server was last active - 4a. If the last active timestamp was greater than `activityGap`, the server who discovered this will step up to manage - 4a. If the last active timestamp was less than `activeDelay`, the server will not do anything - - These checks happen every 5 seconds, or as as defined with `timer` in `runner.js`. - - `activityGap` refers to how much time is permitted between the last timestamp update and current time. - -*/ - -JobsControl = { - collection: new Mongo.Collection("jobs_config"), - serverId: Random.id(), - isActive: function () { - if (Meteor.isDevelopment) { - return true; - } - - var self = this; - - doc = self.collection.find({name: "ActiveServer"}).fetch(); - - if (!doc) { - return self.setAsActive(); - } else if (doc.serverId === self.serverId) { - return self.setAsActive(); - } else { - var timeGap = new Date () - doc.lastPing; - var timeSpacer = Jobs.private.configuration.activityGap; - - if (timeGap >= timeSpacer) { - return self.setAsActive() - } - } - }, - setAsActive: function () { - var self = this; - - var result = self.collection.upsert({name: "ActiveServer"}, { - $set: { - lastPing: new Date(), - serverId: self.serverId - } - }) - - return result; - }, - reset: function () { - var self = this; - self.serverId = Math.random(); - } -} \ No newline at end of file diff --git a/package/server/private.js b/package/server/private.js deleted file mode 100644 index d6b8ab5..0000000 --- a/package/server/private.js +++ /dev/null @@ -1,277 +0,0 @@ -Jobs = {}; - -Jobs.private = {}; - -Jobs.private.collection = new Mongo.Collection("jobs_data"); - -Meteor.startup(function () { - Jobs.private.collection._ensureIndex({ - due: 1, - state: 1 - }) -}) - -Jobs.private.registry = {}; - -Jobs.private.configuration = { - autoStart: true, - timer: 5 * 1000, - activityGap: 5 * 60 * 1000, - activityDelay: 5 * 1000 -} - -Jobs.private.execute = function (doc, jobCallback) { - // Goals: - // 1- Execute the job - // 2- Update the document in database - // 3- Capture the result (if any) - - if (typeof Jobs.private.registry[doc.name] === "function") { - // should probably switch to - // pending: true/false - // ranSuccessfully: true/false - try { - var jobResult = Jobs.private.registry[doc.name].apply(null, doc.arguments); - - var jobUpdate = Jobs.private.collection.update(doc._id, { - $set: { - state: "successful", - lastRun: new Date(), - completed: new Date() - }, - $push: { - history: { - date: new Date(), - result: jobResult, - state: "successful" - } - } - }) - - if (typeof jobCallback === "function") { - jobCallback(null, jobResult); - } else if (typeof jobCallback !== "undefined") { - console.log("Jobs: Invalid callback, but job still ran"); - console.log("----") - } - } catch (e) { - console.log(e); - var jobUpdate = Jobs.private.collection.update(doc._id, { - $set: { - lastRun: new Date(), - lastServer: JobsControl.serverId, - state: "failed" - }, - $push: { - history: { - date: new Date(), - state: "failed" - } - } - }); - - if (jobUpdate) { - console.log("Jobs: Job failed to run: " + doc.name) - } - - if (typeof jobCallback === "function") { - jobCallback(true, null) - } else if (typeof jobCallback !== "undefined") { - console.log("Jobs: Invalid callback, but job still ran"); - console.log("----") - } - } - } else { - console.log("Jobs: Job not found in registry: " + doc.name); - } -} - -Jobs.private.cancel = function (id) { - job = Jobs.private.collection.findOne(id) - - if (job) { - if (job.state === "pending" || job.state === "failed") { - result = Jobs.private.collection.update(id, { - $set: { - state: "cancelled" - }, - $push: { - history: { - date: new Date(), - state: "cancelled" - } - } - }) - - return result; - } else { - console.log("Jobs: Cancel failed for " + id); - console.log("Jobs: Job has completed successful or is already cancelled."); - console.log("----"); - return false; - } - } else { - console.log("Jobs: Cancel failed for " + id); - console.log("Jobs: No such job found."); - console.log("----"); - return false; - } -} - -Jobs.private.clear = function (count, name) { - - /* Future Idea: - - allow array as input for count and/or name - - verify array to make sure it has allowed inputs - */ - - var state = ["cancelled"]; - - if (count >= 2) { - state.push("successful") - } - - if (count >= 3) { - state.push("failed") - } - - if (count >= 4) { - state.push("pending") - } - - if (name) { - var result = Jobs.private.collection.remove({ - name: name, - state: { - $in: state - } - }) - } else { - var result = Jobs.private.collection.remove({ - state: { - $in: state - } - }) - } - - return result; -} - -Jobs.private.start = function (doc, jobCallback) { - if (typeof doc === "object") { - var result = Jobs.private.execute(doc, jobCallback); - return result; - } else if (typeof doc === "string") { - jobDoc = Jobs.private.collection.findOne(doc); - - if (jobDoc) { - var result = Jobs.private.execute(jobDoc, jobCallback); - return result; - } - } else { - console.log("Jobs: Invalid input for Jobs.execute();"); - console.log(doc); - console.log("----") - } -} - -Jobs.private.run = function () { - // 0. Convert arguments to array + prepare necessary data - var args = Array.prototype.slice.call(arguments); - var config = args[args.length - 1]; - - // 1. Check that the job being added exists - if (!Jobs.private.registry[args[0]]) { - console.log("Jobs: Invalid job name: " + job.name); - console.log("----"); - } - - // 2. Ready set fire - var doc = { - name: args[0], - created: new Date(), - due: function () { - var due = new Date(); - - if (typeof config === "object") { - if (config.in || config.on) { - due = Jobs.utilities.date(config); - } - } - - return due; - }(), - priority: function () { - if (typeof config === "object" && config.priority) { - return Jobs.utilities.number(config.priority, "priority") || 0; - } else { - return 0; - } - }(), - arguments: function () { - argz = args.splice(0, 1) - - if (typeof config === "object") { - if (config.in || config.on || config.priority || config.tz) { - argz = args.splice(-1) - } - } - - return args; - }(), - state: "pending" - } - - var result = Jobs.private.collection.insert(doc); - return result; -} - -// Pending: - -Jobs.private.reschedule = function (jobId, config) { - // TODO: Allow to set a new priority too? or make it a different function? - - // First, check if the doc is available - // Second, check if the doc has ran - - var jobDoc = Jobs.private.collection.findOne(jobId); - - if (!jobDoc) { - console.log("Jobs: no such job found: " + jobId); - return; - } - - if (['successful', 'cancelled'].indexOf(jobDoc.state) >= 0) { - console.log('Jobs: job has already completed: ' + jobId); - return; - } - - // If not create the patch and update the document - - newDueDate = function () { - var due = new Date(); - - if (typeof config === "object") { - if (config.in || config.on) { - due = Jobs.utilities.date(config); - } - } - - return due; - }() - - var update = Jobs.private.collection.update(jobId, { - $set: { - due: newDueDate - }, - $push: { - history: { - date: new Date(), - type: "reschedule", - result: newDueDate - } - } - }) - - return update; -} \ No newline at end of file diff --git a/package/server/public.js b/package/server/public.js deleted file mode 100644 index 42fef24..0000000 --- a/package/server/public.js +++ /dev/null @@ -1,96 +0,0 @@ -// Configure the package (optional) - -Jobs.configure = function (data) { - Object.keys(data).forEach(function (key) { - Jobs.private.configuration[key] = data[key]; - }); -} - -// Register jobs in a similar style to Meteor methods - -Jobs.queues = {} - -Jobs.register = function (jobs) { - Object.keys(jobs).forEach(function (job) { - if (typeof jobs[job] === "function") { - Jobs.private.registry[job] = jobs[job]; - - Meteor.setTimeout(function () { - Jobs.queues[job] = new JobsRunner(job); - }, 3000); - } else { - console.log("Jobs: Error registering " + job); - console.log("Jobs: Please make sure its a valid function"); - console.log("----"); - } - }); -} - -// Add a new job to MongoDB - -Jobs.run = function () { - return Jobs.private.run.apply(null, arguments) -} - -// Cancel a job without removing it from MongoDB - -Jobs.cancel = function (id) { - return Jobs.private.cancel(id); -} - -// Start or stop the queue - intended for debugging - -Jobs.start = function (queueName) { - if (queueName) { - Jobs.queues[queueName].start() - } else { - Object.keys(Jobs.queues).forEach(function (queueName) { - Jobs.queues[queueName].start() - }); - } -} - -Jobs.stop = function (queueName) { - if (queueName) { - Jobs.queues[queueName].stop() - } else { - Object.keys(Jobs.queues).forEach(function (queueName) { - Jobs.queues[queueName].stop() - }); - } -} - -// Get info on a job - -Jobs.get = function (id) { - return Jobs.private.collection.findOne(id); -} - -// Run a job ahead of time - -Jobs.execute = function (doc, callback, force) { - if (typeof doc === "string") { - doc = Jobs.private.collection.findOne(doc) - } - - if (force || Jobs.queues[doc.name].available) { - return Jobs.private.start(doc, callback); - } else { - console.log("Jobs: Could not run job because job queue is active"); - } -} - -// Reschedule a job - -Jobs.reschedule = function (jobId, config) { - return Jobs.private.reschedule(jobId, config); -} - -// Clear resolved jobs - or all of them - -Jobs.clear = function (count) { - return Jobs.private.clear(count) -} - -// Access to MongoDB Collection -Jobs.collection = Jobs.private.collection; \ No newline at end of file diff --git a/package/server/runner.js b/package/server/runner.js deleted file mode 100644 index 093de2d..0000000 --- a/package/server/runner.js +++ /dev/null @@ -1,102 +0,0 @@ -/* - This object ensures that only job is processed at a time (and on one server) - It starts a new runner for each queue that you register - Queues can be accessed with Jobs.queue - - How it works - 1. when a server starts up, it will run the start function after a slight delay - 2. When the start function runs, it'll start an interval timer for every X seconds (5 is default) - 3. When the interval timer runs, it will check that the server is active via `control.js` - 4. If the server is active, it will look for jobs to run - - How jobs are ran - 1. first, the queue will check if there are failed jobs. if there are, it will try to run them again - 2. after the failed jobs are processed, it will check for new jobs and run them - 3. jobs are based on whichever comes first (and ultimately, whatever MongoDB returns to us) - - When a job is running, JobsRunner will ignore the setInterval calls. - When a job completes, Jobs will check if there are more jobs to run. - If there are more jobs to run, Jobs will continue to run them consecutively. - If there are no more jobs to run, Jobs will go back to polling with setInterval - -*/ - -JobsRunner = function (name) { - this.name = name; - this.interval = null; - this.state = "failed"; - this.available = true; - - - this.start = function () { - var self = this; - - self.interval = Meteor.setInterval(function () { - // console.log(name) - self.trigger(); - }, Jobs.private.configuration.timer); - } - - this.stop = function () { - var self = this; - self.state = "failed"; - return Meteor.clearInterval(self.interval); - } - - this.trigger = function () { - var self = this; - - if (self.available === true) { - self.available = false; - - if (JobsControl.isActive) { - self.run(); - } else { - self.available = true; - } - } - } - - this.grabDoc = function () { - var self = this; - var state = self.state; - var name = self.name; - - var jobDoc = Jobs.private.collection.findOne({ - name: name, - due: { - $lt: new Date() - }, - state: state, - lastServer: { - $ne: JobsControl.serverId - } - }, { - sort: { - due: 1, - priority: 1 - } - }); - - return jobDoc; - } - - this.run = function () { - var self = this; - var state = self.state; - var jobDoc = self.grabDoc(); - - if (jobDoc) { - Jobs.private.start(jobDoc, function () { - self.available = true; - self.trigger() - }); - } else { - self.available = true; - if (self.state === "failed") { - self.state = "pending"; - self.trigger(); - } - } - } -} \ No newline at end of file diff --git a/package/server/startup.js b/package/server/startup.js deleted file mode 100644 index dd0fbc2..0000000 --- a/package/server/startup.js +++ /dev/null @@ -1,14 +0,0 @@ -/* - Since packages load first, we need to add a slight delay to the startup - in order to let the end-users startupDelay value register -*/ - -Meteor.startup(function () { - Meteor.setTimeout(function () { - if (Jobs.private.configuration.autoStart) { - Meteor.setTimeout(function () { - Jobs.start(); - }, Jobs.private.configuration.activityDelay); - } - }, 3000) -}); \ No newline at end of file diff --git a/package/server/utilities.js b/package/server/utilities.js deleted file mode 100644 index 6a260e8..0000000 --- a/package/server/utilities.js +++ /dev/null @@ -1,146 +0,0 @@ -Jobs.utilities = {}; - -Jobs.utilities.number = function (thing, note) { - if (typeof thing === "function") { - thing = thing(); - } - - if (typeof thing === "string") { - thing = Number(thing); - - if (isNaN(thing)) { - console.log("Jobs: invalid input for " + note || "number"); - return 0 - } - } - - if (typeof thing === "number") { - return thing; - } else { - console.log("Jobs: invalid input for " + note || "number"); - return 0; - } -} - -Jobs.utilities.date = function (input1, input2) { - var currentDate = new Date(); - var ignoreList = ["priority"] - var action; - - if (input2) { - try { - currentDate = new Date(input1); - action = input2; - } catch (e) { - console.log("Jobs: Invalid date entered"); - return; - } - } else { - action = input1 || {}; - } - - // Hacky - var utilities = { - in: { - milliseconds: function (int) { - int = currentDate.getMilliseconds() + int; - currentDate.setMilliseconds(int); - }, - seconds: function (int) { - int = currentDate.getSeconds() + int; - currentDate.setSeconds(int); - }, - minutes: function (int) { - int = currentDate.getMinutes() + int; - currentDate.setMinutes(int); - }, - hours: function (int) { - int = currentDate.getHours() + int; - currentDate.setHours(int); - }, - days: function (int) { - int = currentDate.getDate() + int; - currentDate.setDate(int); - }, - months: function (int) { - int = currentDate.getMonth() + int; - currentDate.setMonth(int); - }, - years: function (int) { - int = currentDate.getYear() + int; - currentDate.setYear(int); - } - }, - on: { - milliseconds: function (int) { - currentDate.setMilliseconds(int); - }, - seconds: function (int) { - currentDate.setSeconds(int); - }, - minutes: function (int) { - currentDate.setMinutes(int); - }, - hours: function (int) { - currentDate.setHours(int); - }, - days: function (int) { - currentDate.setDate(int); - }, - months: function (int) { - currentDate.setMonth(int); - }, - years: function (int) { - currentDate.setYear(int); - } - } - } - - utilities.in.millisecond = utilities.in.milliseconds; - utilities.in.second = utilities.in.seconds; - utilities.in.minute = utilities.in.minutes; - utilities.in.hour = utilities.in.hours; - utilities.in.day = utilities.in.days; - utilities.in.month = utilities.in.months; - utilities.in.year = utilities.in.years; - - utilities.on.millisecond = utilities.on.milliseconds; - utilities.on.second = utilities.on.seconds; - utilities.on.minute = utilities.on.minutes; - utilities.on.hour = utilities.on.hours; - utilities.on.day = utilities.on.days; - utilities.on.month = utilities.on.months; - utilities.on.year = utilities.on.years; - - if (typeof action === "object") { - Object.keys(action).forEach(function (key1) { - if (["in","on"].indexOf(key1) > -1) { - - Object.keys(action[key1]).forEach(function (key2) { - try { - newNumber = Jobs.utilities.number(action[key1][key2]); - - if (typeof newNumber === "number") { - utilities[key1][key2](newNumber); - } else { - console.log("Jobs: invalid type was inputted: " + key1 + "." + key2); - } - } catch (e) { - console.log("Jobs: invalid argument was ignored: " + key1 + "." + key2); - } - }); - - } else if (key1 === "tz") { - console.log("Jobs: Oooo - you found a hidden feature - timezone is not working yet!"); - } else if (ignoreList.indexOf(key1) > 0) { - // ignore - } else { - console.log("Jobs: invalid argument was ignored: " + key1); - } - }); - - return currentDate; - } else { - console.log("Jobs: Invalid input for second argument"); - } -} \ No newline at end of file diff --git a/server/.DS_Store b/server/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..720a4395443efd4cfaf78409ac8b5bb7e8622e2a GIT binary patch literal 8196 zcmeHMOKjXk82yRXDvo5qRPUe?~- zwrLJQUnM1XT}>fNps-WGMe+xj0e`^pl7{0ME36|ylz}J% zQ3j$6L>Y)O@L$LPy|Y=9R(bCWZ8Sz1h%&GyGr*n?aq5gFLb@O*e{|5mO8}xpB<79g zR0nXMct{f=T@aMJ(wd@rz|a-L5(9-h>Eq0tXde0TDTb92Ci zpwSp*Aj-hX46xe07<5R34SSc>?{q6+73mxP4%?fplOgT0nCP4*!gI>x$#!<=6iE>=S(gx(B8^AhbV?)dr^9a;pBZV#_Jii; zuQ9f+Xnk>s#M(yb+|aa_9}4s!>-XrHGmIP5gJ~n24||@oIMmA#%D=(cNR zxU+3qWO8=Qc14t#mRS*q-Z;S{a{cZK7EmTfSYoWqVC+LsM(#!Ni@7o61zBLa7|f z*}iQ!)=1tqgXsy~Pg<^NyBAJV-rT@`(zY^kIX23}uAZ{)sGG_*ZmrU^@vLD#m9>LQ zYON%RwRUQ|Wc5AitVt(1?wQQfkPo?v0 z<72uXoJ?CT?ae~y-l=I5Ox|spmPrt-O*Di_QrWw68|;R5I0S<*LHAD{uEBM95nhGc z@GiU$pTOtv1$+r#!Cm+seuSUkXZQu~!Ef+8{DBgdV+C%;8f?Hl_y{&)3m(9O*oBW_ zFCN3=coGM36en>C4NT%J2AIVhK7-HU^Y{W@!&mSYzJYJzTlhA9fFFv3#R^b7D)_>P zAJJ^t9Y;_M@3`2h2SKr#pjh{>Lh+kYhQIQP&6Qhr)iyM?bR0}9UcF@(Yzd(5M-c~( zKn6!l0TBYJgwk!>cPP~(geB|Za ^ua+=uOLv%cO}x5VWrz`MU46BpA|YDl+CBB8 zjfJ|Vy`)=5;1#;oHlo+q?T z;|w~OLKpLR3Fq(%&f~L$xvLy=*YPEM8E@cCd~F%t`U^-C6mPA>TW%=hI-YxhGzF9{ zby^GDIuWGC>i;9FfB#<#sfc2XGO%VdfJJ>neLWQZYL7xzYsaZiQD=?WEeOh8XyD~I qQC^M{UHrq4=5aD*K8cVn2uc!a|M-W1zxh)%djCi7e-7CH$?z9*E=CXl literal 0 HcmV?d00001 diff --git a/server/api.js b/server/api.js new file mode 100644 index 0000000..4650cfe --- /dev/null +++ b/server/api.js @@ -0,0 +1,155 @@ +import { Actions } from './imports/actions' +import { Utilities } from './imports/utilities' +import { Operator } from './imports/operator' + +Jobs = {} + +// Configure the package (optional) + +Jobs.configure = function (config) { + check(config, { + autoStart: Match.Maybe(Boolean), + interval: Match.Maybe(Number), + startupDelay: Match.Maybe(Number), + maxWait: Match.Maybe(Number) + }) + + Object.keys(config).forEach(function (key) { + Utilities.config[key] = config[key]; + }); +} + +// Register jobs in a similar style to Meteor methods + +Jobs.register = function (jobs) { + check(jobs, Object); + + Object.keys(jobs).forEach(function (job) { + if (typeof jobs[job] === "function") { + // Register the job and start the queue + Operator.manager.add(job); + Utilities.registry.add(job, jobs[job]); + + // If Jobs has already started, start the queue automatically + if (Utilities.config.started) { + Operator.manager.queues[job].start(); + } + } else { + Utilities.logger("Register failed - this key should be a function: " + job); + } + }) +} + +// Adds a new job to MongoDB + +Jobs.run = function () { + check(arguments[0], String) + + if (Utilities.registry.data[arguments[0]]) { + return Actions.add.apply(null, arguments); + } else { + Utilities.logger("invalid job name: " + arguments[0] || "not specified"); + return false; + } +} + +// Cancel a job without removing it from MongoDB + +Jobs.cancel = function (jobId) { + check(jobId, String); + return Actions.cancel(jobId); +} + +// Start or stop the queue - intended for debugging and/or single server deployments + +Jobs.start = function (name) { + check(name, Match.OneOf(undefined, String, [String])) + Operator.manager.start(name); +} + +Jobs.stop = function (name) { + check(name, Match.OneOf(undefined, String, [String])) + Operator.manager.stop(name); +} + +// Get info on a job + +Jobs.get = function (jobId) { + check(jobId, String); + return Actions.get(jobId); +} + +// Run a job ahead of time + +Jobs.execute = function (jobId, callback, force) { + check(jobId, String) + check(force, Match.Optional(Boolean)) + + // 1. Get the job + var doc = Utilities.collection.findOne({ + _id: jobId, + state: { + $nin: ["success", "cancelled"] + } + }); + + // 1a. Ensure the job is legit + if (!doc) { + Utilities.logger("Unable to execute job. Job is completed or cannot be found."); + return false; + } + + // 2. Figure out the execution plan + if (typeof callback !== "undefined") { + if (typeof callback === "function") { + // business as usual + } else if (typeof callback === "boolean") { + force = true; + callback = undefined; + } else { + Utilities.logger("Execute call abandoned for " + jobId + " because of invalid callback"); + return false; + } + } + + // 3. Ensure the job is real + + if (Operator.manager.isAvailable(doc.name) || force) { + return Actions.execute(doc, callback, force) + } else { + Utilities.logger("Unable to execute job - queue is busy: " + doc.name + "/" + jobId); + return false; + } +} + +// Reschedule a job + +Jobs.reschedule = function (jobId, config) { + check(jobId, String) + check(config, { + date: Match.Maybe(Object), + in: Match.Maybe(Object), + on: Match.Maybe(Object), + priority: Match.Maybe(Number), + }) + + return Actions.reschedule(jobId, config); +} + +// Clear resolved jobs - or all of them + +Jobs.clear = function (state, name) { + check(state, Match.OneOf(undefined, String, [String])) + check(name, Match.Optional(String)) + + return Actions.clear(state, name); +} + +// Internals for debugging + +var JobsInternal = {} +JobsInternal.Actions = Actions; +JobsInternal.Utilities = Utilities; +JobsInternal.Operator = Operator; + +export { Jobs, JobsInternal } \ No newline at end of file diff --git a/server/imports/.DS_Store b/server/imports/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..edd9aabb22378bfdb6f4622613dbb0b491148d81 GIT binary patch literal 8196 zcmeHMO-vnC6h7xc%gg}ff)w67+VU*LRtx%QDcI6Bk3WS_Q_=_Jr&K!c%@igMGx%oS zgF-Qw>aJrsadCRIj?IjLLz+h(#G87cMlU-b> z115b8+b{xQ1lC4?KRz<#MBZ-QnZLW!Y0F8I^o#VgWx8G>@eM`_3O8*ol6WbVPNycF znUtq_Ik!X0p5mi^&6`XI{Q<|ZR{L7|q-o5Iv&v4#_B7KrvclNZ4N@5!GHu2Pwk z?uvC%zCh+>oJHs7n_CYw#G9MZIXZ+I&7|dguH-YSo8jc0(G|={V0g zI$u#2617&Xm!wmCzB6l3t1q)HWu?3KsLFuLPpnC@m}zKKl!52yR+%gNGfn#xbZ(=M5kB9p>xNFetV{0kuU)t=*Ci7H3nWnCar&6_z!?2NH4PWwB3y#Y@H)H;*WrEm z5I%>Sa0|YKCAbYgz)$cC{0hIp@9-x|C}S~}po%-O4tHS!58_cwVmo$VANJ!3OyK~Y z#ta&G7AJ5TXK@ba@jSkam+&&af>-b=zJYJzb-aNe;m7z1eu}s7D_rtV7g^!#vEZ}; z;BD%x^LP?xgU2&6vL0t+UERvl{|#rqE8%xmUhz=n)*W?w8uzsxPOe_)JvVqKlLUc$ zh6v`YFhgA9-Bct>tFano`$m)=B;pyf5`P9*(XvV`sg1{Cj8jMw!(H_>j1a{o{&=X$ zV=>ayBvv4Xu_u%!iCD@u`O;>kMJ67zLSK4NX_JVqY_nMENR6T}Ad>TKs zBH;iI;t&qwD3MSXBs4LLF6J;#WPA}X;7hAye3i)f7QXFsY?XRF0p|k1_o%ZD_428# zZ8`P?NmodFw_!NCUm{T8A2iDI|L%={|G!_{3C|fuAdJ92hyWILrMf!EaHAJG&$Z)J z&rs!s>n-{yOsL^1d|ssEgsXoTQa>KZk^wIIND?al`VRros_^?Ce*Xpc9^~dPYD?k~ literal 0 HcmV?d00001 diff --git a/server/imports/actions/add/generateDoc.js b/server/imports/actions/add/generateDoc.js new file mode 100644 index 0000000..290a0ff --- /dev/null +++ b/server/imports/actions/add/generateDoc.js @@ -0,0 +1,23 @@ +import { Utilities } from '../../utilities' + +var generateDoc = function (input) { + return { + name: input.name, + created: new Date(), + state: "pending", + due: function () { + return Utilities.helpers.generateDueDate(input.config) + }(), + priority: function () { + return Utilities.helpers.number(input.config.priority, "priority") || 0; + }(), + data: function () { + return input.config.state || {}; + }(), + arguments: function () { + return input.arguments || []; + }() + } +} + +export { generateDoc } \ No newline at end of file diff --git a/server/imports/actions/add/index.js b/server/imports/actions/add/index.js new file mode 100644 index 0000000..e00ae57 --- /dev/null +++ b/server/imports/actions/add/index.js @@ -0,0 +1,27 @@ +import { Utilities } from '../../utilities' +import { generateDoc } from './generateDoc.js' +import { processInput } from './processInput.js' + +var add = function () { + // 1. Convert arguments to array + prepare necessary data + var userEntry = processInput(arguments); + + // 2. Generate job document + var jobInfo = generateDoc(userEntry); + + // 3. Add to the database + var jobId = Utilities.collection.insert(jobInfo); + + // 4. Simulate the document (this saves us a database request) + var simulatedDoc = jobInfo; + simulatedDoc._id = jobId; + + // 5. Mission accomplished + if (typeof userEntry.config.callback === "function") { + return userEntry.config.callback(undefined, simulatedDoc); + } else { + return simulatedDoc; + } +} + +export { add } \ No newline at end of file diff --git a/server/imports/actions/add/processInput.js b/server/imports/actions/add/processInput.js new file mode 100644 index 0000000..999b695 --- /dev/null +++ b/server/imports/actions/add/processInput.js @@ -0,0 +1,42 @@ +var configIsPresent = function (input) { + var lastItem = input[input.length - 1], + lastItemIsConfig = false, + reservedWords = ["in", "on", "priority", "date", "data", "callback"]; + + if (typeof lastItem === "object") { + reservedWords.forEach(function (word) { + if (lastItem[word]) { + lastItemIsConfig = true; + } + }); + } + + return lastItemIsConfig; +} + +var processInput = function (args) { + var output = {}, + args = Array.prototype.slice.call(args); + + output.name = function () { + var name = args.shift(); + return name; + }() + + output.config = function () { + if (configIsPresent(args)) { + var config = args.pop(); + return config; + } else { + return {}; + } + }() + + output.arguments = function () { + return args; + }() + + return output; +} + +export { processInput } \ No newline at end of file diff --git a/server/imports/actions/cancel/index.js b/server/imports/actions/cancel/index.js new file mode 100644 index 0000000..cd17cf8 --- /dev/null +++ b/server/imports/actions/cancel/index.js @@ -0,0 +1,47 @@ +import { Utilities } from '../../utilities' + +var cancel = function (id, callback) { + check(id, String) + + var job = Utilities.collection.findOne(id); + + + if (job) { + if (job.state === "pending" || job.state === "failed") { + + var result = Utilities.collection.update(id, { + $set: { + state: "cancelled" + }, + $push: { + history: { + date: new Date(), + state: "cancelled" + } + } + }) + + if (callback) callback(undefined, result); + return result; + } else { + Utilities.logger([ + "Cancel failed for " + id, + "Job has completed successfully or is already cancelled" + ]) + + if (callback) callback(true, undefined); + + return false; + } + } else { + Utilities.logger([ + "Cancel failed for " + id, + "No such job found" + ]) + + if (callback) callback(true, undefined); + return false; + } +} + +export { cancel } \ No newline at end of file diff --git a/server/imports/actions/clear/index.js b/server/imports/actions/clear/index.js new file mode 100644 index 0000000..67bf314 --- /dev/null +++ b/server/imports/actions/clear/index.js @@ -0,0 +1,33 @@ +import { Utilities } from '../../utilities' + +var clear = function (state, name) { + action = { + state: { + $in: state || ["cancelled", "success"] + } + } + + if (typeof name === "string") { + action.name = name + } + + if (typeof name === "object") { + action.name = { + $in: name + } + } + + if (state === "*") { + action = { + state: { + $in: ["cancelled", "success", "pending", "failure"] + } + } + } + + var result = Utilities.collection.remove(action) + + return result; +} + +export { clear } \ No newline at end of file diff --git a/server/imports/actions/execute/index.js b/server/imports/actions/execute/index.js new file mode 100644 index 0000000..fdb71ad --- /dev/null +++ b/server/imports/actions/execute/index.js @@ -0,0 +1,31 @@ +import { Utilities } from '../../utilities' +import { process } from './process.js' + +var execute = function (job, callback) { + + // 1. Get the job Document + if (typeof job === "string") { + job = Utilities.collection.findOne({ + _id: job, + state: { + $nin: ["success", "cancelled"] + } + }); + } + + // 2. ... + if (typeof job === "object") { + if (typeof Utilities.registry.data[job.name]) { + var result = process(job, callback); + return result; + } else { + Utilities.logger("Jobs: Job not found in registry: " + doc.name); + return false; + } + } else { + Utilities.logger(["Job not valid or not found:", job]); + return false; + } +} + +export { execute } \ No newline at end of file diff --git a/server/imports/actions/execute/process.js b/server/imports/actions/execute/process.js new file mode 100644 index 0000000..875ade9 --- /dev/null +++ b/server/imports/actions/execute/process.js @@ -0,0 +1,36 @@ +import { Utilities } from '../../utilities' +import { toolbelt } from './toolbelt.js' +import { reschedule } from '../reschedule/' +import { Operator } from '../../operator' + +var process = function (doc, callback) { + // Goals: + // 1- Execute the job + // 2- Update the document in database + // 3- Capture the result (if any) + + try { + var Toolbelt = new toolbelt(doc); + var jobResult = Utilities.registry.data[doc.name].apply(Toolbelt, doc.arguments); + + if (typeof callback === "function") { + return callback(undefined, jobResult); + } else { + return jobResult; + } + } + + catch (e) { + var Toolbelt = new toolbelt(doc); + var failure = Toolbelt.failure(); + + Utilities.logger("job failed to run: " + doc.name) + console.log(e); + + if (typeof callback === "function") { + return callback(true, undefined); + } + } +} + +export { process } \ No newline at end of file diff --git a/server/imports/actions/execute/toolbelt.js b/server/imports/actions/execute/toolbelt.js new file mode 100644 index 0000000..be20a46 --- /dev/null +++ b/server/imports/actions/execute/toolbelt.js @@ -0,0 +1,96 @@ +import { Utilities } from '../../utilities' +import { Operator } from '../../operator' +import { reschedule } from '../reschedule/' + +var toolbelt = function (jobDoc) { + this.doc = jobDoc; + + this.set = function (key, value) { + check(key, String) + + if (key.indexOf()) + var docId = this.doc._id; + + var patch = {} + patch["data." + key] = value; + + var doc = Utilities.collection.update(docId, { + $set: patch + }) + + return doc; + } + + this.get = function (key) { + check(key, String) + + var docId = this.doc._id; + var result = null; + + // Get the latest doc + latestDoc = Utilities.collection.findOne(docId); + + // Update the cached doc while we're at it + this.doc = latestDoc; + + // Return the result + if (latestDoc) { + if (latestDoc.data) { + result = latestDoc.data; + + if (key) { + result = latestDoc.data.key || null; + } + } else { + console.log("WTF") + } + } + return result; + } + + this.success = function (result) { + var docId = this.doc._id; + + Utilities.collection.update(docId, { + $set: { + state: "success", + }, + $push: { + history: { + date: new Date(), + state: "success", + server: Operator.dominator.serverId, + result: result + } + } + }) + } + + this.failure = function (result) { + var docId = this.doc._id; + + var update = Utilities.collection.update(docId, { + $set: { + state: "failure", + }, + $push: { + history: { + date: new Date(), + state: "failure", + server: Operator.dominator.serverId, + result: result + } + } + }) + + return update; + } + + this.reschedule = function (config) { + var docId = this.doc._id; + var newDate = reschedule(docId, config); + return newDate; + } +} + +export { toolbelt } \ No newline at end of file diff --git a/server/imports/actions/get/index.js b/server/imports/actions/get/index.js new file mode 100644 index 0000000..514fe08 --- /dev/null +++ b/server/imports/actions/get/index.js @@ -0,0 +1,7 @@ +import { Utilities } from '../../utilities' + +var get = function (id) { + return Utilities.collection.findOne(id); +} + +export { get } \ No newline at end of file diff --git a/server/imports/actions/index.js b/server/imports/actions/index.js new file mode 100644 index 0000000..5c14c84 --- /dev/null +++ b/server/imports/actions/index.js @@ -0,0 +1,17 @@ +import { add } from './add' +import { get } from './get' +import { clear } from './clear' +import { cancel } from './cancel' +import { execute } from './execute' +import { reschedule } from './reschedule' + +var Actions = { + add: add, + get: get, + clear: clear, + cancel: cancel, + execute: execute, + reschedule: reschedule +} + +export { Actions } \ No newline at end of file diff --git a/server/imports/actions/reschedule/index.js b/server/imports/actions/reschedule/index.js new file mode 100644 index 0000000..fcc8c3e --- /dev/null +++ b/server/imports/actions/reschedule/index.js @@ -0,0 +1,58 @@ +import { Utilities } from '../../utilities' + +getJob = function (jobId) { + var jobDoc = Utilities.collection.findOne(jobId); + + // Second, check if its possible to reschedule the document + if (!jobDoc) { + Utilities.logger("no such job found: " + jobId); + return false; + } else if (['success', 'cancelled'].indexOf(jobDoc.state) >= 0) { + Utilities.logger('job has already completed: ' + jobId); + return false; + } else { + return jobDoc; + } +} + +reschedule = function (jobId, config) { + // First, check if the doc is available + var jobDoc = getJob(jobId); + + if (!jobDoc) { + return false; + } + + // Second, prepare the update + var jobUpdate = { + $set: {}, + $push: { + history: { + date: new Date(), + type: "reschedule", + } + } + } + + // Third, prepare the priority, if any + var determineNewPriority = function () { + if (config.priority) { + var val = Utilities.number(config.priority, "priority") || 0; + jobUpdate.$set.priority = val; + jobUpdate.$push.history.newPriority = val; + } + }(); + + // Fourth, create the new date, if any + var newDueDate = function () { + var val = Utilities.helpers.generateDueDate(config); + jobUpdate.$set.due = val; + jobUpdate.$push.history.newDue = val; + }(); + + // Finally, run the update + var update = Utilities.collection.update(jobId, jobUpdate) + return update; +} + +export { reschedule } \ No newline at end of file diff --git a/server/imports/operator/dominator/README.md b/server/imports/operator/dominator/README.md new file mode 100644 index 0000000..ac94f71 --- /dev/null +++ b/server/imports/operator/dominator/README.md @@ -0,0 +1,15 @@ +# Jobs - Operator.dominator + +This object ensures that only one server is processing jobs + +How it works +1. Each server is automatically given an id +2. Every 5 seconds, the server checks to see if it is active +3. If it is active, it updates a MongoDB document with its id and a timestamp +4. If it is not active, it checks to see when a server was last active +4a. If the last active timestamp was greater than `activityGap`, the server who discovered this will step up to manage +4a. If the last active timestamp was less than `activeDelay`, the server will not do anything + +These checks happen every 5 seconds, or as as defined with `timer` in `runner.js`. + +`activityGap` refers to how much time is permitted between the last timestamp update and current time. diff --git a/server/imports/operator/dominator/index.js b/server/imports/operator/dominator/index.js new file mode 100644 index 0000000..dd07203 --- /dev/null +++ b/server/imports/operator/dominator/index.js @@ -0,0 +1,56 @@ +import { Mongo } from 'meteor/mongo' +import { Random } from 'meteor/random' +import { Utilities } from '../../utilities/' + +var dominator = { + collection: new Mongo.Collection("jobs_dominator"), + serverId: function () { + return Random.id(); + }(), + isActive: function () { + var self = this; + + if (Meteor.isDevelopment) { + return self.setAsActive(); + } + + var doc = self.collection.findOne({}, { + sort: { + lastPing: 1, + } + }); + + if (!doc || doc.serverId === self.serverId) { + return self.setAsActive(); + } else { + var timeGap = new Date () - doc.lastPing; + var timeSpacer = Utilities.config.maxWait; + + if (timeGap >= timeSpacer) { + return self.setAsActive() + } + } + }, + setAsActive: function () { + var self = this; + + var result = self.collection.upsert({ + serverId: self.serverId + }, { + $set: { + lastPing: new Date(), + }, + $setOnInsert: { + created: new Date() + } + }) + + return result; + }, + reset: function () { + self = this; + self.serverId = Random.id(); + } +} + +export { dominator } \ No newline at end of file diff --git a/server/imports/operator/index.js b/server/imports/operator/index.js new file mode 100644 index 0000000..833b208 --- /dev/null +++ b/server/imports/operator/index.js @@ -0,0 +1,12 @@ +import { queue } from './queue/' +import { dominator } from './dominator/' +import { startup } from './startup/' +import { manager } from './manager/' + +Operator = { + dominator: dominator, + queue: queue, + manager: manager +} + +export { Operator } \ No newline at end of file diff --git a/server/imports/operator/manager/index.js b/server/imports/operator/manager/index.js new file mode 100644 index 0000000..8d80e29 --- /dev/null +++ b/server/imports/operator/manager/index.js @@ -0,0 +1,51 @@ +import { queue } from "../queue" + +var manager = {} + +manager.queues = {} + +manager.add = function (name) { + manager.queues[name] = new queue(name) +} + +manager.start = function (name) { + var action = function (queue) { + manager.queues[queue].start() + } + + if (typeof name === "string") { + action(name); + } else if (typeof name === "object") { + name.forEach(function (item) { + action(item); + }); + } else { + Object.keys(manager.queues).forEach(function (item) { + action(item); + }); + } +} + +manager.stop = function (name) { + var action = function (queue) { + manager.queues[queue].stop() + } + + if (typeof name === "string") { + action(name); + } else if (typeof name === "object") { + name.forEach(function (item) { + action(item); + }); + } else { + Object.keys(manager.queues).forEach(function (item) { + action(item); + }); + } +} + +manager.isAvailable = function (name) { + return manager.queues[name].available; +} + +export { manager, start, stop } \ No newline at end of file diff --git a/server/imports/operator/queue/README.md b/server/imports/operator/queue/README.md new file mode 100644 index 0000000..4fa4537 --- /dev/null +++ b/server/imports/operator/queue/README.md @@ -0,0 +1,19 @@ +# Jobs - Operator.queue + +This object ensures that only job is processed at a time (and on one server). It starts a new "runner" for each queue that you register. + +How it works +1. when a server starts up, it will run the start function after a slight delay +2. When the start function runs, it'll start an interval timer for every X seconds (5 is default) +3. When the interval timer runs, it will check that the server is active via `control.js` +4. If the server is active, it will look for jobs to run + +How jobs are ran +1. first, the queue will check if there are failed jobs. if there are, it will try to run them again +2. after the failed jobs are processed, it will check for new jobs and run them +3. jobs are based on whichever comes first (and ultimately, whatever MongoDB returns to us) + +When a job is running, JobsRunner will ignore the setInterval calls. +When a job completes, Jobs will check if there are more jobs to run. +If there are more jobs to run, Jobs will continue to run them consecutively. +If there are no more jobs to run, Jobs will go back to polling with setInterval \ No newline at end of file diff --git a/server/imports/operator/queue/index.js b/server/imports/operator/queue/index.js new file mode 100644 index 0000000..8be8145 --- /dev/null +++ b/server/imports/operator/queue/index.js @@ -0,0 +1,122 @@ +import { Meteor } from 'meteor/meteor'; +import { Utilities } from '../../utilities/' +import { execute } from '../../actions/execute'; +import { dominator } from '../dominator'; + +/* + This object ensures that only job is processed at a time (and on one server) + It starts a new runner for each queue that you register + Queues can be accessed with Jobs.queue + + How it works + 1. when a server starts up, it will run the start function after a slight delay + 2. When the start function runs, it'll start an interval timer for every X seconds (5 is default) + 3. When the interval timer runs, it will check that the server is active via `control.js` + 4. If the server is active, it will look for jobs to run + + How jobs are ran + 1. first, the queue will check if there are failed jobs. if there are, it will try to run them again + 2. after the failed jobs are processed, it will check for new jobs and run them + 3. jobs are based on whichever comes first (and ultimately, whatever MongoDB returns to us) + + When a job is running, JobsRunner will ignore the setInterval calls. + When a job completes, Jobs will check if there are more jobs to run. + If there are more jobs to run, Jobs will continue to run them consecutively. + If there are no more jobs to run, Jobs will go back to polling with setInterval + +*/ + +var queue = function (name) { + this.name = name; + this.state = "failure"; + this.interval = null; + this.available = true; +} + +queue.prototype.start = function () { + var self = this; + + if (self.interval) { + Utilities.logger('Cannot start queue because it has already been started: ' + self.name); + return; + } + + var action = self.trigger.bind(self); + self.interval = Meteor.setInterval(action, Utilities.config.interval); +} + +queue.prototype.stop = function () { + var self = this; + + if (!self.interval) { + Utilities.logger('Cannot stop queue because it has already been stopped: ' + self.name); + return; + } + + self.state = "failure"; + self.interval = Meteor.clearInterval(self.interval); +} + +queue.prototype.trigger = function () { + var self = this; + + if (self.available === true) { + self.available = false; + + if (dominator.isActive()) { + self.run() + } else { + self.available = true; + } + } +} + +queue.prototype.grabDoc = function () { + var self = this; + var state = self.state; + var name = self.name; + + var jobDoc = Utilities.collection.findOne({ + name: name, + due: { + $lt: new Date() + }, + state: state, + history: { + $not: { + $elemMatch: { + state: "failure", + server: dominator.serverId + } + } + } + }, { + sort: { + due: 1, + priority: 1 + } + }); + + return jobDoc; +} + +queue.prototype.run = function () { + var self = this; + var jobDoc = self.grabDoc(); + + if (jobDoc) { + execute(jobDoc, function () { + self.available = true; + self.trigger() + }); + } else { + self.available = true; + + if (self.state === "failure") { + self.state = "pending"; + self.trigger(); + } + } +} + +export { queue } \ No newline at end of file diff --git a/server/imports/operator/startup/README.md b/server/imports/operator/startup/README.md new file mode 100644 index 0000000..08232f1 --- /dev/null +++ b/server/imports/operator/startup/README.md @@ -0,0 +1,5 @@ +# Jobs - Startup + +The startup function will wait 5 seconds for the code to load, and then it will start all queues. + +If a job is registered after the queue has been started, that job will be started automatically. \ No newline at end of file diff --git a/server/imports/operator/startup/index.js b/server/imports/operator/startup/index.js new file mode 100644 index 0000000..7d2478e --- /dev/null +++ b/server/imports/operator/startup/index.js @@ -0,0 +1,17 @@ +import { Meteor } from 'meteor/meteor' +import { Utilities } from '../../utilities/' +import { manager } from '../manager' + +// 1. Wait 5 seconds for all the code to initialize +// 2. Start Jobs is required + +Meteor.startup(function () { + Meteor.setTimeout(function () { + if (Utilities.config.autoStart) { + Meteor.setTimeout(function () { + manager.start(); + Utilities.config.started = true; + }, Utilities.config.startupDelay); + } + }, 5000) +}); \ No newline at end of file diff --git a/server/imports/utilities/.DS_Store b/server/imports/utilities/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..b10dd16c1ea84871bedcf5195d5049afb41caa1e GIT binary patch literal 6148 zcmeHKOHRW;47E#zT42#7%h|K)4WcNVpcg<&pdd9(7wv}C+=EMS1`feNI04UNt4hdJ zVues;S2J(oPsVu@#WfLed1#hI3nH3f2u|jhG9q>vBbk^+jGXTANHy*0g*w`fyj}4h z8IZHP!l`w%!LN6IoAq*4ckQm1bk?u!s;W8cYogEBuemq*5)*}q4DFP#Brz!^9T22is_X4i^7I|I&uGjL=;z7GLI zurQ2@>C=HJwE(~=)Jd?GUP5AmVPO~*VS%uQ0yUJa#b6DGKA2x&7!@^~*qRTvo!L4R zPP^m&A-NM5iat97&cKj?3%y)O{l7om{||%w${BD5&WZt^R;#MSBUx=7J)G3qfboDK oB7RZv7=lGA#qgC -1) { + Object.keys(action[key1]).forEach(function (key2) { + try { + newNumber = number(action[key1][key2]); + + if (typeof newNumber === "number") { + utilities[key1][key2](newNumber); + } else { + logger("invalid type was inputted: " + key1 + "." + key2); + } + } catch (e) { + logger("invalid argument was ignored: " + key1 + "." + key2); + } + }); + } + }); + + return currentDate; + } else { + console.log("invalid argument(s) date generator"); + } +} + +export { date } \ No newline at end of file diff --git a/server/imports/utilities/helpers/generateDueDate.js b/server/imports/utilities/helpers/generateDueDate.js new file mode 100644 index 0000000..52da5a4 --- /dev/null +++ b/server/imports/utilities/helpers/generateDueDate.js @@ -0,0 +1,24 @@ +import { logger } from '../logger' +import { date } from './date.js' + +generateDueDate = function (config) { + var due = new Date(); + + if (config && config.date) { + if (typeof config.date.getDate === "function") { + due = config.date; + } else { + logger("Invalid input for 'date' field. Used current date instead.") + } + } + + if (typeof config === "object") { + if (config.in || config.on) { + due = date(due, config); + } + } + + return due; +} + +export { generateDueDate } \ No newline at end of file diff --git a/server/imports/utilities/helpers/index.js b/server/imports/utilities/helpers/index.js new file mode 100644 index 0000000..fc14daa --- /dev/null +++ b/server/imports/utilities/helpers/index.js @@ -0,0 +1,11 @@ +import { generateDueDate } from './generateDueDate.js' +import { number } from './number.js' +import { date } from './date.js' + +var helpers = { + generateDueDate: generateDueDate, + number: number, + date: date +} + +export { helpers} \ No newline at end of file diff --git a/server/imports/utilities/helpers/number.js b/server/imports/utilities/helpers/number.js new file mode 100644 index 0000000..a73a5f7 --- /dev/null +++ b/server/imports/utilities/helpers/number.js @@ -0,0 +1,27 @@ +var number = function (input, note) { + if (typeof input === "undefined") { + return 0; + } + + if (typeof input === "function") { + input = input(); + } + + if (typeof input === "string") { + input = Number(input); + + if (isNaN(input)) { + console.log("Jobs: invalid input for " + note || "number"); + return 0 + } + } + + if (typeof input === "number") { + return input; + } else { + console.log("Jobs: invalid input for " + note || "number"); + return 0; + } +} + +export { number } \ No newline at end of file diff --git a/server/imports/utilities/index.js b/server/imports/utilities/index.js new file mode 100644 index 0000000..50201b0 --- /dev/null +++ b/server/imports/utilities/index.js @@ -0,0 +1,15 @@ +import { config } from './config' +import { logger } from './logger' +import { registry } from './registry' +import { helpers } from './helpers' +import { collection } from './collection' + +var Utilities = { + config: config, + collection: collection, + logger: logger, + registry: registry, + helpers: helpers +} + +export { Utilities } \ No newline at end of file diff --git a/server/imports/utilities/logger/index.js b/server/imports/utilities/logger/index.js new file mode 100644 index 0000000..b80021d --- /dev/null +++ b/server/imports/utilities/logger/index.js @@ -0,0 +1,21 @@ +var logAsString = function (message) { + console.log("Jobs: " + message); +} + +var logAsArray = function (messages) { + messages.forEach(function (message) { + console.log("Jobs: " + message); + }); +} + +var logger = function (messages) { + if (typeof messages === "string") { + logAsString(messages); + } else if (typeof messages === "object") { + logAsArray(messages); + } + + console.log("----") +} + +export { logger } \ No newline at end of file diff --git a/server/imports/utilities/registry/index.js b/server/imports/utilities/registry/index.js new file mode 100644 index 0000000..db6a2c5 --- /dev/null +++ b/server/imports/utilities/registry/index.js @@ -0,0 +1,9 @@ +var registry = {}; + +registry.data = {}; + +registry.add = function (job, jobs) { + registry.data[job] = jobs; +} + +export { registry } \ No newline at end of file