Skip to content

Commit

Permalink
Add option to pass the consumer tag
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Hughes committed Jan 17, 2019
1 parent 7538b29 commit 1532cbc
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 19 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ We've bundled the queues in docker so you should be able to run examples locally
|---|---|---|---|
| blockingConsumer | Whether the consume method should block execution or only process 1 message | true | true/false |
| prefetchSize | Get the limit of messages that can be consumed by the channel | null | any numeric value |
| prefetchCount | The limit of messages that we fetch from the queue | 1 | any numeric value|
| prefetchCount | The limit of messages that we fetch from the queue | 1 | any numeric value |
| consumerTag | A tag for the consumer | default.consumer.tag | any value |

### SQS

Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"mockery/mockery": "^1.2",
"jakub-onderka/php-parallel-lint": "^1.0",
"phpro/grumphp": "^0.14",
"sensiolabs/security-checker": "^4.1",
"sensiolabs/security-checker": "^5.0",
"nikic/php-parser": "^4.0",
"povils/phpmnd": "^2.0",
"squizlabs/php_codesniffer": "^3.4"
Expand Down
29 changes: 15 additions & 14 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions src/Connector/RabbitMQ.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

final class RabbitMQ extends AMQPLazyConnection implements QueueInterface
{
private const CONSUMER_TAG = '.consumer.tag';
private const CONSUMER_TAG = 'default.consumer.tag';

private const DEFAULT_PREFETCH_SIZE = null;

Expand All @@ -32,7 +32,8 @@ final class RabbitMQ extends AMQPLazyConnection implements QueueInterface
private $queueOptions = [
'blockingConsumer' => true,
'prefetchSize' => self::DEFAULT_PREFETCH_SIZE,
'prefetchCount' => self::PREFETCH_COUNT
'prefetchCount' => self::PREFETCH_COUNT,
'consumerTag' => self::CONSUMER_TAG
];

public function queueMessage(string $queue, array $message): void
Expand All @@ -57,7 +58,7 @@ public function consume(string $queue, callable $callback, callable $idleCallbac

$this->channel->basic_consume(
$queue,
$queue . self::CONSUMER_TAG,
$this->queueOptions['consumerTag'],
false,
false,
false,
Expand Down

0 comments on commit 1532cbc

Please sign in to comment.