Skip to content

Commit

Permalink
Merge pull request #43 from tractorcow/pulls/job-recovery-stability
Browse files Browse the repository at this point in the history
API Better restarting of stalled jobs (when StepsProcessed == 0)
  • Loading branch information
nyeholt committed May 22, 2015
2 parents aaf60b6 + 609b55e commit 6036a98
Show file tree
Hide file tree
Showing 5 changed files with 437 additions and 114 deletions.
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,38 @@ _must_ detect whether they're present or not before using them. See [this issue]
and [this wiki page](https://github.com/silverstripe-australia/silverstripe-queuedjobs/wiki/Defining-queued-jobs) for
more information

Ensure that notifications are configured so that you can get updates or stalled or broken jobs. You can
set the notification email address in your config as below:


:::yaml
Email:
queued_job_admin_email: [email protected]

## Performance configuration

By default this task will run until either 128mb or the limit specified by php_ini('memory_limit') is reached.

You can adjust this with the below config change


:::yaml
# Force memory limit to 256 megabytes
QueuedJobsService:
# Accepts b, k, m, or b suffixes
memory_limit: 256m


You can also enforce a time limit for each queue, after which the task will attempt a restart to release all
resources. By default this is disabled, so you must specify this in your project as below:


:::yaml
# Force limit to 10 minutes
QueuedJobsService:
time_limit: 600


## Indexes

ALTER TABLE `QueuedJobDescriptor` ADD INDEX ( `JobStatus` , `JobType` )
Expand Down
62 changes: 55 additions & 7 deletions code/dataobjects/QueuedJobDescriptor.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,23 @@
* this will determine which queue it is placed within so that some shorter jobs can execute immediately without needing
* to wait for a potentially long running job.
*
* @property string $JobTitle Name of job
* @property string $Signature Unique identifier for this job instance
* @property string $Implementation Classname of underlying job
* @property string $StartAfter Don't start until this date, if set
* @property string $JobStarted When this job was started
* @property string $JobFinished When this job was finished
* @property int $TotalSteps Number of steps
* @property int $StepsProcessed Number of completed steps
* @property int $LastProcessedCount Number at which StepsProcessed was last checked for stalled jobs
* @property int $ResumeCounts Number of times this job has been resumed
* @property string $SavedJobData serialised data for the job to use as storage
* @property string $SavedJobMessages List of messages saved for this job
* @property string $JobStatus Status of this job
* @property string $JobType Type of job
*
* @method Member RunAs() Member to run this job as
*
* @author Marcus Nyeholt <[email protected]>
* @license BSD http://silverstripe.org/bsd-license/
*/
Expand All @@ -22,7 +39,7 @@ class QueuedJobDescriptor extends DataObject {
'JobFinished' => 'SS_Datetime',
'TotalSteps' => 'Int',
'StepsProcessed' => 'Int',
'LastProcessedCount' => 'Int',
'LastProcessedCount' => 'Int(-1)', // -1 means never checked, 0 means checked but no work is done
'ResumeCounts' => 'Int',
'SavedJobData' => 'Text',
'SavedJobMessages' => 'Text',
Expand All @@ -37,6 +54,7 @@ class QueuedJobDescriptor extends DataObject {
private static $defaults = array(
'JobStatus' => 'New',
'ResumeCounts' => 0,
'LastProcessedCount' => -1 // -1 means never checked, 0 means checked and none were processed
);

private static $indexes = array(
Expand Down Expand Up @@ -74,21 +92,47 @@ public function summaryFields() {
);
return $columns;
}

public function pause() {
if ($this->JobStatus == QueuedJob::STATUS_WAIT || $this->JobStatus == QueuedJob::STATUS_RUN || $this->JobStatus == QueuedJob::STATUS_INIT) {

/**
* Pause this job, but only if it is waiting, running, or init
*
* @param bool $force Pause this job even if it's not waiting, running, or init
* @return bool Return true if this job was paused
*/
public function pause($force = false) {
if($force || in_array(
$this->JobStatus,
array(QueuedJob::STATUS_WAIT, QueuedJob::STATUS_RUN, QueuedJob::STATUS_INIT)
)) {
$this->JobStatus = QueuedJob::STATUS_PAUSED;
$this->write();
return true;
}
return false;
}

public function resume() {
if ($this->JobStatus == QueuedJob::STATUS_PAUSED || $this->JobStatus == QueuedJob::STATUS_BROKEN) {
/**
* Resume this job and schedules it for execution
*
* @param bool $force Resume this job even if it's not paused or broken
* @return bool Return true if this job was resumed
*/
public function resume($force = false) {
if($force || in_array($this->JobStatus, array(QueuedJob::STATUS_PAUSED, QueuedJob::STATUS_BROKEN))) {
$this->JobStatus = QueuedJob::STATUS_WAIT;
$this->ResumeCounts++;
$this->write();
singleton('QueuedJobService')->startJob($this);
return true;
}
return false;
}

/**
* Restarts this job via a forced resume
*/
public function restart() {
$this->resume(true);
}

/**
Expand All @@ -97,13 +141,17 @@ public function resume() {
*/
public function activateOnQueue() {
// if it's an immediate job, lets cache it to disk to be picked up later
if ($this->JobType == QueuedJob::IMMEDIATE && !Config::inst()->get('QueuedJobService', 'use_shutdown_function')) {
if ($this->JobType == QueuedJob::IMMEDIATE
&& !Config::inst()->get('QueuedJobService', 'use_shutdown_function')
) {
touch($this->getJobDir() . '/' . 'queuedjob-' . $this->ID);
}
}

/**
* Gets the path to the queuedjob cache directory
*
* @return string
*/
protected function getJobDir() {
// make sure our temp dir is in place. This is what will be inotify watched
Expand Down
Loading

0 comments on commit 6036a98

Please sign in to comment.