diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..fd02076 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,21 @@ +# EditorConfig helps developers define and maintain consistent coding styles between different editors and IDEs +# editorconfig.org + +root = true + +[*] +charset = utf-8 +end_of_line = lf +trim_trailing_whitespace = true +insert_final_newline = true +indent_style = space +indent_size = 4 + +[*.md] +trim_trailing_whitespace = false + +[*.json] +indent_size = 2 + +[composer.json] +indent_size = 4 diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..0e24132 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,8 @@ +* text=auto eol=lf + +*.json text eol=lf +*.md text eol=lf +*.php text eol=lf + +LICENSE export-ignore +README export-ignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9637274 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +###> editors ### +/nbproject/* +/.settings +/.buildpath +/.project +.idea +###< editors ### diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..145cd7a --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,6 @@ +CHANGELOG +========= + +## 1.0.0 + +[new] Implementation diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..20175a9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2022 snortlin + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..5d69254 --- /dev/null +++ b/README.md @@ -0,0 +1,62 @@ +# Google Pub/Sub Messenger + +## Installation + +### Step 1: Install + +The preferred method of installation is via [Composer](https://getcomposer.org/): + +```bash +composer require snortlin/google-pubsub-messenger +``` + +### Step 2: Register Messenger Transport + +```yaml +# config/services.yaml +services: + Snortlin\GooglePubsubMessenger\Transport\GpsTransportFactory: + tags: [ messenger.transport_factory ] +``` + +### Step 3: Configure Symfony Messenger + +Create a connection based on the DSN and options: + +```yaml +framework: + messenger: + transports: + google-pubsub: + dsn: 'gps://default?topic=topic_name&subscription=subscription_name&key=base64_key' +``` + +## Usage + +### Configuration options + +* `topic`: Topic name +* `subscription`: Subscription name +* `key`: GPS key JSON format +* `pull_max_messages`: Limit the amount of messages pulled; default=0 (GPC default => 1000) +* `pull_ack_deadline`: The new ack deadline pulled messages; default=0 (GPC default => 10) +* `redelivery_ack_deadline`: Default ack deadline on redelivered messages, default=0 + +### OrderingKeyStamp for ordering messages + +```php +use Snortlin\GooglePubsubMessenger\Transport\Stamp\OrderingKeyStamp +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\MessageBusInterface; + +public function index(MessageBusInterface $bus) +{ + $bus->dispatch(new MyMessage('...'), [ + new OrderingKeyStamp('my_ordering_key'), + ]); + + // ... +} +``` + +[Official documentation](https://cloud.google.com/pubsub/docs/ordering) diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..179718f --- /dev/null +++ b/composer.json @@ -0,0 +1,26 @@ +{ + "name": "snortlin/google-pubsub-messenger", + "description": "Symfony Google Pub/Sub extension Messenger Bridge", + "type": "symfony-messenger-bridge", + "license": "MIT", + "authors": [ + { + "name": "Petr Leixner", + "email": "snortlin@volny.cz" + } + ], + "require": { + "php": "^8.0", + "google/cloud-pubsub": "^1.35", + "symfony/messenger": "^5.4|^6.0" + }, + "require-dev": { + "jetbrains/phpstorm-attributes": "^1.0" + }, + "autoload": { + "psr-4": { + "Snortlin\\GooglePubsubMessenger\\": "src/" + } + }, + "minimum-stability": "dev" +} diff --git a/src/Transport/Connection.php b/src/Transport/Connection.php new file mode 100644 index 0000000..ee4cd8d --- /dev/null +++ b/src/Transport/Connection.php @@ -0,0 +1,140 @@ +getMessage()), $e->getCode(), $e); + } + + try { + $this->client = new PubSubClient([ + 'keyFile' => $key, + 'projectId' => $key['project_id'] ?? null, + ]); + } catch (\Throwable $e) { + throw new InvalidArgumentException(sprintf('Error when creating Google Pub/Sub Client: %s', $e->getMessage()), $e->getCode(), $e); + } + } + + /** + * Creates a connection based on the DSN and options. + * + * Available options: + * + * * topic: Topic name + * * subscription: Subscription name + * * key: GPS key JSON format + * * pull_max_messages: Limit the amount of messages pulled + * * pull_ack_deadline: The new ack deadline pulled messages + * * redelivery_ack_deadline: The new ack deadline on reject messages + * + * gps://default?topic=topic_name&subscription=subscription_name&key=base64_key + * gps://default?topic=topic_name&subscription=subscription_name&key=base64_key&pull_max_messages=100 + * gps://default?topic=topic_name&subscription=subscription_name&key=base64_key&pull_max_messages=100&pull_ack_deadline=10&redelivery_ack_deadline=10 + */ + public static function fromDsn(string $dsn, array $options = []): self + { + if (false === $parsedUrl = parse_url($dsn)) { + throw new InvalidArgumentException(sprintf('The given GPS DSN "%s" is invalid.', $dsn)); + } + + parse_str($parsedUrl['query'] ?? '', $parsedQuery); + + $options['topic'] = $parsedQuery['topic'] ?? $options['topic'] ?? ''; + $options['subscription'] = $parsedQuery['subscription'] ?? $options['subscription'] ?? ''; + $options['key'] = base64_decode($parsedQuery['key'] ?? $options['key'] ?? ''); + $options['pull_max_messages'] = (int)($parsedQuery['pull_max_messages'] ?? $options['pull_max_messages'] ?? 0); + $options['pull_ack_deadline'] = (int)($parsedQuery['pull_ack_deadline'] ?? $options['pull_ack_deadline'] ?? 0); + $options['redelivery_ack_deadline'] = (int)($parsedQuery['redelivery_ack_deadline'] ?? $options['redelivery_ack_deadline'] ?? 0); + + return new self($options['key'], $options['topic'], $options['subscription'], $options['pull_max_messages'], $options['pull_ack_deadline'], $options['redelivery_ack_deadline']); + } + + public function publish(string $body, array $attributes = [], ?string $orderingKey = null): array + { + $messageBuilder = (new MessageBuilder()) + ->setData($body) + ->setAttributes($attributes) + ->setOrderingKey($orderingKey); + + return $this->client + ->topic($this->topicName) + ->publish($messageBuilder->build()); + } + + public function get(): iterable + { + $options = []; + + if ($this->pullMaxMessages > 0) { + $options['maxMessages'] = $this->pullMaxMessages; + } + + $messages = $this->client + ->subscription($this->subscriptionName) + ->pull($options); + + if ($this->pullAckDeadline > 0 && !empty($messages)) { + $this->modifyAckDeadlineBatch($messages, $this->pullAckDeadline); + } + + return $messages; + } + + public function ack(Message $message): void + { + $this->client + ->subscription($this->subscriptionName) + ->acknowledge($message); + } + + public function reject(Message $message): void + { + $this->ack($message); + } + + public function modifyAckDeadline(Message $message, int $seconds = 0): void + { + $this->client + ->subscription($this->subscriptionName) + ->modifyAckDeadline($message, $seconds > 0 ? $seconds : $this->redeliveryAckDeadline); + } + + public function modifyAckDeadlineBatch(array $messages, int $seconds): void + { + $this->client + ->subscription($this->subscriptionName) + ->modifyAckDeadlineBatch($messages, $seconds); + } + + #[ArrayShape([ + 'client' => 'array', + 'subscription' => 'array', + ])] + public function getDebugInfo(): array + { + return [ + 'client' => $this->client->__debugInfo(), + 'subscription' => $this->client->subscription($this->subscriptionName)->__debugInfo(), + ]; + } +} diff --git a/src/Transport/GpsReceiver.php b/src/Transport/GpsReceiver.php new file mode 100644 index 0000000..932d159 --- /dev/null +++ b/src/Transport/GpsReceiver.php @@ -0,0 +1,94 @@ +serializer = $serializer ?? new PhpSerializer(); + } + + /** + * @inheritDoc + */ + public function get(): iterable + { + try { + foreach ($this->connection->get() as $message) { + yield from $this->getEnvelope($message); + } + } catch (MessageDecodingFailedException $e) { + throw $e; + } catch (\Throwable $e) { + throw new TransportException($e->getMessage(), 0, $e); + } + } + + /** + * @inheritDoc + */ + public function ack(Envelope $envelope): void + { + try { + $this->connection->ack($this->findReceivedStamp($envelope)->getMessage()); + } catch (\Throwable $e) { + throw new TransportException($e->getMessage(), 0, $e); + } + } + + /** + * @inheritDoc + */ + public function reject(Envelope $envelope): void + { + try { + $stamp = $envelope->last(RedeliveryAsModifyAckDeadlineStamp::class); + + if ($stamp instanceof RedeliveryAsModifyAckDeadlineStamp) { + $this->connection->modifyAckDeadline($this->findReceivedStamp($envelope)->getMessage(), $stamp->getDelay()); + } else { + $this->connection->ack($this->findReceivedStamp($envelope)->getMessage()); + } + } catch (\Throwable $e) { + throw new TransportException($e->getMessage(), 0, $e); + } + } + + private function getEnvelope(Message $message): iterable + { + try { + $envelope = $this->serializer->decode([ + 'body' => $message->data(), + 'headers' => $message->attributes(), + 'message' => $message, + ]); + } catch (MessageDecodingFailedException $e) { + $this->connection->reject($message); + + throw $e; + } + + yield $envelope->with(new GpsReceivedStamp($message)); + } + + private function findReceivedStamp(Envelope $envelope): GpsReceivedStamp + { + $stamp = $envelope->last(GpsReceivedStamp::class); + + return $stamp instanceof GpsReceivedStamp + ? $stamp + : throw new \LogicException(sprintf('No "%s" stamp found on the Envelope.', GpsReceivedStamp::class)); + } +} diff --git a/src/Transport/GpsSender.php b/src/Transport/GpsSender.php new file mode 100644 index 0000000..c610f7e --- /dev/null +++ b/src/Transport/GpsSender.php @@ -0,0 +1,50 @@ +serializer = $serializer ?? new PhpSerializer(); + } + + /** + * @inheritDoc + * @throws \Exception + */ + public function send(Envelope $envelope): Envelope + { + if ($envelope->last(RedeliveryStamp::class) instanceof RedeliveryStamp) { + if ($envelope->last(RedeliveryAsModifyAckDeadlineStamp::class) instanceof RedeliveryAsModifyAckDeadlineStamp) { + return $envelope; + } + } + + $encodedMessage = $this->serializer->encode($envelope); + $orderingKeyStamp = $envelope->last(OrderingKeyStamp::class); + + try { + $id = $this->connection->publish( + $encodedMessage['body'], + $encodedMessage['headers'] ?? [], + $orderingKeyStamp instanceof OrderingKeyStamp ? $orderingKeyStamp->getOrderingKey() : null + )['messageIds']; + } catch (\Throwable $e) { + throw new TransportException($e->getMessage(), 0, $e); + } + + return $envelope->with(new TransportMessageIdStamp($id)); + } +} diff --git a/src/Transport/GpsTransport.php b/src/Transport/GpsTransport.php new file mode 100644 index 0000000..967939c --- /dev/null +++ b/src/Transport/GpsTransport.php @@ -0,0 +1,81 @@ +serializer = $serializer ?? new PhpSerializer(); + } + + /** + * @inheritDoc + */ + public function get(): iterable + { + return $this->getReceiver()->get(); + } + + /** + * @inheritDoc + */ + public function ack(Envelope $envelope): void + { + $this->getReceiver()->ack($envelope); + } + + /** + * @inheritDoc + */ + public function reject(Envelope $envelope): void + { + $this->getReceiver()->reject($envelope); + } + + /** + * @inheritDoc + * @throws \Exception + */ + public function send(Envelope $envelope): Envelope + { + return $this->getSender()->send($envelope); + } + + #[ArrayShape([ + 'client' => 'array', + 'subscription' => 'array', + ])] + public function getDebugInfo(): array + { + return $this->connection->getDebugInfo(); + } + + private function getReceiver(): GpsReceiver + { + if (!isset($this->receiver)) { + $this->receiver = new GpsReceiver($this->connection, $this->serializer); + } + + return $this->receiver; + } + + private function getSender(): GpsSender + { + if (!isset($this->sender)) { + $this->sender = new GpsSender($this->connection, $this->serializer); + } + + return $this->sender; + } +} diff --git a/src/Transport/GpsTransportFactory.php b/src/Transport/GpsTransportFactory.php new file mode 100644 index 0000000..8e285a9 --- /dev/null +++ b/src/Transport/GpsTransportFactory.php @@ -0,0 +1,20 @@ +message; + } +} diff --git a/src/Transport/Stamp/OrderingKeyStamp.php b/src/Transport/Stamp/OrderingKeyStamp.php new file mode 100644 index 0000000..454dfe1 --- /dev/null +++ b/src/Transport/Stamp/OrderingKeyStamp.php @@ -0,0 +1,17 @@ +orderingKey; + } +} diff --git a/src/Transport/Stamp/RedeliveryAsModifyAckDeadlineStamp.php b/src/Transport/Stamp/RedeliveryAsModifyAckDeadlineStamp.php new file mode 100644 index 0000000..4cf56a1 --- /dev/null +++ b/src/Transport/Stamp/RedeliveryAsModifyAckDeadlineStamp.php @@ -0,0 +1,17 @@ +delay; + } +}