From 869e9462a443a1bc5e608f1da3712629b3463f90 Mon Sep 17 00:00:00 2001 From: Jon Dufresne Date: Thu, 19 Jul 2018 18:29:12 -0700 Subject: [PATCH] Use Celery task protocol version 2 By using the newer protocol, helps ensure that celery-php remains forward compatible. For details on the protocol, see the Celery docs: http://docs.celeryproject.org/en/latest/internals/protocol.html The protocol was first introduce with Celery 4.0. http://docs.celeryproject.org/en/latest/history/whatsnew-4.0.html#new-task-message-protocol --- CHANGELOG.md | 1 + README.md | 2 +- src/AMQPLibConnector.php | 8 +++--- src/AbstractAMQPConnector.php | 7 ++--- src/CeleryAbstract.php | 50 ++++++++++++++++++++--------------- src/PECLAMQPConnector.php | 12 +++++---- src/RedisConnector.php | 50 +++++++++++++---------------------- 7 files changed, 63 insertions(+), 67 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c938d96..391aef1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,3 +10,4 @@ results exchange. - The `Celery` constructor no longer accepts the argument `persistent_messages`. It was previously unused. +- celery-php now uses Celery task protocol version 2 and requires Celery 4.0+. diff --git a/README.md b/README.md index cdc659a..4be20a1 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ Last PHP-amqplib version tested is 2.5.1. Last predis version tested is 1.0.1. -Tested on Celery 4.0+. +Requires Celery 4.0+. [API documentation](https://massivescale.net/celery-php/li_celery-php.html) is dead, [help wanted](https://github.com/gjedeer/celery-php/issues/82) diff --git a/src/AMQPLibConnector.php b/src/AMQPLibConnector.php index a862914..f61f7a3 100644 --- a/src/AMQPLibConnector.php +++ b/src/AMQPLibConnector.php @@ -81,7 +81,7 @@ public function Connect($connection) { } - public function PostToExchange($connection, $details, $task, $params) + public function PostToExchange($connection, $details, $body, $properties, $headers) { $ch = $connection->channel(); @@ -106,10 +106,8 @@ public function PostToExchange($connection, $details, $task, $params) $details['exchange'] /* exchange name - "celery" */ ); - $msg = new \PhpAmqpLib\Message\AMQPMessage( - $task, - $params - ); + $properties['application_headers'] = new \PhpAmqpLib\Wire\AMQPTable($headers); + $msg = new \PhpAmqpLib\Message\AMQPMessage($body, $properties); $ch->basic_publish($msg, $details['exchange'], $details['routing_key']); diff --git a/src/AbstractAMQPConnector.php b/src/AbstractAMQPConnector.php index d90c314..331f933 100644 --- a/src/AbstractAMQPConnector.php +++ b/src/AbstractAMQPConnector.php @@ -77,11 +77,12 @@ abstract public function Connect($connection); * Post a task to exchange specified in $details * @param AMQPConnection $connection Connection object * @param array $details Array of connection details - * @param string $task JSON-encoded task - * @param array $params AMQP message parameters + * @param string $body JSON-encoded task body + * @param array $properties AMQP message properties + * @param array $headers Celery task headers * @return bool true if posted successfuly */ - abstract public function PostToExchange($connection, $details, $task, $params); + abstract public function PostToExchange($connection, $details, $body, $properties, $headers); /** * Return result of task execution for $task_id diff --git a/src/CeleryAbstract.php b/src/CeleryAbstract.php index 019cd6b..0169bd1 100644 --- a/src/CeleryAbstract.php +++ b/src/CeleryAbstract.php @@ -85,10 +85,10 @@ public static function InitializeAMQPConnection($details) * @param array $args Array of arguments (kwargs call when $args is associative) * @param bool $async_result Set to false if you don't need the AsyncResult object returned * @param string $routing_key Set to routing key name if you're using something other than "celery" - * @param array $task_args Additional settings for Celery - normally not needed + * @param array $options Additional settings for Celery - normally not needed * @return AsyncResult */ - public function PostTask($task, $args, $async_result=true, $routing_key="celery", $task_args=[]) + public function PostTask($task, $args, $async_result=true, $routing_key="celery", $options=[]) { if (!is_array($args)) { throw new CeleryException("Args should be an array"); @@ -112,29 +112,34 @@ public function PostTask($task, $args, $async_result=true, $routing_key="celery" } /* - * $task_args may contain additional arguments such as eta which are useful in task execution - * The usecase of this field is as follows: - * $task_args = array( 'eta' => "2014-12-02T16:00:00" ); + * $options may contain additional arguments such as eta which are + * useful in task execution. The usecase of this field is as follows: + * $options = ['eta' => "2014-12-02T16:00:00"]; */ - $task_array = array_merge( - [ - 'id' => $id, - 'task' => $task, - 'args' => $args, - 'kwargs' => (object)$kwargs, - ], - $task_args - ); - $task = json_encode($task_array); - $params = [ + // http://docs.celeryproject.org/en/latest/internals/protocol.html + $properties = [ + 'correlation_id' => $id, 'content_type' => 'application/json', - 'content_encoding' => 'UTF-8', - 'immediate' => false, + 'content_encoding' => 'utf-8', + ]; + + $headers = [ + 'lang' => 'py', + 'task' => $task, + 'id' => $id, + ]; + $headers = array_merge($headers, $options); + + $body = [ + $args, + (object)$kwargs, + null, ]; + $body = json_encode($body); if ($this->broker_connection_details['persistent_messages']) { - $params['delivery_mode'] = 2; + $properties['delivery_mode'] = 2; } $this->broker_connection_details['routing_key'] = $routing_key; @@ -142,8 +147,9 @@ public function PostTask($task, $args, $async_result=true, $routing_key="celery" $success = $this->broker_amqp->PostToExchange( $this->broker_connection, $this->broker_connection_details, - $task, - $params + $body, + $properties, + $headers ); if (!$success) { @@ -151,7 +157,7 @@ public function PostTask($task, $args, $async_result=true, $routing_key="celery" } if ($async_result) { - return new AsyncResult($id, $this->backend_connection_details, $task_array['task'], $args); + return new AsyncResult($id, $this->backend_connection_details, $task, $args); } else { return true; } diff --git a/src/PECLAMQPConnector.php b/src/PECLAMQPConnector.php index ad99b05..8f8172d 100644 --- a/src/PECLAMQPConnector.php +++ b/src/PECLAMQPConnector.php @@ -40,18 +40,20 @@ public function Connect($connection) * Post a task to exchange specified in $details * @param AMQPConnection $connection Connection object * @param array $details Array of connection details - * @param string $task JSON-encoded task - * @param array $params AMQP message parameters + * @param string $body JSON-encoded task body + * @param array $properties AMQP message properties + * @param array $headers Celery task headers + * @return bool true if posted successfuly */ - public function PostToExchange($connection, $details, $task, $params) + public function PostToExchange($connection, $details, $body, $properties, $headers) { $ch = $connection->channel; $xchg = new \AMQPExchange($ch); $xchg->setName($details['exchange']); - $success = $xchg->publish($task, $details['binding'], 0, $params); + $properties['headers'] = $headers; - return $success; + return $xchg->publish($body, $details['binding'], 0, $properties); } /** diff --git a/src/RedisConnector.php b/src/RedisConnector.php index 7aea019..58d10d5 100644 --- a/src/RedisConnector.php +++ b/src/RedisConnector.php @@ -54,26 +54,27 @@ class RedisConnector extends AbstractAMQPConnector public $celery_result_prefix = 'celery-task-meta-'; - /** - * Return headers used sent to Celery - * Override this function to set custom headers - */ - protected function GetHeaders() - { - return new \stdClass; - } - /** * Prepare the message sent to Celery */ - protected function GetMessage($task) + protected function GetMessage($details, $body, $properties, $headers) { - $result = []; - $result['body'] = base64_encode($task); - $result['headers'] = $this->GetHeaders(); - $result['content-type'] = $this->content_type; - $result['content-encoding'] = 'binary'; - return $result; + return [ + 'content-type' => $this->content_type, + 'content-encoding' => 'binary', + 'properties' => [ + 'reply_to' => $headers['id'], + 'delivery_info' => [ + 'priority' => 0, + 'routing_key' => $details['binding'], + 'exchange' => $details['exchange'], + ], + 'delivery_mode' => $this->GetDeliveryMode($properties), + 'delivery_tag' => $headers['id'], + ], + 'headers' => $headers, + 'body' => $body, + ]; } /** @@ -114,24 +115,11 @@ protected function ToDict($raw_json) * Post the message to Redis * This function implements the AbstractAMQPConnector interface */ - public function PostToExchange($connection, $details, $task, $params) + public function PostToExchange($connection, $details, $body, $properties, $headers) { $connection = $this->Connect($connection); - $body = json_decode($task, true); - $message = $this->GetMessage($task); - $message['properties'] = [ - 'body_encoding' => 'base64', - 'reply_to' => $body['id'], - 'delivery_info' => [ - 'priority' => 0, - 'routing_key' => $details['binding'], - 'exchange' => $details['exchange'], - ], - 'delivery_mode' => $this->GetDeliveryMode($params), - 'delivery_tag' => $body['id'] - ]; + $message = $this->GetMessage($details, $body, $properties, $headers); $connection->lPush($details['exchange'], $this->ToStr($message)); - return true; }