This repository has been archived by the owner on Sep 22, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathconsume_with_ack_test.exs
150 lines (119 loc) · 4.37 KB
/
consume_with_ack_test.exs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
defmodule RabbitHole.Protocol.ConsumeWithAckTest do
use ExUnit.Case
alias RabbitHole.Protocol.{Connection, Channel, Queue, Basic}
require Logger
@my_queue "my_queue"
@my_message "Hello Rabbit, this is Elixir! We have a lot in common..."
setup do
{:ok, conn} = Connection.open()
{:ok, chan} = Channel.open(conn)
{:ok, _} = Queue.delete(chan, @my_queue)
{:ok, queue} = Queue.declare(chan, @my_queue)
on_exit(fn ->
:ok = Channel.close(chan)
:ok = Connection.close(conn)
end)
[conn: conn, chan: chan, queue: queue]
end
describe "with auto acks" do
test "message received by a consumer is already acked", params do
# GIVEN
:ok = Basic.publish(params.chan, "", params.queue, @my_message)
# WHEN
consume_once(params.queue, no_ack: true)
{:ok, tag} = Basic.consume(params.chan, params.queue, no_ack: false)
{:ok, ^tag} = Basic.cancel(params.chan, tag)
# THEN
refute_received {:basic_deliver, @my_message, _meta}, 100
end
end
describe "with manual acks" do
test "message not acked by a consumer will be consumed by another one", params do
# GIVEN
:ok = Basic.publish(params.chan, "", params.queue, @my_message)
# WHEN
consume_once(params.queue, no_ack: false)
{:ok, tag} = Basic.consume(params.chan, params.queue, no_ack: false)
# THEN
assert_receive {:basic_deliver, @my_message, meta}, 100
assert meta.redelivered == true
assert :ok = Basic.ack(params.chan, meta.delivery_tag)
# CLEAN-UP
{:ok, ^tag} = Basic.cancel(params.chan, tag)
end
test "prefetch will limit the number of messages until ack is send", params do
# GIVEN
# manual acks (i.e. no_ack: false) is set by default
Basic.qos(params.chan, prefetch_count: 2)
{:ok, tag} = Basic.consume(params.chan, params.queue)
# WHEN
for _ <- 1..3 do
:ok = Basic.publish(params.chan, "", params.queue, @my_message)
end
# THEN
# get 2 messages and ack
assert_receive {:basic_deliver, @my_message, _meta}, 100
assert_receive {:basic_deliver, @my_message, meta1}, 100
refute_receive {:basic_deliver, @my_message, _meta}, 100
Basic.ack(params.chan, meta1.delivery_tag, [:multiple])
# get the last message and confirm
assert_receive {:basic_deliver, @my_message, meta2}, 100
Basic.ack(params.chan, meta2.delivery_tag)
# CLEAN-UP
{:ok, ^tag} = Basic.cancel(params.chan, tag)
end
test "messages over the prefetch limit are sent to another consumer", params do
# GIVEN
# manual acks (i.e. no_ack: false) is set by default
Basic.qos(params.chan, prefetch_count: 2)
{:ok, tag1} = Basic.consume(params.chan, params.queue)
{:ok, tag2} = Basic.consume(params.chan, params.queue)
# WHEN
for _ <- 1..5 do
:ok = Basic.publish(params.chan, "", params.queue, @my_message)
end
# THEN
# get 4 message and ack
metas =
for _ <- 1..4 do
assert_receive {:basic_deliver, @my_message, meta}, 100
meta
end
refute_receive {:basic_deliver, @my_message, _meta}, 100
assert 2 == Enum.count(metas, &(&1.consumer_tag == tag1))
assert 2 == Enum.count(metas, &(&1.consumer_tag == tag2))
Basic.ack(params.chan, List.last(metas).delivery_tag, [:multiple])
# get the last message and confirm
assert_receive {:basic_deliver, @my_message, meta}, 100
Basic.ack(params.chan, meta.delivery_tag)
# CLEAN-UP
for t <- [tag1, tag2], do: {:ok, ^t} = Basic.cancel(params.chan, t)
end
end
defp consume_once(queue, opts) do
Process.flag(:trap_exit, true)
pid =
spawn_link(fn ->
{_, chan} = conn_chan = setup_conn_chan()
{:ok, tag} = Basic.consume(chan, queue, no_ack: opts[:no_ack])
receive do
{:basic_deliver, @my_message, _meta} ->
Logger.debug("Got message on #{tag}")
end
close_conn_chan(conn_chan)
end)
receive do
{:EXIT, ^pid, :normal} -> :ok
other -> Logger.debug("Instead of exit signal got #{inspect(other)}")
end
end
defp setup_conn_chan() do
{:ok, conn} = Connection.open()
{:ok, chan} = Channel.open(conn)
{conn, chan}
end
defp close_conn_chan({conn, chan}) do
:ok = Channel.close(chan)
:ok = Connection.close(conn)
end
end