From aaefd1609e98907bc7399b905ba14a4e3c1dfe7f Mon Sep 17 00:00:00 2001 From: Ulises Andreu Olivenza Yago Date: Wed, 12 Jun 2024 17:12:51 +0200 Subject: [PATCH] Refactor PublushFlowHanderl, so, now we have two clear distinct paths for Qos1 and Qos2, Qos1 is using a map while Qos2 is using the IncomingMessageFlowPersistence --- .../handler/publish/PublishFlowHandler.java | 95 +++++++------------ .../IncomingMessageFlowPersistenceImpl.java | 2 +- .../publish/PublishFlowHandlerTest.java | 63 ++++++++---- 3 files changed, 81 insertions(+), 79 deletions(-) diff --git a/src/main/java/com/hivemq/mqtt/handler/publish/PublishFlowHandler.java b/src/main/java/com/hivemq/mqtt/handler/publish/PublishFlowHandler.java index 32bca18d5..5b767ac6c 100644 --- a/src/main/java/com/hivemq/mqtt/handler/publish/PublishFlowHandler.java +++ b/src/main/java/com/hivemq/mqtt/handler/publish/PublishFlowHandler.java @@ -70,7 +70,7 @@ public class PublishFlowHandler extends ChannelDuplexHandler { private final @NotNull IncomingPublishHandler incomingPublishHandler; private final @NotNull DropOutgoingPublishesHandler dropOutgoingPublishesHandler; - private final @NotNull Map qos1And2AlreadySentMap; + private final @NotNull Map qos1AlreadySentMap; @VisibleForTesting @Inject @@ -83,7 +83,7 @@ public PublishFlowHandler( this.publishPollService = publishPollService; this.persistence = persistence; this.orderedTopicService = orderedTopicService; - this.qos1And2AlreadySentMap = new HashMap<>(); + this.qos1AlreadySentMap = new HashMap<>(); this.incomingPublishHandler = incomingPublishHandler; this.dropOutgoingPublishesHandler = dropOutgoingPublishesHandler; } @@ -117,13 +117,17 @@ public void write( if (msg instanceof PUBACK) { final PUBACK puback = (PUBACK) msg; - final String client = ClientConnection.of(ctx.channel()).getClientId(); final int messageId = puback.getPacketIdentifier(); - persistence.addOrReplace(client, messageId, puback); - promise.addListener(new PUBLISHFlowCompleteListener(messageId, - client, - qos1And2AlreadySentMap, - persistence)); + promise.addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + qos1AlreadySentMap.remove(messageId); + if (log.isTraceEnabled()) { + log.trace("Client '{}' completed a PUBLISH flow with QoS 1 for packet identifier '{}'", + ctx, + messageId); + } + } + }); } final boolean flowComplete = orderedTopicService.handlePublish(ctx.channel(), msg, promise); @@ -183,22 +187,27 @@ private void handlePublish( if (publish.getQoS() == QoS.AT_MOST_ONCE) {// do nothing incomingPublishHandler.interceptOrDelegate(ctx, publish, clientId); - // QoS 1 or 2 duplicate delivery handling - } else { + // QoS 1 delivery handling + } else if (publish.getQoS() == QoS.AT_LEAST_ONCE) { UNACKNOWLEDGED_PUBLISHES_COUNTER.incrementAndGet(); + if (publish.isDuplicateDelivery() && qos1AlreadySentMap.get(publish.getPacketIdentifier()) != null) { + log.debug("Client {} sent a duplicate publish message with id {}. This message is ignored", + clientId, + publish.getPacketIdentifier()); + } else { + final int packetId = publish.getPacketIdentifier(); + qos1AlreadySentMap.put(publish.getPacketIdentifier(), true); + firstPublishForMessageIdReceived(ctx, publish, clientId, packetId); + } + // QoS 2 duplicate delivery handling + } else { final int messageId = publish.getPacketIdentifier(); final MessageWithID savedMessage = persistence.get(clientId, messageId); - - //No PUBLISH message was found in persistence. This is the standard case since we don't know this message yet if (!(savedMessage instanceof PUBLISH)) { + persistence.addOrReplace(clientId, messageId, publish); firstPublishForMessageIdReceived(ctx, publish, clientId, messageId); - //The publish was resent with the DUP flag - } else if (publish.isDuplicateDelivery()) { - resentWithDUPFlag(ctx, publish, clientId); - //The publish was resent without DUP flag! - } else { - resentWithoutDUPFlag(ctx, publish, clientId); - } + } else + ctx.writeAndFlush(new PUBREC(messageId)); } } @@ -207,47 +216,13 @@ private void firstPublishForMessageIdReceived( final @NotNull PUBLISH publish, final @NotNull String client, final int messageId) throws Exception { - persistence.addOrReplace(client, messageId, publish); incomingPublishHandler.interceptOrDelegate(ctx, publish, client); - qos1And2AlreadySentMap.put(messageId, true); log.trace( "Client {} sent a publish message with id {} which was not forwarded before. This message is processed normally", client, messageId); } - private void resentWithDUPFlag( - final @NotNull ChannelHandlerContext ctx, final @NotNull PUBLISH publish, final @NotNull String client) - throws Exception { - final Boolean alreadySent = qos1And2AlreadySentMap.get(publish.getPacketIdentifier()); - if (alreadySent != null && alreadySent) { - - log.debug("Client {} sent a duplicate publish message with id {}. This message is ignored", - client, - publish.getPacketIdentifier()); - } else { - super.channelRead(ctx, publish); - log.debug( - "Client {} sent a duplicate publish message with id {} which was not forwarded before. This message is processed normally", - client, - publish.getPacketIdentifier()); - } - qos1And2AlreadySentMap.put(publish.getPacketIdentifier(), true); - } - - private void resentWithoutDUPFlag( - final @NotNull ChannelHandlerContext ctx, final @NotNull PUBLISH publish, final @NotNull String client) - throws Exception { - log.debug( - "Client {} sent a new PUBLISH with QoS {} and a message identifier which is already in process ({}) by another flow! Starting new flow", - client, - publish.getQoS().getQosNumber(), - publish.getPacketIdentifier()); - persistence.addOrReplace(client, publish.getPacketIdentifier(), publish); - incomingPublishHandler.interceptOrDelegate(ctx, publish, client); - qos1And2AlreadySentMap.put(publish.getPacketIdentifier(), true); - } - private void handlePuback(final @NotNull ChannelHandlerContext ctx, final PUBACK msg) { @@ -291,7 +266,7 @@ private void handlePubrel(final ChannelHandlerContext ctx, final PUBREL pubrel) persistence.addOrReplace(client, messageId, pubrel); ctx.writeAndFlush(new PUBCOMP(messageId)) - .addListener(new PUBLISHFlowCompleteListener(messageId, client, qos1And2AlreadySentMap, persistence)); + .addListener(new PubcompSentListener(messageId, client, persistence)); } private void handlePubcomp(final @NotNull ChannelHandlerContext ctx, @NotNull final PUBCOMP msg) { @@ -329,31 +304,27 @@ private void returnMessageId( } @Immutable - private static class PUBLISHFlowCompleteListener implements ChannelFutureListener { + private static class PubcompSentListener implements ChannelFutureListener { private final int messageId; private final @NotNull String client; - private final @NotNull Map qos1And2AlreadySentMap; private final @NotNull IncomingMessageFlowPersistence persistence; - PUBLISHFlowCompleteListener( + PubcompSentListener( final int messageId, final @NotNull String client, - final @NotNull Map qos1And2AlreadySentMap, final @NotNull IncomingMessageFlowPersistence persistence) { this.messageId = messageId; this.client = client; - this.qos1And2AlreadySentMap = qos1And2AlreadySentMap; this.persistence = persistence; } @Override - public void operationComplete(final ChannelFuture future) throws Exception { + public void operationComplete(final ChannelFuture future) { if (future.isSuccess()) { UNACKNOWLEDGED_PUBLISHES_COUNTER.decrementAndGet(); - qos1And2AlreadySentMap.remove(messageId); persistence.remove(client, messageId); - log.trace("Client '{}' completed a PUBLISH flow with QoS 1 or 2 for packet identifier '{}'", + log.trace("Client '{}' completed a PUBLISH flow with QoS 2 for packet identifier '{}'", client, messageId); } diff --git a/src/main/java/com/hivemq/persistence/qos/IncomingMessageFlowPersistenceImpl.java b/src/main/java/com/hivemq/persistence/qos/IncomingMessageFlowPersistenceImpl.java index 135dfd55c..c40e61eb0 100644 --- a/src/main/java/com/hivemq/persistence/qos/IncomingMessageFlowPersistenceImpl.java +++ b/src/main/java/com/hivemq/persistence/qos/IncomingMessageFlowPersistenceImpl.java @@ -32,7 +32,7 @@ public class IncomingMessageFlowPersistenceImpl implements IncomingMessageFlowPe private final @NotNull IncomingMessageFlowLocalPersistence localPersistence; @Inject - IncomingMessageFlowPersistenceImpl(final @NotNull IncomingMessageFlowLocalPersistence localPersistence) { + public IncomingMessageFlowPersistenceImpl(final @NotNull IncomingMessageFlowLocalPersistence localPersistence) { this.localPersistence = localPersistence; } diff --git a/src/test/java/com/hivemq/mqtt/handler/publish/PublishFlowHandlerTest.java b/src/test/java/com/hivemq/mqtt/handler/publish/PublishFlowHandlerTest.java index c2765f29e..d0083b5eb 100644 --- a/src/test/java/com/hivemq/mqtt/handler/publish/PublishFlowHandlerTest.java +++ b/src/test/java/com/hivemq/mqtt/handler/publish/PublishFlowHandlerTest.java @@ -37,7 +37,10 @@ import com.hivemq.mqtt.message.reason.Mqtt5PubRecReasonCode; import com.hivemq.mqtt.message.reason.Mqtt5PubRelReasonCode; import com.hivemq.mqtt.services.PublishPollService; +import com.hivemq.persistence.local.IncomingMessageFlowInMemoryLocalPersistence; import com.hivemq.persistence.qos.IncomingMessageFlowPersistence; +import com.hivemq.persistence.qos.IncomingMessageFlowPersistenceImpl; +import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; import org.junit.After; @@ -51,6 +54,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.anyInt; @@ -213,7 +217,7 @@ public void test_qos_1_messages_is_dup_ignored() { } @Test - public void test_qos_1_messages_is_not_dup() { + public void test_qos_1_messages_is_dup() { final int messageid = 1; @@ -231,9 +235,6 @@ public void test_qos_1_messages_is_not_dup() { channel.writeInbound(publish); assertEquals(true, channel.outboundMessages().isEmpty()); - verify(incomingMessageFlowPersistence, times(2)).addOrReplace(CLIENT_ID, - publish.getPacketIdentifier(), - publish); } @Test @@ -270,7 +271,7 @@ public void test_qos_2_messages_is_dup_not_forwarded() { channel.writeInbound(publish); //pubcomp is here - assertEquals(1, channel.outboundMessages().size()); + assertEquals(2, channel.outboundMessages().size()); } @Test @@ -292,11 +293,11 @@ public void test_qos_2_messages_is_dup_ignored() { channel.writeInbound(publish); channel.writeInbound(publish); - assertEquals(true, channel.outboundMessages().isEmpty()); + assertEquals(false, channel.outboundMessages().isEmpty()); } @Test - public void test_qos_2_messages_is_not_dup() { + public void test_qos_2_messages_is_dup() { final int messageid = 1; @@ -313,8 +314,8 @@ public void test_qos_2_messages_is_not_dup() { channel.writeInbound(publish); channel.writeInbound(publish); - assertEquals(true, channel.outboundMessages().isEmpty()); - verify(incomingMessageFlowPersistence, times(2)).addOrReplace(CLIENT_ID, + assertEquals(false, channel.outboundMessages().isEmpty()); + verify(incomingMessageFlowPersistence, times(1)).addOrReplace(CLIENT_ID, publish.getPacketIdentifier(), publish); } @@ -352,13 +353,6 @@ public void test_acknowledge_qos_1_message() { assertNotNull(pubackOut); assertEquals(puback.getPacketIdentifier(), pubackOut.getPacketIdentifier()); - - verify(incomingMessageFlowPersistence).addOrReplace(eq("client"), - eq(puback.getPacketIdentifier()), - same(puback)); - - //We have to make sure that the client was actually deleted in the end - verify(incomingMessageFlowPersistence).remove(eq("client"), eq(puback.getPacketIdentifier())); } @Test @@ -817,6 +811,43 @@ public void test_max_inflight_window() throws Exception { assertEquals(3, orderedTopicService.unacknowledgedMessages().size()); } + @Test() + public void test_Qos2AndQos1PublishDoNotInterfereWithEachOther() throws Exception { + InternalConfigurations.MAX_INFLIGHT_WINDOW_SIZE_MESSAGES = 50; + channel = new EmbeddedChannel(new PublishFlowHandler(publishPollService, + new IncomingMessageFlowPersistenceImpl(new IncomingMessageFlowInMemoryLocalPersistence()), + orderedTopicService, + incomingPublishHandler, + mock(DropOutgoingPublishesHandler.class))); + final ClientConnection clientConnection = spy(new DummyClientConnection(channel, null)); + when(clientConnection.getFreePacketIdRanges()).thenReturn(freePacketIdRanges); + channel.attr(ClientConnectionContext.CHANNEL_ATTRIBUTE_NAME).set(clientConnection); + ClientConnection.of(channel).setClientId(CLIENT_ID); + + + final PUBLISH publishQoS1 = createPublish("topic", 100, QoS.AT_LEAST_ONCE); + final PUBLISH publishQoS2 = createPublish("topic", 100, QoS.EXACTLY_ONCE); + + final PUBLISH publishQoS2_1000 = createPublish("topic", 100, QoS.EXACTLY_ONCE); + + channel.pipeline().fireChannelRead(publishQoS2); + channel.pipeline().fireChannelRead(publishQoS1); + channel.pipeline().fireChannelRead(publishQoS2_1000); + + assertEquals(0, orderedTopicService.queue.size()); + assertEquals(0, orderedTopicService.unacknowledgedMessages().size()); + verify(incomingPublishHandler, times(1)).interceptOrDelegate(any(ChannelHandlerContext.class), + eq(publishQoS1), + eq(CLIENT_ID)); + verify(incomingPublishHandler, times(1)).interceptOrDelegate(any(ChannelHandlerContext.class), + eq(publishQoS2), + eq(CLIENT_ID)); + verify(incomingPublishHandler, times(0)).interceptOrDelegate(any(ChannelHandlerContext.class), + eq(publishQoS2_1000), + eq(CLIENT_ID)); + } + + private PUBLISH createPublish(final String topic, final int messageId, final QoS qoS) { return createPublish(topic, messageId, qoS, false); }