diff --git a/src/main/java/com/hivemq/configuration/service/InternalConfigurations.java b/src/main/java/com/hivemq/configuration/service/InternalConfigurations.java index ac078b20a..5ae88734d 100644 --- a/src/main/java/com/hivemq/configuration/service/InternalConfigurations.java +++ b/src/main/java/com/hivemq/configuration/service/InternalConfigurations.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static com.hivemq.configuration.entity.mqtt.MqttConfigurationDefaults.MAX_EXPIRY_INTERVAL_DEFAULT; import static com.hivemq.persistence.local.xodus.EnvironmentUtil.GCType; /** @@ -469,4 +470,5 @@ public class InternalConfigurations { public static boolean EXPIRE_INFLIGHT_MESSAGES_ENABLED = false; public static boolean EXPIRE_INFLIGHT_PUBRELS_ENABLED = false; + public static long MAXIMUM_INFLIGHT_PUBREL_EXPIRY = MAX_EXPIRY_INTERVAL_DEFAULT; } diff --git a/src/main/java/com/hivemq/mqtt/message/pubrel/PUBREL.java b/src/main/java/com/hivemq/mqtt/message/pubrel/PUBREL.java index 6e5fee215..aae749294 100644 --- a/src/main/java/com/hivemq/mqtt/message/pubrel/PUBREL.java +++ b/src/main/java/com/hivemq/mqtt/message/pubrel/PUBREL.java @@ -27,6 +27,8 @@ import com.hivemq.mqtt.message.reason.Mqtt5PubRelReasonCode; import com.hivemq.util.ObjectMemoryEstimation; +import java.util.concurrent.TimeUnit; + /** * @since 1.4 */ @@ -98,25 +100,23 @@ public void setPublishTimestamp(final @Nullable Long publishTimestamp) { this.publishTimestamp = publishTimestamp; } - public long getRemainingExpiry() { - if (isExpiryDisabled()) { - return PUBLISH.MESSAGE_EXPIRY_INTERVAL_NOT_SET; - } - final long waitingSeconds = (System.currentTimeMillis() - publishTimestamp) / 1000; - return Math.max(0, messageExpiryInterval - waitingSeconds); - } - public boolean isExpiryDisabled() { return (messageExpiryInterval == MqttConfigurationDefaults.TTL_DISABLED) || (messageExpiryInterval == PUBLISH.MESSAGE_EXPIRY_INTERVAL_NOT_SET); } - public boolean hasExpired() { + public boolean hasExpired(final long maximalPubRelExpiry) { if ((publishTimestamp == null) || (messageExpiryInterval == null)) { return false; } - - return getRemainingExpiry() == 0; + if (messageExpiryInterval == MqttConfigurationDefaults.TTL_DISABLED || + messageExpiryInterval == PUBLISH.MESSAGE_EXPIRY_INTERVAL_NOT_SET) { + return false; + } + final long waitingSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - publishTimestamp); + final long actualMessageExpiry = Math.min(messageExpiryInterval, maximalPubRelExpiry); + final long remainingTime = actualMessageExpiry - waitingSeconds; + return remainingTime < 1; } public static @NotNull PUBREL from(final @NotNull PubrelPacketImpl packet) { diff --git a/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueXodusLocalPersistence.java b/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueXodusLocalPersistence.java index 346f972bf..e6f533fc7 100644 --- a/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueXodusLocalPersistence.java +++ b/src/main/java/com/hivemq/persistence/clientqueue/ClientQueueXodusLocalPersistence.java @@ -795,6 +795,10 @@ private boolean setPayloadIfExistingElseDrop( return packetId != ClientQueuePersistenceSerializer.NO_PACKET_ID; }); if (!packetIdFound[0]) { + if (InternalConfigurations.EXPIRE_INFLIGHT_PUBRELS_ENABLED) { + pubrel.setMessageExpiryInterval(InternalConfigurations.MAXIMUM_INFLIGHT_PUBREL_EXPIRY); + pubrel.setPublishTimestamp(System.currentTimeMillis()); + } getOrPutQueueSize(key, bucketIndex).incrementAndGet(); final ByteIterable serializedPubRel = serializer.serializePubRel(pubrel, false); bucket.getStore().put(txn, serializer.serializeUnknownPubRelKey(key), serializedPubRel); @@ -1041,10 +1045,7 @@ private void cleanExpiredMessages(final @NotNull Key key, final int bucketIndex) if (!InternalConfigurations.EXPIRE_INFLIGHT_PUBRELS_ENABLED) { return true; } - if (pubrel.getMessageExpiryInterval() == null || pubrel.getPublishTimestamp() == null) { - return true; - } - if (!pubrel.hasExpired()) { + if (!pubrel.hasExpired(InternalConfigurations.MAXIMUM_INFLIGHT_PUBREL_EXPIRY)) { return true; } getOrPutQueueSize(key, bucketIndex).decrementAndGet(); diff --git a/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java b/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java index 213b47fef..bad09235e 100644 --- a/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java +++ b/src/main/java/com/hivemq/persistence/local/memory/ClientQueueMemoryLocalPersistence.java @@ -445,6 +445,10 @@ private void addQos0Publish( if (packetIdFound) { messages.qos1Or2Messages.set(messageIndexInQueue, pubrelWithRetained); } else { + if (InternalConfigurations.EXPIRE_INFLIGHT_PUBRELS_ENABLED) { + pubrelWithRetained.setMessageExpiryInterval(InternalConfigurations.MAXIMUM_INFLIGHT_PUBREL_EXPIRY); + pubrelWithRetained.setPublishTimestamp(System.currentTimeMillis()); + } // Ensure unknown PUBRELs are always first in queue messages.qos1Or2Messages.addFirst(pubrelWithRetained); } @@ -755,10 +759,7 @@ private void cleanExpiredMessages(final @NotNull Messages messages) { if (!InternalConfigurations.EXPIRE_INFLIGHT_PUBRELS_ENABLED) { continue; } - if (pubrel.getMessageExpiryInterval() == null || pubrel.getPublishTimestamp() == null) { - continue; - } - if (!pubrel.hasExpired()) { + if (!pubrel.hasExpired(InternalConfigurations.MAXIMUM_INFLIGHT_PUBREL_EXPIRY)) { continue; } if (pubrel.retained) {