Skip to content

Commit

Permalink
test: prove HandlerArgumentsStamp is supported
Browse files Browse the repository at this point in the history
  • Loading branch information
nikophil committed Sep 12, 2024
1 parent 60bc93d commit b1e9795
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 53 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
php: [8.1, 8.2, 8.3]
php: [8.3]
deps: [highest]
symfony: [5.4.*, 6.3.*, 6.4.*, 7.0.*]
symfony: [ 7.1.* ]
include:
- php: 8.1
deps: lowest
Expand All @@ -23,7 +23,7 @@ jobs:
- php: 8.1
symfony: 7.0.*
steps:
- uses: zenstruck/.github@php-test-symfony
- uses: zenstruck/.github/actions/php-test-symfony@main
with:
php: ${{ matrix.php }}
symfony: ${{ matrix.symfony }}
Expand Down
2 changes: 2 additions & 0 deletions tests/Fixture/Messenger/MessageA.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ final class MessageA
{
public bool $fail;

public string|null $additionalArgument = null;

public function __construct(bool $fail = false)
{
$this->fail = $fail;
Expand Down
4 changes: 3 additions & 1 deletion tests/Fixture/Messenger/MessageAHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ final class MessageAHandler
/** @var MessageA[] */
public array $messages = [];

public function __invoke(MessageA $message): void
public function __invoke(MessageA $message, string|null $additionalArgument = null): void
{
if ($message->fail) {
throw new \RuntimeException('handling failed...');
}

$message->additionalArgument = $additionalArgument;

$this->messages[] = $message;
}
}
122 changes: 73 additions & 49 deletions tests/InteractsWithMessengerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\HandlerArgumentsStamp;
use Symfony\Component\Messenger\Stamp\SerializerStamp;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Zenstruck\Assert;
Expand Down Expand Up @@ -79,9 +80,9 @@ public function can_interact_with_queue(): void
$this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
$this->assertEmpty(self::getContainer()->get(MessageBHandler::class)->messages);

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
$this->messageBus()->dispatch(new MessageA());
$this->messageBus()->dispatch(new MessageB());
$this->messageBus()->dispatch(new MessageA());

$this->transport()->queue()->assertCount(3);
$this->transport()->queue()->assertContains(MessageA::class);
Expand Down Expand Up @@ -127,7 +128,7 @@ public function can_use_envelope_collection_back(): void
self::assertInstanceOf(TestBus::class, ($bus = $this->bus())->dispatched()->assertEmpty()->back());
self::assertSame($bus, $this->bus()->dispatched()->back());

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
$this->messageBus()->dispatch(new MessageA());

$this->transport()
->queue()->assertCount(1)->back()
Expand Down Expand Up @@ -158,9 +159,9 @@ public function can_disable_intercept(): void
$this->transport()->acknowledged()->assertEmpty();
$this->transport()->dispatched()->assertEmpty();

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
$this->messageBus()->dispatch(new MessageA());
$this->messageBus()->dispatch(new MessageB());
$this->messageBus()->dispatch(new MessageA());

$this->transport()->queue()->assertEmpty();
$this->transport()->queue()->assertNotContains(MessageA::class);
Expand All @@ -181,9 +182,9 @@ public function disabling_intercept_with_items_on_queue_processes_all(): void
{
self::bootKernel();

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
$this->messageBus()->dispatch(new MessageA());
$this->messageBus()->dispatch(new MessageB());
$this->messageBus()->dispatch(new MessageA());

$this->transport()->queue()->assertCount(3);

Expand All @@ -201,8 +202,8 @@ public function unblocking_processes_existing_messages_on_queue(): void
{
self::bootKernel();

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
$this->messageBus()->dispatch(new MessageA());
$this->messageBus()->dispatch(new MessageB());

$this->transport()->queue()->assertCount(2);
$this->transport()->acknowledged()->assertEmpty();
Expand All @@ -220,9 +221,9 @@ public function can_access_envelope_collection_items_via_first(): void
{
self::bootKernel();

self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA());
self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
self::getContainer()->get(MessageBusInterface::class)->dispatch($m3 = new MessageA(true));
$this->messageBus()->dispatch($m1 = new MessageA());
$this->messageBus()->dispatch($m2 = new MessageB());
$this->messageBus()->dispatch($m3 = new MessageA(true));

$this->transport()->queue()->assertCount(3);

Expand Down Expand Up @@ -252,8 +253,8 @@ public function can_make_stamp_assertions_on_test_envelope(): void
{
self::bootKernel();

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(), [new DelayStamp(1000)]);
self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
$this->messageBus()->dispatch(new MessageA(), [new DelayStamp(1000)]);
$this->messageBus()->dispatch(new MessageB());

$this->transport()->queue()->first()->assertHasStamp(DelayStamp::class);
$this->transport()->queue()->first(MessageB::class)->assertNotHasStamp(DelayStamp::class);
Expand Down Expand Up @@ -296,9 +297,9 @@ public function can_interact_with_multiple_queues(): void
$this->assertEmpty(self::getContainer()->get(MessageAHandler::class)->messages);
$this->assertEmpty(self::getContainer()->get(MessageBHandler::class)->messages);

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
$this->messageBus()->dispatch(new MessageA());
$this->messageBus()->dispatch(new MessageB());
$this->messageBus()->dispatch(new MessageA());

$this->transport('async1')->queue()->assertCount(2);
$this->transport('async2')->queue()->assertEmpty();
Expand Down Expand Up @@ -338,8 +339,8 @@ public function can_enable_intercept(): void
$this->transport('async2')->queue()->assertEmpty();
$this->assertEmpty(self::getContainer()->get(MessageBHandler::class)->messages);

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB());
$this->messageBus()->dispatch(new MessageB());
$this->messageBus()->dispatch(new MessageB());

$this->transport('async2')->queue()->assertCount(2);
}
Expand Down Expand Up @@ -390,9 +391,9 @@ public function can_access_message_objects_on_queue(): void
{
self::bootKernel();

self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA());
self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
self::getContainer()->get(MessageBusInterface::class)->dispatch($m3 = new MessageA());
$this->messageBus()->dispatch($m1 = new MessageA());
$this->messageBus()->dispatch($m2 = new MessageB());
$this->messageBus()->dispatch($m3 = new MessageA());

$this->assertSame([$m1, $m2, $m3], $this->transport()->queue()->messages());
$this->assertSame([$m1, $m3], $this->transport()->queue()->messages(MessageA::class));
Expand All @@ -406,9 +407,9 @@ public function can_access_envelopes_on_envelope_collection(): void
{
self::bootKernel();

self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA());
self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
self::getContainer()->get(MessageBusInterface::class)->dispatch($m3 = new MessageA());
$this->messageBus()->dispatch($m1 = new MessageA());
$this->messageBus()->dispatch($m2 = new MessageB());
$this->messageBus()->dispatch($m3 = new MessageA());

$messages = \array_map(fn(TestEnvelope $envelope) => $envelope->getMessage(), $this->transport()->queue()->all());
$messagesFromIterator = \array_map(fn(TestEnvelope $envelope) => $envelope->getMessage(), \iterator_to_array($this->transport()->queue()));
Expand All @@ -424,8 +425,8 @@ public function can_access_sent_acknowledged_and_rejected(): void
{
self::bootKernel();

self::getContainer()->get(MessageBusInterface::class)->dispatch($m1 = new MessageA(true));
self::getContainer()->get(MessageBusInterface::class)->dispatch($m2 = new MessageB());
$this->messageBus()->dispatch($m1 = new MessageA(true));
$this->messageBus()->dispatch($m2 = new MessageB());

$this->assertCount(2, $this->transport()->queue());
$this->assertCount(2, $this->transport()->dispatched());
Expand Down Expand Up @@ -464,7 +465,7 @@ public function can_configure_throwing_exceptions(): void

$this->transport()->throwExceptions();

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
$this->messageBus()->dispatch(new MessageA(true));

$this->expectException(\RuntimeException::class);
$this->expectExceptionMessage('handling failed...');
Expand All @@ -484,7 +485,7 @@ public function can_configure_throwing_exceptions_with_intercept_disabled(): voi
$this->expectException(\RuntimeException::class);
$this->expectExceptionMessage('handling failed...');

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
$this->messageBus()->dispatch(new MessageA(true));
}

/**
Expand All @@ -497,7 +498,7 @@ public function can_disable_exception_catching_in_transport_config(): void
$this->expectException(\RuntimeException::class);
$this->expectExceptionMessage('handling failed...');

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB(true));
$this->messageBus()->dispatch(new MessageB(true));
}

/**
Expand All @@ -511,7 +512,7 @@ public function can_re_enable_exception_catching_if_disabled_in_transport_config

$this->transport('async2')->rejected()->assertEmpty();

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageB(true));
$this->messageBus()->dispatch(new MessageB(true));

$this->transport('async2')->rejected()->assertCount(1);
}
Expand All @@ -523,8 +524,8 @@ public function transport_data_is_persisted_between_requests_and_kernel_shutdown
{
self::bootKernel();

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
$this->messageBus()->dispatch(new MessageA());
$this->messageBus()->dispatch(new MessageA(true));

$this->transport()->queue()->assertCount(2);

Expand Down Expand Up @@ -569,7 +570,7 @@ public function can_reset_transport_data(): void
{
self::bootKernel();

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
$this->messageBus()->dispatch(new MessageA());

$this->transport()->queue()->assertNotEmpty();

Expand All @@ -587,15 +588,15 @@ public function disabling_intercept_is_remembered_between_kernel_reboots(): void

$this->transport()->unblock();

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
$this->messageBus()->dispatch(new MessageA());

$this->transport()->queue()->assertEmpty();
$this->transport()->dispatched()->assertCount(1);

self::ensureKernelShutdown();
self::bootKernel();

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
$this->messageBus()->dispatch(new MessageA());

$this->transport()->queue()->assertEmpty();
$this->transport()->dispatched()->assertCount(2);
Expand All @@ -613,7 +614,7 @@ public function throwing_exceptions_is_remembered_between_kernel_reboots(): void
self::ensureKernelShutdown();
self::bootKernel();

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
$this->messageBus()->dispatch(new MessageA(true));

$this->expectException(\RuntimeException::class);
$this->expectExceptionMessage('handling failed...');
Expand Down Expand Up @@ -693,7 +694,7 @@ public function process_all_is_recursive(): void
{
self::bootKernel();

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageD());
$this->messageBus()->dispatch(new MessageD());

$this->transport()->queue()->assertCount(1);
$this->transport()->queue()->assertContains(MessageD::class, 1);
Expand All @@ -715,7 +716,7 @@ public function process_x_messages_is_recursive(): void
{
self::bootKernel();

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageD());
$this->messageBus()->dispatch(new MessageD());

$this->transport()->queue()->assertCount(1);
$this->transport()->queue()->assertContains(MessageD::class, 1);
Expand Down Expand Up @@ -743,7 +744,7 @@ public function process_x_recursive_when_intercept_disabled(): void
self::bootKernel();

$this->transport()->unblock();
self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageD());
$this->messageBus()->dispatch(new MessageD());

$this->transport()->acknowledged()->assertCount(3);
$this->transport()->acknowledged()->assertContains(MessageD::class, 1);
Expand All @@ -758,7 +759,7 @@ public function fails_if_trying_to_process_more_messages_than_can_be_processed()
{
self::bootKernel();

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
$this->messageBus()->dispatch(new MessageA());

Assert::that(fn() => $this->transport()->process(2))->throws(function(AssertionFailedError $e) {
$this->assertStringContainsString('Expected to process 2 messages but only processed 1.', $e->getMessage());
Expand All @@ -774,7 +775,7 @@ public function process_or_fail_processes_messages(): void
{
self::bootKernel();

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
$this->messageBus()->dispatch(new MessageA());

$this->transport()->queue()->assertCount(1);
$this->transport()->queue()->assertContains(MessageA::class, 1);
Expand Down Expand Up @@ -815,7 +816,7 @@ public function envelope_collection_assertions(): void
->throws(AssertionFailedError::class, 'Expected some messages but found none.')
;

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA());
$this->messageBus()->dispatch(new MessageA());

Assert::that(fn() => $this->transport()->dispatched()->assertEmpty())
->throws(AssertionFailedError::class, 'Expected 0 messages but 1 messages found.')
Expand Down Expand Up @@ -844,7 +845,7 @@ static function(WorkerMessageHandledEvent $event) use (&$messages) {
},
);

self::getContainer()->get(MessageBusInterface::class)->dispatch($message = new MessageA());
$this->messageBus()->dispatch($message = new MessageA());

$this->transport()->process();

Expand Down Expand Up @@ -876,7 +877,7 @@ public function can_enable_retries(): void

self::bootKernel(['environment' => 'multi_transport']);

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
$this->messageBus()->dispatch(new MessageA(true));

$this->transport('async4')->process()->rejected()->assertContains(MessageA::class, 1);

Expand All @@ -898,11 +899,29 @@ public function can_enable_retries_without_delay_stamp(): void
{
self::bootKernel(['environment' => 'delay_stamp_disabled']);

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));
$this->messageBus()->dispatch(new MessageA(true));

$this->transport('async')->process()->rejected()->assertContains(MessageA::class, 4);
}

/**
* @test
*/
public function can_add_handler_argument(): void
{
if (!class_exists(HandlerArgumentsStamp::class)) {
self::markTestSkipped('Needs symfony/messenger 6.2.');
}

self::bootKernel();

$this->messageBus()->dispatch($message = new MessageA(), [new HandlerArgumentsStamp(['someAdditionalArgument'])]);

$this->transport('async')->process()->acknowledged()->assertContains(MessageA::class, 1);

self::assertSame('someAdditionalArgument', $message->additionalArgument);
}

/**
* @test
*/
Expand All @@ -921,4 +940,9 @@ protected static function bootKernel(array $options = []): KernelInterface // @p
{
return parent::bootKernel(\array_merge(['environment' => 'single_transport'], $options));
}

private function messageBus(): MessageBusInterface
{
return self::getContainer()->get(MessageBusInterface::class);
}
}

0 comments on commit b1e9795

Please sign in to comment.