Skip to content

Commit

Permalink
Merge pull request #37 from kbond/disable-retries
Browse files Browse the repository at this point in the history
Disable retries
  • Loading branch information
kbond authored Jan 8, 2022
2 parents 13376bb + 8a434ad commit 53750d2
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 11 deletions.
20 changes: 17 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,10 @@ framework:
async: test://?intercept=false
```
### Testing serialization
### Testing Serialization
By default, the bundle tests if the envelopes could be serialized and deserialized.
This behavior can be disabled at transport level:
By default, the `TestTransport` tests that messages can be serialized and deserialized.
This behavior can be disabled with the transport dsn:
```yaml
# config/packages/test/messenger.yaml
Expand All @@ -282,6 +282,20 @@ framework:
async: test://?test_serialization=false
```
### Enable Retries
By default, the `TestTransport` does not retry failed messages (your retry settings
are ignored). This behavior can be disabled with the transport dsn:
```yaml
# config/packages/test/messenger.yaml
framework:
messenger:
transports:
async: test://?disable_retries=false
```
### Multiple Transports
If you have multiple transports you'd like to test, change all their dsn's to
Expand Down
16 changes: 16 additions & 0 deletions src/Transport/TestTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\Messenger\Worker;
Expand All @@ -24,6 +25,7 @@ final class TestTransport implements TransportInterface
'intercept' => true,
'catch_exceptions' => true,
'test_serialization' => true,
'disable_retries' => true,
];

private string $name;
Expand All @@ -40,6 +42,9 @@ final class TestTransport implements TransportInterface
/** @var array<string, bool> */
private static array $testSerialization = [];

/** @var array<string, bool> */
private static array $disableRetries = [];

/** @var array<string, Envelope[]> */
private static array $dispatched = [];

Expand Down Expand Up @@ -67,6 +72,7 @@ public function __construct(string $name, MessageBusInterface $bus, EventDispatc
self::$intercept[$name] ??= $options['intercept'];
self::$catchExceptions[$name] ??= $options['catch_exceptions'];
self::$testSerialization[$name] ??= $options['test_serialization'];
self::$disableRetries[$name] ??= $options['disable_retries'];
}

/**
Expand Down Expand Up @@ -228,6 +234,11 @@ public function reject(Envelope $envelope): void

public function send(Envelope $envelope): Envelope
{
if ($this->isRetriesDisabled() && $envelope->last(RedeliveryStamp::class)) {
// message is being retried, don't process
return $envelope;
}

if ($this->shouldTestSerialization()) {
Assert::try(
fn() => $this->serializer->decode($this->serializer->encode($envelope)),
Expand Down Expand Up @@ -276,6 +287,11 @@ private function shouldTestSerialization(): bool
return self::$testSerialization[$this->name];
}

private function isRetriesDisabled(): bool
{
return self::$disableRetries[$this->name];
}

private function hasMessagesToProcess(): bool
{
return !empty(self::$queue[$this->name] ?? []);
Expand Down
1 change: 1 addition & 0 deletions src/Transport/TestTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ private function parseDsn(string $dsn): array
'intercept' => \filter_var($query['intercept'] ?? true, \FILTER_VALIDATE_BOOLEAN),
'catch_exceptions' => \filter_var($query['catch_exceptions'] ?? true, \FILTER_VALIDATE_BOOLEAN),
'test_serialization' => \filter_var($query['test_serialization'] ?? true, \FILTER_VALIDATE_BOOLEAN),
'disable_retries' => \filter_var($query['disable_retries'] ?? true, \FILTER_VALIDATE_BOOLEAN),
];
}
}
8 changes: 3 additions & 5 deletions tests/Fixture/config/multi_transport.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ framework:
transports:
async1:
dsn: test://
retry_strategy:
max_retries: 0
async2:
dsn: test://?intercept=false&catch_exceptions=false&test_serialization=false
retry_strategy:
max_retries: 0
async3: in-memory://
async4:
dsn: test://?disable_retries=false
routing:
Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageA: [async1]
Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageA: [async1, async4]
Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageB: [async2]
Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageG: [async2]
2 changes: 0 additions & 2 deletions tests/Fixture/config/single_transport.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ framework:
transports:
async:
dsn: test://
retry_strategy:
max_retries: 0
routing:
Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageA: [async]
Zenstruck\Messenger\Test\Tests\Fixture\Messenger\MessageB: [async]
Expand Down
18 changes: 17 additions & 1 deletion tests/InteractsWithMessengerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ public function queue_name_is_required_if_using_multiple_transports(): void
self::bootKernel(['environment' => 'multi_transport']);

$this->expectException(\LogicException::class);
$this->expectExceptionMessage('Multiple transports are registered (async1, async2, async3), you must specify a name.');
$this->expectExceptionMessage('Multiple transports are registered (async1, async2, async3, async4), you must specify a name.');

$this->messenger();
}
Expand Down Expand Up @@ -781,6 +781,22 @@ public function serialization_problem_assertions(): void
Assert::run(fn() => $this->messenger('async2')->send(new Envelope(new MessageG())));
}

/**
* @test
*/
public function can_enable_retries(): void
{
self::bootKernel(['environment' => 'multi_transport']);

self::getContainer()->get(MessageBusInterface::class)->dispatch(new MessageA(true));

$this->messenger('async4')
->process()
->rejected()
->assertContains(MessageA::class, 4)
;
}

protected static function bootKernel(array $options = []): KernelInterface
{
return parent::bootKernel(\array_merge(['environment' => 'single_transport'], $options));
Expand Down

0 comments on commit 53750d2

Please sign in to comment.