From c3996aa4f29a053d13aec4299ed08de300fde1f0 Mon Sep 17 00:00:00 2001 From: Remit Date: Wed, 26 Jul 2023 11:23:24 +0200 Subject: [PATCH 1/3] expired inflight PUBREL gets default values for publish timestamp and message expiry to avoid stuck message queue; added inflight PUBREL expiry config --- .../service/InternalConfigurations.java | 2 ++ .../hivemq/mqtt/message/pubrel/PUBREL.java | 20 +++++++++---------- .../ClientQueueXodusLocalPersistence.java | 12 +++++------ .../ClientQueueMemoryLocalPersistence.java | 12 +++++------ 4 files changed, 21 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/hivemq/configuration/service/InternalConfigurations.java b/src/main/java/com/hivemq/configuration/service/InternalConfigurations.java index ac078b20a..5ae88734d 100644 --- a/src/main/java/com/hivemq/configuration/service/InternalConfigurations.java +++ b/src/main/java/com/hivemq/configuration/service/InternalConfigurations.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static com.hivemq.configuration.entity.mqtt.MqttConfigurationDefaults.MAX_EXPIRY_INTERVAL_DEFAULT; import static com.hivemq.persistence.local.xodus.EnvironmentUtil.GCType; /** @@ -469,4 +470,5 @@ public class InternalConfigurations { public static boolean EXPIRE_INFLIGHT_MESSAGES_ENABLED = false; public static boolean EXPIRE_INFLIGHT_PUBRELS_ENABLED = false; + public static long MAXIMUM_INFLIGHT_PUBREL_EXPIRY = MAX_EXPIRY_INTERVAL_DEFAULT; } diff --git a/src/main/java/com/hivemq/mqtt/message/pubrel/PUBREL.java b/src/main/java/com/hivemq/mqtt/message/pubrel/PUBREL.java index 6e5fee215..5ccd7ecba 100644 --- a/src/main/java/com/hivemq/mqtt/message/pubrel/PUBREL.java +++ b/src/main/java/com/hivemq/mqtt/message/pubrel/PUBREL.java @@ -98,25 +98,23 @@ public void setPublishTimestamp(final @Nullable Long publishTimestamp) { this.publishTimestamp = publishTimestamp; } - public long getRemainingExpiry() { - if (isExpiryDisabled()) { - return PUBLISH.MESSAGE_EXPIRY_INTERVAL_NOT_SET; - } - final long waitingSeconds = (System.currentTimeMillis() - publishTimestamp) / 1000; - return Math.max(0, messageExpiryInterval - waitingSeconds); - } - public boolean isExpiryDisabled() { return (messageExpiryInterval == MqttConfigurationDefaults.TTL_DISABLED) || (messageExpiryInterval == PUBLISH.MESSAGE_EXPIRY_INTERVAL_NOT_SET); } - public boolean hasExpired() { + public boolean hasExpired(final long maximalPubRelExpiry) { if ((publishTimestamp == null) || (messageExpiryInterval == null)) { return false; } - - return getRemainingExpiry() == 0; + if (messageExpiryInterval == MqttConfigurationDefaults.TTL_DISABLED || + messageExpiryInterval == PUBLISH.MESSAGE_EXPIRY_INTERVAL_NOT_SET) { + return false; + } + final long waitingSeconds = (System.currentTimeMillis() - publishTimestamp) / 1000; + final long actualMessageExpiry = Math.min(messageExpiryInterval, maximalPubRelExpiry); + final long remainingTime = actualMessageExpiry - waitingSeconds; + return remainingTime < 1; } public static @NotNull PUBREL from(final @NotNull PubrelPacketImpl packet) { diff --git a/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueXodusLocalPersistence.java b/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueXodusLocalPersistence.java index 346f972bf..f1bdd13f3 100644 --- a/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueXodusLocalPersistence.java +++ b/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueXodusLocalPersistence.java @@ -795,6 +795,10 @@ private boolean setPayloadIfExistingElseDrop( return packetId != ClientQueuePersistenceSerializer.NO_PACKET_ID; }); if (!packetIdFound[0]) { + if (InternalConfigurations.EXPIRE_INFLIGHT_PUBRELS_ENABLED) { + pubrel.setMessageExpiryInterval(InternalConfigurations.MAXIMUM_INFLIGHT_PUBREL_EXPIRY); + pubrel.setPublishTimestamp(System.currentTimeMillis()); + } getOrPutQueueSize(key, bucketIndex).incrementAndGet(); final ByteIterable serializedPubRel = serializer.serializePubRel(pubrel, false); bucket.getStore().put(txn, serializer.serializeUnknownPubRelKey(key), serializedPubRel); @@ -1038,13 +1042,7 @@ private void cleanExpiredMessages(final @NotNull Key key, final int bucketIndex) final MessageWithID message = serializer.deserializeValue(serializedValue); if (message instanceof PUBREL) { final PUBREL pubrel = (PUBREL) message; - if (!InternalConfigurations.EXPIRE_INFLIGHT_PUBRELS_ENABLED) { - return true; - } - if (pubrel.getMessageExpiryInterval() == null || pubrel.getPublishTimestamp() == null) { - return true; - } - if (!pubrel.hasExpired()) { + if (!pubrel.hasExpired(InternalConfigurations.MAXIMUM_INFLIGHT_PUBREL_EXPIRY)) { return true; } getOrPutQueueSize(key, bucketIndex).decrementAndGet(); diff --git a/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java b/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java index 213b47fef..4c309bf61 100644 --- a/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java +++ b/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java @@ -445,6 +445,10 @@ private void addQos0Publish( if (packetIdFound) { messages.qos1Or2Messages.set(messageIndexInQueue, pubrelWithRetained); } else { + if (InternalConfigurations.EXPIRE_INFLIGHT_PUBRELS_ENABLED) { + pubrelWithRetained.setMessageExpiryInterval(InternalConfigurations.MAXIMUM_INFLIGHT_PUBREL_EXPIRY); + pubrelWithRetained.setPublishTimestamp(System.currentTimeMillis()); + } // Ensure unknown PUBRELs are always first in queue messages.qos1Or2Messages.addFirst(pubrelWithRetained); } @@ -752,13 +756,7 @@ private void cleanExpiredMessages(final @NotNull Messages messages) { final MessageWithID messageWithID = qos12iterator.next(); if (messageWithID instanceof PubrelWithRetained) { final PubrelWithRetained pubrel = (PubrelWithRetained) messageWithID; - if (!InternalConfigurations.EXPIRE_INFLIGHT_PUBRELS_ENABLED) { - continue; - } - if (pubrel.getMessageExpiryInterval() == null || pubrel.getPublishTimestamp() == null) { - continue; - } - if (!pubrel.hasExpired()) { + if (!pubrel.hasExpired(InternalConfigurations.MAXIMUM_INFLIGHT_PUBREL_EXPIRY)) { continue; } if (pubrel.retained) { From 7f11ecbe4cd3323a71b9ab6bcb7038b82479cc6a Mon Sep 17 00:00:00 2001 From: Remit Date: Wed, 26 Jul 2023 11:45:30 +0200 Subject: [PATCH 2/3] Fixed missing check for option value. --- .../clientqueue/ClientQueueXodusLocalPersistence.java | 3 +++ .../local/memory/ClientQueueMemoryLocalPersistence.java | 3 +++ 2 files changed, 6 insertions(+) diff --git a/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueXodusLocalPersistence.java b/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueXodusLocalPersistence.java index f1bdd13f3..e6f533fc7 100644 --- a/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueXodusLocalPersistence.java +++ b/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueXodusLocalPersistence.java @@ -1042,6 +1042,9 @@ private void cleanExpiredMessages(final @NotNull Key key, final int bucketIndex) final MessageWithID message = serializer.deserializeValue(serializedValue); if (message instanceof PUBREL) { final PUBREL pubrel = (PUBREL) message; + if (!InternalConfigurations.EXPIRE_INFLIGHT_PUBRELS_ENABLED) { + return true; + } if (!pubrel.hasExpired(InternalConfigurations.MAXIMUM_INFLIGHT_PUBREL_EXPIRY)) { return true; } diff --git a/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java b/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java index 4c309bf61..bad09235e 100644 --- a/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java +++ b/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java @@ -756,6 +756,9 @@ private void cleanExpiredMessages(final @NotNull Messages messages) { final MessageWithID messageWithID = qos12iterator.next(); if (messageWithID instanceof PubrelWithRetained) { final PubrelWithRetained pubrel = (PubrelWithRetained) messageWithID; + if (!InternalConfigurations.EXPIRE_INFLIGHT_PUBRELS_ENABLED) { + continue; + } if (!pubrel.hasExpired(InternalConfigurations.MAXIMUM_INFLIGHT_PUBREL_EXPIRY)) { continue; } From 32281ef60269f62a4573f34d272caa1d8171cee6 Mon Sep 17 00:00:00 2001 From: Remit Date: Thu, 27 Jul 2023 12:18:00 +0200 Subject: [PATCH 3/3] Fixed race condition in the IT. Swapped hardcoded division for time conversion for lib call. --- src/main/java/com/hivemq/mqtt/message/pubrel/PUBREL.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/hivemq/mqtt/message/pubrel/PUBREL.java b/src/main/java/com/hivemq/mqtt/message/pubrel/PUBREL.java index 5ccd7ecba..aae749294 100644 --- a/src/main/java/com/hivemq/mqtt/message/pubrel/PUBREL.java +++ b/src/main/java/com/hivemq/mqtt/message/pubrel/PUBREL.java @@ -27,6 +27,8 @@ import com.hivemq.mqtt.message.reason.Mqtt5PubRelReasonCode; import com.hivemq.util.ObjectMemoryEstimation; +import java.util.concurrent.TimeUnit; + /** * @since 1.4 */ @@ -111,7 +113,7 @@ public boolean hasExpired(final long maximalPubRelExpiry) { messageExpiryInterval == PUBLISH.MESSAGE_EXPIRY_INTERVAL_NOT_SET) { return false; } - final long waitingSeconds = (System.currentTimeMillis() - publishTimestamp) / 1000; + final long waitingSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - publishTimestamp); final long actualMessageExpiry = Math.min(messageExpiryInterval, maximalPubRelExpiry); final long remainingTime = actualMessageExpiry - waitingSeconds; return remainingTime < 1;