Skip to content

Commit

Permalink
Merge pull request #16 from plastiik/TES-3412-Add-fifo-support-to-queues
Browse files Browse the repository at this point in the history
TES-3412 Add support for FIFO queues.
  • Loading branch information
exussum12 authored Mar 26, 2021
2 parents d33c427 + 39dd901 commit 7105304
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 7 deletions.
7 changes: 6 additions & 1 deletion src/Connector/Contract/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ interface Queue
/**
* @param string[] $message
*/
public function queueMessage(string $queue, array $message) : void;
public function queueMessage(
string $queue,
array $message,
?string $messageId = null,
?string $duplicationId = null
) : void;

public function consume(string $queue, callable $callback, callable $idleCallback) : void;

Expand Down
8 changes: 6 additions & 2 deletions src/Connector/RabbitMQ.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ class RabbitMQ extends AMQPLazyConnection implements Queue
/**
* @param string[] $message
*/
public function queueMessage(string $queue, array $message) : void
{
public function queueMessage(
string $queue,
array $message,
?string $messageId = null,
?string $duplicationId = null
) : void {
$this->connectToChannel();

$this->channel->basic_publish(
Expand Down
22 changes: 18 additions & 4 deletions src/Connector/SQS.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,27 @@ class SQS extends SqsClient implements Queue
/**
* @param string[] $message
*/
public function queueMessage(string $queue, array $message) : void
{
$this->sendMessage([
public function queueMessage(
string $queue,
array $message,
?string $messageId = null,
?string $duplicationId = null
) : void {
$message = [
'QueueUrl' => $queue,
'MessageBody' => json_encode($message),
'MessageAttributes' => $this->getMessageAttributes($message),
]);
];

if ($messageId) {
$message['MessageGroupId'] = $messageId;
}

if ($duplicationId) {
$message['MessageDeduplicationId'] = $duplicationId;
}

$this->sendMessage($message);
}

public function consume(string $queue, callable $callback, callable $idleCallback) : void
Expand Down

0 comments on commit 7105304

Please sign in to comment.