-
Notifications
You must be signed in to change notification settings - Fork 0
/
Program.cs
88 lines (76 loc) · 2.4 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
using RabbitMQ.Client;
using System.Text;
if( args.Length != 2 )
{
Console.WriteLine( "Usage: AckProblemTest <queueName> <targetCount>" );
return;
}
var queueName = args[ 0 ];
var targetCount = int.Parse( args[ 1 ] );
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var sendingChannel = connection.CreateModel();
sendingChannel.ConfirmSelect();
var messageBody = Encoding.UTF8.GetBytes( "Hello world" );
int pass = 1;
while( true )
{
Console.WriteLine( "-----------------" );
Console.WriteLine( "Pass: " + pass++ );
sendingChannel.QueuePurge( queueName );
for( var i = 1; i <= targetCount; i++ )
{
var properties = sendingChannel.CreateBasicProperties();
properties.MessageId = i.ToString();
sendingChannel.BasicPublish( "", queueName, properties, messageBody );
}
sendingChannel.WaitForConfirmsOrDie();
Thread.Sleep( 200 );
var count = sendingChannel.MessageCount( queueName );
if( count != targetCount )
{
Console.WriteLine( $"Start count is wrong: {count}!!!" );
return;
}
Console.WriteLine( "Sent: " + count );
var receivingChannel = connection.CreateModel();
// Without this quorum queue would limit us to 2000 unacked messages
receivingChannel.BasicQos( 0, 65535, false );
var receivedDeliveryTags = new List<ulong>();
while( true )
{
var receivedMessage = receivingChannel.BasicGet( queueName, false );
if( receivedMessage == null )
{
break;
}
receivedDeliveryTags.Add( receivedMessage.DeliveryTag );
}
if( receivedDeliveryTags.Count != targetCount )
{
Console.WriteLine( $"We received less messages than sent: {receivedDeliveryTags.Count}!!!" );
return;
}
foreach( var tag in receivedDeliveryTags )
{
receivingChannel.BasicAck( tag, false );
}
receivingChannel.Close();
receivingChannel.Dispose();
Console.WriteLine( $"Received and acknowledged: {receivedDeliveryTags.Count} msgs" );
Thread.Sleep( 1000 );
var countAfterDelete = sendingChannel.MessageCount( queueName );
Console.WriteLine( "Count after delete: " + countAfterDelete );
if( countAfterDelete != 0 )
{
// wait a bit more and recheck
Thread.Sleep( 5000 );
countAfterDelete = sendingChannel.MessageCount( queueName );
if( countAfterDelete != 0 )
{
Console.WriteLine( $"Not all messages were deleted, {countAfterDelete} remained!" );
return;
}
Console.WriteLine( "False positive, all messages were deleted" );
}
}