Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix for expired inflight PUBREL not getting removed from the queue #417

Merged
merged 3 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static com.hivemq.configuration.entity.mqtt.MqttConfigurationDefaults.MAX_EXPIRY_INTERVAL_DEFAULT;
import static com.hivemq.persistence.local.xodus.EnvironmentUtil.GCType;

/**
Expand Down Expand Up @@ -469,4 +470,5 @@ public class InternalConfigurations {
public static boolean EXPIRE_INFLIGHT_MESSAGES_ENABLED = false;
public static boolean EXPIRE_INFLIGHT_PUBRELS_ENABLED = false;
DC2-DanielKrueger marked this conversation as resolved.
Show resolved Hide resolved

public static long MAXIMUM_INFLIGHT_PUBREL_EXPIRY = MAX_EXPIRY_INTERVAL_DEFAULT;
}
22 changes: 11 additions & 11 deletions src/main/java/com/hivemq/mqtt/message/pubrel/PUBREL.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.hivemq.mqtt.message.reason.Mqtt5PubRelReasonCode;
import com.hivemq.util.ObjectMemoryEstimation;

import java.util.concurrent.TimeUnit;

/**
* @since 1.4
*/
Expand Down Expand Up @@ -98,25 +100,23 @@ public void setPublishTimestamp(final @Nullable Long publishTimestamp) {
this.publishTimestamp = publishTimestamp;
}

public long getRemainingExpiry() {
if (isExpiryDisabled()) {
return PUBLISH.MESSAGE_EXPIRY_INTERVAL_NOT_SET;
}
final long waitingSeconds = (System.currentTimeMillis() - publishTimestamp) / 1000;
return Math.max(0, messageExpiryInterval - waitingSeconds);
}

public boolean isExpiryDisabled() {
return (messageExpiryInterval == MqttConfigurationDefaults.TTL_DISABLED) ||
(messageExpiryInterval == PUBLISH.MESSAGE_EXPIRY_INTERVAL_NOT_SET);
}

public boolean hasExpired() {
public boolean hasExpired(final long maximalPubRelExpiry) {
if ((publishTimestamp == null) || (messageExpiryInterval == null)) {
return false;
}

return getRemainingExpiry() == 0;
if (messageExpiryInterval == MqttConfigurationDefaults.TTL_DISABLED ||
messageExpiryInterval == PUBLISH.MESSAGE_EXPIRY_INTERVAL_NOT_SET) {
return false;
}
final long waitingSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - publishTimestamp);
final long actualMessageExpiry = Math.min(messageExpiryInterval, maximalPubRelExpiry);
final long remainingTime = actualMessageExpiry - waitingSeconds;
return remainingTime < 1;
}

public static @NotNull PUBREL from(final @NotNull PubrelPacketImpl packet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,10 @@ private boolean setPayloadIfExistingElseDrop(
return packetId != ClientQueuePersistenceSerializer.NO_PACKET_ID;
});
if (!packetIdFound[0]) {
if (InternalConfigurations.EXPIRE_INFLIGHT_PUBRELS_ENABLED) {
pubrel.setMessageExpiryInterval(InternalConfigurations.MAXIMUM_INFLIGHT_PUBREL_EXPIRY);
pubrel.setPublishTimestamp(System.currentTimeMillis());
}
getOrPutQueueSize(key, bucketIndex).incrementAndGet();
final ByteIterable serializedPubRel = serializer.serializePubRel(pubrel, false);
bucket.getStore().put(txn, serializer.serializeUnknownPubRelKey(key), serializedPubRel);
Expand Down Expand Up @@ -1041,10 +1045,7 @@ private void cleanExpiredMessages(final @NotNull Key key, final int bucketIndex)
if (!InternalConfigurations.EXPIRE_INFLIGHT_PUBRELS_ENABLED) {
return true;
}
if (pubrel.getMessageExpiryInterval() == null || pubrel.getPublishTimestamp() == null) {
return true;
}
if (!pubrel.hasExpired()) {
if (!pubrel.hasExpired(InternalConfigurations.MAXIMUM_INFLIGHT_PUBREL_EXPIRY)) {
return true;
}
getOrPutQueueSize(key, bucketIndex).decrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,10 @@ private void addQos0Publish(
if (packetIdFound) {
messages.qos1Or2Messages.set(messageIndexInQueue, pubrelWithRetained);
} else {
if (InternalConfigurations.EXPIRE_INFLIGHT_PUBRELS_ENABLED) {
pubrelWithRetained.setMessageExpiryInterval(InternalConfigurations.MAXIMUM_INFLIGHT_PUBREL_EXPIRY);
pubrelWithRetained.setPublishTimestamp(System.currentTimeMillis());
}
// Ensure unknown PUBRELs are always first in queue
messages.qos1Or2Messages.addFirst(pubrelWithRetained);
}
Expand Down Expand Up @@ -755,10 +759,7 @@ private void cleanExpiredMessages(final @NotNull Messages messages) {
if (!InternalConfigurations.EXPIRE_INFLIGHT_PUBRELS_ENABLED) {
continue;
}
if (pubrel.getMessageExpiryInterval() == null || pubrel.getPublishTimestamp() == null) {
continue;
}
if (!pubrel.hasExpired()) {
if (!pubrel.hasExpired(InternalConfigurations.MAXIMUM_INFLIGHT_PUBREL_EXPIRY)) {
continue;
}
if (pubrel.retained) {
Expand Down
Loading