Skip to content

Commit

Permalink
ix: prevent job execution on server restart when resumeOnRestart is f…
Browse files Browse the repository at this point in the history
…alse
  • Loading branch information
code-xhyun committed Aug 14, 2024
1 parent c269cb6 commit bc9e8bb
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 6 deletions.
23 changes: 18 additions & 5 deletions src/job/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ export const run: RunMethod = async function (this: Job) {

return new Promise(async (resolve, reject) => {
this.attrs.lastRunAt = new Date();
this.attrs.runCount = (this.attrs.runCount || 0) + 1;

const previousRunAt = this.attrs.nextRunAt;
debug('[%s:%s] setting lastRunAt to: %s', this.attrs.name, this.attrs._id, this.attrs.lastRunAt.toISOString());
this.computeNextRunAt();
await this.save();

let finished = false;
let resumeOnRestartSkipped = false;
const jobCallback = async (error?: Error, result?: unknown) => {
// We don't want to complete the job multiple times
if (finished) {
Expand All @@ -33,11 +35,13 @@ export const run: RunMethod = async function (this: Job) {
if (error) {
this.fail(error);
} else {
this.attrs.lastFinishedAt = new Date();
this.attrs.finishedCount = (this.attrs.finishedCount || 0) + 1;
if (!resumeOnRestartSkipped) {
this.attrs.lastFinishedAt = new Date();
this.attrs.finishedCount = (this.attrs.finishedCount || 0) + 1;

if (this.attrs.shouldSaveResult && result) {
this.attrs.result = result;
if (this.attrs.shouldSaveResult && result) {
this.attrs.result = result;
}
}
}

Expand Down Expand Up @@ -81,6 +85,15 @@ export const run: RunMethod = async function (this: Job) {
throw new JobError('Undefined job');
}

if (!this.pulse._resumeOnRestart && previousRunAt && this.pulse._readyAt >= previousRunAt) {
debug('[%s:%s] job resumeOnRestart skipped', this.attrs.name, this.attrs._id);
resumeOnRestartSkipped = true;
await jobCallback(undefined, 'skipped');
return;
}

this.attrs.runCount = (this.attrs.runCount || 0) + 1;

if (definition.fn.length === 2) {
debug('[%s:%s] process function being called', this.attrs.name, this.attrs._id);
await definition.fn(this, jobCallback);
Expand Down
2 changes: 2 additions & 0 deletions src/pulse/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class Pulse extends EventEmitter {
_collection!: Collection;
_nextScanAt: any;
_processInterval: any;
_readyAt: Date;

/**
* Constructs a new Pulse object.
Expand Down Expand Up @@ -143,6 +144,7 @@ class Pulse extends EventEmitter {
this._ready = new Promise((resolve) => {
this.once('ready', resolve);
});
this._readyAt = new Date();

this.init(config, cb);
}
Expand Down
9 changes: 8 additions & 1 deletion src/pulse/resume-on-restart.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res
.updateMany(
{
$or: [
{ lockedAt: { $exists: true }, lastFinishedAt: { $exists: false } },
{
lockedAt: { $exists: true },
$expr: { $eq: ['$runCount', '$finishedCount'] },
},
{
lockedAt: { $exists: true },
lastFinishedAt: { $exists: false },
},
{
$and: [
{ lockedAt: { $exists: false } },
Expand Down

0 comments on commit bc9e8bb

Please sign in to comment.