Skip to content

Commit

Permalink
Merge pull request #5814 from WoltLab/unique-background-job
Browse files Browse the repository at this point in the history
Unique background job
  • Loading branch information
Cyperghost authored Feb 22, 2024
2 parents 5e94ad3 + 453e575 commit 295ce35
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 11 deletions.
11 changes: 11 additions & 0 deletions wcfsetup/install/files/acp/database/update_com.woltlab.wcf_6.1.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use wcf\system\database\table\column\DefaultFalseBooleanDatabaseTableColumn;
use wcf\system\database\table\column\NotNullInt10DatabaseTableColumn;
use wcf\system\database\table\column\NotNullVarchar191DatabaseTableColumn;
use wcf\system\database\table\column\VarcharDatabaseTableColumn;
use wcf\system\database\table\DatabaseTable;
use wcf\system\database\table\index\DatabaseTableForeignKey;
use wcf\system\database\table\index\DatabaseTableIndex;
Expand Down Expand Up @@ -41,5 +42,15 @@
DatabaseTableIndex::create('messageEmbeddedObject')
->type(DatabaseTableIndex::UNIQUE_TYPE)
->columns(['messageObjectTypeID', 'messageID', 'embeddedObjectTypeID', 'embeddedObjectID']),
]),
PartialDatabaseTable::create('wcf1_background_job')
->columns([
VarcharDatabaseTableColumn::create('identifier')
->length(191)
->defaultValue(null),
])
->indices([
DatabaseTableIndex::create('identifier')
->columns(['identifier']),
])
];
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use wcf\data\user\User;
use wcf\system\background\job\AbstractBackgroundJob;
use wcf\system\background\job\AbstractUniqueBackgroundJob;
use wcf\system\exception\ParentClassException;
use wcf\system\session\SessionHandler;
use wcf\system\SingletonFactory;
Expand Down Expand Up @@ -70,24 +71,51 @@ public function enqueueAt(AbstractBackgroundJob|array $jobs, int $time): void
if (!\is_array($jobs)) {
$jobs = [$jobs];
}

foreach ($jobs as $job) {
if (!($job instanceof AbstractBackgroundJob)) {
throw new ParentClassException(\get_class($job), AbstractBackgroundJob::class);
}
}

WCF::getDB()->beginTransaction();
$sql = "INSERT INTO wcf1_background_job
(job, time)
VALUES (?, ?)";
$statement = WCF::getDB()->prepare($sql);
foreach ($jobs as $job) {
$statement->execute([
\serialize($job),
$time,
]);
$committed = false;
try {
WCF::getDB()->beginTransaction();
$sql = "INSERT INTO wcf1_background_job
(job, time,identifier)
VALUES (?, ?, ?)";
$statement = WCF::getDB()->prepare($sql);
$sql = "SELECT jobID
FROM wcf1_background_job
WHERE identifier = ?
FOR UPDATE";
$selectJobStatement = WCF::getDB()->prepare($sql);

foreach ($jobs as $job) {
$identifier = null;
if ($job instanceof AbstractUniqueBackgroundJob) {
// Check if the job is already in the queue
$selectJobStatement->execute([$job->identifier()]);
$jobID = $selectJobStatement->fetchSingleColumn();
if ($jobID !== false) {
continue;
}
$identifier = $job->identifier();
}

$statement->execute([
\serialize($job),
$time,
$identifier
]);
}
WCF::getDB()->commitTransaction();
$committed = true;
} finally {
if (!$committed) {
WCF::getDB()->rollBackTransaction();
}
}
WCF::getDB()->commitTransaction();
}

/**
Expand Down Expand Up @@ -212,6 +240,9 @@ public function performNextJob(): bool
$statement = WCF::getDB()->prepare($sql);
$statement->execute([$row['jobID']]);
}
if ($job instanceof AbstractUniqueBackgroundJob && $job->queueAgain()) {
$this->enqueueIn($job->newInstance(), $job->retryAfter());
}

return true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

namespace wcf\system\background\job;

/**
* This background job is only queued once
* and is requeued when it has more work to do.
*
* @author Olaf Braun
* @copyright 2001-2024 WoltLab GmbH
* @license GNU Lesser General Public License <http://opensource.org/licenses/lgpl-license.php>
* @since 6.1
*/
abstract class AbstractUniqueBackgroundJob extends AbstractBackgroundJob
{
/**
* @inheritDoc
*/
final public const MAX_FAILURES = 0;

/**
* Returns a unique identifier for this job.
*/
public function identifier(): string
{
return static::class;
}

/**
* Returns a new instance of this job to be queued again.
* This will reset the fail counter.
*/
public function newInstance(): static
{
return new static();
}

/**
* Returns whether this job should be queued again because it has more to do.
*/
abstract public function queueAgain(): bool;

#[\Override]
final public function onFinalFailure()
{
// onFailure() and onFinalFailure() are called at the same time.
// Do your stuff in onFailure().
}

#[\Override]
public function retryAfter()
{
// change the default value to 60 seconds
return 60;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use wcf\data\cronjob\Cronjob;
use wcf\system\background\BackgroundQueueHandler;
use wcf\system\background\job\AbstractUniqueBackgroundJob;
use wcf\system\database\util\PreparedStatementConditionBuilder;
use wcf\system\WCF;

Expand All @@ -27,6 +28,8 @@ public function execute(Cronjob $cronjob)
WCF::getDB()->beginTransaction();
/** @noinspection PhpUnusedLocalVariableInspection */
$committed = false;
/** @var AbstractUniqueBackgroundJob[] $uniqueJobs */
$uniqueJobs = [];
try {
$sql = "SELECT jobID, job
FROM wcf" . WCF_N . "_background_job
Expand All @@ -52,6 +55,12 @@ public function execute(Cronjob $cronjob)

if ($job->getFailures() <= $job::MAX_FAILURES) {
BackgroundQueueHandler::getInstance()->enqueueIn($job, $job->retryAfter());
} else {
$job->onFinalFailure();

if ($job instanceof AbstractUniqueBackgroundJob) {
$uniqueJobs[] = $job;
}
}
}
} catch (\Exception $e) {
Expand Down Expand Up @@ -82,5 +91,12 @@ public function execute(Cronjob $cronjob)
WCF::getDB()->rollBackTransaction();
}
}

// Requeue unique jobs if needed
foreach ($uniqueJobs as $job) {
if ($job->queueAgain()) {
BackgroundQueueHandler::getInstance()->enqueueIn($job->newInstance(), $job->retryAfter());
}
}
}
}
3 changes: 3 additions & 0 deletions wcfsetup/setup/db/install.sql
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ CREATE TABLE wcf1_background_job (
job MEDIUMBLOB NOT NULL,
status ENUM('ready', 'processing') NOT NULL DEFAULT 'ready',
time INT(10) NOT NULL,
identifier VARCHAR(191) NULL,

KEY identifier (identifier),
KEY (status, time)
);

Expand Down

0 comments on commit 295ce35

Please sign in to comment.