Skip to content

Commit

Permalink
Use Celery task protocol version 2
Browse files Browse the repository at this point in the history
By using the newer protocol, helps ensure that celery-php remains
forward compatible.

For details on the protocol, see the Celery docs:

http://docs.celeryproject.org/en/latest/internals/protocol.html

The protocol was first introduce with Celery 4.0.

http://docs.celeryproject.org/en/latest/history/whatsnew-4.0.html#new-task-message-protocol
  • Loading branch information
jdufresne committed Nov 22, 2018
1 parent 64e67a7 commit 869e946
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 67 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
results exchange.
- The `Celery` constructor no longer accepts the argument
`persistent_messages`. It was previously unused.
- celery-php now uses Celery task protocol version 2 and requires Celery 4.0+.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Last PHP-amqplib version tested is 2.5.1.

Last predis version tested is 1.0.1.

Tested on Celery 4.0+.
Requires Celery 4.0+.

[API documentation](https://massivescale.net/celery-php/li_celery-php.html) is dead, [help wanted](https://github.com/gjedeer/celery-php/issues/82)

Expand Down
8 changes: 3 additions & 5 deletions src/AMQPLibConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public function Connect($connection)
{
}

public function PostToExchange($connection, $details, $task, $params)
public function PostToExchange($connection, $details, $body, $properties, $headers)
{
$ch = $connection->channel();

Expand All @@ -106,10 +106,8 @@ public function PostToExchange($connection, $details, $task, $params)
$details['exchange'] /* exchange name - "celery" */
);

$msg = new \PhpAmqpLib\Message\AMQPMessage(
$task,
$params
);
$properties['application_headers'] = new \PhpAmqpLib\Wire\AMQPTable($headers);
$msg = new \PhpAmqpLib\Message\AMQPMessage($body, $properties);

$ch->basic_publish($msg, $details['exchange'], $details['routing_key']);

Expand Down
7 changes: 4 additions & 3 deletions src/AbstractAMQPConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,12 @@ abstract public function Connect($connection);
* Post a task to exchange specified in $details
* @param AMQPConnection $connection Connection object
* @param array $details Array of connection details
* @param string $task JSON-encoded task
* @param array $params AMQP message parameters
* @param string $body JSON-encoded task body
* @param array $properties AMQP message properties
* @param array $headers Celery task headers
* @return bool true if posted successfuly
*/
abstract public function PostToExchange($connection, $details, $task, $params);
abstract public function PostToExchange($connection, $details, $body, $properties, $headers);

/**
* Return result of task execution for $task_id
Expand Down
50 changes: 28 additions & 22 deletions src/CeleryAbstract.php
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ public static function InitializeAMQPConnection($details)
* @param array $args Array of arguments (kwargs call when $args is associative)
* @param bool $async_result Set to false if you don't need the AsyncResult object returned
* @param string $routing_key Set to routing key name if you're using something other than "celery"
* @param array $task_args Additional settings for Celery - normally not needed
* @param array $options Additional settings for Celery - normally not needed
* @return AsyncResult
*/
public function PostTask($task, $args, $async_result=true, $routing_key="celery", $task_args=[])
public function PostTask($task, $args, $async_result=true, $routing_key="celery", $options=[])
{
if (!is_array($args)) {
throw new CeleryException("Args should be an array");
Expand All @@ -112,46 +112,52 @@ public function PostTask($task, $args, $async_result=true, $routing_key="celery"
}

/*
* $task_args may contain additional arguments such as eta which are useful in task execution
* The usecase of this field is as follows:
* $task_args = array( 'eta' => "2014-12-02T16:00:00" );
* $options may contain additional arguments such as eta which are
* useful in task execution. The usecase of this field is as follows:
* $options = ['eta' => "2014-12-02T16:00:00"];
*/
$task_array = array_merge(
[
'id' => $id,
'task' => $task,
'args' => $args,
'kwargs' => (object)$kwargs,
],
$task_args
);

$task = json_encode($task_array);
$params = [
// http://docs.celeryproject.org/en/latest/internals/protocol.html
$properties = [
'correlation_id' => $id,
'content_type' => 'application/json',
'content_encoding' => 'UTF-8',
'immediate' => false,
'content_encoding' => 'utf-8',
];

$headers = [
'lang' => 'py',
'task' => $task,
'id' => $id,
];
$headers = array_merge($headers, $options);

$body = [
$args,
(object)$kwargs,
null,
];
$body = json_encode($body);

if ($this->broker_connection_details['persistent_messages']) {
$params['delivery_mode'] = 2;
$properties['delivery_mode'] = 2;
}

$this->broker_connection_details['routing_key'] = $routing_key;

$success = $this->broker_amqp->PostToExchange(
$this->broker_connection,
$this->broker_connection_details,
$task,
$params
$body,
$properties,
$headers
);

if (!$success) {
throw new CeleryPublishException();
}

if ($async_result) {
return new AsyncResult($id, $this->backend_connection_details, $task_array['task'], $args);
return new AsyncResult($id, $this->backend_connection_details, $task, $args);
} else {
return true;
}
Expand Down
12 changes: 7 additions & 5 deletions src/PECLAMQPConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,20 @@ public function Connect($connection)
* Post a task to exchange specified in $details
* @param AMQPConnection $connection Connection object
* @param array $details Array of connection details
* @param string $task JSON-encoded task
* @param array $params AMQP message parameters
* @param string $body JSON-encoded task body
* @param array $properties AMQP message properties
* @param array $headers Celery task headers
* @return bool true if posted successfuly
*/
public function PostToExchange($connection, $details, $task, $params)
public function PostToExchange($connection, $details, $body, $properties, $headers)
{
$ch = $connection->channel;
$xchg = new \AMQPExchange($ch);
$xchg->setName($details['exchange']);

$success = $xchg->publish($task, $details['binding'], 0, $params);
$properties['headers'] = $headers;

return $success;
return $xchg->publish($body, $details['binding'], 0, $properties);
}

/**
Expand Down
50 changes: 19 additions & 31 deletions src/RedisConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,26 +54,27 @@ class RedisConnector extends AbstractAMQPConnector

public $celery_result_prefix = 'celery-task-meta-';

/**
* Return headers used sent to Celery
* Override this function to set custom headers
*/
protected function GetHeaders()
{
return new \stdClass;
}

/**
* Prepare the message sent to Celery
*/
protected function GetMessage($task)
protected function GetMessage($details, $body, $properties, $headers)
{
$result = [];
$result['body'] = base64_encode($task);
$result['headers'] = $this->GetHeaders();
$result['content-type'] = $this->content_type;
$result['content-encoding'] = 'binary';
return $result;
return [
'content-type' => $this->content_type,
'content-encoding' => 'binary',
'properties' => [
'reply_to' => $headers['id'],
'delivery_info' => [
'priority' => 0,
'routing_key' => $details['binding'],
'exchange' => $details['exchange'],
],
'delivery_mode' => $this->GetDeliveryMode($properties),
'delivery_tag' => $headers['id'],
],
'headers' => $headers,
'body' => $body,
];
}

/**
Expand Down Expand Up @@ -114,24 +115,11 @@ protected function ToDict($raw_json)
* Post the message to Redis
* This function implements the AbstractAMQPConnector interface
*/
public function PostToExchange($connection, $details, $task, $params)
public function PostToExchange($connection, $details, $body, $properties, $headers)
{
$connection = $this->Connect($connection);
$body = json_decode($task, true);
$message = $this->GetMessage($task);
$message['properties'] = [
'body_encoding' => 'base64',
'reply_to' => $body['id'],
'delivery_info' => [
'priority' => 0,
'routing_key' => $details['binding'],
'exchange' => $details['exchange'],
],
'delivery_mode' => $this->GetDeliveryMode($params),
'delivery_tag' => $body['id']
];
$message = $this->GetMessage($details, $body, $properties, $headers);
$connection->lPush($details['exchange'], $this->ToStr($message));

return true;
}

Expand Down

0 comments on commit 869e946

Please sign in to comment.