Skip to content

Commit

Permalink
AMQP plugin: prefer the rabbitmq-c/amqp.h header if available.
Browse files Browse the repository at this point in the history
At some point, RabbitMQ has moved their headers to the `rabbitmq-c/`
subdirectory. The old locations still exist but throw an error, saying
the old headers are "deprecated".

This adds appropriate checks for the new headers to the configure script
and uses those if present.

To simplify both the configure script and plugin, support for ancient
versions of the library is removed. This affects versions that don't
have the `amqp_tcp_socket_new()` function yet.
  • Loading branch information
octo committed Nov 24, 2023
1 parent fb0047e commit cd07fc4
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 88 deletions.
51 changes: 11 additions & 40 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -5047,10 +5047,12 @@ if test "x$with_librabbitmq" = "xyes"; then
SAVE_CPPFLAGS="$CPPFLAGS"
CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
AC_CHECK_HEADERS([amqp.h],
[with_librabbitmq="yes"],
[with_librabbitmq="no (amqp.h not found)"]
)
with_librabbitmq="no (amqp.h and rabbitmq-c/amqp.h not found)"
AC_CHECK_HEADERS([rabbitmq-c/amqp.h], [with_librabbitmq="yes"], [])
AC_CHECK_HEADERS([amqp.h], [with_librabbitmq="yes"], [])
AC_CHECK_HEADERS([rabbitmq-c/framing.h rabbitmq-c/ssl_socket.h rabbitmq-c/tcp_socket.h \
amqp_framing.h amqp_ssl_socket.h amqp_tcp_socket.h])
CPPFLAGS="$SAVE_CPPFLAGS"
fi
Expand All @@ -5071,7 +5073,11 @@ if test "x$with_librabbitmq" = "xyes"; then
#include <stdio.h>
#include <stdint.h>
#include <inttypes.h>
#include <amqp.h>
#if HAVE_RABBITMQ_C_AMQP_H
# include <rabbitmq-c/amqp.h>
#else
# include <amqp.h>
#endif
]]
)
CPPFLAGS="$SAVE_CPPFLAGS"
Expand All @@ -5087,41 +5093,6 @@ if test "x$with_librabbitmq" = "xyes"; then
LDFLAGS="$SAVE_LDFLAGS"
fi
if test "x$with_librabbitmq" = "xyes"; then
SAVE_CPPFLAGS="$CPPFLAGS"
SAVE_LDFLAGS="$LDFLAGS"
SAVE_LIBS="$LIBS"
CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
LDFLAGS="$LDFLAGS $with_librabbitmq_ldflags"
LIBS="-lrabbitmq"
AC_CHECK_HEADERS([amqp_tcp_socket.h amqp_socket.h])
AC_CHECK_FUNC([amqp_tcp_socket_new],
[
AC_DEFINE([HAVE_AMQP_TCP_SOCKET], [1],
[Define if librabbitmq provides the new TCP socket interface.])
]
)
AC_CHECK_DECLS([amqp_socket_close],
[],
[],
[[
#include <amqp.h>
#ifdef HAVE_AMQP_TCP_SOCKET_H
# include <amqp_tcp_socket.h>
#endif
#ifdef HAVE_AMQP_SOCKET_H
# include <amqp_socket.h>
#endif
]]
)
CPPFLAGS="$SAVE_CPPFLAGS"
LDFLAGS="$SAVE_LDFLAGS"
LIBS="$SAVE_LIBS"
fi
if test "x$with_librabbitmq" = "xyes"; then
BUILD_WITH_LIBRABBITMQ_CPPFLAGS="$with_librabbitmq_cppflags"
BUILD_WITH_LIBRABBITMQ_LDFLAGS="$with_librabbitmq_ldflags"
Expand Down
76 changes: 28 additions & 48 deletions src/amqp.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,28 @@
#include "utils/format_json/format_json.h"
#include "utils_random.h"

#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/amqp_framing.h>

#ifdef HAVE_AMQP_TCP_SOCKET_H
#include <rabbitmq-c/amqp_ssl_socket.h>
#include <rabbitmq-c/amqp_tcp_socket.h>
#endif
#ifdef HAVE_AMQP_SOCKET_H
#include <rabbitmq-c/amqp_socket.h>
#endif
#ifdef HAVE_AMQP_TCP_SOCKET
#if defined HAVE_DECL_AMQP_SOCKET_CLOSE && !HAVE_DECL_AMQP_SOCKET_CLOSE
/* rabbitmq-c does not currently ship amqp_socket.h
* and, thus, does not define this function. */
int amqp_socket_close(amqp_socket_t *);
#endif
#if HAVE_RABBITMQ_C_AMQP_H
# include <rabbitmq-c/amqp.h>
# if HAVE_RABBITMQ_C_FRAMING_H
# include <rabbitmq-c/framing.h>
# endif
# if HAVE_RABBITMQ_C_TCP_SOCKET_H
# include <rabbitmq-c/tcp_socket.h>
# endif
# if HAVE_RABBITMQ_C_SSL_SOCKET_H
# include <rabbitmq-c/ssl_socket.h>
# endif
#elif HAVE_AMQP_H
# include <amqp.h>
# if HAVE_AMQP_FRAMING_H
# include <amqp_framing.h>
# endif
# if HAVE_AMQP_TCP_SOCKET_H
# include <amqp_tcp_socket.h>
# endif
# if HAVE_AMQP_SSL_SOCKET_H
# include <amqp_ssl_socket.h>
# endif
#endif

/* Defines for the delivery mode. I have no idea why they're not defined by the
Expand Down Expand Up @@ -411,14 +417,6 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
{
static time_t last_connect_time;

amqp_rpc_reply_t reply;
int status;
#ifdef HAVE_AMQP_TCP_SOCKET
amqp_socket_t *socket;
#else
int sockfd;
#endif

if (conf->connection != NULL)
return 0;

Expand All @@ -442,13 +440,10 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
char *host = conf->hosts[cdrand_u() % conf->hosts_count];
INFO("amqp plugin: Connecting to %s", host);

#ifdef HAVE_AMQP_TCP_SOCKET
#define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us \
*/

amqp_socket_t *socket = NULL;
if (conf->tls_enabled) {
socket = amqp_ssl_socket_new(conf->connection);
if (!socket) {
if (socket == NULL) {
ERROR("amqp plugin: amqp_ssl_socket_new failed.");
amqp_destroy_connection(conf->connection);
conf->connection = NULL;
Expand All @@ -461,7 +456,7 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
#endif

if (conf->tls_cacert) {
status = amqp_ssl_socket_set_cacert(socket, conf->tls_cacert);
int status = amqp_ssl_socket_set_cacert(socket, conf->tls_cacert);
if (status < 0) {
ERROR("amqp plugin: amqp_ssl_socket_set_cacert failed: %s",
amqp_error_string2(status));
Expand All @@ -471,7 +466,7 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
}
}
if (conf->tls_client_cert && conf->tls_client_key) {
status = amqp_ssl_socket_set_key(socket, conf->tls_client_cert,
int status = amqp_ssl_socket_set_key(socket, conf->tls_client_cert,
conf->tls_client_key);
if (status < 0) {
ERROR("amqp plugin: amqp_ssl_socket_set_key failed: %s",
Expand All @@ -491,29 +486,16 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
}
}

status = amqp_socket_open(socket, host, conf->port);
int status = amqp_socket_open(socket, host, conf->port);
if (status < 0) {
ERROR("amqp plugin: amqp_socket_open failed: %s",
amqp_error_string2(status));
amqp_destroy_connection(conf->connection);
conf->connection = NULL;
return status;
}
#else /* HAVE_AMQP_TCP_SOCKET */
#define CLOSE_SOCKET() close(sockfd)
/* this interface is deprecated as of rabbitmq-c 0.4 */
sockfd = amqp_open_socket(host, conf->port);
if (sockfd < 0) {
status = (-1) * sockfd;
ERROR("amqp plugin: amqp_open_socket failed: %s", STRERROR(status));
amqp_destroy_connection(conf->connection);
conf->connection = NULL;
return status;
}
amqp_set_sockfd(conf->connection, sockfd);
#endif

reply = amqp_login(conf->connection, CONF(conf, vhost),
amqp_rpc_reply_t reply = amqp_login(conf->connection, CONF(conf, vhost),
/* channel max = */ 0,
/* frame max = */ 131072,
/* heartbeat = */ 0,
Expand All @@ -523,7 +505,6 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
ERROR("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
CONF(conf, vhost), CONF(conf, user));
amqp_destroy_connection(conf->connection);
CLOSE_SOCKET();
conf->connection = NULL;
return 1;
}
Expand All @@ -535,7 +516,6 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
ERROR("amqp plugin: amqp_channel_open failed.");
amqp_connection_close(conf->connection, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conf->connection);
CLOSE_SOCKET();
conf->connection = NULL;
return 1;
}
Expand Down

0 comments on commit cd07fc4

Please sign in to comment.