Skip to content

Commit

Permalink
sdasd
Browse files Browse the repository at this point in the history
Signed-off-by: jaylin <[email protected]>
  • Loading branch information
JaylinYu committed Aug 28, 2024
1 parent 26b1e4c commit 8710d19
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 188 deletions.
34 changes: 29 additions & 5 deletions demo/mqttv5_scram/mqttv5_scram.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ static int msgarrvd(void* context, char* topicName, int topicLen, MQTTAsync_mess
// }
// }

// MQTTAsync_freeMessage(&message);
// MQTTAsync_free(&topicName);
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);

return iRet;
}
Expand Down Expand Up @@ -602,6 +602,29 @@ void onSendSuccess(void* context, MQTTAsync_successData* response)
// }
}

void connlost(void *context, char *cause)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;

printf("\nConnection lost\n");
if (cause)
printf(" cause: %s\n", cause);

printf("Reconnecting\n");
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
}
}

void asyncDeliveryComplete(void* context, MQTTAsync_token token)
{
printf("qos finished!");
}

int
main(const int argc, const char **argv)
{
Expand Down Expand Up @@ -629,7 +652,7 @@ main(const int argc, const char **argv)
rc = EXIT_FAILURE;
return;
}
if ((rc = MQTTAsync_setCallbacks(client, client, NULL, msgarrvd, NULL)) != MQTTASYNC_SUCCESS)
if ((rc = MQTTAsync_setCallbacks(client, client, connlost, msgarrvd, asyncDeliveryComplete)) != MQTTASYNC_SUCCESS)
{
printf("Failed to set callbacks, return code %d\n", rc);
rc = EXIT_FAILURE;
Expand Down Expand Up @@ -673,8 +696,8 @@ main(const int argc, const char **argv)
pubmsg.retained = 0;

MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_sendMessage(client, "msg", &pubmsg, &pub_opts);
nng_msleep(3600); // neither pause() nor sleep() portable
MQTTAsync_sendMessage(client, "test", &pubmsg, &pub_opts);
nng_msleep(5600);

disc_opts.onSuccess = onDisconnect;
disc_opts.onFailure = onDisconnectFailure;
Expand All @@ -684,6 +707,7 @@ main(const int argc, const char **argv)
rc = EXIT_FAILURE;
goto destroy_exit;
}
nng_msleep(15600);
destroy_exit:
MQTTAsync_destroy(&client);
exit:
Expand Down
111 changes: 84 additions & 27 deletions include/nng/mqtt/mqtt_paho.h
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,41 @@ typedef struct MQTTProperties {
MQTTProperty *array; /**< array of properties */
} MQTTProperties;


struct {
enum MQTTPropertyCodes value;
const char* name;
} nameToString[] =
{
{MQTTPROPERTY_CODE_PAYLOAD_FORMAT_INDICATOR, "PAYLOAD_FORMAT_INDICATOR"},
{MQTTPROPERTY_CODE_MESSAGE_EXPIRY_INTERVAL, "MESSAGE_EXPIRY_INTERVAL"},
{MQTTPROPERTY_CODE_CONTENT_TYPE, "CONTENT_TYPE"},
{MQTTPROPERTY_CODE_RESPONSE_TOPIC, "RESPONSE_TOPIC"},
{MQTTPROPERTY_CODE_CORRELATION_DATA, "CORRELATION_DATA"},
{MQTTPROPERTY_CODE_SUBSCRIPTION_IDENTIFIER, "SUBSCRIPTION_IDENTIFIER"},
{MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL, "SESSION_EXPIRY_INTERVAL"},
{MQTTPROPERTY_CODE_ASSIGNED_CLIENT_IDENTIFER, "ASSIGNED_CLIENT_IDENTIFER"},
{MQTTPROPERTY_CODE_SERVER_KEEP_ALIVE, "SERVER_KEEP_ALIVE"},
{MQTTPROPERTY_CODE_AUTHENTICATION_METHOD, "AUTHENTICATION_METHOD"},
{MQTTPROPERTY_CODE_AUTHENTICATION_DATA, "AUTHENTICATION_DATA"},
{MQTTPROPERTY_CODE_REQUEST_PROBLEM_INFORMATION, "REQUEST_PROBLEM_INFORMATION"},
{MQTTPROPERTY_CODE_WILL_DELAY_INTERVAL, "WILL_DELAY_INTERVAL"},
{MQTTPROPERTY_CODE_REQUEST_RESPONSE_INFORMATION, "REQUEST_RESPONSE_INFORMATION"},
{MQTTPROPERTY_CODE_RESPONSE_INFORMATION, "RESPONSE_INFORMATION"},
{MQTTPROPERTY_CODE_SERVER_REFERENCE, "SERVER_REFERENCE"},
{MQTTPROPERTY_CODE_REASON_STRING, "REASON_STRING"},
{MQTTPROPERTY_CODE_RECEIVE_MAXIMUM, "RECEIVE_MAXIMUM"},
{MQTTPROPERTY_CODE_TOPIC_ALIAS_MAXIMUM, "TOPIC_ALIAS_MAXIMUM"},
{MQTTPROPERTY_CODE_TOPIC_ALIAS, "TOPIC_ALIAS"},
{MQTTPROPERTY_CODE_MAXIMUM_QOS, "MAXIMUM_QOS"},
{MQTTPROPERTY_CODE_RETAIN_AVAILABLE, "RETAIN_AVAILABLE"},
{MQTTPROPERTY_CODE_USER_PROPERTY, "USER_PROPERTY"},
{MQTTPROPERTY_CODE_MAXIMUM_PACKET_SIZE, "MAXIMUM_PACKET_SIZE"},
{MQTTPROPERTY_CODE_WILDCARD_SUBSCRIPTION_AVAILABLE, "WILDCARD_SUBSCRIPTION_AVAILABLE"},
{MQTTPROPERTY_CODE_SUBSCRIPTION_IDENTIFIERS_AVAILABLE, "SUBSCRIPTION_IDENTIFIERS_AVAILABLE"},
{MQTTPROPERTY_CODE_SHARED_SUBSCRIPTION_AVAILABLE, "SHARED_SUBSCRIPTION_AVAILABLE"}
};

/** The MQTT V5 one byte reason code */
enum MQTTReasonCodes {
MQTTREASONCODE_SUCCESS = 0,
Expand Down Expand Up @@ -1428,6 +1463,31 @@ typedef void MQTTAsync_connected(void *context, char *cause);
typedef void MQTTAsync_disconnected(void *context, MQTTProperties *properties,
enum MQTTReasonCodes reasonCode);

/** The MQTT V5 subscribe options, apart from QoS which existed before V5. */
typedef struct MQTTSubscribe_options {
/** The eyecatcher for this structure. Must be MQSO. */
char struct_id[4];
/** The version number of this structure. Must be 0.
*/
int struct_version;
/** To not receive our own publications, set to 1.
* 0 is the original MQTT behaviour - all messages matching the
* subscription are received.
*/
unsigned char noLocal;
/** To keep the retain flag as on the original publish message, set
* to 1. If 0, defaults to the original MQTT behaviour where the retain
* flag is only set on publications sent by a broker if in response to
* a subscribe request.
*/
unsigned char retainAsPublished;
/** 0 - send retained messages at the time of the subscribe (original
* MQTT behaviour) 1 - send retained messages on subscribe only if the
* subscription is new 2 - do not send retained messages at all
*/
unsigned char retainHandling;
} MQTTSubscribe_options;

/** Structure to define call options. For MQTT 5.0 there is input data as well
* as that describing the response method. So there is now also a synonym
* ::MQTTAsync_callOptions to better reflect the use. This responseOptions
Expand Down Expand Up @@ -1483,7 +1543,7 @@ typedef struct MQTTAsync_responseOptions {
/*
* MQTT V5 subscribe options, when used with subscribe only.
*/
// MQTTSubscribe_options subscribeOptions;
MQTTSubscribe_options subscribeOptions;
/*
* MQTT V5 subscribe option count, when used with subscribeMany only.
* The number of entries in the subscribe_options_list array.
Expand All @@ -1492,34 +1552,9 @@ typedef struct MQTTAsync_responseOptions {
/*
* MQTT V5 subscribe option array, when used with subscribeMany only.
*/
// MQTTSubscribe_options *subscribeOptionsList;
MQTTSubscribe_options *subscribeOptionsList;
} MQTTAsync_responseOptions;

/** The MQTT V5 subscribe options, apart from QoS which existed before V5. */
typedef struct MQTTSubscribe_options {
/** The eyecatcher for this structure. Must be MQSO. */
char struct_id[4];
/** The version number of this structure. Must be 0.
*/
int struct_version;
/** To not receive our own publications, set to 1.
* 0 is the original MQTT behaviour - all messages matching the
* subscription are received.
*/
unsigned char noLocal;
/** To keep the retain flag as on the original publish message, set
* to 1. If 0, defaults to the original MQTT behaviour where the retain
* flag is only set on publications sent by a broker if in response to
* a subscribe request.
*/
unsigned char retainAsPublished;
/** 0 - send retained messages at the time of the subscribe (original
* MQTT behaviour) 1 - send retained messages on subscribe only if the
* subscription is new 2 - do not send retained messages at all
*/
unsigned char retainHandling;
} MQTTSubscribe_options;

/**
* This function sets the global callback functions for a specific client.
* If your client application doesn't use a particular callback, set the
Expand Down Expand Up @@ -1764,6 +1799,28 @@ NNG_DECL int MQTTAsync_isConnected(MQTTAsync handle);
*/
NNG_DECL void MQTTAsync_destroy(MQTTAsync *handle);

/**
* This function frees memory allocated to an MQTT message, including the
* additional memory allocated to the message payload. The client application
* calls this function when the message has been fully processed. <b>Important
* note:</b> This function does not free the memory allocated to a message
* topic string. It is the responsibility of the client application to free
* this memory using the MQTTAsync_free() library function.
* @param msg The address of a pointer to the ::MQTTAsync_message structure
* to be freed.
*/
NNG_DECL void MQTTAsync_freeMessage(MQTTAsync_message **msg);

/**
* This function frees memory allocated by the MQTT C client library,
* especially the topic name. This is needed on Windows when the client library
* and application program have been compiled with different versions of the C
* compiler. It is thus good policy to always use this function when freeing
* any MQTT C client- allocated memory.
* @param ptr The pointer to the client library storage to be freed.
*/
NNG_DECL void MQTTAsync_free(void *ptr);

#ifdef __cplusplus
}
#endif
Expand Down
51 changes: 15 additions & 36 deletions src/mqtt/transport/tcp/mqtt_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,11 @@ mqtt_tcptran_pipe_fini(void *arg)
nni_mtx_unlock(&ep->mtx);
}

nng_stream_free(p->conn);
nni_aio_free(p->rxaio);
nni_aio_free(p->txaio);
nni_aio_free(p->negoaio);
nni_aio_free(p->rpaio);
nng_stream_free(p->conn);
nni_msg_free(p->rxmsg);
// nni_lmq_fini(&p->rslmq);
nni_mtx_fini(&p->mtx);
Expand Down Expand Up @@ -552,13 +552,6 @@ mqtt_tcptran_pipe_nego_cb(void *arg)
nni_aio_set_iov(p->rpaio, 1, &iov);
nng_stream_send(p->conn, p->rpaio);
}
/* stream closed passively by peer? TODO
nng_stream_close(p->conn);
if ((uaio = ep->useraio) != NULL) {
ep->useraio = NULL;
nni_aio_finish_error(uaio, rv);
}
*/

nni_mtx_unlock(&ep->mtx);

Expand Down Expand Up @@ -633,7 +626,7 @@ mqtt_tcptran_pipe_send_cb(void *arg)

nni_aio_set_msg(aio, NULL);
nni_msg_free(msg);
nni_aio_finish_sync(aio, 0, n);
nni_aio_finish_sync(aio, rv, n);
}

static void
Expand Down Expand Up @@ -719,6 +712,7 @@ mqtt_tcptran_pipe_recv_cb(void *arg)
nni_msg_header_append(p->rxmsg, p->rxlen, pos + 1);
msg = p->rxmsg;
p->rxmsg = NULL;
n = nni_msg_len(msg);
type = p->rxlen[0] & 0xf0;
flags = p->rxlen[0] & 0x0f;

Expand Down Expand Up @@ -891,8 +885,8 @@ mqtt_tcptran_pipe_send_start(mqtt_tcptran_pipe *p)
if (qos > 0)
p->sndmax --;
if (qos > p->qosmax) {
p->qosmax == 1? (*header &= 0XF9) & (*header |= 0X02): NNI_ARG_UNUSED(*header);
p->qosmax == 0? *header &= 0XF9:*header;
p->qosmax == 1? ((*header &= 0XF9), (*header |= 0X02)) : NNI_ARG_UNUSED(*header);
p->qosmax == 0 ? *header &= 0XF9 : NNI_ARG_UNUSED(*header);
}
}
}
Expand All @@ -918,27 +912,6 @@ mqtt_tcptran_pipe_send_start(mqtt_tcptran_pipe *p)
niov++;
}

// int msg_body_len = 30 < nni_msg_len(msg) ? 30 : nni_msg_len(msg);

// char *strheader = nng_alloc(nni_msg_header_len(msg) * 3 + 1);
// char *strbody = nng_alloc(msg_body_len * 3 + 1);
// char *data;

// data = nni_msg_header(msg);
// for (int i = 0; i < nni_msg_header_len(msg); ++i) {
// sprintf(strheader + i * 3, "%02X ", data[i]);
// }
// log_debug("msg header: %s", strheader);

// data = nni_msg_body(msg);
// for (int i = 0; i < msg_body_len; ++i) {
// sprintf(strbody + i * 3, "%02X ", data[i]);
// }
// log_debug("msg body: %s", strbody);

// nng_free(strheader, nni_msg_header_len(msg) * 3 + 1);
// nng_free(strbody, msg_body_len * 3 + 1);

nni_aio_set_iov(txaio, niov, iov);
nng_stream_send(p->conn, txaio);
}
Expand Down Expand Up @@ -1193,6 +1166,8 @@ mqtt_tcptran_ep_fini(void *arg)
}
nni_mtx_unlock(&ep->mtx);

if (ep->connmsg)
nni_msg_free(ep->connmsg);
#ifdef SUPP_SCRAM
if (ep->authmsg)
nni_msg_free(ep->authmsg);
Expand Down Expand Up @@ -1624,8 +1599,10 @@ mqtt_tcptran_ep_get_connmsg(void *arg, void *v, size_t *szp, nni_opt_type t)
{
mqtt_tcptran_ep *ep = arg;
int rv;

rv = nni_copyout_ptr(ep->connmsg, v, szp, t);
if (ep->connmsg != NULL)
rv = nni_copyout_ptr(ep->connmsg, v, szp, t);
else
rv = NNG_EEXIST;

return (rv);
}
Expand All @@ -1635,8 +1612,10 @@ mqtt_tcptran_ep_get_property(void *arg, void *v, size_t *szp, nni_opt_type t)
{
mqtt_tcptran_ep *ep = arg;
int rv;

rv = nni_copyout_ptr(ep->property, v, szp, t);
if (ep->property != NULL)
rv = nni_copyout_ptr(ep->property, v, szp, t);
else
rv = NNG_EEXIST;
return (rv);
}

Expand Down
Loading

0 comments on commit 8710d19

Please sign in to comment.