Skip to content

Commit

Permalink
Merge pull request collectd#4111 from mrunge/rabbitmq
Browse files Browse the repository at this point in the history
Fix compile warning for amqp
  • Loading branch information
octo authored Nov 24, 2023
2 parents b0d7a29 + cd07fc4 commit 73000d8
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 88 deletions.
51 changes: 11 additions & 40 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -5047,10 +5047,12 @@ if test "x$with_librabbitmq" = "xyes"; then
SAVE_CPPFLAGS="$CPPFLAGS"
CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
AC_CHECK_HEADERS([amqp.h],
[with_librabbitmq="yes"],
[with_librabbitmq="no (amqp.h not found)"]
)
with_librabbitmq="no (amqp.h and rabbitmq-c/amqp.h not found)"
AC_CHECK_HEADERS([rabbitmq-c/amqp.h], [with_librabbitmq="yes"], [])
AC_CHECK_HEADERS([amqp.h], [with_librabbitmq="yes"], [])
AC_CHECK_HEADERS([rabbitmq-c/framing.h rabbitmq-c/ssl_socket.h rabbitmq-c/tcp_socket.h \
amqp_framing.h amqp_ssl_socket.h amqp_tcp_socket.h])
CPPFLAGS="$SAVE_CPPFLAGS"
fi
Expand All @@ -5071,7 +5073,11 @@ if test "x$with_librabbitmq" = "xyes"; then
#include <stdio.h>
#include <stdint.h>
#include <inttypes.h>
#include <amqp.h>
#if HAVE_RABBITMQ_C_AMQP_H
# include <rabbitmq-c/amqp.h>
#else
# include <amqp.h>
#endif
]]
)
CPPFLAGS="$SAVE_CPPFLAGS"
Expand All @@ -5087,41 +5093,6 @@ if test "x$with_librabbitmq" = "xyes"; then
LDFLAGS="$SAVE_LDFLAGS"
fi
if test "x$with_librabbitmq" = "xyes"; then
SAVE_CPPFLAGS="$CPPFLAGS"
SAVE_LDFLAGS="$LDFLAGS"
SAVE_LIBS="$LIBS"
CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
LDFLAGS="$LDFLAGS $with_librabbitmq_ldflags"
LIBS="-lrabbitmq"
AC_CHECK_HEADERS([amqp_tcp_socket.h amqp_socket.h])
AC_CHECK_FUNC([amqp_tcp_socket_new],
[
AC_DEFINE([HAVE_AMQP_TCP_SOCKET], [1],
[Define if librabbitmq provides the new TCP socket interface.])
]
)
AC_CHECK_DECLS([amqp_socket_close],
[],
[],
[[
#include <amqp.h>
#ifdef HAVE_AMQP_TCP_SOCKET_H
# include <amqp_tcp_socket.h>
#endif
#ifdef HAVE_AMQP_SOCKET_H
# include <amqp_socket.h>
#endif
]]
)
CPPFLAGS="$SAVE_CPPFLAGS"
LDFLAGS="$SAVE_LDFLAGS"
LIBS="$SAVE_LIBS"
fi
if test "x$with_librabbitmq" = "xyes"; then
BUILD_WITH_LIBRABBITMQ_CPPFLAGS="$with_librabbitmq_cppflags"
BUILD_WITH_LIBRABBITMQ_LDFLAGS="$with_librabbitmq_ldflags"
Expand Down
76 changes: 28 additions & 48 deletions src/amqp.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,28 @@
#include "utils/format_json/format_json.h"
#include "utils_random.h"

#include <amqp.h>
#include <amqp_framing.h>

#ifdef HAVE_AMQP_TCP_SOCKET_H
#include <amqp_ssl_socket.h>
#include <amqp_tcp_socket.h>
#endif
#ifdef HAVE_AMQP_SOCKET_H
#include <amqp_socket.h>
#endif
#ifdef HAVE_AMQP_TCP_SOCKET
#if defined HAVE_DECL_AMQP_SOCKET_CLOSE && !HAVE_DECL_AMQP_SOCKET_CLOSE
/* rabbitmq-c does not currently ship amqp_socket.h
* and, thus, does not define this function. */
int amqp_socket_close(amqp_socket_t *);
#endif
#if HAVE_RABBITMQ_C_AMQP_H
# include <rabbitmq-c/amqp.h>
# if HAVE_RABBITMQ_C_FRAMING_H
# include <rabbitmq-c/framing.h>
# endif
# if HAVE_RABBITMQ_C_TCP_SOCKET_H
# include <rabbitmq-c/tcp_socket.h>
# endif
# if HAVE_RABBITMQ_C_SSL_SOCKET_H
# include <rabbitmq-c/ssl_socket.h>
# endif
#elif HAVE_AMQP_H
# include <amqp.h>
# if HAVE_AMQP_FRAMING_H
# include <amqp_framing.h>
# endif
# if HAVE_AMQP_TCP_SOCKET_H
# include <amqp_tcp_socket.h>
# endif
# if HAVE_AMQP_SSL_SOCKET_H
# include <amqp_ssl_socket.h>
# endif
#endif

/* Defines for the delivery mode. I have no idea why they're not defined by the
Expand Down Expand Up @@ -411,14 +417,6 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
{
static time_t last_connect_time;

amqp_rpc_reply_t reply;
int status;
#ifdef HAVE_AMQP_TCP_SOCKET
amqp_socket_t *socket;
#else
int sockfd;
#endif

if (conf->connection != NULL)
return 0;

Expand All @@ -442,13 +440,10 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
char *host = conf->hosts[cdrand_u() % conf->hosts_count];
INFO("amqp plugin: Connecting to %s", host);

#ifdef HAVE_AMQP_TCP_SOCKET
#define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us \
*/

amqp_socket_t *socket = NULL;
if (conf->tls_enabled) {
socket = amqp_ssl_socket_new(conf->connection);
if (!socket) {
if (socket == NULL) {
ERROR("amqp plugin: amqp_ssl_socket_new failed.");
amqp_destroy_connection(conf->connection);
conf->connection = NULL;
Expand All @@ -461,7 +456,7 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
#endif

if (conf->tls_cacert) {
status = amqp_ssl_socket_set_cacert(socket, conf->tls_cacert);
int status = amqp_ssl_socket_set_cacert(socket, conf->tls_cacert);
if (status < 0) {
ERROR("amqp plugin: amqp_ssl_socket_set_cacert failed: %s",
amqp_error_string2(status));
Expand All @@ -471,7 +466,7 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
}
}
if (conf->tls_client_cert && conf->tls_client_key) {
status = amqp_ssl_socket_set_key(socket, conf->tls_client_cert,
int status = amqp_ssl_socket_set_key(socket, conf->tls_client_cert,
conf->tls_client_key);
if (status < 0) {
ERROR("amqp plugin: amqp_ssl_socket_set_key failed: %s",
Expand All @@ -491,29 +486,16 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
}
}

status = amqp_socket_open(socket, host, conf->port);
int status = amqp_socket_open(socket, host, conf->port);
if (status < 0) {
ERROR("amqp plugin: amqp_socket_open failed: %s",
amqp_error_string2(status));
amqp_destroy_connection(conf->connection);
conf->connection = NULL;
return status;
}
#else /* HAVE_AMQP_TCP_SOCKET */
#define CLOSE_SOCKET() close(sockfd)
/* this interface is deprecated as of rabbitmq-c 0.4 */
sockfd = amqp_open_socket(host, conf->port);
if (sockfd < 0) {
status = (-1) * sockfd;
ERROR("amqp plugin: amqp_open_socket failed: %s", STRERROR(status));
amqp_destroy_connection(conf->connection);
conf->connection = NULL;
return status;
}
amqp_set_sockfd(conf->connection, sockfd);
#endif

reply = amqp_login(conf->connection, CONF(conf, vhost),
amqp_rpc_reply_t reply = amqp_login(conf->connection, CONF(conf, vhost),
/* channel max = */ 0,
/* frame max = */ 131072,
/* heartbeat = */ 0,
Expand All @@ -523,7 +505,6 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
ERROR("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
CONF(conf, vhost), CONF(conf, user));
amqp_destroy_connection(conf->connection);
CLOSE_SOCKET();
conf->connection = NULL;
return 1;
}
Expand All @@ -535,7 +516,6 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
ERROR("amqp plugin: amqp_channel_open failed.");
amqp_connection_close(conf->connection, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conf->connection);
CLOSE_SOCKET();
conf->connection = NULL;
return 1;
}
Expand Down

0 comments on commit 73000d8

Please sign in to comment.