Skip to content

Commit

Permalink
ttee
Browse files Browse the repository at this point in the history
Signed-off-by: jaylin <[email protected]>
  • Loading branch information
JaylinYu committed Aug 28, 2024
1 parent bb59b4e commit 26b1e4c
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 18 deletions.
23 changes: 10 additions & 13 deletions demo/mqttv5_scram/mqttv5_scram.c
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,8 @@ main(const int argc, const char **argv)
nng_socket sock;

const char *exe = argv[0];

int rc;
int ch;
const char *cmd;
int istls = 0;
const char *ca = NULL, *cert = NULL, *key = NULL, *pwd = NULL;
Expand All @@ -620,8 +621,7 @@ main(const int argc, const char **argv)
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
int rc;
int ch;

rc = MQTTAsync_createWithOptions(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
if (rc != MQTTASYNC_SUCCESS)
{
Expand Down Expand Up @@ -674,21 +674,18 @@ main(const int argc, const char **argv)

MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_sendMessage(client, "msg", &pubmsg, &pub_opts);
nng_msleep(3600); // neither pause() nor sleep() portable

disc_opts.onSuccess = onDisconnect;
disc_opts.onFailure = onDisconnectFailure;
// if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
// {
// printf("Failed to start disconnect, return code %d\n", rc);
// rc = EXIT_FAILURE;
// goto destroy_exit;
// }
for (;;) {
nng_msleep(
3600000); // neither pause() nor sleep() portable
if ((MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start disconnect, return code %d\n", rc);
rc = EXIT_FAILURE;
goto destroy_exit;
}
destroy_exit:
// MQTTAsync_destroy(&client);
MQTTAsync_destroy(&client);
exit:
return;
if (5 == argc && 0 == strcmp(argv[1], SUBSCRIBE)) {
Expand Down
39 changes: 39 additions & 0 deletions include/nng/mqtt/mqtt_paho.h
Original file line number Diff line number Diff line change
Expand Up @@ -1725,6 +1725,45 @@ NNG_DECL int MQTTAsync_setConnected(
*/
NNG_DECL int MQTTAsync_reconnect(MQTTAsync handle);

/**
* This function attempts to disconnect the client from the MQTT
* server. In order to allow the client time to complete handling of messages
* that are in-flight when this function is called, a timeout period is
* specified. When the timeout period has expired, the client disconnects even
* if there are still outstanding message acknowledgements.
* The next time the client connects to the same server, any QoS 1 or 2
* messages which have not completed will be retried depending on the
* cleansession settings for both the previous and the new connection (see
* MQTTAsync_connectOptions.cleansession and MQTTAsync_connect()).
* @param handle A valid client handle from a successful call to
* MQTTAsync_create().
* @param options The client delays disconnection for up to this time (in
* milliseconds) in order to allow in-flight message transfers to complete.
* @return ::MQTTASYNC_SUCCESS if the client successfully disconnects from
* the server. An error code is returned if the client was unable to disconnect
* from the server
*/
NNG_DECL int MQTTAsync_disconnect(
MQTTAsync handle, const MQTTAsync_disconnectOptions *options);

/**
* This function allows the client application to test whether or not a
* client is currently connected to the MQTT server.
* @param handle A valid client handle from a successful call to
* MQTTAsync_create().
* @return Boolean true if the client is connected, otherwise false.
*/
NNG_DECL int MQTTAsync_isConnected(MQTTAsync handle);

/**
* This function frees the memory allocated to an MQTT client (see
* MQTTAsync_create()). It should be called when the client is no longer
* required.
* @param handle A pointer to the handle referring to the ::MQTTAsync
* structure to be freed.
*/
NNG_DECL void MQTTAsync_destroy(MQTTAsync *handle);

#ifdef __cplusplus
}
#endif
Expand Down
56 changes: 55 additions & 1 deletion src/supplemental/mqtt/mqtt_nano_paho.c
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ connect_cb(nng_pipe p, nng_pipe_ev ev, void *arg)
// nng_pipe_get_ptr(p, NNG_OPT_MQTT_CONNECT_PROPERTY, &prop);
printf("%s: connected! RC [%d] \n", __FUNCTION__, reason);
nng_log_warn("Connected!", "This is also fine");
m->shouldBeConnected = 1;
if (m->connect.onSuccess) {
MQTTAsync_successData data;
memset(&data, '\0', sizeof(data));
Expand Down Expand Up @@ -522,6 +523,7 @@ disconnect_cb(nng_pipe p, nng_pipe_ev ev, void *arg)
// nng_pipe_get_ptr(p, NNG_OPT_MQTT_DISCONNECT_PROPERTY, &prop);
// nng_socket_get?
printf("%s: disconnected! RC [%d] \n", __FUNCTION__, reason);
m->shouldBeConnected = 0;
(void) ev;
if (m->cl)
{
Expand Down Expand Up @@ -913,7 +915,7 @@ int MQTTAsync_send(MQTTAsync handle, const char *destinationName, int payloadlen
m->publish.context = response->context;
response->token = m->publish.token;
if (m->MQTTVersion >= MQTTVERSION_5) {
nng_log_warn("Paho", "Publish property with PAHO API is not support");
nng_log_warn("Paho", "Publish property with PAHO API is not support");
// pub->command.properties = MQTTProperties_copy(&response->properties);
// MQTTProperties_free(&response->properties);
}
Expand Down Expand Up @@ -962,3 +964,55 @@ int MQTTAsync_sendMessage(MQTTAsync handle, const char *destinationName, const M
exit:
return rc;
}

int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions *options)
{
MQTTAsyncs *m = handle;
nng_mqtt_client *client = m->nanosdk_client;
int rc = MQTTASYNC_SUCCESS;

if (options != NULL && (strncmp(options->struct_id, "MQTD", 4) != 0 || options->struct_version < 0 || options->struct_version > 1))
return MQTTASYNC_BAD_STRUCTURE;
if (m == NULL)
{
return MQTTASYNC_FAILURE;
}
if (m->shouldBeConnected == 0)
{
return MQTTASYNC_DISCONNECTED;
}
if (options)
{
m->disconnect.onSuccess = options->onSuccess;
m->disconnect.onFailure = options->onFailure;
m->disconnect.onSuccess5 = options->onSuccess5;
m->disconnect.onFailure5 = options->onFailure5;
m->disconnect.context = options->context;
m->disconnect.details.dis.timeout = options->timeout;
if (m->MQTTVersion >= MQTTVERSION_5 && options->struct_version >= 1)
{
nng_log_warn("Paho", "Disconnect property with PAHO API is not support");
// dis->command.properties = MQTTProperties_copy(&options->properties);
// dis->command.details.dis.reasonCode = options->reasonCode;
}
}
m->disconnect.type = DISCONNECT;
m->disconnect.details.dis.internal = 0; // ???
nng_mqtt_disconnect(client, options->reasonCode, NULL);
m->shouldBeConnected = 0;
}

void MQTTAsync_destroy(MQTTAsync *handle)
{
MQTTAsyncs *m = *handle;
nng_socket *sock = m->sock;
if (m->shouldBeConnected = 1)
nng_mqtt_disconnect(m->sock, MQTTREASONCODE_NORMAL_DISCONNECTION, NULL);
nng_close(*sock);

// free objects
nng_mqtt_client_free(m->nanosdk_client, true);
nng_free(m->sock, sizeof(nng_socket));
nng_free(m->dialer, sizeof(nng_dialer));
nng_free(m, sizeof(MQTTAsyncs));
}
7 changes: 7 additions & 0 deletions src/supplemental/mqtt/mqtt_nano_paho.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ extern "C" {

#define PAHO_MEMORY_ERROR -99

enum msgTypes
{
CONNECT = 1, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL,
PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK,
PINGREQ, PINGRESP, DISCONNECT, AUTH
};

#define MQTTAsync_successData5_initializer \
{ \
{'M', 'Q', 'S', 'D'}, 0, 0, MQTTREASONCODE_SUCCESS, MQTTProperties_initializer, \
Expand Down
12 changes: 8 additions & 4 deletions src/supplemental/mqtt/mqtt_public.c
Original file line number Diff line number Diff line change
Expand Up @@ -930,9 +930,13 @@ void nng_mqtt_client_free(nng_mqtt_client *client, bool is_async)
nni_aio_close(client->send_aio);
if (client) {
if (is_async) {
nni_aio_close(client->recv_aio);
nni_aio_stop(client->send_aio);
nni_aio_stop(client->recv_aio);
nng_aio_free(client->send_aio);
nni_lmq_fini((nni_lmq *) client->msgq);
nng_free(client->msgq, sizeof(nni_lmq));
nng_aio_free(client->recv_aio);
}
NNI_FREE_STRUCT(client);
}
Expand Down Expand Up @@ -1068,11 +1072,11 @@ nng_mqtt_disconnect(nng_socket *sock, uint8_t reason_code, property *pl)
}

if ((rv = nng_sendmsg(*sock, disconnmsg, 0)) != 0) {
nng_msg_free(disconnmsg);
nng_log_warn("Disconnect", "sending disconnect failed");
}

nng_close(*sock);

// we only send a disconnect packet to remote
// and wait for a passive socket close
// however nanosdk will reconnect again.
return (rv);
}

Expand Down

0 comments on commit 26b1e4c

Please sign in to comment.