Skip to content

Commit

Permalink
Added support for async functions, made job success implicit, and aut…
Browse files Browse the repository at this point in the history
…omatically storing function result to database
  • Loading branch information
msavin committed Mar 19, 2020
1 parent 06361bf commit b866938
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 50 deletions.
8 changes: 2 additions & 6 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ Jobs.register({
sendEmail: function (to, content) {
var send = Magic.sendEmail(to, content);

if (send) {
this.success()
} else {
if (!send) {
this.reschedule({
in: {
minutes: 5
Expand All @@ -97,7 +95,6 @@ Jobs.register({
})

if (doc) {
this.success(doc);
this.remove();
} else {
this.reschedule({
Expand All @@ -106,15 +103,14 @@ Jobs.register({
}
})
}
}

})
```
Each job is binded with a set of functions to give you maximum control over how the job runs:
- `this.document` - access job document
- `this.set(key, value)` - set a persistent key/value pair
- `this.get(key)` - get a persistent value from key
- `this.success(result)` - tell the queue the job is completed, and attach an optional result
- `this.failure(result)` - tell the queue the job failed, and attach an optional result
- `this.reschedule(config)` - tell the queue to schedule the job for a future date
- `this.remove()` - remove the job from the queue
Expand Down
26 changes: 13 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ Run scheduled tasks with Steve Jobs, the simple jobs queue made just for Meteor.

- Jobs run on one server at a time
- Jobs run predictably and consecutively
- Jobs and their history are stored in MongoDB
- Jobs, their history and returned data are stored in MongoDB
- Failed jobs are retried on server restart
- No third party dependencies

**The new 3.1 features repeating jobs and more improvements.** It can run hundreds of jobs in seconds with minimal CPU impact, making it a reasonable choice for many applications. To get started, check out the <a href="https://github.com/msavin/SteveJobs..meteor.jobs.scheduler.queue.background.tasks/blob/master/DOCUMENTATION.md">**documentation**</a> and the <a href="#quick-start">**quick start**</a> below.
**The new 4.0 features repeating jobs, async support and more!** It can run hundreds of jobs in seconds with minimal CPU impact, making it a reasonable choice for many applications. To get started, check out the <a href="https://github.com/msavin/SteveJobs..meteor.jobs.scheduler.queue.background.tasks/blob/master/DOCUMENTATION.md">**documentation**</a> and the <a href="#quick-start">**quick start**</a> below.

## Developer Friendly GUI and API

Expand Down Expand Up @@ -43,21 +43,22 @@ Then, write your background jobs like you would write your methods:
```javascript
Jobs.register({
"sendReminder": function (to, message) {
var instance = this;
const instance = this;

var call = HTTP.put("http://www.magic.com/sendEmail", {
const call = HTTP.put("http://www.magic.com/email/send", {
to: to,
message: message
message: message,
subject: "You've Got Mail!",
})

if (call.statusCode === 200) {
instance.success(call.result);
} else {
if (call.statusCode !== 200) {
instance.reschedule({
in: {
minutes: 5
}
});
} else {
return call.data;
}
}
});
Expand Down Expand Up @@ -93,9 +94,8 @@ Compared to a CRON Job, the Steve Jobs package gives you much more control over
```javascript
Jobs.register({
"syncData": function () {
var instance = this;

var call = HTTP.put("http://www.magic.com/syncData")
const instance = this;
const call = HTTP.put("http://www.magic.com/syncData")

if (call.statusCode === 200) {
instance.replicate({
Expand All @@ -104,8 +104,8 @@ Jobs.register({
}
});

// alternatively, you can use instance.remove to save storage
instance.success(call.result);
// to save storage, you can remove the document
instance.remove();
} else {
instance.reschedule({
in: {
Expand Down
4 changes: 2 additions & 2 deletions package/server/imports/actions/execute/index.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { Utilities } from "../../utilities"
import { process } from "./process.js"

var execute = function (job, callback) {
var execute = async function (job, callback) {
var jobDoc = Utilities.helpers.getJob(job, {
allow: ["pending", "failure"],
message: "Job is not valid or not found, or is already resolved:"
});

if (typeof jobDoc === "object") {
if (typeof Utilities.registry.data[jobDoc.name]) {
var result = process(jobDoc, callback);
var result = await process(jobDoc, callback);
return result;
} else {
Utilities.logger("Jobs: Job not found in registry: " + jobDoc.name);
Expand Down
15 changes: 12 additions & 3 deletions package/server/imports/actions/execute/process.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,26 @@ import { Utilities } from "../../utilities"
import { toolbelt } from "./toolbelt.js"
import { reschedule } from "../reschedule/"

var process = function (doc, callback) {
var process = async function (doc, callback) {
// Goals:
// 1- Execute the job
// 2- Update the document in database
// 3- Capture the result (if any)

var Toolbelt = new toolbelt(doc);
var jobResult;
var jobFunc = Utilities.registry.data[doc.name];
var isAsync = jobFunc.constructor.name === "AsyncFunction";

try {
var jobResult = Utilities.registry.data[doc.name].apply(Toolbelt, doc.arguments);
var resolution = Toolbelt.checkForResolution();
if (isAsync) {
jobResult = await jobFunc.apply(Toolbelt, doc.arguments);
} else {
jobResult = jobFunc.apply(Toolbelt, doc.arguments);
}


var resolution = Toolbelt.checkForResolution(jobResult);

if (typeof callback === "function") {
return callback(undefined, jobResult);
Expand Down
29 changes: 3 additions & 26 deletions package/server/imports/actions/execute/toolbelt.js
Original file line number Diff line number Diff line change
Expand Up @@ -223,35 +223,12 @@ var toolbelt = function (jobDoc) {
return update;
}

this.checkForResolution = function () {
this.checkForResolution = function (result) {
var docId = this.document._id;
var queueName = this.document.name;
var resolution = this.resolved;

if (!resolution) {
Utilities.logger([
"Job was not successfully terminated: " + queueName + ", " + docId,
"Every job must be resolved with this.success(), this.failure(), this.reschedule(), or this.remove()",
"Queue was stopped; please re-write your function and re-start the server"
]);

Operator.manager.queues[queueName].stop();

var update = Utilities.collection.update(docId, {
$set: {
state: "failure",
},
$push: {
history: {
date: new Date(),
state: "unresolved",
serverId: Utilities.config.getServerId()
}
}
})

return false;
}

if (!resolution) this.success(result)
}
}

Expand Down

0 comments on commit b866938

Please sign in to comment.