Skip to content

Commit

Permalink
expired inflight PUBREL gets default values for publish timestamp and…
Browse files Browse the repository at this point in the history
… message expiry to avoid stuck message queue; added inflight PUBREL expiry config
  • Loading branch information
Remit committed Jul 26, 2023
1 parent 0db4659 commit c3996aa
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static com.hivemq.configuration.entity.mqtt.MqttConfigurationDefaults.MAX_EXPIRY_INTERVAL_DEFAULT;
import static com.hivemq.persistence.local.xodus.EnvironmentUtil.GCType;

/**
Expand Down Expand Up @@ -469,4 +470,5 @@ public class InternalConfigurations {
public static boolean EXPIRE_INFLIGHT_MESSAGES_ENABLED = false;
public static boolean EXPIRE_INFLIGHT_PUBRELS_ENABLED = false;

public static long MAXIMUM_INFLIGHT_PUBREL_EXPIRY = MAX_EXPIRY_INTERVAL_DEFAULT;
}
20 changes: 9 additions & 11 deletions src/main/java/com/hivemq/mqtt/message/pubrel/PUBREL.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,25 +98,23 @@ public void setPublishTimestamp(final @Nullable Long publishTimestamp) {
this.publishTimestamp = publishTimestamp;
}

public long getRemainingExpiry() {
if (isExpiryDisabled()) {
return PUBLISH.MESSAGE_EXPIRY_INTERVAL_NOT_SET;
}
final long waitingSeconds = (System.currentTimeMillis() - publishTimestamp) / 1000;
return Math.max(0, messageExpiryInterval - waitingSeconds);
}

public boolean isExpiryDisabled() {
return (messageExpiryInterval == MqttConfigurationDefaults.TTL_DISABLED) ||
(messageExpiryInterval == PUBLISH.MESSAGE_EXPIRY_INTERVAL_NOT_SET);
}

public boolean hasExpired() {
public boolean hasExpired(final long maximalPubRelExpiry) {
if ((publishTimestamp == null) || (messageExpiryInterval == null)) {
return false;
}

return getRemainingExpiry() == 0;
if (messageExpiryInterval == MqttConfigurationDefaults.TTL_DISABLED ||
messageExpiryInterval == PUBLISH.MESSAGE_EXPIRY_INTERVAL_NOT_SET) {
return false;
}
final long waitingSeconds = (System.currentTimeMillis() - publishTimestamp) / 1000;
final long actualMessageExpiry = Math.min(messageExpiryInterval, maximalPubRelExpiry);
final long remainingTime = actualMessageExpiry - waitingSeconds;
return remainingTime < 1;
}

public static @NotNull PUBREL from(final @NotNull PubrelPacketImpl packet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,10 @@ private boolean setPayloadIfExistingElseDrop(
return packetId != ClientQueuePersistenceSerializer.NO_PACKET_ID;
});
if (!packetIdFound[0]) {
if (InternalConfigurations.EXPIRE_INFLIGHT_PUBRELS_ENABLED) {
pubrel.setMessageExpiryInterval(InternalConfigurations.MAXIMUM_INFLIGHT_PUBREL_EXPIRY);
pubrel.setPublishTimestamp(System.currentTimeMillis());
}
getOrPutQueueSize(key, bucketIndex).incrementAndGet();
final ByteIterable serializedPubRel = serializer.serializePubRel(pubrel, false);
bucket.getStore().put(txn, serializer.serializeUnknownPubRelKey(key), serializedPubRel);
Expand Down Expand Up @@ -1038,13 +1042,7 @@ private void cleanExpiredMessages(final @NotNull Key key, final int bucketIndex)
final MessageWithID message = serializer.deserializeValue(serializedValue);
if (message instanceof PUBREL) {
final PUBREL pubrel = (PUBREL) message;
if (!InternalConfigurations.EXPIRE_INFLIGHT_PUBRELS_ENABLED) {
return true;
}
if (pubrel.getMessageExpiryInterval() == null || pubrel.getPublishTimestamp() == null) {
return true;
}
if (!pubrel.hasExpired()) {
if (!pubrel.hasExpired(InternalConfigurations.MAXIMUM_INFLIGHT_PUBREL_EXPIRY)) {
return true;
}
getOrPutQueueSize(key, bucketIndex).decrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,10 @@ private void addQos0Publish(
if (packetIdFound) {
messages.qos1Or2Messages.set(messageIndexInQueue, pubrelWithRetained);
} else {
if (InternalConfigurations.EXPIRE_INFLIGHT_PUBRELS_ENABLED) {
pubrelWithRetained.setMessageExpiryInterval(InternalConfigurations.MAXIMUM_INFLIGHT_PUBREL_EXPIRY);
pubrelWithRetained.setPublishTimestamp(System.currentTimeMillis());
}
// Ensure unknown PUBRELs are always first in queue
messages.qos1Or2Messages.addFirst(pubrelWithRetained);
}
Expand Down Expand Up @@ -752,13 +756,7 @@ private void cleanExpiredMessages(final @NotNull Messages messages) {
final MessageWithID messageWithID = qos12iterator.next();
if (messageWithID instanceof PubrelWithRetained) {
final PubrelWithRetained pubrel = (PubrelWithRetained) messageWithID;
if (!InternalConfigurations.EXPIRE_INFLIGHT_PUBRELS_ENABLED) {
continue;
}
if (pubrel.getMessageExpiryInterval() == null || pubrel.getPublishTimestamp() == null) {
continue;
}
if (!pubrel.hasExpired()) {
if (!pubrel.hasExpired(InternalConfigurations.MAXIMUM_INFLIGHT_PUBREL_EXPIRY)) {
continue;
}
if (pubrel.retained) {
Expand Down

0 comments on commit c3996aa

Please sign in to comment.