Skip to content

Commit

Permalink
Pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tgracchus committed Jun 17, 2024
1 parent 96525be commit 7327679
Showing 1 changed file with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

@SuppressWarnings("NullabilityAnnotations")
Expand Down Expand Up @@ -235,6 +236,7 @@ public void test_qos_1_messages_is_dup() {
channel.writeInbound(publish);

assertEquals(true, channel.outboundMessages().isEmpty());
verifyNoMoreInteractions(incomingMessageFlowPersistence);
}

@Test
Expand Down Expand Up @@ -293,7 +295,10 @@ public void test_qos_2_messages_is_dup_ignored() {
channel.writeInbound(publish);
channel.writeInbound(publish);

assertEquals(false, channel.outboundMessages().isEmpty());
assertEquals(1, channel.outboundMessages().size());
verify(incomingMessageFlowPersistence, times(1)).addOrReplace(CLIENT_ID,
publish.getPacketIdentifier(),
publish);
}

@Test
Expand All @@ -314,7 +319,7 @@ public void test_qos_2_messages_is_dup() {
channel.writeInbound(publish);
channel.writeInbound(publish);

assertEquals(false, channel.outboundMessages().isEmpty());
assertEquals(1, channel.outboundMessages().size());
verify(incomingMessageFlowPersistence, times(1)).addOrReplace(CLIENT_ID,
publish.getPacketIdentifier(),
publish);
Expand Down Expand Up @@ -353,6 +358,7 @@ public void test_acknowledge_qos_1_message() {

assertNotNull(pubackOut);
assertEquals(puback.getPacketIdentifier(), pubackOut.getPacketIdentifier());
verifyNoMoreInteractions(incomingMessageFlowPersistence);
}

@Test
Expand Down Expand Up @@ -786,7 +792,7 @@ public void test_qos2_return_publish_status_on_pubcomp() throws Exception {
}

@Test(timeout = 5000)
public void test_max_inflight_window() throws Exception {
public void test_max_inflight_window() {

ClientConnection.of(channel).setClientReceiveMaximum(50);
InternalConfigurations.MAX_INFLIGHT_WINDOW_SIZE_MESSAGES = 3;
Expand All @@ -812,7 +818,7 @@ public void test_max_inflight_window() throws Exception {
}

@Test()
public void test_Qos2AndQos1PublishDoNotInterfereWithEachOther() throws Exception {
public void test_Qos2AndQos1PublishDoNotInterfereWithEachOther() {
InternalConfigurations.MAX_INFLIGHT_WINDOW_SIZE_MESSAGES = 50;
channel = new EmbeddedChannel(new PublishFlowHandler(publishPollService,
new IncomingMessageFlowPersistenceImpl(new IncomingMessageFlowInMemoryLocalPersistence()),
Expand All @@ -824,15 +830,14 @@ public void test_Qos2AndQos1PublishDoNotInterfereWithEachOther() throws Exceptio
channel.attr(ClientConnectionContext.CHANNEL_ATTRIBUTE_NAME).set(clientConnection);
ClientConnection.of(channel).setClientId(CLIENT_ID);


final PUBLISH publishQoS1 = createPublish("topic", 100, QoS.AT_LEAST_ONCE);
final PUBLISH publishQoS2 = createPublish("topic", 100, QoS.EXACTLY_ONCE);

final PUBLISH publishQoS2_1000 = createPublish("topic", 100, QoS.EXACTLY_ONCE);
final PUBLISH publishQoS2Duplicate = createPublish("topic", 100, QoS.EXACTLY_ONCE);

channel.pipeline().fireChannelRead(publishQoS2);
channel.pipeline().fireChannelRead(publishQoS1);
channel.pipeline().fireChannelRead(publishQoS2_1000);
channel.pipeline().fireChannelRead(publishQoS2Duplicate);

assertEquals(0, orderedTopicService.queue.size());
assertEquals(0, orderedTopicService.unacknowledgedMessages().size());
Expand All @@ -843,7 +848,7 @@ public void test_Qos2AndQos1PublishDoNotInterfereWithEachOther() throws Exceptio
eq(publishQoS2),
eq(CLIENT_ID));
verify(incomingPublishHandler, times(0)).interceptOrDelegate(any(ChannelHandlerContext.class),
eq(publishQoS2_1000),
eq(publishQoS2Duplicate),
eq(CLIENT_ID));
}

Expand Down

0 comments on commit 7327679

Please sign in to comment.