From 9efe217e9086e1d7afc7d27ed4bd1728cca2eabd Mon Sep 17 00:00:00 2001 From: desperado Date: Thu, 23 Aug 2018 14:57:33 +0300 Subject: [PATCH 01/11] scheduler --- composer.lock | 27 +- src/Application/KernelContext.php | 63 ++-- src/Application/ServiceBusKernel.php | 19 +- .../ExecutionContext/LoggingInContext.php | 52 ++++ .../MessageDeliveryContext.php | 29 ++ .../Sql/SqlEventStreamStore.php | 8 - .../SnapshotStore/SqlSnapshotStore.php | 10 +- src/Sagas/SagaStore/Sql/SQLSagaStore.php | 14 - src/Scheduler/Data/NextScheduledOperation.php | 64 +++++ src/Scheduler/Data/ScheduledOperation.php | 116 ++++++++ ...InvalidScheduledOperationExecutionDate.php | 25 ++ .../Command/EmitSchedulerOperation.php | 54 ++++ .../Messages/Event/OperationScheduled.php | 128 +++++++++ .../Event/SchedulerOperationCanceled.php | 97 +++++++ .../Event/SchedulerOperationEmitted.php | 76 +++++ src/Scheduler/ScheduledOperationId.php | 51 ++++ src/Scheduler/SchedulerListener.php | 168 +++++++++++ src/Scheduler/Store/SchedulerRegistry.php | 175 ++++++++++++ src/Scheduler/Store/SchedulerStore.php | 62 ++++ src/Scheduler/Store/Sql/SqlSchedulerStore.php | 123 ++++++++ .../Store/Sql/schema/scheduler_registry.sql | 5 + src/SchedulerProvider.php | 269 ++++++++++++++++++ .../AmpPostgreSQL/AmpPostgreSQLAdapter.php | 4 +- .../SQL/DoctrineDBAL/DoctrineDBALAdapter.php | 9 +- src/Storage/StorageAdapter.php | 4 +- tests/Stubs/Context/TestContext.php | 22 ++ 26 files changed, 1600 insertions(+), 74 deletions(-) create mode 100644 src/Common/ExecutionContext/LoggingInContext.php create mode 100644 src/Scheduler/Data/NextScheduledOperation.php create mode 100644 src/Scheduler/Data/ScheduledOperation.php create mode 100644 src/Scheduler/Exceptions/InvalidScheduledOperationExecutionDate.php create mode 100644 src/Scheduler/Messages/Command/EmitSchedulerOperation.php create mode 100644 src/Scheduler/Messages/Event/OperationScheduled.php create mode 100644 src/Scheduler/Messages/Event/SchedulerOperationCanceled.php create mode 100644 src/Scheduler/Messages/Event/SchedulerOperationEmitted.php create mode 100644 src/Scheduler/ScheduledOperationId.php create mode 100644 src/Scheduler/SchedulerListener.php create mode 100644 src/Scheduler/Store/SchedulerRegistry.php create mode 100644 src/Scheduler/Store/SchedulerStore.php create mode 100644 src/Scheduler/Store/Sql/SqlSchedulerStore.php create mode 100644 src/Scheduler/Store/Sql/schema/scheduler_registry.sql create mode 100644 src/SchedulerProvider.php diff --git a/composer.lock b/composer.lock index 0fe53d3..c1c99d8 100644 --- a/composer.lock +++ b/composer.lock @@ -2871,16 +2871,16 @@ }, { "name": "doctrine/cache", - "version": "v1.7.1", + "version": "v1.8.0", "source": { "type": "git", "url": "https://github.com/doctrine/cache.git", - "reference": "b3217d58609e9c8e661cd41357a54d926c4a2a1a" + "reference": "d768d58baee9a4862ca783840eca1b9add7a7f57" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/doctrine/cache/zipball/b3217d58609e9c8e661cd41357a54d926c4a2a1a", - "reference": "b3217d58609e9c8e661cd41357a54d926c4a2a1a", + "url": "https://api.github.com/repos/doctrine/cache/zipball/d768d58baee9a4862ca783840eca1b9add7a7f57", + "reference": "d768d58baee9a4862ca783840eca1b9add7a7f57", "shasum": "" }, "require": { @@ -2891,8 +2891,9 @@ }, "require-dev": { "alcaeus/mongo-php-adapter": "^1.1", + "doctrine/coding-standard": "^4.0", "mongodb/mongodb": "^1.1", - "phpunit/phpunit": "^5.7", + "phpunit/phpunit": "^7.0", "predis/predis": "~1.0" }, "suggest": { @@ -2901,7 +2902,7 @@ "type": "library", "extra": { "branch-alias": { - "dev-master": "1.7.x-dev" + "dev-master": "1.8.x-dev" } }, "autoload": { @@ -2936,12 +2937,12 @@ } ], "description": "Caching library offering an object-oriented API for many cache backends", - "homepage": "http://www.doctrine-project.org", + "homepage": "https://www.doctrine-project.org", "keywords": [ "cache", "caching" ], - "time": "2017-08-25 07:02:50" + "time": "2018-08-21 18:01:43" }, { "name": "doctrine/dbal", @@ -3821,16 +3822,16 @@ }, { "name": "phpunit/phpunit", - "version": "7.3.1", + "version": "7.3.2", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/phpunit.git", - "reference": "f9b14c17860eccb440a0352a117a81eb754cff5a" + "reference": "34705f81bddc3f505b9599a2ef96e2b4315ba9b8" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/f9b14c17860eccb440a0352a117a81eb754cff5a", - "reference": "f9b14c17860eccb440a0352a117a81eb754cff5a", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/34705f81bddc3f505b9599a2ef96e2b4315ba9b8", + "reference": "34705f81bddc3f505b9599a2ef96e2b4315ba9b8", "shasum": "" }, "require": { @@ -3901,7 +3902,7 @@ "testing", "xunit" ], - "time": "2018-08-07 06:44:28" + "time": "2018-08-22 06:39:21" }, { "name": "sebastian/code-unit-reverse-lookup", diff --git a/src/Application/KernelContext.php b/src/Application/KernelContext.php index d72c7a0..0eea404 100644 --- a/src/Application/KernelContext.php +++ b/src/Application/KernelContext.php @@ -15,7 +15,10 @@ use function Amp\call; use Amp\Promise; +use Desperado\ServiceBus\Common\Contract\Messages\Command; +use Desperado\ServiceBus\Common\Contract\Messages\Event; use Desperado\ServiceBus\Common\Contract\Messages\Message; +use Desperado\ServiceBus\Common\ExecutionContext\LoggingInContext; use Desperado\ServiceBus\Common\ExecutionContext\MessageDeliveryContext; use Desperado\ServiceBus\Transport\IncomingEnvelope; use Psr\Log\LoggerInterface; @@ -24,7 +27,7 @@ /** * */ -final class KernelContext implements MessageDeliveryContext +final class KernelContext implements MessageDeliveryContext, LoggingInContext { /** * Send message handler @@ -62,7 +65,7 @@ public function __construct(IncomingEnvelope $incomingEnvelope, \Generator $mess */ public function delivery(Message ...$messages): Promise { - $messageSender = $this->messageSender; + $messageSender = $this->messageSender; $incomingEnvelope = $this->incomingEnvelope; /** @psalm-suppress InvalidArgument */ @@ -71,13 +74,51 @@ static function(array $messages) use ($messageSender, $incomingEnvelope): void { foreach($messages as $message) { - $messageSender->send([$message, $incomingEnvelope]); + $messageSender->send([$message, $incomingEnvelope, []]); } }, $messages ); } + /** + * @inheritDoc + */ + public function send(Command $command, array $headers = []): Promise + { + $messageSender = $this->messageSender; + $incomingEnvelope = $this->incomingEnvelope; + + /** @psalm-suppress InvalidArgument */ + return call( + static function(Command $command, array $headers) use ($messageSender, $incomingEnvelope): void + { + $messageSender->send([$command, $incomingEnvelope, $headers]); + }, + $command, + $headers + ); + } + + /** + * @inheritDoc + */ + public function publish(Event $event, array $headers = []): Promise + { + $messageSender = $this->messageSender; + $incomingEnvelope = $this->incomingEnvelope; + + /** @psalm-suppress InvalidArgument */ + return call( + static function(Event $event, array $headers) use ($messageSender, $incomingEnvelope): void + { + $messageSender->send([$event, $incomingEnvelope, $headers]); + }, + $event, + $headers + ); + } + /** * Receive incoming envelope * @@ -89,13 +130,7 @@ public function incomingEnvelope(): IncomingEnvelope } /** - * Log message with context details - * - * @param string $logMessage - * @param array $extra - * @param string $level - * - * @return void + * @inheritdoc */ public function logContextMessage( string $logMessage, @@ -112,13 +147,7 @@ public function logContextMessage( } /** - * Log Throwable in execution context - * - * @param \Throwable $throwable - * @param array $extra - * @param string $level - * - * @return void + * @inheritdoc */ public function logContextThrowable( \Throwable $throwable, diff --git a/src/Application/ServiceBusKernel.php b/src/Application/ServiceBusKernel.php index 5808771..ffd69aa 100644 --- a/src/Application/ServiceBusKernel.php +++ b/src/Application/ServiceBusKernel.php @@ -272,8 +272,9 @@ private static function createMessageSender( /** * @var Message $message * @var IncomingEnvelope $incomingEnvelope + * @var array $headers */ - [$message, $incomingEnvelope] = yield; + [$message, $incomingEnvelope, $headers] = yield; $messageClass = \get_class($message); $destinations = $messageRoutes->destinationsFor($messageClass); @@ -285,11 +286,15 @@ private static function createMessageSender( $outboundEnvelope = self::createOutboundEnvelope( $publisher, $incomingEnvelope->operationId(), - $message, [ - 'x-message-class' => $messageClass, - 'x-created-after-message' => \get_class($incomingEnvelope->denormalized()), - 'x-hostname' => \gethostname() - ] + $message, + \array_merge( + [ + 'x-message-class' => $messageClass, + 'x-created-after-message' => \get_class($incomingEnvelope->denormalized()), + 'x-hostname' => \gethostname() + ], + $headers + ) ); self::beforeMessageSend($logger, $messageClass, $destination, $outboundEnvelope); @@ -305,7 +310,7 @@ private static function createMessageSender( } } - unset($message, $incomingEnvelope, $messageClass, $destinations); + unset($message, $incomingEnvelope, $messageClass, $destinations, $headers); } } diff --git a/src/Common/ExecutionContext/LoggingInContext.php b/src/Common/ExecutionContext/LoggingInContext.php new file mode 100644 index 0000000..1b2e9fa --- /dev/null +++ b/src/Common/ExecutionContext/LoggingInContext.php @@ -0,0 +1,52 @@ + + * @license MIT + * @license https://opensource.org/licenses/MIT + */ + +declare(strict_types = 1); + +namespace Desperado\ServiceBus\Common\ExecutionContext; + +use Psr\Log\LogLevel; + +/** + * + */ +interface LoggingInContext +{ + /** + * Log message with context details + * + * @param string $logMessage + * @param array $extra + * @param string $level + * + * @return void + */ + public function logContextMessage( + string $logMessage, + array $extra = [], + string $level = LogLevel::INFO + ): void; + + /** + * Log Throwable in execution context + * + * @param \Throwable $throwable + * @param array $extra + * @param string $level + * + * @return void + */ + public function logContextThrowable( + \Throwable $throwable, + string $level = LogLevel::ERROR, + array $extra = [] + ): void; +} diff --git a/src/Common/ExecutionContext/MessageDeliveryContext.php b/src/Common/ExecutionContext/MessageDeliveryContext.php index dd3867c..6ab6531 100644 --- a/src/Common/ExecutionContext/MessageDeliveryContext.php +++ b/src/Common/ExecutionContext/MessageDeliveryContext.php @@ -14,6 +14,8 @@ namespace Desperado\ServiceBus\Common\ExecutionContext; use Amp\Promise; +use Desperado\ServiceBus\Common\Contract\Messages\Command; +use Desperado\ServiceBus\Common\Contract\Messages\Event; use Desperado\ServiceBus\Common\Contract\Messages\Message; /** @@ -21,5 +23,32 @@ */ interface MessageDeliveryContext { + /** + * Execute simple messages (commands\events) delivery + * + * @param Message ...$messages + * + * @return Promise + */ public function delivery(Message ...$messages): Promise; + + /** + * Send command with specified options + * + * @param Command headers + * @param array $headers + * + * @return Promise + */ + public function send(Command $command, array $headers = []): Promise; + + /** + * Publish event with specified headers + * + * @param Event $event + * @param array $headers + * + * @return Promise + */ + public function publish(Event $event, array $headers = []): Promise; } diff --git a/src/EventSourcing/EventStreamStore/Sql/SqlEventStreamStore.php b/src/EventSourcing/EventStreamStore/Sql/SqlEventStreamStore.php index 9e323e2..b204681 100644 --- a/src/EventSourcing/EventStreamStore/Sql/SqlEventStreamStore.php +++ b/src/EventSourcing/EventStreamStore/Sql/SqlEventStreamStore.php @@ -116,8 +116,6 @@ static function(StoredAggregateEventStream $eventsStream) use ($adapter, $afterS yield call($afterSaveHandler); yield $transaction->commit(); - - return yield new Success(); } catch(\Throwable $throwable) { @@ -186,8 +184,6 @@ static function(AggregateId $id) use ($adapter): \Generator ->compile(); yield $adapter->execute($query->sql(), $query->params()); - - return yield new Success(); }, $id ); @@ -225,8 +221,6 @@ static function(StoredAggregateEventStream $eventsStream) use ($transaction): \G yield $transaction->execute($query->sql(), $query->params()); - - return yield new Success(); }, $eventsStream ); @@ -263,8 +257,6 @@ static function(TransactionAdapter $transaction, StoredAggregateEventStream $eve self::collectSaveEventQueryParameters($eventsStream) ); } - - return yield new Success(); }, $transaction, $eventsStream diff --git a/src/EventSourcingSnapshots/SnapshotStore/SqlSnapshotStore.php b/src/EventSourcingSnapshots/SnapshotStore/SqlSnapshotStore.php index 3edf763..2834b7b 100644 --- a/src/EventSourcingSnapshots/SnapshotStore/SqlSnapshotStore.php +++ b/src/EventSourcingSnapshots/SnapshotStore/SqlSnapshotStore.php @@ -63,8 +63,6 @@ static function(StoredAggregateSnapshot $aggregateSnapshot) use ($adapter): \Gen ])->compile(); yield $adapter->execute($query->sql(), $query->params()); - - return yield new Success(); }, $aggregateSnapshot ); @@ -96,11 +94,7 @@ static function(AggregateId $id) use ($adapter): \Generator if(true === \is_array($data) && 0 !== \count($data)) { - /** For DoctrineDBAL */ - if(true === \is_resource($data['payload'])) - { - $data['payload'] = \stream_get_contents($data['payload'], -1, 0); - } + $data['payload'] = \stream_get_contents($data['payload'], -1, 0); $storedSnapshot = new StoredAggregateSnapshot( $data['id'], @@ -136,8 +130,6 @@ static function(AggregateId $id) use ($adapter): \Generator ->compile(); yield $adapter->execute($query->sql(), $query->params()); - - return yield new Success(); }, $id ); diff --git a/src/Sagas/SagaStore/Sql/SQLSagaStore.php b/src/Sagas/SagaStore/Sql/SQLSagaStore.php index 11281cf..f15357d 100644 --- a/src/Sagas/SagaStore/Sql/SQLSagaStore.php +++ b/src/Sagas/SagaStore/Sql/SQLSagaStore.php @@ -70,8 +70,6 @@ static function(StoredSaga $storedSaga) use ($adapter, $afterSaveHandler): \Gene yield $adapter->execute($query->sql(), $query->params()); yield call($afterSaveHandler); - - return yield new Success(); }, $storedSaga ); @@ -108,8 +106,6 @@ static function(StoredSaga $storedSaga) use ($adapter, $afterSaveHandler): \Gene yield call($afterSaveHandler); yield $transaction->commit(); - - return yield new Success(); } catch(\Throwable $throwable) { @@ -146,18 +142,10 @@ static function(SagaId $id) use ($adapter): \Generator if(null !== $result && true === isset($result['payload'])) { - /** For DoctrineDBAL */ - if(true === \is_resource($result['payload'])) - { - $result['payload'] = \stream_get_contents($result['payload'], -1, 0); - } - $result['payload'] = $adapter->unescapeBinary($result['payload']); return yield new Success(StoredSaga::fromRow($result)); } - - return yield new Success(); }, $id ); @@ -181,8 +169,6 @@ static function(SagaId $id) use ($adapter): \Generator ->compile(); yield $adapter->execute($query->sql(), $query->params()); - - return yield new Success(); }, $id ); diff --git a/src/Scheduler/Data/NextScheduledOperation.php b/src/Scheduler/Data/NextScheduledOperation.php new file mode 100644 index 0000000..041e68e --- /dev/null +++ b/src/Scheduler/Data/NextScheduledOperation.php @@ -0,0 +1,64 @@ + + * @license MIT + * @license https://opensource.org/licenses/MIT + */ + +declare(strict_types = 1); + +namespace Desperado\ServiceBus\Scheduler\Data; + +/** + * Scheduled job data (for next job) + */ +final class NextScheduledOperation +{ + /** + * Job identifier + * + * @var string + */ + private $id; + + /** + * Time in milliseconds + * + * @var int + */ + private $time; + + /** + * @param string $id + * @param int $time + */ + public function __construct(string $id, int $time) + { + $this->id = $id; + $this->time = $time; + } + + /** + * Receive identifier + * + * @return string + */ + public function id(): string + { + return $this->id; + } + + /** + * Receive time in microseconds + * + * @return int + */ + public function time(): int + { + return $this->time; + } +} diff --git a/src/Scheduler/Data/ScheduledOperation.php b/src/Scheduler/Data/ScheduledOperation.php new file mode 100644 index 0000000..065ca8a --- /dev/null +++ b/src/Scheduler/Data/ScheduledOperation.php @@ -0,0 +1,116 @@ + + * @license MIT + * @license https://opensource.org/licenses/MIT + */ + +declare(strict_types = 1); + +namespace Desperado\ServiceBus\Scheduler\Data; + +use Desperado\ServiceBus\Common\Contract\Messages\Command; +use Desperado\ServiceBus\Scheduler\ScheduledOperationId; + +/** + * Scheduled job data + */ +final class ScheduledOperation +{ + /** + * Identifier + * + * @var ScheduledOperationId + */ + private $id; + + /** + * Scheduled message + * + * @var Command + */ + private $command; + + /** + * Execution date + * + * @var \DateTimeImmutable + */ + private $date; + + /** + * Is command sent + * + * @var bool + */ + private $isSent = false; + + /** + * @param ScheduledOperationId $id + * @param Command $command + * @param \DateTimeImmutable $dateTime + */ + public function __construct(ScheduledOperationId $id, Command $command, \DateTimeImmutable $dateTime) + { + $this->id = $id; + $this->command = $command; + $this->date = $dateTime; + } + + /** + * Sent job + * + * @return self + */ + public function sent(): self + { + $self = new self($this->id, $this->command, $this->date); + $self->isSent = true; + + return $self; + } + + /** + * Receive identifier + * + * @return ScheduledOperationId + */ + public function id(): ScheduledOperationId + { + return $this->id; + } + + /** + * Receive command + * + * @return Command + */ + public function command(): Command + { + return $this->command; + } + + /** + * Receive execution date + * + * @return \DateTimeImmutable + */ + public function date(): \DateTimeImmutable + { + return $this->date; + } + + /** + * Is command sent + * + * @return bool + */ + public function isSent(): bool + { + return $this->isSent; + } +} diff --git a/src/Scheduler/Exceptions/InvalidScheduledOperationExecutionDate.php b/src/Scheduler/Exceptions/InvalidScheduledOperationExecutionDate.php new file mode 100644 index 0000000..aef31a2 --- /dev/null +++ b/src/Scheduler/Exceptions/InvalidScheduledOperationExecutionDate.php @@ -0,0 +1,25 @@ + + * @license MIT + * @license https://opensource.org/licenses/MIT + */ + +declare(strict_types = 1); + +namespace Desperado\ServiceBus\Scheduler\Exceptions; + +use Desperado\ServiceBus\Common\Exceptions\ServiceBusExceptionMarker; + +/** + * + */ +final class InvalidScheduledOperationExecutionDate extends \InvalidArgumentException + implements ServiceBusExceptionMarker +{ + +} diff --git a/src/Scheduler/Messages/Command/EmitSchedulerOperation.php b/src/Scheduler/Messages/Command/EmitSchedulerOperation.php new file mode 100644 index 0000000..e3e021d --- /dev/null +++ b/src/Scheduler/Messages/Command/EmitSchedulerOperation.php @@ -0,0 +1,54 @@ + + * @license MIT + * @license https://opensource.org/licenses/MIT + */ + +declare(strict_types = 1); + +namespace Desperado\ServiceBus\Scheduler\Messages\Command; + +use Desperado\ServiceBus\Common\Contract\Messages\Command; +use Desperado\ServiceBus\Scheduler\ScheduledOperationId; + +/** + * Fulfill the task of the scheduler + * + * @see SchedulerOperationEmitted + */ +final class EmitSchedulerOperation implements Command +{ + /** + * Scheduled operation identifier + * + * @var ScheduledOperationId + */ + private $id; + + /** + * @param ScheduledOperationId $id + * + * @return self + */ + public static function create(ScheduledOperationId $id): self + { + $self = new self(); + + $self->id = $id; + + return $self; + } + + /** + * @return ScheduledOperationId + */ + public function id(): ScheduledOperationId + { + return $this->id; + } +} diff --git a/src/Scheduler/Messages/Event/OperationScheduled.php b/src/Scheduler/Messages/Event/OperationScheduled.php new file mode 100644 index 0000000..b5384fd --- /dev/null +++ b/src/Scheduler/Messages/Event/OperationScheduled.php @@ -0,0 +1,128 @@ + + * @license MIT + * @license https://opensource.org/licenses/MIT + */ + +declare(strict_types = 1); + +namespace Desperado\ServiceBus\Scheduler\Messages\Event; + +use Desperado\ServiceBus\Common\Contract\Messages\Event; +use Desperado\ServiceBus\Scheduler\Data\NextScheduledOperation; +use Desperado\ServiceBus\Scheduler\ScheduledOperationId; +use Desperado\ServiceBus\Common\Contract\Messages\Command; + +/** + * + */ +final class OperationScheduled implements Event +{ + /** + * Operation identifier + * + * @var ScheduledOperationId + */ + private $id; + + /** + * Command namespace + * + * @var string + */ + private $commandNamespace; + + /** + * Execution date + * + * @var \DateTimeImmutable + */ + private $executionDate; + + /** + * Next operation data + * + * @var NextScheduledOperation|null + */ + private $nextOperation; + + /** + * @param ScheduledOperationId $id + * @param Command $command , + * @param \DateTimeImmutable $executionDate + * @param NextScheduledOperation|null $nextOperation + * + * @return self + */ + public static function create( + ScheduledOperationId $id, + Command $command, + \DateTimeImmutable $executionDate, + ?NextScheduledOperation $nextOperation + ): self + { + $self = new self(); + + $self->id = $id; + $self->commandNamespace = \get_class($command); + $self->executionDate = $executionDate; + $self->nextOperation = $nextOperation; + + return $self; + } + + /** + * Receive identifier + * + * @return ScheduledOperationId + */ + public function id(): ScheduledOperationId + { + return $this->id; + } + + /** + * Receive command namespace + * + * @return string + */ + public function commandNamespace(): string + { + return $this->commandNamespace; + } + + /** + * Receive execution date + * + * @return \DateTimeImmutable + */ + public function executionDate(): \DateTimeImmutable + { + return $this->executionDate; + } + + /** + * Receive next operation data + * + * @return NextScheduledOperation|null + */ + public function nextOperation(): ?NextScheduledOperation + { + return $this->nextOperation; + } + + /** + * Has next operation data + * + * @return bool + */ + public function hasNextOperation(): bool + { + return null !== $this->nextOperation; + } +} diff --git a/src/Scheduler/Messages/Event/SchedulerOperationCanceled.php b/src/Scheduler/Messages/Event/SchedulerOperationCanceled.php new file mode 100644 index 0000000..a3f4023 --- /dev/null +++ b/src/Scheduler/Messages/Event/SchedulerOperationCanceled.php @@ -0,0 +1,97 @@ + + * @license MIT + * @license https://opensource.org/licenses/MIT + */ + +declare(strict_types = 1); + +namespace Desperado\ServiceBus\Scheduler\Messages\Event; + +use Desperado\ServiceBus\Common\Contract\Messages\Event; +use Desperado\ServiceBus\Scheduler\Data\NextScheduledOperation; +use Desperado\ServiceBus\Scheduler\ScheduledOperationId; + +/** + * Scheduler operation canceled + */ +final class SchedulerOperationCanceled implements Event +{ + /** + * Operation identifier + * + * @var ScheduledOperationId + */ + private $id; + + /** + * Reason + * + * @var string|null + */ + private $reason; + + /** + * Next operation data + * + * @var NextScheduledOperation|null + */ + private $nextOperation; + + /** + * @param ScheduledOperationId $id + * @param string|null $reason + * @param NextScheduledOperation|null $nextScheduledOperation + * + * @return self + */ + public static function create( + ScheduledOperationId $id, + ?string $reason, + ?NextScheduledOperation $nextScheduledOperation + ): self + { + $self = new self(); + + $self->id = $id; + $self->reason = $reason; + $self->nextOperation = $nextScheduledOperation; + + return $self; + } + + /** + * Receive identifier + * + * @return ScheduledOperationId + */ + public function id(): ScheduledOperationId + { + return $this->id; + } + + /** + * Receive next operation data + * + * @return NextScheduledOperation|null + */ + public function nextOperation(): ?NextScheduledOperation + { + return $this->nextOperation; + } + + /** + * Receive cancellation reason + * + * @return null|string + */ + public function reason(): ?string + { + return $this->reason; + } +} diff --git a/src/Scheduler/Messages/Event/SchedulerOperationEmitted.php b/src/Scheduler/Messages/Event/SchedulerOperationEmitted.php new file mode 100644 index 0000000..38ef8eb --- /dev/null +++ b/src/Scheduler/Messages/Event/SchedulerOperationEmitted.php @@ -0,0 +1,76 @@ + + * @license MIT + * @license https://opensource.org/licenses/MIT + */ + +declare(strict_types = 1); + +namespace Desperado\ServiceBus\Scheduler\Messages\Event; + +use Desperado\ServiceBus\Common\Contract\Messages\Event; +use Desperado\ServiceBus\Scheduler\Data\NextScheduledOperation; +use Desperado\ServiceBus\Scheduler\ScheduledOperationId; + +/** + * Scheduler operation emitted + * + * @see EmitSchedulerOperation + */ +final class SchedulerOperationEmitted implements Event +{ + /** + * Scheduled operation identifier + * + * @var ScheduledOperationId + */ + private $id; + + /** + * Next operation data + * + * @var NextScheduledOperation|null + */ + private $nextOperation; + + /** + * @param ScheduledOperationId $id + * @param NextScheduledOperation|null $nextScheduledOperation + * + * @return self + */ + public static function create(ScheduledOperationId $id, ?NextScheduledOperation $nextScheduledOperation = null): self + { + $self = new self(); + + $self->id = $id; + $self->nextOperation = $nextScheduledOperation; + + return $self; + } + + /** + * Receive scheduled operation identifier + * + * @return ScheduledOperationId + */ + public function id(): ScheduledOperationId + { + return $this->id; + } + + /** + * Receive next operation data + * + * @return NextScheduledOperation|null + */ + public function nextOperation(): ?NextScheduledOperation + { + return $this->nextOperation; + } +} \ No newline at end of file diff --git a/src/Scheduler/ScheduledOperationId.php b/src/Scheduler/ScheduledOperationId.php new file mode 100644 index 0000000..772c1cc --- /dev/null +++ b/src/Scheduler/ScheduledOperationId.php @@ -0,0 +1,51 @@ + + * @license MIT + * @license https://opensource.org/licenses/MIT + */ + +declare(strict_types = 1); + +namespace Desperado\ServiceBus\Scheduler; + +use function Desperado\ServiceBus\Common\uuid; + +/** + * + */ +final class ScheduledOperationId +{ + /** + * @var string + */ + private $value; + + /** + * @param string $value + */ + public function __construct(string $value) + { + $this->value = $value; + } + + /** + * @return self + */ + public static function new(): self + { + return new self(uuid()); + } + + /** + * @return string + */ + public function __toString(): string + { + return $this->value; + } +} diff --git a/src/Scheduler/SchedulerListener.php b/src/Scheduler/SchedulerListener.php new file mode 100644 index 0000000..6045ebd --- /dev/null +++ b/src/Scheduler/SchedulerListener.php @@ -0,0 +1,168 @@ + + * @license MIT + * @license https://opensource.org/licenses/MIT + */ + +declare(strict_types = 1); + +namespace Desperado\ServiceBus\Scheduler; + +use Amp\Promise; +use Amp\Success; +use Desperado\ServiceBus\Application\KernelContext; +use Desperado\ServiceBus\Common\Contract\Messages\Event; +use function Desperado\ServiceBus\Common\invokeReflectionMethod; +use Desperado\ServiceBus\Scheduler\Data\NextScheduledOperation; +use Desperado\ServiceBus\Scheduler\Messages\Command\EmitSchedulerOperation; +use Desperado\ServiceBus\Scheduler\Messages\Event\OperationScheduled; +use Desperado\ServiceBus\Scheduler\Messages\Event\SchedulerOperationCanceled; +use Desperado\ServiceBus\Scheduler\Messages\Event\SchedulerOperationEmitted; +use Desperado\ServiceBus\SchedulerProvider; +use Desperado\ServiceBus\Services\Annotations\CommandHandler; +use Desperado\ServiceBus\Services\Annotations\EventListener; + +/** + * + */ +final class SchedulerListener +{ + /** + * Emit command + * + * @CommandHandler() + * + * @param EmitSchedulerOperation $command + * @param KernelContext $context + * @param SchedulerProvider $schedulerProvider + * + * @return Promise + */ + public function handleEmit( + EmitSchedulerOperation $command, + KernelContext $context, + SchedulerProvider $schedulerProvider + ): Promise + { + try + { + /** + * @see SchedulerProvider::emit() + * + * @var Promise $promise + */ + $promise = invokeReflectionMethod($schedulerProvider, 'emit', $command->id(), $context); + + return $promise; + } + catch(\Throwable $throwable) + { + $context->logContextMessage( + 'Emit scheduled operation "{scheduledOperationId}" failed with message "{throwableMessage}"', [ + 'scheduledOperationId' => (string) $command->id(), + 'throwableMessage' => $throwable->getMessage(), + 'throwablePoint' => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()) + ] + ); + } + + return new Success(); + } + + /** + * Scheduler operation emitted + * + * @EventListener() + * + * @param SchedulerOperationEmitted $event + * @param KernelContext $context + * + * @return Promise + */ + public function whenSchedulerOperationEmitted(SchedulerOperationEmitted $event, KernelContext $context): Promise + { + return self::processNextOperationEmit($event, $context); + } + + /** + * Scheduler operation canceled + * + * @EventListener() + * + * @param SchedulerOperationCanceled $event + * @param KernelContext $context + * + * @return Promise + */ + public function whenSchedulerOperationCanceled(SchedulerOperationCanceled $event, KernelContext $context): Promise + { + return self::processNextOperationEmit($event, $context); + } + + /** + * Operation scheduled + * + * @EventListener() + * + * @param OperationScheduled $event + * @param KernelContext $context + * + * @return Promise + */ + public function whenOperationScheduled(OperationScheduled $event, KernelContext $context): Promise + { + return self::processNextOperationEmit($event, $context); + } + + /** + * Emit next operation + * + * @param Event $event + * @param KernelContext $context + * + * @return Promise + */ + private static function processNextOperationEmit(Event $event, KernelContext $context): Promise + { + /** @var SchedulerOperationEmitted|SchedulerOperationCanceled|OperationScheduled $event */ + + $nextOperation = $event->nextOperation(); + + if(null !== $nextOperation) + { + /** Send a message that will return after a specified time interval */ + return $context->send( + EmitSchedulerOperation::create(new ScheduledOperationId($nextOperation->id())), [ + 'expiration' => 0, + 'x-delay' => self::calculateExecutionDelay($nextOperation) + ] + ); + } + + return new Success(); + } + + /** + * Calculate next execution delay + * + * @param NextScheduledOperation $nextScheduledOperation + * + * @return int + */ + private static function calculateExecutionDelay(NextScheduledOperation $nextScheduledOperation): int + { + $executionDelay = $nextScheduledOperation->time() - (\microtime(true) * 1000); + + if(0 > $executionDelay) + { + $executionDelay *= -1; + } + + return $executionDelay; + } +} diff --git a/src/Scheduler/Store/SchedulerRegistry.php b/src/Scheduler/Store/SchedulerRegistry.php new file mode 100644 index 0000000..ba8b188 --- /dev/null +++ b/src/Scheduler/Store/SchedulerRegistry.php @@ -0,0 +1,175 @@ + + * @license MIT + * @license https://opensource.org/licenses/MIT + */ + +declare(strict_types = 1); + +namespace Desperado\ServiceBus\Scheduler\Store; + +use Desperado\ServiceBus\Scheduler\Data\NextScheduledOperation; +use Desperado\ServiceBus\Scheduler\Data\ScheduledOperation; +use Desperado\ServiceBus\Scheduler\ScheduledOperationId; + +/** + * Registry representation + */ +final class SchedulerRegistry implements \Serializable +{ + /** + * @var string + */ + private $id; + + /** + * Operations collection + * + * @var array + */ + private $operations; + + /** + * Time table of operations in milliseconds + * + * @var array + */ + private $timetable; + + /** + * Create registry + * + * @return self + */ + public static function create(string $registryId): self + { + $self = new self(); + + $self->id = $registryId; + + return $self; + } + + /** + * @return string + */ + public function id(): string + { + return $this->id; + } + + /** + * @inheritdoc + */ + public function serialize(): string + { + return \serialize([$this->id, $this->operations, $this->timetable]); + } + + /** + * @inheritdoc + */ + public function unserialize($serialized): void + { + [$this->id, $this->operations, $this->timetable] = \unserialize($serialized, ['allowed_classes' => true]); + } + + /** + * Add operation to registry + * + * @param ScheduledOperation $operation + * + * @return void + */ + public function add(ScheduledOperation $operation): void + { + $operationId = (string) $operation->id(); + + $this->operations[$operationId] = $operation; + $this->timetable[$operationId] = (int) $operation->date()->format('U.u') * 1000; + } + + /** + * Extract operation from registry + * + * @param ScheduledOperationId $id + * + * @return ScheduledOperation|null + */ + public function extract(ScheduledOperationId $id): ?ScheduledOperation + { + $identifier = (string) $id; + + if(true === isset($this->timetable[$identifier]) && true === isset($this->operations[$identifier])) + { + $operation = $this->operations[$identifier]; + + $this->remove($id); + + return $operation; + } + + return null; + } + + /** + * Remove from registry + * + * @param ScheduledOperationId $id + * + * @return void + */ + public function remove(ScheduledOperationId $id): void + { + $identifier = (string) $id; + + unset($this->timetable[$identifier], $this->operations[$identifier]); + } + + /** + * Need to call after remove or add of new operation + * + * @return NextScheduledOperation|null + */ + public function fetchNextOperation(): ?NextScheduledOperation + { + if(0 !== \count($this->timetable)) + { + $minTime = \min($this->timetable); + $id = (string) \array_search($minTime, $this->timetable, true); + + // @codeCoverageIgnoreStart + if('' === $id) + { + return null; + } + // @codeCoverageIgnoreEnd + + /** @var ScheduledOperation $operation */ + $operation = $this->operations[$id]; + + if(false === $operation->isSent()) + { + $this->operations[$id] = $operation->sent(); + + return new NextScheduledOperation($id, $minTime); + } + } + + return null; + } + + /** + * Close constructor + */ + private function __construct() + { + $this->operations = []; + $this->timetable = []; + } +} diff --git a/src/Scheduler/Store/SchedulerStore.php b/src/Scheduler/Store/SchedulerStore.php new file mode 100644 index 0000000..83f9930 --- /dev/null +++ b/src/Scheduler/Store/SchedulerStore.php @@ -0,0 +1,62 @@ + + * @license MIT + * @license https://opensource.org/licenses/MIT + */ + +declare(strict_types = 1); + +namespace Desperado\ServiceBus\Scheduler\Store; + +use Amp\Promise; + +/** + * + */ +interface SchedulerStore +{ + /** + * Load registry + * + * @param string $id + * + * @return Promise<\Desperado\ServiceBus\Scheduler\Store\SchedulerRegistry> + * + * @throws \Desperado\ServiceBus\Storage\Exceptions\ConnectionFailed + * @throws \Desperado\ServiceBus\Storage\Exceptions\OperationFailed + * @throws \Desperado\ServiceBus\Storage\Exceptions\StorageInteractingFailed + */ + public function load(string $id): Promise; + + /** + * Save new registry + * + * @param SchedulerRegistry $registry + * + * @return Promise + * + * @throws \Desperado\ServiceBus\Storage\Exceptions\UniqueConstraintViolationCheckFailed + * @throws \Desperado\ServiceBus\Storage\Exceptions\ConnectionFailed + * @throws \Desperado\ServiceBus\Storage\Exceptions\OperationFailed + * @throws \Desperado\ServiceBus\Storage\Exceptions\StorageInteractingFailed + */ + public function add(SchedulerRegistry $registry): Promise; + + /** + * Update registry + * + * @param SchedulerRegistry $registry + * + * @return Promise + * + * @throws \Desperado\ServiceBus\Storage\Exceptions\ConnectionFailed + * @throws \Desperado\ServiceBus\Storage\Exceptions\OperationFailed + * @throws \Desperado\ServiceBus\Storage\Exceptions\StorageInteractingFailed + */ + public function update(SchedulerRegistry $registry): Promise; +} diff --git a/src/Scheduler/Store/Sql/SqlSchedulerStore.php b/src/Scheduler/Store/Sql/SqlSchedulerStore.php new file mode 100644 index 0000000..5749785 --- /dev/null +++ b/src/Scheduler/Store/Sql/SqlSchedulerStore.php @@ -0,0 +1,123 @@ + + * @license MIT + * @license https://opensource.org/licenses/MIT + */ + +declare(strict_types = 1); + +namespace Desperado\ServiceBus\Scheduler\Store\Sql; + +use function Amp\call; +use Amp\Promise; +use Amp\Success; +use Desperado\ServiceBus\Scheduler\Store\SchedulerRegistry; +use Desperado\ServiceBus\Scheduler\Store\SchedulerStore; +use function Desperado\ServiceBus\Storage\fetchOne; +use function Desperado\ServiceBus\Storage\SQL\createInsertQuery; +use function Desperado\ServiceBus\Storage\SQL\equalsCriteria; +use function Desperado\ServiceBus\Storage\SQL\selectQuery; +use function Desperado\ServiceBus\Storage\SQL\updateQuery; +use Desperado\ServiceBus\Storage\StorageAdapter; + +/** + * + */ +final class SqlSchedulerStore implements SchedulerStore +{ + /** + * @var StorageAdapter + */ + private $adapter; + + /** + * @param StorageAdapter $adapter + */ + public function __construct(StorageAdapter $adapter) + { + $this->adapter = $adapter; + } + + /** + * @inheritDoc + */ + public function load(string $id): Promise + { + $adapter = $this->adapter; + + /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */ + return call( + static function(string $id) use ($adapter): \Generator + { + $registry = null; + + $query = selectQuery('scheduler_registry') + ->where(equalsCriteria('id', $id)) + ->compile(); + + /** @var array|null $row */ + $row = yield fetchOne( + yield $adapter->execute($query->sql(), $query->params()) + ); + + if(true === \is_array($row) && 0 !== \count($row)) + { + $row['payload'] = $adapter->unescapeBinary($row['payload']); + + $registry = \unserialize(\base64_decode($row['payload']), ['allowed_classes' => true]); + } + + return yield new Success($registry); + }, + $id + ); + } + + /** + * @inheritDoc + */ + public function add(SchedulerRegistry $registry): Promise + { + $adapter = $this->adapter; + + /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */ + return call( + static function(SchedulerRegistry $registry) use ($adapter): \Generator + { + $query = createInsertQuery( + 'scheduler_registry', + ['id' => $registry->id(), 'payload' => \base64_encode(\serialize($registry))] + )->compile(); + + yield $adapter->execute($query->sql(), $query->params()); + } + ); + } + + + /** + * @inheritDoc + */ + public function update(SchedulerRegistry $registry): Promise + { + $adapter = $this->adapter; + + /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */ + return call( + static function(SchedulerRegistry $registry) use ($adapter): \Generator + { + $query = updateQuery('scheduler_registry', ['payload' => \base64_encode(\serialize($registry))]) + ->where(equalsCriteria('id', $registry->id())) + ->compile(); + + yield $adapter->execute($query->sql(), $query->params()); + }, + $registry + ); + } +} diff --git a/src/Scheduler/Store/Sql/schema/scheduler_registry.sql b/src/Scheduler/Store/Sql/schema/scheduler_registry.sql new file mode 100644 index 0000000..bf7a3f8 --- /dev/null +++ b/src/Scheduler/Store/Sql/schema/scheduler_registry.sql @@ -0,0 +1,5 @@ +CREATE TABLE IF NOT EXISTS scheduler_registry +( + id uuid PRIMARY KEY, + payload bytea NOT NULL +); \ No newline at end of file diff --git a/src/SchedulerProvider.php b/src/SchedulerProvider.php new file mode 100644 index 0000000..01af2f1 --- /dev/null +++ b/src/SchedulerProvider.php @@ -0,0 +1,269 @@ + + * @license MIT + * @license https://opensource.org/licenses/MIT + */ + +declare(strict_types = 1); + +namespace Desperado\ServiceBus; + +use function Amp\call; +use Amp\Promise; +use Amp\Success; +use Desperado\ServiceBus\Common\Contract\Messages\Command; +use function Desperado\ServiceBus\Common\datetimeInstantiator; +use Desperado\ServiceBus\Common\ExecutionContext\LoggingInContext; +use Desperado\ServiceBus\Common\ExecutionContext\MessageDeliveryContext; +use Desperado\ServiceBus\Scheduler\Data\ScheduledOperation; +use Desperado\ServiceBus\Scheduler\Exceptions\InvalidScheduledOperationExecutionDate; +use Desperado\ServiceBus\Scheduler\Messages\Event\OperationScheduled; +use Desperado\ServiceBus\Scheduler\Messages\Event\SchedulerOperationCanceled; +use Desperado\ServiceBus\Scheduler\Messages\Event\SchedulerOperationEmitted; +use Desperado\ServiceBus\Scheduler\ScheduledOperationId; +use Desperado\ServiceBus\Scheduler\Store\SchedulerRegistry; +use Desperado\ServiceBus\Scheduler\Store\SchedulerStore; +use Ramsey\Uuid\Uuid; + +/** + * + */ +final class SchedulerProvider +{ + /** + * Registry identifier + * + * @var string + */ + private $registryId; + + /** + * @var SchedulerStore + */ + private $store; + + /** + * @param SchedulerStore $store + */ + public function __construct(SchedulerStore $store, string $registryName = 'scheduler_registry') + { + $this->registryId = Uuid::uuid5(SchedulerRegistry::class, $registryName)->toString(); + $this->store = $store; + } + + /** + * Schedule operation + * + * @param ScheduledOperationId $id + * @param Command $command + * @param \DateTimeImmutable $executionDate + * @param MessageDeliveryContext $context + * + * @return Promise + */ + public function schedule( + ScheduledOperationId $id, + Command $command, + \DateTimeImmutable $executionDate, + MessageDeliveryContext $context + ): Promise + { + self::guardOperationExecutionDate($executionDate); + + $store = $this->store; + $registryId = $this->registryId; + + /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */ + return call( + static function(ScheduledOperation $operation) use ($store, $registryId, $context): \Generator + { + /** @var SchedulerRegistry $registry */ + $registry = yield self::obtainRegistry($store, $registryId); + + $registry->add($operation); + + yield self::updateRegistry($store, $registry, false); + + yield $context->delivery( + OperationScheduled::create( + $operation->id(), + $operation->command(), + $operation->date(), + $registry->fetchNextOperation() + ) + ); + }, + new ScheduledOperation($id, $command, $executionDate) + ); + } + + /** + * Cancel scheduled operation + * + * @param ScheduledOperationId $id + * @param MessageDeliveryContext $context + * + * @return Promise + */ + public function cancel(ScheduledOperationId $id, MessageDeliveryContext $context, ?string $reason = null): Promise + { + $store = $this->store; + $registryId = $this->registryId; + + /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */ + return call( + static function(ScheduledOperationId $id, ?string $reason) use ($store, $registryId, $context): \Generator + { + /** @var SchedulerRegistry $registry */ + $registry = yield self::obtainRegistry($store, $registryId); + + $registry->remove($id); + + yield self::updateRegistry($store, $registry, false); + + yield $context->delivery( + SchedulerOperationCanceled::create( + $id, + $reason, + $registry->fetchNextOperation() + ) + ); + }, + $id, + $reason + ); + } + + /** + * Emit operation + * Called by infrastructure components + * + * @noinspection PhpUnusedPrivateMethodInspection + * + * @param ScheduledOperationId $operationId + * @param MessageDeliveryContext $context + * + * @return Promise + */ + private function emit(ScheduledOperationId $id, MessageDeliveryContext $context): Promise + { + $store = $this->store; + $registryId = $this->registryId; + + /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */ + return call( + static function(ScheduledOperationId $id) use ($store, $registryId, $context): \Generator + { + /** @var SchedulerRegistry $registry */ + $registry = yield self::obtainRegistry($store, $registryId); + + $operation = $registry->extract($id); + + if(null !== $operation) + { + $command = $operation->command(); + + yield self::updateRegistry($store, $registry, false); + yield $context->delivery($command); + + if($context instanceof LoggingInContext) + { + $context->logContextMessage( + 'The delayed "{messageClass}" command has been sent to the transport', [ + 'messageClass' => \get_class($command), + 'scheduledOperationId' => (string) $id + ] + ); + } + } + + yield $context->delivery( + SchedulerOperationEmitted::create($id, $registry->fetchNextOperation()) + ); + }, + $id + ); + } + + /** + * Obtain registry + * + * @param SchedulerStore $store + * @param string $registryId + * + * @return Promise<\Desperado\ServiceBus\Scheduler\Store\SchedulerRegistry> + */ + private static function obtainRegistry(SchedulerStore $store, string $registryId): Promise + { + /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */ + return call( + static function(string $registryId) use ($store): \Generator + { + /** @var SchedulerRegistry|null $registry */ + $registry = yield $store->load($registryId); + + if(null === $registry) + { + $registry = SchedulerRegistry::create($registryId); + + yield self::updateRegistry($store, $registry, true); + } + + return yield new Success($registry); + }, + $registryId + ); + } + + /** + * Update\Save new registry + * + * @param SchedulerStore $store + * @param SchedulerRegistry $registry + * + * @return Promise + */ + private static function updateRegistry(SchedulerStore $store, SchedulerRegistry $registry, bool $isNew = false): Promise + { + /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */ + return call( + static function(SchedulerRegistry $registry, bool $isNew) use ($store): \Generator + { + true === $isNew + ? yield $store->add($registry) + : yield $store->update($registry); + + return yield new Success(); + }, + $registry, + $isNew + ); + } + + /** + * Make sure that the specific date of the operation is specified + * + * @param \DateTimeImmutable $dateTime + * + * @return void + * + * @throws \Desperado\ServiceBus\Scheduler\Exceptions\InvalidScheduledOperationExecutionDate + */ + private static function guardOperationExecutionDate(\DateTimeImmutable $dateTime): void + { + /** @var \DateTimeImmutable $currentDate */ + $currentDate = datetimeInstantiator('NOW'); + + if($currentDate->getTimestamp() > $dateTime->getTimestamp()) + { + throw new InvalidScheduledOperationExecutionDate( + 'Scheduled operation date must be greater then current' + ); + } + } +} diff --git a/src/Storage/SQL/AmpPostgreSQL/AmpPostgreSQLAdapter.php b/src/Storage/SQL/AmpPostgreSQL/AmpPostgreSQLAdapter.php index 762f741..44e74fa 100644 --- a/src/Storage/SQL/AmpPostgreSQL/AmpPostgreSQLAdapter.php +++ b/src/Storage/SQL/AmpPostgreSQL/AmpPostgreSQLAdapter.php @@ -137,9 +137,9 @@ static function() use ($connectionsPool): \Generator /** * @inheritdoc */ - public function unescapeBinary(string $string): string + public function unescapeBinary($payload): string { /** @noinspection PhpComposerExtensionStubsInspection */ - return \pg_unescape_bytea($string); + return \pg_unescape_bytea($payload); } } \ No newline at end of file diff --git a/src/Storage/SQL/DoctrineDBAL/DoctrineDBALAdapter.php b/src/Storage/SQL/DoctrineDBAL/DoctrineDBALAdapter.php index 32fbf2b..2613ebf 100644 --- a/src/Storage/SQL/DoctrineDBAL/DoctrineDBALAdapter.php +++ b/src/Storage/SQL/DoctrineDBAL/DoctrineDBALAdapter.php @@ -114,8 +114,13 @@ public function transaction(): Promise /** * @inheritdoc */ - public function unescapeBinary(string $string): string + public function unescapeBinary($payload): string { - return $string; + if(true === \is_resource($payload)) + { + return \stream_get_contents($payload, -1, 0); + } + + return $payload; } } diff --git a/src/Storage/StorageAdapter.php b/src/Storage/StorageAdapter.php index fed4753..129cdab 100644 --- a/src/Storage/StorageAdapter.php +++ b/src/Storage/StorageAdapter.php @@ -56,9 +56,9 @@ public function transaction(): Promise; /** * Unescape binary string * - * @param string $string + * @param string|resource $payload * * @return string */ - public function unescapeBinary(string $string): string; + public function unescapeBinary($payload): string; } diff --git a/tests/Stubs/Context/TestContext.php b/tests/Stubs/Context/TestContext.php index 365aec9..b8d33bb 100644 --- a/tests/Stubs/Context/TestContext.php +++ b/tests/Stubs/Context/TestContext.php @@ -15,6 +15,8 @@ use Amp\Promise; use Amp\Success; +use Desperado\ServiceBus\Common\Contract\Messages\Command; +use Desperado\ServiceBus\Common\Contract\Messages\Event; use Desperado\ServiceBus\Common\Contract\Messages\Message; use Desperado\ServiceBus\Common\ExecutionContext\MessageDeliveryContext; @@ -34,4 +36,24 @@ public function delivery(Message ...$messages): Promise return new Success(); } + + /** + * @inheritDoc + */ + public function send(Command $command, array $headers = []): Promise + { + $this->messages[] = $command; + + return new Success(); + } + + /** + * @inheritDoc + */ + public function publish(Event $event, array $headers = []): Promise + { + $this->messages[] = $event; + + return new Success(); + } } From d214d66580f45baef6a07e484365b9d890ea9ad3 Mon Sep 17 00:00:00 2001 From: desperado Date: Thu, 23 Aug 2018 18:12:39 +0300 Subject: [PATCH 02/11] scheduler --- src/Scheduler/SchedulerListener.php | 3 ++- src/Scheduler/Store/Sql/SqlSchedulerStore.php | 3 ++- src/SchedulerProvider.php | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/Scheduler/SchedulerListener.php b/src/Scheduler/SchedulerListener.php index 6045ebd..8c9e058 100644 --- a/src/Scheduler/SchedulerListener.php +++ b/src/Scheduler/SchedulerListener.php @@ -156,7 +156,8 @@ private static function processNextOperationEmit(Event $event, KernelContext $co */ private static function calculateExecutionDelay(NextScheduledOperation $nextScheduledOperation): int { - $executionDelay = $nextScheduledOperation->time() - (\microtime(true) * 1000); + /** @noinspection UnnecessaryCastingInspection */ + $executionDelay = $nextScheduledOperation->time() - (int) (\microtime(true) * 1000); if(0 > $executionDelay) { diff --git a/src/Scheduler/Store/Sql/SqlSchedulerStore.php b/src/Scheduler/Store/Sql/SqlSchedulerStore.php index 5749785..568652a 100644 --- a/src/Scheduler/Store/Sql/SqlSchedulerStore.php +++ b/src/Scheduler/Store/Sql/SqlSchedulerStore.php @@ -95,7 +95,8 @@ static function(SchedulerRegistry $registry) use ($adapter): \Generator )->compile(); yield $adapter->execute($query->sql(), $query->params()); - } + }, + $registry ); } diff --git a/src/SchedulerProvider.php b/src/SchedulerProvider.php index 01af2f1..5ae7177 100644 --- a/src/SchedulerProvider.php +++ b/src/SchedulerProvider.php @@ -52,7 +52,7 @@ final class SchedulerProvider */ public function __construct(SchedulerStore $store, string $registryName = 'scheduler_registry') { - $this->registryId = Uuid::uuid5(SchedulerRegistry::class, $registryName)->toString(); + $this->registryId = Uuid::uuid5(Uuid::NAMESPACE_X500, $registryName)->toString(); $this->store = $store; } From e243bdbea9e5d29887f12f8551e4859eff2ede2d Mon Sep 17 00:00:00 2001 From: desperado Date: Fri, 24 Aug 2018 12:47:33 +0300 Subject: [PATCH 03/11] change default\custom destinations logic --- src/Application/TransportConfigurator.php | 6 ++-- src/OutboundMessage/OutboundMessageRoutes.php | 28 +++++++++++-------- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/src/Application/TransportConfigurator.php b/src/Application/TransportConfigurator.php index 37d8bbe..79fc1dc 100644 --- a/src/Application/TransportConfigurator.php +++ b/src/Application/TransportConfigurator.php @@ -48,8 +48,7 @@ public function __construct(Transport $transport, OutboundMessageRoutes $outboun } /** - * Configure default destinations - * In them, messages will be sent regardless of the availability of individual routes + * Configure default routes for messages * * @noinspection PhpDocSignatureInspection * @@ -66,7 +65,8 @@ public function addDefaultDestinations(Destination ...$destinations): self } /** - * Add directions for a specific message (in addition to the default directions) + * Add routes for specific messages + * If the message has its own route, it will not be sent to the default route * * @noinspection PhpDocSignatureInspection * diff --git a/src/OutboundMessage/OutboundMessageRoutes.php b/src/OutboundMessage/OutboundMessageRoutes.php index 82c304e..31bbdbb 100644 --- a/src/OutboundMessage/OutboundMessageRoutes.php +++ b/src/OutboundMessage/OutboundMessageRoutes.php @@ -24,7 +24,8 @@ final class OutboundMessageRoutes { /** - * Routes to which messages will be sent + * Routes for specific messages + * If the message has its own route, it will not be sent to the default route * * [ * 'SomeClassNamespace' => Destination[] @@ -32,15 +33,14 @@ final class OutboundMessageRoutes * * @var array */ - private $destinations; + private $customDestinations = []; /** - * Default destinations - * In them, messages will be sent regardless of the availability of individual routes + * The default routes for messages * * @var array */ - private $defaultDestinations; + private $defaultDestinations = []; /** * Receive destinations for specified message @@ -54,10 +54,16 @@ final class OutboundMessageRoutes */ public function destinationsFor(string $messageClass): array { - return \array_merge( - $this->defaultDestinations, - $this->destinations[$messageClass] ?? [] - ); + if( + true === isset($this->customDestinations[$messageClass]) && + true === \is_array($this->customDestinations[$messageClass]) && + 0 !== \count($this->customDestinations[$messageClass]) + ) + { + return $this->customDestinations[$messageClass]; + } + + return $this->defaultDestinations; } /** @@ -81,7 +87,7 @@ public function addDefaultRoutes(array $destinations): void } /** - * Add directions for a specific message (in addition to the default directions) + * Add directions for a specific message * * @param string $messageClass * @param Destination $destination @@ -97,7 +103,7 @@ public function addRoute(string $messageClass, Destination $destination): void self::validateDestination($destination); /** @psalm-suppress InvalidArrayAssignment */ - $this->destinations[$messageClass][] = $destination; + $this->customDestinations[$messageClass][] = $destination; } /** From ad6e58e6cc607985d75bab969a0dca9f8f140fdb Mon Sep 17 00:00:00 2001 From: desperado Date: Fri, 24 Aug 2018 12:52:11 +0300 Subject: [PATCH 04/11] unused import --- tests/Application/Kernel/ServiceBusKernelTest.php | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/Application/Kernel/ServiceBusKernelTest.php b/tests/Application/Kernel/ServiceBusKernelTest.php index 86e1dc9..f33e2ae 100644 --- a/tests/Application/Kernel/ServiceBusKernelTest.php +++ b/tests/Application/Kernel/ServiceBusKernelTest.php @@ -40,7 +40,6 @@ use Monolog\Handler\TestHandler; use PHPUnit\Framework\TestCase; use Psr\Container\ContainerInterface; -use Psr\Log\LoggerInterface; /** * From b4ccd6b706c828a153d1416a049c5ac69a3f8966 Mon Sep 17 00:00:00 2001 From: desperado Date: Fri, 24 Aug 2018 14:32:02 +0300 Subject: [PATCH 05/11] add delayed options to queue and exchange --- src/Transport/AmqpExt/AmqpQueue.php | 19 ++++++++++++++++++- src/Transport/AmqpExt/AmqpTopic.php | 7 +++++-- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/src/Transport/AmqpExt/AmqpQueue.php b/src/Transport/AmqpExt/AmqpQueue.php index 841ee0c..49e37fc 100644 --- a/src/Transport/AmqpExt/AmqpQueue.php +++ b/src/Transport/AmqpExt/AmqpQueue.php @@ -116,6 +116,23 @@ public static function create(string $name): self return new self($name); } + /** + * Create delayed queue + * + * @see https://github.com/rabbitmq/rabbitmq-delayed-message-exchange + * + * @param string $name + * + * @return self + */ + public static function createDelayedQueue(string $name): self + { + $self = new self($name); + $self->arguments['x-dead-letter-exchange'] = 'delayed'; + + return $self; + } + /** * @return string */ @@ -175,7 +192,7 @@ public function enableAutoDelete(): self */ public function wthArguments(array $arguments): self { - $this->arguments = $arguments; + $this->arguments = \array_merge($this->arguments, $arguments); return $this; } diff --git a/src/Transport/AmqpExt/AmqpTopic.php b/src/Transport/AmqpExt/AmqpTopic.php index 0ab5e36..cb5d976 100644 --- a/src/Transport/AmqpExt/AmqpTopic.php +++ b/src/Transport/AmqpExt/AmqpTopic.php @@ -121,7 +121,10 @@ public static function topic(string $name): self */ public static function delayed(string $name): self { - return new self($name, self::TYPE_DELAYED); + $self = new self($name, self::TYPE_DELAYED); + $self->arguments['x-delayed-type'] = 'fanout'; + + return $self; } /** @@ -161,7 +164,7 @@ public function durable(): self */ public function wthArguments(array $arguments): self { - $this->arguments = $arguments; + $this->arguments = \array_merge($this->arguments, $arguments); return $this; } From 9ec7697a522ce8492f03f412cbc4a11d984535d9 Mon Sep 17 00:00:00 2001 From: desperado Date: Fri, 24 Aug 2018 14:36:11 +0300 Subject: [PATCH 06/11] change exception log keys --- src/Application/ServiceBusKernel.php | 5 ++-- src/HttpClient/Artax/ArtaxHttpClient.php | 6 ++-- src/Scheduler/SchedulerListener.php | 3 +- src/Transport/AmqpExt/AmqpConsumer.php | 36 +++++++++++------------- 4 files changed, 23 insertions(+), 27 deletions(-) diff --git a/src/Application/ServiceBusKernel.php b/src/Application/ServiceBusKernel.php index ffd69aa..52b39c1 100644 --- a/src/Application/ServiceBusKernel.php +++ b/src/Application/ServiceBusKernel.php @@ -382,9 +382,10 @@ private static function onSendMessageFailed( ): void { $logger->critical( - 'Error sending message "{messageClass}" to broker: "{exceptionMessage}"', [ + 'Error sending message "{messageClass}" to broker: "{throwableMessage}"', [ 'messageClass' => $messageClass, - 'exceptionMessage' => $throwable->getMessage() + 'throwableMessage' => $throwable->getMessage(), + 'throwablePoint' => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()) ] ); diff --git a/src/HttpClient/Artax/ArtaxHttpClient.php b/src/HttpClient/Artax/ArtaxHttpClient.php index 3e5f90d..19630e5 100644 --- a/src/HttpClient/Artax/ArtaxHttpClient.php +++ b/src/HttpClient/Artax/ArtaxHttpClient.php @@ -271,11 +271,11 @@ private static function logResponse(LoggerInterface $logger, Psr7Response $respo private static function logThrowable(LoggerInterface $logger, \Throwable $throwable, string $requestId): void { $logger->error( - 'During the execution of the request with identifier "{requestId}" an exception was caught: "{exceptionMessage}"', + 'During the execution of the request with identifier "{requestId}" an exception was caught: "{throwableMessage}"', [ 'requestId' => $requestId, - 'exceptionMessage' => $throwable->getMessage(), - 'file' => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()) + 'throwableMessage' => $throwable->getMessage(), + 'throwablePoint' => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()) ] ); } diff --git a/src/Scheduler/SchedulerListener.php b/src/Scheduler/SchedulerListener.php index 8c9e058..be1d6af 100644 --- a/src/Scheduler/SchedulerListener.php +++ b/src/Scheduler/SchedulerListener.php @@ -138,8 +138,7 @@ private static function processNextOperationEmit(Event $event, KernelContext $co /** Send a message that will return after a specified time interval */ return $context->send( EmitSchedulerOperation::create(new ScheduledOperationId($nextOperation->id())), [ - 'expiration' => 0, - 'x-delay' => self::calculateExecutionDelay($nextOperation) + 'x-delay' => self::calculateExecutionDelay($nextOperation) ] ); } diff --git a/src/Transport/AmqpExt/AmqpConsumer.php b/src/Transport/AmqpExt/AmqpConsumer.php index 34debd6..e91b721 100644 --- a/src/Transport/AmqpExt/AmqpConsumer.php +++ b/src/Transport/AmqpExt/AmqpConsumer.php @@ -212,8 +212,7 @@ private static function transformEnvelope( $unserialized['namespace'], $unserialized['message'] ), - /** @todo: only custom headers */ - [] + $envelope->getHeaders() ); } @@ -233,10 +232,9 @@ private function acknowledge(\AMQPEnvelope $envelope): void catch(\Throwable $throwable) { $this->logger->error( - 'Acknowledge error: "{exceptionMessage}"', [ - 'exceptionMessage' => $throwable->getMessage(), - 'exceptionFile' => $throwable->getFile(), - 'exceptionLine' => $throwable->getLine() + 'Acknowledge error: "{throwableMessage}"', [ + 'throwableMessage' => $throwable->getMessage(), + 'throwablePoint' => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()) ] ); } @@ -262,10 +260,9 @@ private function reject(\AMQPEnvelope $envelope, $retry = false): void catch(\Throwable $throwable) { $this->logger->error( - 'Error while rejecting message: "{exceptionMessage}"', [ - 'exceptionMessage' => $throwable->getMessage(), - 'exceptionFile' => $throwable->getFile(), - 'exceptionLine' => $throwable->getLine() + 'Error while rejecting message: "{throwableMessage}"', [ + 'throwableMessage' => $throwable->getMessage(), + 'throwablePoint' => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()) ] ); } @@ -346,10 +343,9 @@ private function garbageCleaning(): void private function handleDecodeFailed(string $operationId, \AMQPEnvelope $envelope, DecodeMessageFailed $exception): void { $this->logger->error( - 'An incorrectly serialized message was received. Error details: "{exceptionMessage}"', [ - 'exceptionMessage' => $exception->getMessage(), - 'exceptionFile' => $exception->getFile(), - 'exceptionLine' => $exception->getLine(), + 'An incorrectly serialized message was received. Error details: "{throwableMessage}"', [ + 'throwableMessage' => $exception->getMessage(), + 'throwablePoint' => \sprintf('%s:%d', $exception->getFile(), $exception->getLine()), 'operationId' => $operationId, 'rawMessagePayload' => $envelope->getBody() ] @@ -366,8 +362,9 @@ private function handleDecodeFailed(string $operationId, \AMQPEnvelope $envelope private function handleConnectionFail(\Exception $connectionException): void { $this->logger->emergency( - 'Connection to broker failed: "{exceptionMessage}". Cancel subscription', [ - 'exceptionMessage' => $connectionException->getMessage() + 'Connection to broker failed: "{throwableMessage}". Cancel subscription', [ + 'throwableMessage' => $connectionException->getMessage(), + 'throwablePoint' => \sprintf('%s:%d', $connectionException->getFile(), $connectionException->getLine()) ] ); @@ -384,10 +381,9 @@ private function handleConnectionFail(\Exception $connectionException): void private function handleThrowable(string $operationId, \AMQPEnvelope $envelope, \Throwable $throwable): void { $this->logger->error( - 'Error processing message: "{exceptionMessage}"', [ - 'exceptionMessage' => $throwable->getMessage(), - 'exceptionFile' => $throwable->getFile(), - 'exceptionLine' => $throwable->getLine(), + 'Error processing message: "{throwableMessage}"', [ + 'throwableMessage' => $throwable->getMessage(), + 'throwablePoint' => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()), 'operationId' => $operationId, 'rawMessagePayload' => $envelope->getBody(), ] From 958202237834e6661ef42f848f96b157ccafaec0 Mon Sep 17 00:00:00 2001 From: desperado Date: Fri, 24 Aug 2018 14:38:02 +0300 Subject: [PATCH 07/11] add transport headers logging --- src/Application/ServiceBusKernel.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Application/ServiceBusKernel.php b/src/Application/ServiceBusKernel.php index 52b39c1..96fc4cc 100644 --- a/src/Application/ServiceBusKernel.php +++ b/src/Application/ServiceBusKernel.php @@ -418,7 +418,8 @@ private static function beforeMessageSend( 'Sending a "{messageClass}" message to "{destinationTopic}/{destinationRoutingKey}"', [ 'messageClass' => $messageClass, 'destinationTopic' => $destination->topicName(), - 'destinationRoutingKey' => $destination->routingKey() + 'destinationRoutingKey' => $destination->routingKey(), + 'headers' => $outboundEnvelope->headers() ] ); From 0903e250fc2f7b13f8da0ba8b6fa4a3d320f82c5 Mon Sep 17 00:00:00 2001 From: desperado Date: Fri, 24 Aug 2018 14:41:44 +0300 Subject: [PATCH 08/11] fix tests --- tests/Application/Kernel/ServiceBusKernelTest.php | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/Application/Kernel/ServiceBusKernelTest.php b/tests/Application/Kernel/ServiceBusKernelTest.php index f33e2ae..8eef283 100644 --- a/tests/Application/Kernel/ServiceBusKernelTest.php +++ b/tests/Application/Kernel/ServiceBusKernelTest.php @@ -205,6 +205,11 @@ public function successExecutionWithResponseMessage(): void 'messageClass' => SuccessResponseEvent::class, 'destinationTopic' => 'test_topic', 'destinationRoutingKey' => 'test_key', + 'headers' => [ + 'x-message-class' => SuccessResponseEvent::class, + 'x-created-after-message' => TriggerResponseEventCommand::class, + 'x-hostname' => \gethostname() + ] ], $latest['context'] ); @@ -231,14 +236,16 @@ public function failedResponseDelivery(): void /** @see VirtualPublisher::send() */ static::assertEquals( - 'Error sending message "{messageClass}" to broker: "{exceptionMessage}"', + 'Error sending message "{messageClass}" to broker: "{throwableMessage}"', $messageEntry['message'] ); + unset($messageEntry['context']['throwablePoint']); + static::assertEquals( [ 'messageClass' => FailedMessageSendMarkerEvent::class, - 'exceptionMessage' => 'shit happens' + 'throwableMessage' => 'shit happens' ], $messageEntry['context'] ); From 419d3ca28e1252cbe63b725c4b876dc2a5121753 Mon Sep 17 00:00:00 2001 From: desperado Date: Fri, 24 Aug 2018 18:13:23 +0300 Subject: [PATCH 09/11] transport logic changes --- src/Application/TransportConfigurator.php | 54 ++++++++++- src/Transport/AmqpExt/AmqpExt.php | 95 ++++++++++++------- src/Transport/AmqpExt/AmqpQueue.php | 45 ++++++--- src/Transport/AmqpExt/AmqpTopic.php | 46 ++++++--- src/Transport/Exceptions/BindFailed.php | 22 +++++ src/Transport/QueueBind.php | 16 +++- src/Transport/TopicBind.php | 70 ++++++++++++++ src/Transport/Transport.php | 31 +++++- .../Kernel/ServiceBusKernelTest.php | 3 +- tests/Stubs/Transport/VirtualTransport.php | 19 +++- 10 files changed, 326 insertions(+), 75 deletions(-) create mode 100644 src/Transport/Exceptions/BindFailed.php create mode 100644 src/Transport/TopicBind.php diff --git a/src/Application/TransportConfigurator.php b/src/Application/TransportConfigurator.php index 79fc1dc..b26c828 100644 --- a/src/Application/TransportConfigurator.php +++ b/src/Application/TransportConfigurator.php @@ -18,6 +18,7 @@ use Desperado\ServiceBus\Transport\Queue; use Desperado\ServiceBus\Transport\QueueBind; use Desperado\ServiceBus\Transport\Topic; +use Desperado\ServiceBus\Transport\TopicBind; use Desperado\ServiceBus\Transport\Transport; /** @@ -91,28 +92,71 @@ public function registerCustomMessageDestinations(string $message, Destination . * @param QueueBind|null $bindTo * * @return $this + * + * @throws \Desperado\ServiceBus\Transport\Exceptions\ConnectionFail + * @throws \Desperado\ServiceBus\Transport\Exceptions\BindFailed */ public function addQueue(Queue $queue, ?QueueBind $bindTo = null): self { + $this->transport->createQueue($queue); + if(null !== $bindTo) { - $this->transport->createTopic($bindTo->topic()); + $this->bindQueue($bindTo); } - $this->transport->createQueue($queue, $bindTo); - return $this; } /** - * @param Topic $topic + * @param Topic $topic + * @param TopicBind|null $bindTo * * @return $this + * + * @throws \Desperado\ServiceBus\Transport\Exceptions\ConnectionFail + * @throws \Desperado\ServiceBus\Transport\Exceptions\BindFailed */ - public function createTopic(Topic $topic): self + public function createTopic(Topic $topic, ?TopicBind $bindTo = null): self { $this->transport->createTopic($topic); + if(null !== $bindTo) + { + $this->bindTopic($bindTo); + } + return $this; } + + /** + * @param TopicBind $bindTo + * + * @return $this + * + * @throws \Desperado\ServiceBus\Transport\Exceptions\ConnectionFail + * @throws \Desperado\ServiceBus\Transport\Exceptions\BindFailed + */ + public function bindTopic(TopicBind $bindTo): self + { + $this->transport->bindTopic($bindTo); + + return $this; + } + + /** + * @param QueueBind $bindTo + * + * @return $this + * + * @throws \Desperado\ServiceBus\Transport\Exceptions\ConnectionFail + * @throws \Desperado\ServiceBus\Transport\Exceptions\BindFailed + */ + public function bindQueue(QueueBind $bindTo): self + { + $this->transport->bindQueue($bindTo); + + return $this; + + } } \ No newline at end of file diff --git a/src/Transport/AmqpExt/AmqpExt.php b/src/Transport/AmqpExt/AmqpExt.php index c8309cc..de78add 100644 --- a/src/Transport/AmqpExt/AmqpExt.php +++ b/src/Transport/AmqpExt/AmqpExt.php @@ -14,6 +14,7 @@ namespace Desperado\ServiceBus\Transport\AmqpExt; use Desperado\ServiceBus\Transport\Consumer; +use Desperado\ServiceBus\Transport\Exceptions\BindFailed; use Desperado\ServiceBus\Transport\Exceptions\ConnectionFail; use Desperado\ServiceBus\Transport\Exceptions\CreateQueueFailed; use Desperado\ServiceBus\Transport\Exceptions\CreateTopicFailed; @@ -26,6 +27,7 @@ use Desperado\ServiceBus\Transport\Queue; use Desperado\ServiceBus\Transport\QueueBind; use Desperado\ServiceBus\Transport\Topic; +use Desperado\ServiceBus\Transport\TopicBind; use Desperado\ServiceBus\Transport\Transport; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; @@ -78,13 +80,6 @@ final class AmqpExt implements Transport */ private $queues = []; - /** - * Relation exchange -> routing keys - * - * @var array - */ - private $exchangeBinds = []; - /** * @param AmqpConfiguration $amqpConfiguration * @param TransportMessageEncoder|null $messageEncoder @@ -167,10 +162,39 @@ public function createTopic(Topic $topic): void } } + /** + * @inheritDoc + */ + public function bindTopic(TopicBind $to): void + { + try + { + if(true === isset($this->exchanges[(string) $to->sourceTopic()])) + { + /** @var \AMQPExchange $exchange */ + $exchange = $this->exchanges[(string) $to->sourceTopic()]; + + $exchange->bind((string) $to->destinationTopic(), $to->routingKey()); + + return; + } + + throw new \LogicException('Queue not configured. Use createQueue method'); + } + catch(\AMQPConnectionException $exception) + { + throw new ConnectionFail($exception->getMessage(), $exception->getCode(), $exception); + } + catch(\Throwable $throwable) + { + throw new BindFailed($throwable->getMessage(), $throwable->getCode(), $throwable); + } + } + /** * @inheritdoc */ - public function createQueue(Queue $queue, QueueBind $bind = null): void + public function createQueue(Queue $queue): void { /** Queue already added */ if(true === isset($this->queues[(string) $queue])) @@ -189,8 +213,6 @@ public function createQueue(Queue $queue, QueueBind $bind = null): void $amqpQueue->declareQueue(); - $this->bindQueue($amqpQueue, $bind); - /** @psalm-suppress InvalidArrayAssignment */ $this->queues[(string) $queue] = $amqpQueue; } @@ -204,6 +226,35 @@ public function createQueue(Queue $queue, QueueBind $bind = null): void } } + /** + * @inheritDoc + */ + public function bindQueue(QueueBind $to): void + { + try + { + if(true === isset($this->queues[(string) $to->queue()])) + { + /** @var \AMQPQueue $queue */ + $queue = $this->queues[(string) $to->queue()]; + + $queue->bind((string) $to->topic(), $to->routingKey()); + + return; + } + + throw new \LogicException('Queue not configured. Use createQueue method'); + } + catch(\AMQPConnectionException $exception) + { + throw new ConnectionFail($exception->getMessage(), $exception->getCode(), $exception); + } + catch(\Throwable $throwable) + { + throw new BindFailed($throwable->getMessage(), $throwable->getCode(), $throwable); + } + } + /** * @inheritdoc */ @@ -248,30 +299,6 @@ public function close(): void } } - /** - * Bind queue to exchange with specified routing key - * - * @param \AMQPQueue $amqpQueue - * @param QueueBind|null $bind - * - * @return void - * - * @throws \AMQPChannelException - * @throws \AMQPConnectionException - */ - private function bindQueue(\AMQPQueue $amqpQueue, ?QueueBind $bind): void - { - if(null !== $bind && null !== $bind->routingKey() && '' !== (string) $bind->routingKey()) - { - $routingKey = (string) $bind->routingKey(); - - /** @psalm-suppress InvalidPropertyAssignmentValue */ - $this->exchangeBinds[(string) $bind->topic()][] = $routingKey; - - $amqpQueue->bind((string) $bind->topic(), $routingKey); - } - } - /** * @param string $listenQueue * diff --git a/src/Transport/AmqpExt/AmqpQueue.php b/src/Transport/AmqpExt/AmqpQueue.php index 49e37fc..f07fa5f 100644 --- a/src/Transport/AmqpExt/AmqpQueue.php +++ b/src/Transport/AmqpExt/AmqpQueue.php @@ -101,9 +101,14 @@ final class AmqpQueue implements Queue /** * @param string $name */ - public function __construct(string $name) + public function __construct(string $name, bool $durable = false) { $this->name = $name; + + if(true === $durable) + { + $this->makeDurable(); + } } /** @@ -111,9 +116,9 @@ public function __construct(string $name) * * @return self */ - public static function create(string $name): self + public static function create(string $name, bool $durable = false): self { - return new self($name); + return new self($name, $durable); } /** @@ -144,10 +149,13 @@ public function __toString(): string /** * @return $this */ - public function passive(): self + public function makePassive(): self { - $this->passive = true; - $this->flags += \AMQP_PASSIVE; + if(false === $this->passive) + { + $this->passive = true; + $this->flags += \AMQP_PASSIVE; + } return $this; } @@ -155,10 +163,13 @@ public function passive(): self /** * @return $this */ - public function exclusive(): self + public function makeExclusive(): self { - $this->exclusive = true; - $this->flags += \AMQP_EXCLUSIVE; + if(false === $this->exclusive) + { + $this->exclusive = true; + $this->flags += \AMQP_EXCLUSIVE; + } return $this; } @@ -166,10 +177,13 @@ public function exclusive(): self /** * @return $this */ - public function durable(): self + public function makeDurable(): self { - $this->durable = true; - $this->flags += \AMQP_DURABLE; + if(false === $this->durable) + { + $this->durable = true; + $this->flags += \AMQP_DURABLE; + } return $this; } @@ -179,8 +193,11 @@ public function durable(): self */ public function enableAutoDelete(): self { - $this->autoDelete = true; - $this->flags += \AMQP_AUTODELETE; + if(false === $this->autoDelete) + { + $this->autoDelete = true; + $this->flags += \AMQP_AUTODELETE; + } return $this; } diff --git a/src/Transport/AmqpExt/AmqpTopic.php b/src/Transport/AmqpExt/AmqpTopic.php index cb5d976..db8a84d 100644 --- a/src/Transport/AmqpExt/AmqpTopic.php +++ b/src/Transport/AmqpExt/AmqpTopic.php @@ -86,32 +86,35 @@ final class AmqpTopic implements Topic /** * @param string $name + * @param bool $durable * * @return self */ - public static function fanout(string $name): self + public static function fanout(string $name, bool $durable = false): self { - return new self($name, self::TYPE_FANOUT); + return new self($name, self::TYPE_FANOUT, $durable); } /** * @param string $name + * @param bool $durable * * @return self */ - public static function direct(string $name): self + public static function direct(string $name, bool $durable = false): self { - return new self($name, self::TYPE_DIRECT); + return new self($name, self::TYPE_DIRECT, $durable); } /** * @param string $name + * @param bool $durable * * @return self */ - public static function topic(string $name): self + public static function topic(string $name, bool $durable = false): self { - return new self($name, self::TYPE_TOPIC); + return new self($name, self::TYPE_TOPIC, $durable); } /** @@ -121,8 +124,9 @@ public static function topic(string $name): self */ public static function delayed(string $name): self { - $self = new self($name, self::TYPE_DELAYED); - $self->arguments['x-delayed-type'] = 'fanout'; + $self = new self($name, self::TYPE_DELAYED, true); + + $self->arguments['x-delayed-type'] = self::TYPE_FANOUT; return $self; } @@ -138,10 +142,13 @@ public function __toString(): string /** * @return $this */ - public function passive(): self + public function makePassive(): self { - $this->passive = true; - $this->flags += \AMQP_PASSIVE; + if(false === $this->passive) + { + $this->passive = true; + $this->flags += \AMQP_PASSIVE; + } return $this; } @@ -149,10 +156,13 @@ public function passive(): self /** * @return $this */ - public function durable(): self + public function makeDurable(): self { - $this->durable = true; - $this->flags += \AMQP_DURABLE; + if(false === $this->durable) + { + $this->durable = true; + $this->flags += \AMQP_DURABLE; + } return $this; } @@ -196,10 +206,16 @@ public function arguments(): array /** * @param string $name * @param string $type + * @param bool $durable */ - private function __construct(string $name, string $type) + private function __construct(string $name, string $type, bool $durable) { $this->name = $name; $this->type = $type; + + if(true === $durable) + { + $this->makeDurable(); + } } } diff --git a/src/Transport/Exceptions/BindFailed.php b/src/Transport/Exceptions/BindFailed.php new file mode 100644 index 0000000..3509a62 --- /dev/null +++ b/src/Transport/Exceptions/BindFailed.php @@ -0,0 +1,22 @@ + + * @license MIT + * @license https://opensource.org/licenses/MIT + */ + +declare(strict_types = 1); + +namespace Desperado\ServiceBus\Transport\Exceptions; + +/** + * Bind operation failed + */ +final class BindFailed extends \RuntimeException implements TransportFail +{ + +} diff --git a/src/Transport/QueueBind.php b/src/Transport/QueueBind.php index d165645..f9879e8 100644 --- a/src/Transport/QueueBind.php +++ b/src/Transport/QueueBind.php @@ -18,6 +18,11 @@ */ final class QueueBind { + /** + * @var Queue + */ + private $queue; + /** * @var Topic */ @@ -32,12 +37,21 @@ final class QueueBind * @param Topic $topic * @param string|null $routingKey */ - public function __construct(Topic $topic, ?string $routingKey = null) + public function __construct(Queue $queue, Topic $topic, ?string $routingKey = null) { + $this->queue = $queue; $this->topic = $topic; $this->routingKey = $routingKey; } + /** + * @return Queue + */ + public function queue(): Queue + { + return $this->queue; + } + /** * @return Topic */ diff --git a/src/Transport/TopicBind.php b/src/Transport/TopicBind.php new file mode 100644 index 0000000..968a3ed --- /dev/null +++ b/src/Transport/TopicBind.php @@ -0,0 +1,70 @@ + + * @license MIT + * @license https://opensource.org/licenses/MIT + */ + +declare(strict_types = 1); + +namespace Desperado\ServiceBus\Transport; + +/** + * + */ +final class TopicBind +{ + /** + * @var Topic + */ + private $sourceTopic; + + /** + * @var Topic + */ + private $destinationTopic; + + /** + * @var string|null + */ + private $routingKey; + + /** + * @param Topic $topic + * @param string|null $routingKey + */ + public function __construct(Topic $sourceTopic, Topic $destinationTopic, ?string $routingKey = null) + { + $this->sourceTopic = $sourceTopic; + $this->destinationTopic = $destinationTopic; + $this->routingKey = $routingKey; + } + + /** + * @return Topic + */ + public function sourceTopic(): Topic + { + return $this->sourceTopic; + } + + /** + * @return Topic + */ + public function destinationTopic(): Topic + { + return $this->destinationTopic; + } + + /** + * @return string|null + */ + public function routingKey(): ?string + { + return $this->routingKey; + } +} diff --git a/src/Transport/Transport.php b/src/Transport/Transport.php index b851ef3..9190cec 100644 --- a/src/Transport/Transport.php +++ b/src/Transport/Transport.php @@ -30,18 +30,41 @@ interface Transport */ public function createTopic(Topic $topic): void; + /** + * Bind topic to topic with specified key + * + * @param TopicBind $to + * + * @return void + * + * @throws \Desperado\ServiceBus\Transport\Exceptions\ConnectionFail + * @throws \Desperado\ServiceBus\Transport\Exceptions\BindFailed + */ + public function bindTopic(TopicBind $to): void; + /** * Create queue * - * @param Queue $queue - * @param QueueBind $bind + * @param Queue $queue * * @return void * * @throws \Desperado\ServiceBus\Transport\Exceptions\ConnectionFail * @throws \Desperado\ServiceBus\Transport\Exceptions\CreateQueueFailed */ - public function createQueue(Queue $queue, QueueBind $bind = null): void; + public function createQueue(Queue $queue): void; + + /** + * Bind queue to topic with specified key + * + * @param TopicBind $to + * + * @return void + * + * @throws \Desperado\ServiceBus\Transport\Exceptions\ConnectionFail + * @throws \Desperado\ServiceBus\Transport\Exceptions\BindFailed + */ + public function bindQueue(QueueBind $to): void; /** * Create publisher @@ -53,7 +76,7 @@ public function createPublisher(): Publisher; /** * Create consumer * - * @param Queue $listenQueue + * @param Queue $listenQueue * * @return Consumer * diff --git a/tests/Application/Kernel/ServiceBusKernelTest.php b/tests/Application/Kernel/ServiceBusKernelTest.php index 8eef283..894a8bd 100644 --- a/tests/Application/Kernel/ServiceBusKernelTest.php +++ b/tests/Application/Kernel/ServiceBusKernelTest.php @@ -101,7 +101,8 @@ protected function setUp(): void $this->kernel ->transportConfigurator() ->createTopic($topic) - ->addQueue($queue, new QueueBind($topic, 'test_key')) + ->addQueue($queue) + ->bindQueue(new QueueBind($queue, $topic, 'test_key')) ->addDefaultDestinations($defaultOutboundDestination) ->registerCustomMessageDestinations(FirstEmptyEvent::class, $customOutboundDestination); diff --git a/tests/Stubs/Transport/VirtualTransport.php b/tests/Stubs/Transport/VirtualTransport.php index bb59600..0fa8abc 100644 --- a/tests/Stubs/Transport/VirtualTransport.php +++ b/tests/Stubs/Transport/VirtualTransport.php @@ -22,6 +22,7 @@ use Desperado\ServiceBus\Transport\Queue; use Desperado\ServiceBus\Transport\QueueBind; use Desperado\ServiceBus\Transport\Topic; +use Desperado\ServiceBus\Transport\TopicBind; use Desperado\ServiceBus\Transport\Transport; /** @@ -66,7 +67,23 @@ public function createTopic(Topic $topic): void /** * @inheritDoc */ - public function createQueue(Queue $queue, QueueBind $bind = null): void + public function bindTopic(TopicBind $to): void + { + + } + + /** + * @inheritDoc + */ + public function bindQueue(QueueBind $to): void + { + + } + + /** + * @inheritDoc + */ + public function createQueue(Queue $queue): void { } From 0d8d3863c13e4cb593f42f881297eab24d363c35 Mon Sep 17 00:00:00 2001 From: desperado Date: Fri, 24 Aug 2018 18:30:22 +0300 Subject: [PATCH 10/11] types fix --- src/Transport/AmqpExt/AmqpExt.php | 4 ++-- src/Transport/AmqpExt/AmqpQueue.php | 8 +++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/Transport/AmqpExt/AmqpExt.php b/src/Transport/AmqpExt/AmqpExt.php index de78add..c259d7d 100644 --- a/src/Transport/AmqpExt/AmqpExt.php +++ b/src/Transport/AmqpExt/AmqpExt.php @@ -174,7 +174,7 @@ public function bindTopic(TopicBind $to): void /** @var \AMQPExchange $exchange */ $exchange = $this->exchanges[(string) $to->sourceTopic()]; - $exchange->bind((string) $to->destinationTopic(), $to->routingKey()); + $exchange->bind((string) $to->destinationTopic(), (string) $to->routingKey()); return; } @@ -238,7 +238,7 @@ public function bindQueue(QueueBind $to): void /** @var \AMQPQueue $queue */ $queue = $this->queues[(string) $to->queue()]; - $queue->bind((string) $to->topic(), $to->routingKey()); + $queue->bind((string) $to->topic(), (string) $to->routingKey()); return; } diff --git a/src/Transport/AmqpExt/AmqpQueue.php b/src/Transport/AmqpExt/AmqpQueue.php index f07fa5f..a441955 100644 --- a/src/Transport/AmqpExt/AmqpQueue.php +++ b/src/Transport/AmqpExt/AmqpQueue.php @@ -14,6 +14,7 @@ namespace Desperado\ServiceBus\Transport\AmqpExt; use Desperado\ServiceBus\Transport\Queue; +use Desperado\ServiceBus\Transport\Topic; /** * Queue details @@ -130,10 +131,11 @@ public static function create(string $name, bool $durable = false): self * * @return self */ - public static function createDelayedQueue(string $name): self + public static function createDelayedQueue(string $name, Topic $toTopic): self { - $self = new self($name); - $self->arguments['x-dead-letter-exchange'] = 'delayed'; + $self = new self($name, true); + + $self->arguments['x-dead-letter-exchange'] = (string) $toTopic; return $self; } From 3ccd40e60a59692293c6f71e3e573347d4d5a038 Mon Sep 17 00:00:00 2001 From: desperado Date: Mon, 3 Sep 2018 13:22:49 +0300 Subject: [PATCH 11/11] scheduler fixes --- composer.lock | 86 +++++++++---------- psalm.xml | 49 ++++++----- src/Application/TransportConfigurator.php | 1 - .../Normalizer/SymfonyPropertyNormalizer.php | 2 +- src/Scheduler/Data/NextScheduledOperation.php | 26 +++--- src/Scheduler/SchedulerListener.php | 10 ++- src/Scheduler/Store/SchedulerRegistry.php | 7 +- src/Storage/StorageAdapterFactory.php | 3 + src/Transport/AmqpExt/AmqpConsumer.php | 16 +++- src/Transport/AmqpExt/AmqpQueue.php | 4 +- src/Transport/AmqpExt/AmqpTopic.php | 2 +- src/Transport/Transport.php | 2 +- 12 files changed, 116 insertions(+), 92 deletions(-) diff --git a/composer.lock b/composer.lock index c1c99d8..10b7292 100644 --- a/composer.lock +++ b/composer.lock @@ -1853,16 +1853,16 @@ }, { "name": "symfony/cache", - "version": "v4.1.3", + "version": "v4.1.4", "source": { "type": "git", "url": "https://github.com/symfony/cache.git", - "reference": "c666a5bbfeb1fe05c7b91d46810f405c8bea14cf" + "reference": "b8440ff4635c6631aca21a09ab72e0b7e3a6cb7a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/cache/zipball/c666a5bbfeb1fe05c7b91d46810f405c8bea14cf", - "reference": "c666a5bbfeb1fe05c7b91d46810f405c8bea14cf", + "url": "https://api.github.com/repos/symfony/cache/zipball/b8440ff4635c6631aca21a09ab72e0b7e3a6cb7a", + "reference": "b8440ff4635c6631aca21a09ab72e0b7e3a6cb7a", "shasum": "" }, "require": { @@ -1918,7 +1918,7 @@ "caching", "psr6" ], - "time": "2018-07-26 11:24:31" + "time": "2018-08-27 09:36:56" }, { "name": "symfony/config", @@ -2056,7 +2056,7 @@ }, { "name": "symfony/dotenv", - "version": "v4.1.3", + "version": "v4.1.4", "source": { "type": "git", "url": "https://github.com/symfony/dotenv.git", @@ -2113,16 +2113,16 @@ }, { "name": "symfony/filesystem", - "version": "v4.1.3", + "version": "v4.1.4", "source": { "type": "git", "url": "https://github.com/symfony/filesystem.git", - "reference": "2e30335e0aafeaa86645555959572fe7cea22b43" + "reference": "c0f5f62db218fa72195b8b8700e4b9b9cf52eb5e" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/filesystem/zipball/2e30335e0aafeaa86645555959572fe7cea22b43", - "reference": "2e30335e0aafeaa86645555959572fe7cea22b43", + "url": "https://api.github.com/repos/symfony/filesystem/zipball/c0f5f62db218fa72195b8b8700e4b9b9cf52eb5e", + "reference": "c0f5f62db218fa72195b8b8700e4b9b9cf52eb5e", "shasum": "" }, "require": { @@ -2159,11 +2159,11 @@ ], "description": "Symfony Filesystem Component", "homepage": "https://symfony.com", - "time": "2018-07-26 11:24:31" + "time": "2018-08-18 16:52:46" }, { "name": "symfony/inflector", - "version": "v4.1.3", + "version": "v4.1.4", "source": { "type": "git", "url": "https://github.com/symfony/inflector.git", @@ -2338,16 +2338,16 @@ }, { "name": "symfony/property-access", - "version": "v4.1.3", + "version": "v4.1.4", "source": { "type": "git", "url": "https://github.com/symfony/property-access.git", - "reference": "e97a399cb40333fc79724ddee952aa926a30c743" + "reference": "ae8561ba08af38e12dc5e21582ddb4baf66719ca" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/property-access/zipball/e97a399cb40333fc79724ddee952aa926a30c743", - "reference": "e97a399cb40333fc79724ddee952aa926a30c743", + "url": "https://api.github.com/repos/symfony/property-access/zipball/ae8561ba08af38e12dc5e21582ddb4baf66719ca", + "reference": "ae8561ba08af38e12dc5e21582ddb4baf66719ca", "shasum": "" }, "require": { @@ -2401,20 +2401,20 @@ "property path", "reflection" ], - "time": "2018-07-26 09:10:45" + "time": "2018-08-24 10:22:26" }, { "name": "symfony/property-info", - "version": "v4.1.3", + "version": "v4.1.4", "source": { "type": "git", "url": "https://github.com/symfony/property-info.git", - "reference": "807e103da26f1ea0256e587fe918fe2ba68989fb" + "reference": "a3a785d23b4fe1d1cea02be14011d654432aeb89" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/property-info/zipball/807e103da26f1ea0256e587fe918fe2ba68989fb", - "reference": "807e103da26f1ea0256e587fe918fe2ba68989fb", + "url": "https://api.github.com/repos/symfony/property-info/zipball/a3a785d23b4fe1d1cea02be14011d654432aeb89", + "reference": "a3a785d23b4fe1d1cea02be14011d654432aeb89", "shasum": "" }, "require": { @@ -2477,11 +2477,11 @@ "type", "validator" ], - "time": "2018-07-26 08:55:25" + "time": "2018-08-03 11:13:38" }, { "name": "symfony/serializer", - "version": "v4.1.3", + "version": "v4.1.4", "source": { "type": "git", "url": "https://github.com/symfony/serializer.git", @@ -2561,16 +2561,16 @@ }, { "name": "symfony/translation", - "version": "v4.1.3", + "version": "v4.1.4", "source": { "type": "git", "url": "https://github.com/symfony/translation.git", - "reference": "6fcd1bd44fd6d7181e6ea57a6f4e08a09b29ef65" + "reference": "fa2182669f7983b7aa5f1a770d053f79f0ef144f" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/translation/zipball/6fcd1bd44fd6d7181e6ea57a6f4e08a09b29ef65", - "reference": "6fcd1bd44fd6d7181e6ea57a6f4e08a09b29ef65", + "url": "https://api.github.com/repos/symfony/translation/zipball/fa2182669f7983b7aa5f1a770d053f79f0ef144f", + "reference": "fa2182669f7983b7aa5f1a770d053f79f0ef144f", "shasum": "" }, "require": { @@ -2626,7 +2626,7 @@ ], "description": "Symfony Translation Component", "homepage": "https://symfony.com", - "time": "2018-07-26 11:24:31" + "time": "2018-08-07 12:45:11" }, { "name": "symfony/validator", @@ -2827,16 +2827,16 @@ "packages-dev": [ { "name": "composer/xdebug-handler", - "version": "1.2.0", + "version": "1.3.0", "source": { "type": "git", "url": "https://github.com/composer/xdebug-handler.git", - "reference": "e1809da56ce1bd1b547a752936817341ac244d8e" + "reference": "b8e9745fb9b06ea6664d8872c4505fb16df4611c" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/composer/xdebug-handler/zipball/e1809da56ce1bd1b547a752936817341ac244d8e", - "reference": "e1809da56ce1bd1b547a752936817341ac244d8e", + "url": "https://api.github.com/repos/composer/xdebug-handler/zipball/b8e9745fb9b06ea6664d8872c4505fb16df4611c", + "reference": "b8e9745fb9b06ea6664d8872c4505fb16df4611c", "shasum": "" }, "require": { @@ -2867,7 +2867,7 @@ "Xdebug", "performance" ], - "time": "2018-08-16 10:54:23" + "time": "2018-08-31 19:07:57" }, { "name": "doctrine/cache", @@ -3822,16 +3822,16 @@ }, { "name": "phpunit/phpunit", - "version": "7.3.2", + "version": "7.3.3", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/phpunit.git", - "reference": "34705f81bddc3f505b9599a2ef96e2b4315ba9b8" + "reference": "1bd5629cccfb2c0a9ef5474b4ff772349e1ec898" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/34705f81bddc3f505b9599a2ef96e2b4315ba9b8", - "reference": "34705f81bddc3f505b9599a2ef96e2b4315ba9b8", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/1bd5629cccfb2c0a9ef5474b4ff772349e1ec898", + "reference": "1bd5629cccfb2c0a9ef5474b4ff772349e1ec898", "shasum": "" }, "require": { @@ -3902,7 +3902,7 @@ "testing", "xunit" ], - "time": "2018-08-22 06:39:21" + "time": "2018-09-01 15:49:55" }, { "name": "sebastian/code-unit-reverse-lookup", @@ -4509,16 +4509,16 @@ }, { "name": "vimeo/psalm", - "version": "2.0.10", + "version": "2.0.11", "source": { "type": "git", "url": "https://github.com/vimeo/psalm.git", - "reference": "366f625c83000a701ce0a69b8a05b387c7bf4bce" + "reference": "3e69a333c9683596ad4a4325e0039374516295a0" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/vimeo/psalm/zipball/366f625c83000a701ce0a69b8a05b387c7bf4bce", - "reference": "366f625c83000a701ce0a69b8a05b387c7bf4bce", + "url": "https://api.github.com/repos/vimeo/psalm/zipball/3e69a333c9683596ad4a4325e0039374516295a0", + "reference": "3e69a333c9683596ad4a4325e0039374516295a0", "shasum": "" }, "require": { @@ -4572,7 +4572,7 @@ "inspection", "php" ], - "time": "2018-08-14 15:51:17" + "time": "2018-08-30 00:54:48" } ], "aliases": [], diff --git a/psalm.xml b/psalm.xml index 79c806b..a3841a2 100644 --- a/psalm.xml +++ b/psalm.xml @@ -1,38 +1,43 @@ - + + + + + + - + - - - + + + - - - - - + + + + + - - - - + + + + - - + + - + - + diff --git a/src/Application/TransportConfigurator.php b/src/Application/TransportConfigurator.php index b26c828..ec1a3fb 100644 --- a/src/Application/TransportConfigurator.php +++ b/src/Application/TransportConfigurator.php @@ -157,6 +157,5 @@ public function bindQueue(QueueBind $bindTo): self $this->transport->bindQueue($bindTo); return $this; - } } \ No newline at end of file diff --git a/src/Marshal/Normalizer/SymfonyPropertyNormalizer.php b/src/Marshal/Normalizer/SymfonyPropertyNormalizer.php index a6452ae..bcb2c32 100644 --- a/src/Marshal/Normalizer/SymfonyPropertyNormalizer.php +++ b/src/Marshal/Normalizer/SymfonyPropertyNormalizer.php @@ -43,7 +43,7 @@ public function __construct() ), new EmptyDataNormalizer(), new Serializer\Normalizer\DateTimeNormalizer( - \DateTime::RFC3339, + 'c', new \DateTimeZone('UTC') ) ] diff --git a/src/Scheduler/Data/NextScheduledOperation.php b/src/Scheduler/Data/NextScheduledOperation.php index 041e68e..5d78c42 100644 --- a/src/Scheduler/Data/NextScheduledOperation.php +++ b/src/Scheduler/Data/NextScheduledOperation.php @@ -13,6 +13,8 @@ namespace Desperado\ServiceBus\Scheduler\Data; +use Desperado\ServiceBus\Scheduler\ScheduledOperationId; + /** * Scheduled job data (for next job) */ @@ -21,44 +23,40 @@ final class NextScheduledOperation /** * Job identifier * - * @var string + * @var ScheduledOperationId */ private $id; /** * Time in milliseconds * - * @var int + * @var \DateTimeImmutable */ private $time; /** - * @param string $id - * @param int $time + * @param ScheduledOperationId $id + * @param \DateTimeImmutable $time */ - public function __construct(string $id, int $time) + public function __construct(ScheduledOperationId $id, \DateTimeImmutable $time) { $this->id = $id; $this->time = $time; } /** - * Receive identifier - * - * @return string + * @return ScheduledOperationId */ - public function id(): string + public function id(): ScheduledOperationId { return $this->id; } /** - * Receive time in microseconds - * - * @return int + * @return \DateTimeImmutable */ - public function time(): int + public function time(): \DateTimeImmutable { return $this->time; - } + } } diff --git a/src/Scheduler/SchedulerListener.php b/src/Scheduler/SchedulerListener.php index be1d6af..7bf0b66 100644 --- a/src/Scheduler/SchedulerListener.php +++ b/src/Scheduler/SchedulerListener.php @@ -17,6 +17,7 @@ use Amp\Success; use Desperado\ServiceBus\Application\KernelContext; use Desperado\ServiceBus\Common\Contract\Messages\Event; +use function Desperado\ServiceBus\Common\datetimeInstantiator; use function Desperado\ServiceBus\Common\invokeReflectionMethod; use Desperado\ServiceBus\Scheduler\Data\NextScheduledOperation; use Desperado\ServiceBus\Scheduler\Messages\Command\EmitSchedulerOperation; @@ -137,7 +138,7 @@ private static function processNextOperationEmit(Event $event, KernelContext $co { /** Send a message that will return after a specified time interval */ return $context->send( - EmitSchedulerOperation::create(new ScheduledOperationId($nextOperation->id())), [ + EmitSchedulerOperation::create($nextOperation->id()), [ 'x-delay' => self::calculateExecutionDelay($nextOperation) ] ); @@ -155,12 +156,15 @@ private static function processNextOperationEmit(Event $event, KernelContext $co */ private static function calculateExecutionDelay(NextScheduledOperation $nextScheduledOperation): int { + /** @var \DateTimeImmutable $currentDate */ + $currentDate = datetimeInstantiator('NOW'); + /** @noinspection UnnecessaryCastingInspection */ - $executionDelay = $nextScheduledOperation->time() - (int) (\microtime(true) * 1000); + $executionDelay = $nextScheduledOperation->time()->getTimestamp() - $currentDate->getTimestamp(); if(0 > $executionDelay) { - $executionDelay *= -1; + $executionDelay = \abs($executionDelay); } return $executionDelay; diff --git a/src/Scheduler/Store/SchedulerRegistry.php b/src/Scheduler/Store/SchedulerRegistry.php index ba8b188..6e2cf76 100644 --- a/src/Scheduler/Store/SchedulerRegistry.php +++ b/src/Scheduler/Store/SchedulerRegistry.php @@ -37,7 +37,7 @@ final class SchedulerRegistry implements \Serializable /** * Time table of operations in milliseconds * - * @var array + * @var array */ private $timetable; @@ -91,7 +91,7 @@ public function add(ScheduledOperation $operation): void $operationId = (string) $operation->id(); $this->operations[$operationId] = $operation; - $this->timetable[$operationId] = (int) $operation->date()->format('U.u') * 1000; + $this->timetable[$operationId] = (float) $operation->date()->format('U.u'); } /** @@ -140,6 +140,7 @@ public function fetchNextOperation(): ?NextScheduledOperation { if(0 !== \count($this->timetable)) { + /** @var float $minTime */ $minTime = \min($this->timetable); $id = (string) \array_search($minTime, $this->timetable, true); @@ -157,7 +158,7 @@ public function fetchNextOperation(): ?NextScheduledOperation { $this->operations[$id] = $operation->sent(); - return new NextScheduledOperation($id, $minTime); + return new NextScheduledOperation($operation->id(), $operation->date()); } } diff --git a/src/Storage/StorageAdapterFactory.php b/src/Storage/StorageAdapterFactory.php index 2114285..6f85e00 100644 --- a/src/Storage/StorageAdapterFactory.php +++ b/src/Storage/StorageAdapterFactory.php @@ -37,6 +37,9 @@ public static function inMemory(): DoctrineDBALAdapter } /** + * @psalm-suppress MoreSpecificReturnType + * @psalm-suppress LessSpecificReturnStatement + * * @param string $adapter * @param string $connectionDSN * diff --git a/src/Transport/AmqpExt/AmqpConsumer.php b/src/Transport/AmqpExt/AmqpConsumer.php index e91b721..1f30226 100644 --- a/src/Transport/AmqpExt/AmqpConsumer.php +++ b/src/Transport/AmqpExt/AmqpConsumer.php @@ -212,10 +212,24 @@ private static function transformEnvelope( $unserialized['namespace'], $unserialized['message'] ), - $envelope->getHeaders() + self::extractHeaders($envelope) ); } + /** + * Extract message headers + * + * @todo: only custom headers + * + * @param \AMQPEnvelope $envelope + * + * @return array + */ + private static function extractHeaders(\AMQPEnvelope $envelope): array + { + return $envelope->getHeaders(); + } + /** * Message accepted for processing * diff --git a/src/Transport/AmqpExt/AmqpQueue.php b/src/Transport/AmqpExt/AmqpQueue.php index a441955..adb3e50 100644 --- a/src/Transport/AmqpExt/AmqpQueue.php +++ b/src/Transport/AmqpExt/AmqpQueue.php @@ -117,7 +117,7 @@ public function __construct(string $name, bool $durable = false) * * @return self */ - public static function create(string $name, bool $durable = false): self + public static function default(string $name, bool $durable = false): self { return new self($name, $durable); } @@ -131,7 +131,7 @@ public static function create(string $name, bool $durable = false): self * * @return self */ - public static function createDelayedQueue(string $name, Topic $toTopic): self + public static function delayed(string $name, Topic $toTopic): self { $self = new self($name, true); diff --git a/src/Transport/AmqpExt/AmqpTopic.php b/src/Transport/AmqpExt/AmqpTopic.php index db8a84d..d7ddaa1 100644 --- a/src/Transport/AmqpExt/AmqpTopic.php +++ b/src/Transport/AmqpExt/AmqpTopic.php @@ -126,7 +126,7 @@ public static function delayed(string $name): self { $self = new self($name, self::TYPE_DELAYED, true); - $self->arguments['x-delayed-type'] = self::TYPE_FANOUT; + $self->arguments['x-delayed-type'] = self::TYPE_DIRECT; return $self; } diff --git a/src/Transport/Transport.php b/src/Transport/Transport.php index 9190cec..fa4cf12 100644 --- a/src/Transport/Transport.php +++ b/src/Transport/Transport.php @@ -57,7 +57,7 @@ public function createQueue(Queue $queue): void; /** * Bind queue to topic with specified key * - * @param TopicBind $to + * @param QueueBind $to * * @return void *