Skip to content

Commit

Permalink
* NEW [demo] Allow client switch urls after disconnected.
Browse files Browse the repository at this point in the history
* NEW [demo] Add roundrobin option for switching urls.

Signed-off-by: wanghaemq <[email protected]>
  • Loading branch information
wanghaEMQ committed Aug 8, 2024
1 parent 52e79ed commit a99c073
Showing 1 changed file with 34 additions and 11 deletions.
45 changes: 34 additions & 11 deletions demo/multiurls_switch/multiurls_switch.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "nng/nng.h"
#include "nng/supplemental/util/platform.h"

#define ROUND_ROBIN true

void
print_helper()
{
Expand All @@ -49,6 +51,9 @@ intHandler(int dummy)
exit(0);
}

static nng_cv *switch_cv;
static nng_mtx *switch_mtx;

static void
disconnect_cb(nng_pipe p, nng_pipe_ev ev, void *arg)
{
Expand All @@ -59,6 +64,11 @@ disconnect_cb(nng_pipe p, nng_pipe_ev ev, void *arg)
// nng_pipe_get_ptr(p, NNG_OPT_MQTT_DISCONNECT_PROPERTY, &prop);
// nng_socket_get?
printf("%s: disconnected!\n", __FUNCTION__);

// Wake to reconnect
nng_mtx_lock(switch_mtx);
nng_cv_wake(switch_cv);
nng_mtx_unlock(switch_mtx);
(void) ev;
(void) arg;
}
Expand Down Expand Up @@ -101,29 +111,37 @@ client_connect(const char **urls, int len)

int cnt = -1;

nng_socket sock;
if ((rv = nng_mqtt_client_open(&sock)) != 0) {
fatal("nng_socket", rv);
}
nng_mqtt_set_connect_cb(sock, connect_cb, (void *)&sock);
nng_mqtt_set_disconnect_cb(sock, disconnect_cb, connmsg);

while (1) {
nng_dialer dialer;
nng_socket sock;
if ((rv = nng_mqtt_client_open(&sock)) != 0) {
fatal("nng_socket", rv);
}
nng_mqtt_set_connect_cb(sock, connect_cb, (void *)&sock);
nng_mqtt_set_disconnect_cb(sock, disconnect_cb, connmsg);

cnt = (cnt + 1) % len;
const char *url = urls[cnt];

nng_dialer dialer;
if ((rv = nng_dialer_create(&dialer, sock, url)) != 0) {
fatal("nng_dialer_create", rv);
}
nng_dialer_set_ptr(dialer, NNG_OPT_MQTT_CONNMSG, connmsg);

printf("Connecting to server %s ...\n", url);
printf("Connecting to server [%d]%s ...\n", cnt, url);
if ((rv = nng_dialer_start(dialer, NNG_FLAG_ALLOC)) != 0) {
printf("Failed to connect to %s rv%d\n", url, rv);
} else {
break;
continue;
}
// connected
// Wait for disconnect
nng_mtx_lock(switch_mtx);
nng_cv_wait(switch_cv);
nng_mtx_unlock(switch_mtx);
// close socket
nng_close(sock);
if (ROUND_ROBIN == false) {
cnt = -1; // Always from the first url
}
}

Expand All @@ -133,6 +151,7 @@ client_connect(const char **urls, int len)
int
main()
{
int rv;
print_helper();
const char *urls[] = {
"mqtt-tcp://example.io:1883",
Expand All @@ -141,6 +160,10 @@ main()
};
int len = sizeof(urls) / sizeof(char *);

if ((0 != (rv = nng_mtx_alloc(&switch_mtx))) ||
(0 != (rv = nng_cv_alloc(&switch_cv, switch_mtx)))) {
fatal("Failed to init switch mtx or cv", rv);
}
client_connect(urls, len);

signal(SIGINT, intHandler);
Expand Down

0 comments on commit a99c073

Please sign in to comment.