Skip to content

Commit

Permalink
[latest] Refactor PublushFlowHander, so, now we have two clear distin…
Browse files Browse the repository at this point in the history
…ct paths… (#505)

* Refactor PublushFlowHanderl, so, now we have two clear distinct paths for Qos1 and Qos2, Qos1 is using a map while Qos2 is using the IncomingMessageFlowPersistence
  • Loading branch information
tgracchus authored Jun 20, 2024
1 parent 2aeda8a commit d096393
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class PublishFlowHandler extends ChannelDuplexHandler {
private final @NotNull IncomingPublishHandler incomingPublishHandler;
private final @NotNull DropOutgoingPublishesHandler dropOutgoingPublishesHandler;

private final @NotNull Map<Integer, Boolean> qos1And2AlreadySentMap;
private final @NotNull Map<Integer, Boolean> qos1AlreadySentMap;

@VisibleForTesting
@Inject
Expand All @@ -83,7 +83,7 @@ public PublishFlowHandler(
this.publishPollService = publishPollService;
this.persistence = persistence;
this.orderedTopicService = orderedTopicService;
this.qos1And2AlreadySentMap = new HashMap<>();
this.qos1AlreadySentMap = new HashMap<>();
this.incomingPublishHandler = incomingPublishHandler;
this.dropOutgoingPublishesHandler = dropOutgoingPublishesHandler;
}
Expand Down Expand Up @@ -117,13 +117,17 @@ public void write(

if (msg instanceof PUBACK) {
final PUBACK puback = (PUBACK) msg;
final String client = ClientConnection.of(ctx.channel()).getClientId();
final int messageId = puback.getPacketIdentifier();
persistence.addOrReplace(client, messageId, puback);
promise.addListener(new PUBLISHFlowCompleteListener(messageId,
client,
qos1And2AlreadySentMap,
persistence));
promise.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
qos1AlreadySentMap.remove(messageId);
if (log.isTraceEnabled()) {
log.trace("Client '{}' completed a PUBLISH flow with QoS 1 for packet identifier '{}'",
ctx,
messageId);
}
}
});
}

final boolean flowComplete = orderedTopicService.handlePublish(ctx.channel(), msg, promise);
Expand Down Expand Up @@ -183,22 +187,27 @@ private void handlePublish(

if (publish.getQoS() == QoS.AT_MOST_ONCE) {// do nothing
incomingPublishHandler.interceptOrDelegate(ctx, publish, clientId);
// QoS 1 or 2 duplicate delivery handling
} else {
// QoS 1 delivery handling
} else if (publish.getQoS() == QoS.AT_LEAST_ONCE) {
UNACKNOWLEDGED_PUBLISHES_COUNTER.incrementAndGet();
if (publish.isDuplicateDelivery() && qos1AlreadySentMap.get(publish.getPacketIdentifier()) != null) {
log.debug("Client {} sent a duplicate publish message with id {}. This message is ignored",
clientId,
publish.getPacketIdentifier());
} else {
final int packetId = publish.getPacketIdentifier();
qos1AlreadySentMap.put(publish.getPacketIdentifier(), true);
firstPublishForMessageIdReceived(ctx, publish, clientId, packetId);
}
// QoS 2 duplicate delivery handling
} else {
final int messageId = publish.getPacketIdentifier();
final MessageWithID savedMessage = persistence.get(clientId, messageId);

//No PUBLISH message was found in persistence. This is the standard case since we don't know this message yet
if (!(savedMessage instanceof PUBLISH)) {
persistence.addOrReplace(clientId, messageId, publish);
firstPublishForMessageIdReceived(ctx, publish, clientId, messageId);
//The publish was resent with the DUP flag
} else if (publish.isDuplicateDelivery()) {
resentWithDUPFlag(ctx, publish, clientId);
//The publish was resent without DUP flag!
} else {
resentWithoutDUPFlag(ctx, publish, clientId);
}
} else
ctx.writeAndFlush(new PUBREC(messageId));
}
}

Expand All @@ -207,47 +216,13 @@ private void firstPublishForMessageIdReceived(
final @NotNull PUBLISH publish,
final @NotNull String client,
final int messageId) throws Exception {
persistence.addOrReplace(client, messageId, publish);
incomingPublishHandler.interceptOrDelegate(ctx, publish, client);
qos1And2AlreadySentMap.put(messageId, true);
log.trace(
"Client {} sent a publish message with id {} which was not forwarded before. This message is processed normally",
client,
messageId);
}

private void resentWithDUPFlag(
final @NotNull ChannelHandlerContext ctx, final @NotNull PUBLISH publish, final @NotNull String client)
throws Exception {
final Boolean alreadySent = qos1And2AlreadySentMap.get(publish.getPacketIdentifier());
if (alreadySent != null && alreadySent) {

log.debug("Client {} sent a duplicate publish message with id {}. This message is ignored",
client,
publish.getPacketIdentifier());
} else {
super.channelRead(ctx, publish);
log.debug(
"Client {} sent a duplicate publish message with id {} which was not forwarded before. This message is processed normally",
client,
publish.getPacketIdentifier());
}
qos1And2AlreadySentMap.put(publish.getPacketIdentifier(), true);
}

private void resentWithoutDUPFlag(
final @NotNull ChannelHandlerContext ctx, final @NotNull PUBLISH publish, final @NotNull String client)
throws Exception {
log.debug(
"Client {} sent a new PUBLISH with QoS {} and a message identifier which is already in process ({}) by another flow! Starting new flow",
client,
publish.getQoS().getQosNumber(),
publish.getPacketIdentifier());
persistence.addOrReplace(client, publish.getPacketIdentifier(), publish);
incomingPublishHandler.interceptOrDelegate(ctx, publish, client);
qos1And2AlreadySentMap.put(publish.getPacketIdentifier(), true);
}


private void handlePuback(final @NotNull ChannelHandlerContext ctx, final PUBACK msg) {

Expand Down Expand Up @@ -291,7 +266,7 @@ private void handlePubrel(final ChannelHandlerContext ctx, final PUBREL pubrel)

persistence.addOrReplace(client, messageId, pubrel);
ctx.writeAndFlush(new PUBCOMP(messageId))
.addListener(new PUBLISHFlowCompleteListener(messageId, client, qos1And2AlreadySentMap, persistence));
.addListener(new PubcompSentListener(messageId, client, persistence));
}

private void handlePubcomp(final @NotNull ChannelHandlerContext ctx, @NotNull final PUBCOMP msg) {
Expand Down Expand Up @@ -329,31 +304,27 @@ private void returnMessageId(
}

@Immutable
private static class PUBLISHFlowCompleteListener implements ChannelFutureListener {
private static class PubcompSentListener implements ChannelFutureListener {

private final int messageId;
private final @NotNull String client;
private final @NotNull Map<Integer, Boolean> qos1And2AlreadySentMap;
private final @NotNull IncomingMessageFlowPersistence persistence;

PUBLISHFlowCompleteListener(
PubcompSentListener(
final int messageId,
final @NotNull String client,
final @NotNull Map<Integer, Boolean> qos1And2AlreadySentMap,
final @NotNull IncomingMessageFlowPersistence persistence) {
this.messageId = messageId;
this.client = client;
this.qos1And2AlreadySentMap = qos1And2AlreadySentMap;
this.persistence = persistence;
}

@Override
public void operationComplete(final ChannelFuture future) throws Exception {
public void operationComplete(final ChannelFuture future) {
if (future.isSuccess()) {
UNACKNOWLEDGED_PUBLISHES_COUNTER.decrementAndGet();
qos1And2AlreadySentMap.remove(messageId);
persistence.remove(client, messageId);
log.trace("Client '{}' completed a PUBLISH flow with QoS 1 or 2 for packet identifier '{}'",
log.trace("Client '{}' completed a PUBLISH flow with QoS 2 for packet identifier '{}'",
client,
messageId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class IncomingMessageFlowPersistenceImpl implements IncomingMessageFlowPe
private final @NotNull IncomingMessageFlowLocalPersistence localPersistence;

@Inject
IncomingMessageFlowPersistenceImpl(final @NotNull IncomingMessageFlowLocalPersistence localPersistence) {
public IncomingMessageFlowPersistenceImpl(final @NotNull IncomingMessageFlowLocalPersistence localPersistence) {
this.localPersistence = localPersistence;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
import com.hivemq.mqtt.message.reason.Mqtt5PubRecReasonCode;
import com.hivemq.mqtt.message.reason.Mqtt5PubRelReasonCode;
import com.hivemq.mqtt.services.PublishPollService;
import com.hivemq.persistence.local.IncomingMessageFlowInMemoryLocalPersistence;
import com.hivemq.persistence.qos.IncomingMessageFlowPersistence;
import com.hivemq.persistence.qos.IncomingMessageFlowPersistenceImpl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.After;
Expand All @@ -51,6 +54,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.anyInt;
Expand All @@ -60,6 +64,7 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

@SuppressWarnings("NullabilityAnnotations")
Expand Down Expand Up @@ -213,7 +218,7 @@ public void test_qos_1_messages_is_dup_ignored() {
}

@Test
public void test_qos_1_messages_is_not_dup() {
public void test_qos_1_messages_is_dup() {

final int messageid = 1;

Expand All @@ -231,9 +236,7 @@ public void test_qos_1_messages_is_not_dup() {
channel.writeInbound(publish);

assertEquals(true, channel.outboundMessages().isEmpty());
verify(incomingMessageFlowPersistence, times(2)).addOrReplace(CLIENT_ID,
publish.getPacketIdentifier(),
publish);
verifyNoMoreInteractions(incomingMessageFlowPersistence);
}

@Test
Expand Down Expand Up @@ -270,7 +273,7 @@ public void test_qos_2_messages_is_dup_not_forwarded() {
channel.writeInbound(publish);

//pubcomp is here
assertEquals(1, channel.outboundMessages().size());
assertEquals(2, channel.outboundMessages().size());
}

@Test
Expand All @@ -292,11 +295,14 @@ public void test_qos_2_messages_is_dup_ignored() {
channel.writeInbound(publish);
channel.writeInbound(publish);

assertEquals(true, channel.outboundMessages().isEmpty());
assertEquals(1, channel.outboundMessages().size());
verify(incomingMessageFlowPersistence, times(1)).addOrReplace(CLIENT_ID,
publish.getPacketIdentifier(),
publish);
}

@Test
public void test_qos_2_messages_is_not_dup() {
public void test_qos_2_messages_is_dup() {

final int messageid = 1;

Expand All @@ -313,8 +319,8 @@ public void test_qos_2_messages_is_not_dup() {
channel.writeInbound(publish);
channel.writeInbound(publish);

assertEquals(true, channel.outboundMessages().isEmpty());
verify(incomingMessageFlowPersistence, times(2)).addOrReplace(CLIENT_ID,
assertEquals(1, channel.outboundMessages().size());
verify(incomingMessageFlowPersistence, times(1)).addOrReplace(CLIENT_ID,
publish.getPacketIdentifier(),
publish);
}
Expand Down Expand Up @@ -352,13 +358,7 @@ public void test_acknowledge_qos_1_message() {

assertNotNull(pubackOut);
assertEquals(puback.getPacketIdentifier(), pubackOut.getPacketIdentifier());

verify(incomingMessageFlowPersistence).addOrReplace(eq("client"),
eq(puback.getPacketIdentifier()),
same(puback));

//We have to make sure that the client was actually deleted in the end
verify(incomingMessageFlowPersistence).remove(eq("client"), eq(puback.getPacketIdentifier()));
verifyNoMoreInteractions(incomingMessageFlowPersistence);
}

@Test
Expand Down Expand Up @@ -792,7 +792,7 @@ public void test_qos2_return_publish_status_on_pubcomp() throws Exception {
}

@Test(timeout = 5000)
public void test_max_inflight_window() throws Exception {
public void test_max_inflight_window() {

ClientConnection.of(channel).setClientReceiveMaximum(50);
InternalConfigurations.MAX_INFLIGHT_WINDOW_SIZE_MESSAGES = 3;
Expand All @@ -817,6 +817,42 @@ public void test_max_inflight_window() throws Exception {
assertEquals(3, orderedTopicService.unacknowledgedMessages().size());
}

@Test()
public void test_Qos2AndQos1PublishDoNotInterfereWithEachOther() {
InternalConfigurations.MAX_INFLIGHT_WINDOW_SIZE_MESSAGES = 50;
channel = new EmbeddedChannel(new PublishFlowHandler(publishPollService,
new IncomingMessageFlowPersistenceImpl(new IncomingMessageFlowInMemoryLocalPersistence()),
orderedTopicService,
incomingPublishHandler,
mock(DropOutgoingPublishesHandler.class)));
final ClientConnection clientConnection = spy(new DummyClientConnection(channel, null));
when(clientConnection.getFreePacketIdRanges()).thenReturn(freePacketIdRanges);
channel.attr(ClientConnectionContext.CHANNEL_ATTRIBUTE_NAME).set(clientConnection);
ClientConnection.of(channel).setClientId(CLIENT_ID);

final PUBLISH publishQoS1 = createPublish("topic", 100, QoS.AT_LEAST_ONCE);
final PUBLISH publishQoS2 = createPublish("topic", 100, QoS.EXACTLY_ONCE);

final PUBLISH publishQoS2Duplicate = createPublish("topic", 100, QoS.EXACTLY_ONCE);

channel.pipeline().fireChannelRead(publishQoS2);
channel.pipeline().fireChannelRead(publishQoS1);
channel.pipeline().fireChannelRead(publishQoS2Duplicate);

assertEquals(0, orderedTopicService.queue.size());
assertEquals(0, orderedTopicService.unacknowledgedMessages().size());
verify(incomingPublishHandler, times(1)).interceptOrDelegate(any(ChannelHandlerContext.class),
eq(publishQoS1),
eq(CLIENT_ID));
verify(incomingPublishHandler, times(1)).interceptOrDelegate(any(ChannelHandlerContext.class),
eq(publishQoS2),
eq(CLIENT_ID));
verify(incomingPublishHandler, never()).interceptOrDelegate(any(ChannelHandlerContext.class),
eq(publishQoS2Duplicate),
eq(CLIENT_ID));
}


private PUBLISH createPublish(final String topic, final int messageId, final QoS qoS) {
return createPublish(topic, messageId, qoS, false);
}
Expand Down

0 comments on commit d096393

Please sign in to comment.