Skip to content

Commit

Permalink
* MDF [SDK/MQTT+QUIC] switch uint64_t hash API
Browse files Browse the repository at this point in the history
Signed-off-by: jaylin <[email protected]>
  • Loading branch information
JaylinYu committed Jan 2, 2024
1 parent 14877e8 commit f5d5d1b
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 58 deletions.
75 changes: 37 additions & 38 deletions src/mqtt/protocol/mqtt/mqtt_quic.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ static void mqtt_quic_ctx_fini(void *arg);
static void mqtt_quic_ctx_recv(void *arg, nni_aio *aio);
static void mqtt_quic_ctx_send(void *arg, nni_aio *aio);

static int mqtt_sub_stream(mqtt_pipe_t *p, nni_msg *msg, uint16_t packet_id, nni_aio *aio);
static int mqtt_sub_stream(mqtt_pipe_t *p, nni_msg *msg, uint64_t packet_id, nni_aio *aio);

#if defined(NNG_SUPP_SQLITE)
static void *mqtt_quic_sock_get_sqlite_option(mqtt_sock_t *s);
Expand Down Expand Up @@ -109,17 +109,17 @@ static conf_quic config_default = {
.qcongestion_control = 0, // cubic
};

static uint32_t
DJBHashn(char *str, uint16_t len)
static uint64_t
DJBHashn(uint8_t* str, uint32_t len)

Check warning on line 113 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L113

Added line #L113 was not covered by tests
{
unsigned int hash = 5381;
uint16_t i = 0;
while (i < len) {
hash = ((hash << 5) + hash) + (*str++); /* times 33 */
i++;
}
hash &= ~(1U << 31); /* strip the highest bit */
return hash;
uint64_t hash = 5381;
uint64_t i = 0;

Check warning on line 116 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L115-L116

Added lines #L115 - L116 were not covered by tests

for(i = 0; i < len; str++, i++) {
hash = ((hash << 5) + hash) + (*str);

Check warning on line 119 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L118-L119

Added lines #L118 - L119 were not covered by tests
}

return hash;

Check warning on line 122 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L122

Added line #L122 was not covered by tests
}

static int nng_mqtt_quic_set_config(nng_socket *sock, void *node);
Expand Down Expand Up @@ -195,7 +195,7 @@ struct mqtt_pipe_s {
nni_lmq send_inflight; // only used in multi-stream mode
nni_lmq recv_messages; // recv messages queue
uint32_t stream_id; // only for multi-stream
uint16_t rid; // index of resending packet id
uint64_t rid; // index of resending packet id
uint8_t reason_code; // MQTTV5 reason code
};

Expand All @@ -210,7 +210,7 @@ nng_mqtt_quic_open_topic_stream(mqtt_sock_t *mqtt_sock, const char *topic, uint3
{
mqtt_pipe_t *p = mqtt_sock->pipe;
mqtt_pipe_t *new_pipe = NULL;
uint32_t hash;
uint64_t hash;

// create a pipe/stream here
if ((new_pipe = nng_alloc(sizeof(mqtt_pipe_t))) == NULL) {
Expand All @@ -221,7 +221,7 @@ nng_mqtt_quic_open_topic_stream(mqtt_sock_t *mqtt_sock, const char *topic, uint3
log_warn("Failed in open the topic-stream pair.");
return NULL;
}
hash = DJBHashn((char *) topic, len);
hash = DJBHashn((unsigned char *) topic, len);

Check warning on line 224 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L224

Added line #L224 was not covered by tests
nni_id_set(mqtt_sock->streams, hash, new_pipe);
new_pipe->stream_id = hash;
log_debug("create new pipe %p for topic %.*s", new_pipe, len, topic);
Expand All @@ -247,9 +247,10 @@ nng_mqtt_quic_open_topic_stream(mqtt_sock_t *mqtt_sock, const char *topic, uint3
* mapping sub topics (if >1) with the new stream.
*/
static int
mqtt_sub_stream(mqtt_pipe_t *p, nni_msg *msg, uint16_t packet_id, nni_aio *aio)
mqtt_sub_stream(mqtt_pipe_t *p, nni_msg *msg, uint64_t packet_id, nni_aio *aio)

Check warning on line 250 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L250

Added line #L250 was not covered by tests
{
uint32_t count, hash;
uint32_t count;
uint64_t hash;
nni_msg *tmsg;
mqtt_sock_t *sock = p->mqtt_sock;
mqtt_pipe_t *new_pipe = NULL;
Expand All @@ -260,7 +261,7 @@ mqtt_sub_stream(mqtt_pipe_t *p, nni_msg *msg, uint16_t packet_id, nni_aio *aio)
// there is only one topic in Sub msg if multi-stream is enabled
for (uint32_t i = 0; i < count; i++) {
hash = DJBHashn(
(char *) topics[i].topic.buf, topics[i].topic.length);
(unsigned char *) topics[i].topic.buf, topics[i].topic.length);

Check warning on line 264 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L264

Added line #L264 was not covered by tests
if ((new_pipe = nni_id_get(sock->streams, hash)) == NULL) {
// create pipe here & set stream id
log_debug("topic %s qos %d", topics[i].topic.buf, topics[i].qos);
Expand Down Expand Up @@ -338,9 +339,10 @@ mqtt_send_msg(nni_aio *aio, nni_msg *msg, mqtt_sock_t *s)
{
mqtt_pipe_t *p = s->pipe;
nni_msg *tmsg;
uint16_t ptype, packet_id;
uint32_t topic_len = 0;
uint8_t qos = 0;
uint16_t ptype;
uint32_t topic_len = 0;

Check warning on line 344 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L344

Added line #L344 was not covered by tests
uint64_t packet_id;

ptype = nni_mqtt_msg_get_packet_type(msg);
switch (ptype) {
Expand All @@ -366,10 +368,9 @@ mqtt_send_msg(nni_aio *aio, nni_msg *msg, mqtt_sock_t *s)
// check if topic-stream pair exist
mqtt_pipe_t *pub_pipe;

char *topic = (char *) nni_mqtt_msg_get_publish_topic(
msg, &topic_len);
pub_pipe =
nni_id_get(s->streams, DJBHashn(topic, topic_len));
char *topic =(char *) nni_mqtt_msg_get_publish_topic(

Check warning on line 371 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L371

Added line #L371 was not covered by tests
msg, &topic_len);
pub_pipe = nni_id_get(s->streams, DJBHashn((unsigned char *)topic, topic_len));

Check warning on line 373 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L373

Added line #L373 was not covered by tests
if (pub_pipe == NULL) {
pub_pipe = nng_mqtt_quic_open_topic_stream(
s, topic, topic_len);
Expand Down Expand Up @@ -481,7 +482,7 @@ mqtt_pipe_send_msg(nni_aio *aio, nni_msg *msg, mqtt_pipe_t *p, uint16_t packet_i
nni_id_remove(&p->sent_unack, packet_id);
}
nni_msg_clone(msg);
if (0 != nni_id_set(&p->sent_unack, packet_id, msg)) {
if (0 != nni_id_set(&p->sent_unack, (uint64_t)packet_id, msg)) {

Check warning on line 485 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L485

Added line #L485 was not covered by tests
nni_println("Warning! Cache QoS msg failed");
nni_msg_free(msg);
nni_aio_finish_error(aio, UNSPECIFIED_ERROR);
Expand Down Expand Up @@ -565,7 +566,6 @@ mqtt_quic_send_cb(void *arg)
mqtt_sock_t *s = p->mqtt_sock;
nni_msg *msg = NULL;
nni_aio *aio;
int rv;

if (nni_aio_result(&p->send_aio) != 0) {
// We failed to send... clean up and deal with it.
Expand Down Expand Up @@ -672,7 +672,7 @@ mqtt_quic_data_strm_recv_cb(void *arg)

packet_type_t packet_type = nni_mqtt_msg_get_packet_type(msg);

int32_t packet_id;
uint64_t packet_id;
uint8_t qos;
nni_msg *ack;

Expand All @@ -692,7 +692,7 @@ mqtt_quic_data_strm_recv_cb(void *arg)
// FALLTHROUGH
case NNG_MQTT_PUBCOMP:
// we have received a PUBCOMP, successful delivery of a QoS 2
packet_id = nni_mqtt_msg_get_packet_id(msg);
packet_id = (uint64_t)nni_mqtt_msg_get_packet_id(msg);

Check warning on line 695 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L695

Added line #L695 was not covered by tests
p->rid = packet_id;
cached_msg = nni_id_get(&p->sent_unack, packet_id);
if (cached_msg != NULL) {
Expand All @@ -718,7 +718,7 @@ mqtt_quic_data_strm_recv_cb(void *arg)
// FALLTHROUGH
case NNG_MQTT_UNSUBACK:
// we have received a UNSUBACK, successful unsubscription
packet_id = nni_mqtt_msg_get_packet_id(msg);
packet_id = (uint64_t)nni_mqtt_msg_get_packet_id(msg);

Check warning on line 721 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L721

Added line #L721 was not covered by tests
p->rid = packet_id;
cached_msg = nni_id_get(&p->sent_unack, packet_id);
if (cached_msg != NULL) {
Expand All @@ -735,7 +735,7 @@ mqtt_quic_data_strm_recv_cb(void *arg)
nni_msg_free(msg);
break;
case NNG_MQTT_PUBREL:
packet_id = nni_mqtt_msg_get_pubrel_packet_id(msg);
packet_id = (uint64_t)nni_mqtt_msg_get_pubrel_packet_id(msg);

Check warning on line 738 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L738

Added line #L738 was not covered by tests
cached_msg = nni_id_get(&p->recv_unack, packet_id);
nni_msg_free(msg);
if (cached_msg == NULL) {
Expand Down Expand Up @@ -902,7 +902,7 @@ mqtt_quic_recv_cb(void *arg)

packet_type_t packet_type = nni_mqtt_msg_get_packet_type(msg);

int32_t packet_id;
uint64_t packet_id;
uint8_t qos;
nni_msg *ack;

Expand Down Expand Up @@ -938,7 +938,7 @@ mqtt_quic_recv_cb(void *arg)
// FALLTHROUGH
case NNG_MQTT_PUBCOMP:
// we have received a PUBCOMP, successful delivery of a QoS 2
packet_id = nni_mqtt_msg_get_packet_id(msg);
packet_id = (uint64_t)nni_mqtt_msg_get_packet_id(msg);

Check warning on line 941 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L941

Added line #L941 was not covered by tests
p->rid = packet_id;
cached_msg = nni_id_get(&p->sent_unack, packet_id);
if (cached_msg != NULL) {
Expand All @@ -964,7 +964,7 @@ mqtt_quic_recv_cb(void *arg)
// FALLTHROUGH
case NNG_MQTT_UNSUBACK:
// we have received a UNSUBACK, successful unsubscription
packet_id = nni_mqtt_msg_get_packet_id(msg);
packet_id = (uint64_t)nni_mqtt_msg_get_packet_id(msg);

Check warning on line 967 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L967

Added line #L967 was not covered by tests
p->rid = packet_id;
cached_msg = nni_id_get(&p->sent_unack, packet_id);
if (cached_msg != NULL) {
Expand All @@ -978,7 +978,7 @@ mqtt_quic_recv_cb(void *arg)
nni_msg_free(msg);
break;
case NNG_MQTT_PUBREL:
packet_id = nni_mqtt_msg_get_pubrel_packet_id(msg);
packet_id = (uint64_t)nni_mqtt_msg_get_pubrel_packet_id(msg);

Check warning on line 981 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L981

Added line #L981 was not covered by tests
cached_msg = nni_id_get(&p->recv_unack, packet_id);
nni_msg_free(msg);
if (cached_msg == NULL) {
Expand Down Expand Up @@ -1020,7 +1020,7 @@ mqtt_quic_recv_cb(void *arg)
uint32_t payload_len;
payload = nng_mqtt_msg_get_publish_payload(msg, &payload_len);
*/
packet_id = nni_mqtt_msg_get_publish_packet_id(msg);
packet_id = (uint64_t)nni_mqtt_msg_get_publish_packet_id(msg);

Check warning on line 1023 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L1023

Added line #L1023 was not covered by tests
nni_mqtt_msg_set_packet_type(ack, NNG_MQTT_PUBACK);
nni_mqtt_msg_set_puback_packet_id(ack, packet_id);
nni_mqtt_msg_encode(ack);
Expand All @@ -1045,7 +1045,7 @@ mqtt_quic_recv_cb(void *arg)
nni_aio_set_msg(user_aio, msg);
break;
} else {
packet_id = nni_mqtt_msg_get_publish_packet_id(msg);
packet_id = (uint64_t)nni_mqtt_msg_get_publish_packet_id(msg);

Check warning on line 1048 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L1048

Added line #L1048 was not covered by tests
if ((cached_msg = nni_id_get(
&p->recv_unack, packet_id)) != NULL) {
// packetid already exists.
Expand Down Expand Up @@ -1077,7 +1077,7 @@ mqtt_quic_recv_cb(void *arg)
return;
case NNG_MQTT_PUBREC:
// return PUBREL
packet_id = nni_mqtt_msg_get_pubrec_packet_id(msg);
packet_id = (uint64_t)nni_mqtt_msg_get_pubrec_packet_id(msg);

Check warning on line 1080 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L1080

Added line #L1080 was not covered by tests
nni_mqtt_msg_alloc(&ack, 0);
nni_mqtt_msg_set_packet_type(ack, NNG_MQTT_PUBREL);
nni_mqtt_msg_set_puback_packet_id(ack, packet_id);
Expand Down Expand Up @@ -1210,7 +1210,6 @@ mqtt_quic_sock_fini(void *arg)
mqtt_sock_t *s = arg;
nni_aio *aio;
nni_msg *msg = NULL;
size_t count = 0;
/*
#if defined(NNG_SUPP_SQLITE) && defined(NNG_HAVE_MQTT_BROKER)
bool is_sqlite = get_persist(s);
Expand Down Expand Up @@ -1742,7 +1741,7 @@ mqtt_quic_ctx_send(void *arg, nni_aio *aio)

if (s->multi_stream &&
ptype == NNG_MQTT_SUBSCRIBE) {
mqtt_sub_stream(p, msg, packet_id, aio);
mqtt_sub_stream(p, msg, (uint64_t)packet_id, aio);

Check warning on line 1744 in src/mqtt/protocol/mqtt/mqtt_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqtt_quic.c#L1744

Added line #L1744 was not covered by tests
} else if ((rv = mqtt_send_msg(aio, msg, s)) >= 0) {
nni_mtx_unlock(&s->mtx);
nni_aio_finish(aio, rv, 0);
Expand Down
40 changes: 20 additions & 20 deletions src/mqtt/protocol/mqtt/mqttv5_quic.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,17 @@ static conf_quic config_default = {
.qcongestion_control = 0, // cubic
};

static uint32_t
DJBHashn(char *str, uint16_t len)
static uint64_t
DJBHashn(uint8_t* str, uint32_t len)

Check warning on line 120 in src/mqtt/protocol/mqtt/mqttv5_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqttv5_quic.c#L120

Added line #L120 was not covered by tests
{
unsigned int hash = 5381;
uint16_t i = 0;
while (i < len) {
hash = ((hash << 5) + hash) + (*str++); /* times 33 */
i++;
}
hash &= ~(1U << 31); /* strip the highest bit */
return hash;
uint64_t hash = 5381;
uint64_t i = 0;

Check warning on line 123 in src/mqtt/protocol/mqtt/mqttv5_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqttv5_quic.c#L122-L123

Added lines #L122 - L123 were not covered by tests

for(i = 0; i < len; str++, i++) {
hash = ((hash << 5) + hash) + (*str);

Check warning on line 126 in src/mqtt/protocol/mqtt/mqttv5_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqttv5_quic.c#L125-L126

Added lines #L125 - L126 were not covered by tests
}

return hash;

Check warning on line 129 in src/mqtt/protocol/mqtt/mqttv5_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqttv5_quic.c#L129

Added line #L129 was not covered by tests
}

static int nng_mqtt_quic_set_config(nng_socket *sock, void *node);
Expand Down Expand Up @@ -204,7 +204,7 @@ struct mqtt_pipe_s {
nni_aio rep_aio; // aio for resending qos msg and PINGREQ
nni_lmq send_inflight; // only used in multi-stream mode
nni_lmq recv_messages; // recv messages queue
uint32_t stream_id; // only for multi-stream
uint64_t stream_id; // only for multi-stream
uint16_t rid; // index of resending packet id
uint8_t reason_code; // MQTTV5 reason code
};
Expand All @@ -220,7 +220,7 @@ nng_mqtt_quic_open_topic_stream(mqtt_sock_t *mqtt_sock, const char *topic, uint3
{
mqtt_pipe_t *p = mqtt_sock->pipe;
mqtt_pipe_t *new_pipe = NULL;
uint32_t hash;
uint64_t hash;

// create a pipe/stream here
if ((new_pipe = nng_alloc(sizeof(mqtt_pipe_t))) == NULL) {
Expand All @@ -231,7 +231,7 @@ nng_mqtt_quic_open_topic_stream(mqtt_sock_t *mqtt_sock, const char *topic, uint3
log_warn("Failed in open the topic-stream pair.");
return NULL;
}
hash = DJBHashn((char *) topic, len);
hash = DJBHashn((unsigned char *) topic, len);

Check warning on line 234 in src/mqtt/protocol/mqtt/mqttv5_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqttv5_quic.c#L234

Added line #L234 was not covered by tests
nni_id_set(mqtt_sock->streams, hash, new_pipe);
new_pipe->stream_id = hash;
log_debug("create new pipe %p for topic %.*s", new_pipe, len, topic);
Expand Down Expand Up @@ -259,7 +259,8 @@ nng_mqtt_quic_open_topic_stream(mqtt_sock_t *mqtt_sock, const char *topic, uint3
static int
mqtt_sub_stream(mqtt_pipe_t *p, nni_msg *msg, uint16_t packet_id, nni_aio *aio)
{
uint32_t count, hash;
uint32_t count;
uint64_t hash;
nni_msg *tmsg;
mqtt_sock_t *sock = p->mqtt_sock;
mqtt_pipe_t *new_pipe = NULL;
Expand All @@ -270,7 +271,7 @@ mqtt_sub_stream(mqtt_pipe_t *p, nni_msg *msg, uint16_t packet_id, nni_aio *aio)
// there is only one topic in Sub msg if multi-stream is enabled
for (uint32_t i = 0; i < count; i++) {
hash = DJBHashn(
(char *) topics[i].topic.buf, topics[i].topic.length);
(unsigned char *) topics[i].topic.buf, topics[i].topic.length);

Check warning on line 274 in src/mqtt/protocol/mqtt/mqttv5_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqttv5_quic.c#L274

Added line #L274 was not covered by tests
if ((new_pipe = nni_id_get(sock->streams, hash)) == NULL) {
// create pipe here & set stream id
log_debug("topic %s qos %d", topics[i].topic.buf, topics[i].qos);
Expand Down Expand Up @@ -315,7 +316,7 @@ mqtt_sub_stream(mqtt_pipe_t *p, nni_msg *msg, uint16_t packet_id, nni_aio *aio)
nni_id_remove(&new_pipe->sent_unack, packet_id);
}
nni_msg_clone(msg);
if (0 != nni_id_set(&new_pipe->sent_unack, packet_id, msg)) {
if (0 != nni_id_set(&new_pipe->sent_unack, (uint64_t)packet_id, msg)) {

Check warning on line 319 in src/mqtt/protocol/mqtt/mqttv5_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqttv5_quic.c#L319

Added line #L319 was not covered by tests
nni_println("Warning! Cache QoS msg failed");
nni_msg_free(msg);
nni_aio_finish_error(aio, UNSPECIFIED_ERROR);
Expand Down Expand Up @@ -379,7 +380,7 @@ mqtt_send_msg(nni_aio *aio, nni_msg *msg, mqtt_sock_t *s)
char *topic = (char *) nni_mqtt_msg_get_publish_topic(
msg, &topic_len);
pub_pipe =
nni_id_get(s->streams, DJBHashn(topic, topic_len));
nni_id_get(s->streams, DJBHashn((unsigned char *)topic, topic_len));

Check warning on line 383 in src/mqtt/protocol/mqtt/mqttv5_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqttv5_quic.c#L383

Added line #L383 was not covered by tests
if (pub_pipe == NULL) {
pub_pipe = nng_mqtt_quic_open_topic_stream(
s, topic, topic_len);
Expand Down Expand Up @@ -409,7 +410,7 @@ mqtt_send_msg(nni_aio *aio, nni_msg *msg, mqtt_sock_t *s)
nni_id_remove(&p->sent_unack, packet_id);
}
nni_msg_clone(msg);
if (0 != nni_id_set(&p->sent_unack, packet_id, msg)) {
if (0 != nni_id_set(&p->sent_unack, (uint64_t)packet_id, msg)) {

Check warning on line 413 in src/mqtt/protocol/mqtt/mqttv5_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqttv5_quic.c#L413

Added line #L413 was not covered by tests
nni_println("Warning! Cache QoS msg failed");
nni_msg_free(msg);
nni_aio_finish_error(aio, UNSPECIFIED_ERROR);
Expand Down Expand Up @@ -1216,8 +1217,7 @@ mqtt_quic_sock_fini(void *arg)
{
mqtt_sock_t *s = arg;
nni_aio *aio;
nni_msg *tmsg = NULL, *msg = NULL;
size_t count = 0;
nni_msg *msg = NULL;

Check warning on line 1220 in src/mqtt/protocol/mqtt/mqttv5_quic.c

View check run for this annotation

Codecov / codecov/patch

src/mqtt/protocol/mqtt/mqttv5_quic.c#L1220

Added line #L1220 was not covered by tests
/*
#if defined(NNG_SUPP_SQLITE) && defined(NNG_HAVE_MQTT_BROKER)
bool is_sqlite = get_persist(s);
Expand Down

0 comments on commit f5d5d1b

Please sign in to comment.