Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ranged IDs implementation for packet IDs #426

Merged
merged 6 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/main/java/com/hivemq/bootstrap/ClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
import com.hivemq.mqtt.message.ProtocolVersion;
import com.hivemq.mqtt.message.connect.CONNECT;
import com.hivemq.mqtt.message.mqtt5.Mqtt5UserProperties;
import com.hivemq.mqtt.message.pool.MessageIDPool;
import com.hivemq.mqtt.message.pool.SequentialMessageIDPoolImpl;
import com.hivemq.mqtt.message.pool.FreeIdRanges;
import com.hivemq.security.auth.SslClientCertificate;
import io.netty.channel.Channel;

Expand All @@ -50,9 +49,11 @@

public class ClientConnection implements ClientConnectionContext {

public static final int MAX_MESSAGE_ID = 65_535;

private final @NotNull Channel channel;
private final @NotNull PublishFlushHandler publishFlushHandler;
private final @NotNull MessageIDPool messageIDPool = new SequentialMessageIDPoolImpl();
private final @NotNull FreeIdRanges messageIDPool = new FreeIdRanges(1, MAX_MESSAGE_ID);
private final @NotNull Listener connectedListener;
private volatile @NotNull ClientState clientState;

Expand Down Expand Up @@ -375,7 +376,7 @@ public void setQueueSizeMaximum(final @Nullable Long queueSizeMaximum) {
this.queueSizeMaximum = queueSizeMaximum;
}

public @NotNull MessageIDPool getMessageIDPool() {
public @NotNull FreeIdRanges getMessageIDPool() {
return messageIDPool;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.hivemq.bootstrap.ClientConnection;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.mqtt.handler.publish.PublishStatus;
import com.hivemq.mqtt.message.pool.MessageIDPool;
import com.hivemq.mqtt.message.pool.FreeIdRanges;
import com.hivemq.mqtt.message.publish.PUBLISH;
import com.hivemq.mqtt.services.PublishPollService;
import com.hivemq.persistence.util.FutureUtils;
Expand All @@ -38,7 +38,7 @@ public class PublishStatusFutureCallback implements FutureCallback<PublishStatus
private final boolean sharedSubscription;
private final @NotNull String queueId;
private final @NotNull PUBLISH publish;
private final @NotNull MessageIDPool messageIDPool;
private final @NotNull FreeIdRanges messageIDPool;
private final int packetIdentifier;
private final @NotNull Channel channel;
private final @NotNull String client;
Expand All @@ -48,7 +48,7 @@ public PublishStatusFutureCallback(
final boolean sharedSubscription,
final @NotNull String queueId,
final @NotNull PUBLISH publish,
final @NotNull MessageIDPool messageIDPool,
final @NotNull FreeIdRanges messageIDPool,
final @NotNull Channel channel,
final @NotNull String client) {
this.publishPollService = publishPollService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.hivemq.mqtt.event.PubrelDroppedEvent;
import com.hivemq.mqtt.message.MessageWithID;
import com.hivemq.mqtt.message.QoS;
import com.hivemq.mqtt.message.pool.MessageIDPool;
import com.hivemq.mqtt.message.pool.FreeIdRanges;
import com.hivemq.mqtt.message.puback.PUBACK;
import com.hivemq.mqtt.message.pubcomp.PUBCOMP;
import com.hivemq.mqtt.message.publish.PUBLISH;
Expand Down Expand Up @@ -316,7 +316,7 @@ private void returnMessageId(

//Such a message ID must never be zero, but better be safe than sorry
if (messageId > 0) {
final MessageIDPool messageIDPool = ClientConnection.of(channel).getMessageIDPool();
final FreeIdRanges messageIDPool = ClientConnection.of(channel).getMessageIDPool();
messageIDPool.returnId(messageId);
if (log.isTraceEnabled()) {
log.trace("Returning Message ID {} for client {} because of a {} message was received",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.hivemq.mqtt.handler.publish.PublishStatus;
import com.hivemq.mqtt.handler.publish.PublishWriteFailedListener;
import com.hivemq.mqtt.message.QoS;
import com.hivemq.mqtt.message.pool.MessageIDPool;
import com.hivemq.mqtt.message.pool.FreeIdRanges;
import com.hivemq.mqtt.message.publish.PUBLISH;
import com.hivemq.mqtt.message.publish.PUBLISHFactory;
import com.hivemq.mqtt.message.publish.PublishWithFuture;
Expand Down Expand Up @@ -243,7 +243,7 @@ public void onSuccess(final @Nullable PublishStatus status) {
}

if (qos0Publish.getPacketIdentifier() != 0) {
final MessageIDPool messageIDPool = ClientConnection.of(channel).getMessageIDPool();
final FreeIdRanges messageIDPool = ClientConnection.of(channel).getMessageIDPool();
messageIDPool.returnId(qos0Publish.getPacketIdentifier());
}
}
Expand Down
198 changes: 198 additions & 0 deletions src/main/java/com/hivemq/mqtt/message/pool/FreeIdRanges.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright 2019-present HiveMQ GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hivemq.mqtt.message.pool;

import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.mqtt.message.pool.exception.NoMessageIdAvailableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The purpose of this class is to reduce packet IDs allocation time for each independent client
* and to reduce the memory footprint of keeping track of allocated IDs.
* <p>
* This is achieved by keeping a list of {@link Range} objects each of which represents a contiguous interval of
* integer ids that are NOT currently assigned to any object using this instance of {@link FreeIdRanges}.
* The lower end of the {@link Range} instance is included whereas the upper end is excluded from the interval.
* That is, the {@link Range} instance with start 42 and end 42 contains only one ID - 42.
* The {@link Range} instance with start 10 and end 12 contains only two IDs: 10 and 11.
* <p>
* Initially, there is only one contiguous {@link Range} of IDs. The IDs are assigned starting from the lowest one in
* that interval.
* Upon assignment, the interval's lower end is incremented (interval size reduces from below).
* When the ID is returned, it either joins one of the existing {@link Range} intervals in the list (if it is adjacent)
* or it forms a new {@link Range} that is added to the list.
* <p>
* This class is NOT thread-safe.
* <a
* href="https://github.com/hivemq/hivemq-mqtt-client/blob/master/src/main/java/com/hivemq/client/internal/util/Ranges.java">The
* original implementation in the HiveMQ Java Client.</a>
*/
public class FreeIdRanges {

private static final @NotNull Logger log = LoggerFactory.getLogger(FreeIdRanges.class);

final int minAllowedId;
final int maxAllowedId;
Remit marked this conversation as resolved.
Show resolved Hide resolved

private @NotNull Range rootRange;

public FreeIdRanges(final int minId, final int maxId) {
minAllowedId = minId;
maxAllowedId = maxId;
rootRange = new Range(minId, maxId + 1);
}

/**
* Provides a new ID that is not currently allocated.
*
* @return a new ID if available in any of the ranges or {@link NoMessageIdAvailableException} if ran out of IDs.
*/
public int takeNextId() throws NoMessageIdAvailableException {
if (rootRange.start == rootRange.end) {
throw new NoMessageIdAvailableException();
}

final int id = rootRange.start;
rootRange.start++;
if ((rootRange.start == rootRange.end) && (rootRange.next != null)) {
rootRange = rootRange.next;
}
return id;
}

/**
* Provides the requested ID if it is available or some other ID otherwise.
*
* @param id an ID that the caller attempts to take.
* @return the requested {@param id} if it is available in one of the {@link Range} intervals or otherwise some
* other free ID.
*/
public int takeIfAvailable(final int id) throws NoMessageIdAvailableException {
if (id < minAllowedId || id > maxAllowedId) {
log.warn("Attempting to take an ID {} that is outside the valid range [{}, {}], will try taking another ID.",
id,
minAllowedId,
maxAllowedId);
}
Remit marked this conversation as resolved.
Show resolved Hide resolved

Range current = rootRange;
Range prev = null;

while (current != null) {
if (id >= current.start && id < current.end) {

final int prevCurStart = current.start;
current.start = id + 1;
final Range lowerRange = prevCurStart == id ? null : new Range(prevCurStart, id, current);

if (lowerRange != null) {
if (prev != null) {
prev.next = lowerRange;
} else {
rootRange = lowerRange;
}
}

return id;
}

prev = current;
current = current.next;
}

return takeNextId();
Remit marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Returns the {@param id} into one of the ranges of free IDs.
*
* @param id an ID that the caller attempts to return (to free).
*/
public void returnId(final int id) {
if (id < minAllowedId || id > maxAllowedId) {
log.warn("The returned ID {} is outside the valid range [{}, {}], ignoring.",
id,
minAllowedId,
maxAllowedId);
return;
}
Remit marked this conversation as resolved.
Show resolved Hide resolved

Range current = rootRange;
if (id < current.start - 1) { // at least one element is between the returned and the next range
rootRange = new Range(id, id + 1, current);
return;
}
Range prev = current;
current = returnId(current, id);
while (current != null) {
if (id < current.start - 1) {
prev.next = new Range(id, id + 1, current);
return;
}
prev = current;
current = returnId(current, id);
}
}

private @Nullable Range returnId(final @NotNull Range range, final int id) throws IllegalStateException {
if (id == range.start - 1) { // if the returned element is directly adjacent to the range (from below)
range.start = id;
return null;
}

if (id < range.end) { // the returned element is within the range, i.e. it has been freed already
return null;
}

final Range next = range.next;
if (id == range.end) {
if (next == null) {
throw new IllegalStateException("The id is greater than maxId. This must not happen and is a bug.");
}
Remit marked this conversation as resolved.
Show resolved Hide resolved
range.end++;
if (range.end == next.start) {
range.end = next.end;
range.next = next.next;
}
return null;
}
if (next == null) {
throw new IllegalStateException("The id is greater than maxId. This must not happen and is a bug.");
}
Remit marked this conversation as resolved.
Show resolved Hide resolved
return next;
}

private static class Range {

int start;
int end;
@Nullable Range next;

Range(final int start, final int end) {
this.start = start;
this.end = end;
}

Range(final int start, final int end, final @NotNull Range next) {
this.start = start;
this.end = end;
this.next = next;
}
}
}
77 changes: 0 additions & 77 deletions src/main/java/com/hivemq/mqtt/message/pool/MessageIDPool.java

This file was deleted.

Loading
Loading