diff --git a/Makefile.am b/Makefile.am index 38109fa..cdf3e1a 100644 --- a/Makefile.am +++ b/Makefile.am @@ -31,7 +31,7 @@ tricklectl_LDADD = @ERRO@ $(LIBOBJS) AM_CFLAGS = -Wall -Icompat @EVENTINC@ overloaddir = $(libdir) -overload_DATA = libtrickle.so +overload_DATA = libtrickle.so: trickle-overload.c atomicio.c $(overload_DATA): diff --git a/trickle-overload.c b/trickle-overload.c index 0b0ca18..b37593b 100644 --- a/trickle-overload.c +++ b/trickle-overload.c @@ -50,6 +50,7 @@ #endif /* HAVE_STDINT_H */ #ifdef HAVE_PTHREAD #include +#include #endif #include "bwstat.h" #include "trickle.h" @@ -108,26 +109,6 @@ static uint lsmooth/* , latency */; static int trickled, initialized, initializing; /* XXX initializing - volatile? */ -#ifdef HAVE_PTHREAD -static pthread_mutex_t global_lock = PTHREAD_MUTEX_INITIALIZER; - -static void trickle_lock() -{ - pthread_mutex_lock(&global_lock); -} - -static void trickle_unlock() -{ - pthread_mutex_unlock(&global_lock); -} - -#else - -static void trickle_lock() {} -static void trickle_unlock() {} - -#endif - #define DECLARE(name, ret, args) static ret (*libc_##name) args DECLARE(socket, int, (int, int, int)); @@ -194,6 +175,59 @@ void safe_printv(int, const char *, ...); errx(0, "[trickle] Failed to get " #x "() address"); \ } while (0) +#ifdef HAVE_PTHREAD + +#define LOCK_VAR sigset_t oset +#define TRICKLE_LOCK trickle_lock(&oset) +#define TRICKLE_UNLOCK trickle_unlock(&oset) + +DECLARE(pthread_sigmask, int, (int, const sigset_t *, sigset_t *)); + +static pthread_mutex_t global_lock = PTHREAD_MUTEX_INITIALIZER; + +/* + * Because, select(),poll(), read(), write() and many other overloaded functions are + * normally reentrant from signal handlers, we need to block all signals while we hold + * mutex or else we can deadlock. + */ +static void trickle_lock(sigset_t *oset) +{ + sigset_t mask; + sigfillset(&mask); + void *dh; + /* + * This hack is not nice but I haven't found a better solution. + * You need to load dynamically pthread_sigmask() and you must synchronize + * threads around trickle_init(). trickle_init() must be executed only once + * if more than 1 thread enter trickle-overload while initialization is performed + * they must wait until initialization is completed before continuing. Hence the + * only place that I have found we could load pthread_sigmask() is here. + */ + if (!libc_pthread_sigmask) { + if ((dh = dlopen("libpthread.so.0", RTLD_LAZY)) == NULL) + errx(1, "[trickle] Failed to open libpthread"); + GETADDR(pthread_sigmask); + } + (*libc_pthread_sigmask)(SIG_SETMASK,&mask,oset); + pthread_mutex_lock(&global_lock); +} + +static void trickle_unlock(sigset_t *oset) +{ + pthread_mutex_unlock(&global_lock); +#ifdef NODLOPEN + (*libc_pthread_sigmask)(SIG_SETMASK,oset,NULL); +#endif +} + +#else + +#define LOCK_VAR +#define TRICKLE_LOCK +#define TRICKLE_UNLOCK + +#endif + static void trickle_init(void) { @@ -314,9 +348,10 @@ socket(int domain, int type, int protocol) { int sock; struct sockdesc *sd; - trickle_lock(); + LOCK_VAR; + TRICKLE_LOCK; INIT; - trickle_unlock(); + TRICKLE_UNLOCK; sock = (*libc_socket)(domain, type, protocol); @@ -328,9 +363,9 @@ socket(int domain, int type, int protocol) if (sock != -1 && domain == AF_INET && type == SOCK_STREAM) { if ((sd = calloc(1, sizeof(*sd))) == NULL) return (-1); - trickle_lock(); + TRICKLE_LOCK; if ((sd->stat = bwstat_new()) == NULL) { - trickle_unlock(); + TRICKLE_UNLOCK; free(sd); return (-1); } @@ -341,7 +376,7 @@ socket(int domain, int type, int protocol) sd->sock = sock; TAILQ_INSERT_TAIL(&sdhead, sd, next); - trickle_unlock(); + TRICKLE_UNLOCK; } return (sock); @@ -351,8 +386,9 @@ int close(int fd) { struct sockdesc *sd, *next; + LOCK_VAR; - trickle_lock(); + TRICKLE_LOCK; INIT; #ifdef DEBUG @@ -373,7 +409,7 @@ close(int fd) trickled_close(&trickled); trickled_open(&trickled); } - trickle_unlock(); + TRICKLE_UNLOCK; return ((*libc_close)(fd)); } @@ -460,6 +496,7 @@ select(int nfds, fd_set *rfds, fd_set *wfds, fd_set *efds, struct delayhead dhead; struct delay *d, *_d; int ret; + LOCK_VAR; #ifdef DEBUG safe_printv(0, "[DEBUG] select(%d)", nfds); @@ -473,7 +510,7 @@ select(int nfds, fd_set *rfds, fd_set *wfds, fd_set *efds, timeout = &_timeout; } - trickle_lock(); + TRICKLE_LOCK; INIT; /* @@ -517,9 +554,9 @@ select(int nfds, fd_set *rfds, fd_set *wfds, fd_set *efds, #ifdef DEBUG safe_printv(0, "[DEBUG] IN select(%d)", nfds); #endif /* DEBUG */ - trickle_unlock(); + TRICKLE_UNLOCK; ret = (*libc_select)(nfds, rfds, wfds, efds, selecttv); - trickle_lock(); + TRICKLE_LOCK; #ifdef DEBUG safe_printv(0, "[DEBUG] OUT select(%d) = %d", nfds, ret); @@ -543,7 +580,7 @@ select(int nfds, fd_set *rfds, fd_set *wfds, fd_set *efds, TAILQ_REMOVE(&dhead, d, next); free(d); } - trickle_unlock(); + TRICKLE_UNLOCK; return (ret); } @@ -569,6 +606,7 @@ poll(struct pollfd *fds, int nfds, int __timeout) struct timeval inittv, curtv, _timeout, *timeout = NULL, *delaytv, *polltv, difftv; struct delayhead dhead; + LOCK_VAR; #if defined(DEBUG) || defined(DEBUG_POLL) safe_printv(0, "[DEBUG] poll(*, %d, %d)", nfds, __timeout); @@ -582,7 +620,7 @@ poll(struct pollfd *fds, int nfds, int __timeout) TAILQ_INIT(&dhead); - trickle_lock(); + TRICKLE_LOCK; INIT; for (i = 0; i < nfds; i++) { @@ -638,13 +676,13 @@ poll(struct pollfd *fds, int nfds, int __timeout) #if defined(DEBUG) || defined(DEBUG_POLL) safe_printv(0, "[DEBUG] IN poll(*, %d, %d)", nfds, polltimeout); #endif /* DEBUG */ - trickle_unlock(); + TRICKLE_UNLOCK; ret = (*libc_poll)((struct pollfd *)fds, (int)nfds, (int)polltimeout); #if defined(DEBUG) || defined(DEBUG_POLL) safe_printv(0, "[DEBUG] OUT poll(%d) = %d", nfds, ret); #endif /* DEBUG */ - trickle_lock(); + TRICKLE_LOCK; if (ret == 0 && delaytv != NULL && polltv == delaytv) { _d = select_shift(&dhead, &inittv, &delaytv); while ((d = TAILQ_FIRST(&dhead)) != NULL && d != _d) { @@ -664,7 +702,7 @@ poll(struct pollfd *fds, int nfds, int __timeout) TAILQ_REMOVE(&dhead, d, next); free(d); } - trickle_unlock(); + TRICKLE_UNLOCK; return (ret); } @@ -674,14 +712,15 @@ read(int fd, void *buf, size_t nbytes) ssize_t ret = -1; size_t xnbytes = nbytes; int eagain; + LOCK_VAR; - trickle_lock(); + TRICKLE_LOCK; INIT; if (!(eagain = delay(fd, &xnbytes, TRICKLE_RECV) == TRICKLE_WOULDBLOCK)) { - trickle_unlock(); + TRICKLE_UNLOCK; ret = (*libc_read)(fd, buf, xnbytes); - trickle_lock(); + TRICKLE_LOCK; #ifdef DEBUG safe_printv(0, "[DEBUG] read(%d, *, %d) = %d", fd, xnbytes, ret); } else { @@ -690,7 +729,7 @@ read(int fd, void *buf, size_t nbytes) } update(fd, ret, TRICKLE_RECV); - trickle_unlock(); + TRICKLE_UNLOCK; if (eagain) { ret = -1; @@ -709,17 +748,18 @@ readv(int fd, const struct iovec *iov, int iovcnt) size_t len = 0; ssize_t ret = -1; int i, eagain; + LOCK_VAR; for (i = 0; i < iovcnt; i++) len += iov[i].iov_len; - trickle_lock(); + TRICKLE_LOCK; INIT; if (!(eagain = delay(fd, &len, TRICKLE_RECV) == TRICKLE_WOULDBLOCK)) { - trickle_unlock(); + TRICKLE_UNLOCK; ret = (*libc_readv)(fd, iov, iovcnt); - trickle_lock(); + TRICKLE_LOCK; #ifdef DEBUG safe_printv(0, "[DEBUG] readv(%d, *, %d) = %d", fd, iovcnt, ret); } else { @@ -728,7 +768,7 @@ readv(int fd, const struct iovec *iov, int iovcnt) } update(fd, ret, TRICKLE_RECV); - trickle_unlock(); + TRICKLE_UNLOCK; if (eagain) { errno = EAGAIN; @@ -745,14 +785,15 @@ recv(int sock, void *buf, size_t len, int flags) ssize_t ret = -1; size_t xlen = len; int eagain; + LOCK_VAR; - trickle_lock(); + TRICKLE_LOCK; INIT; if (!(eagain = delay(sock, &xlen, TRICKLE_RECV) == TRICKLE_WOULDBLOCK)) { - trickle_unlock(); + TRICKLE_UNLOCK; ret = (*libc_recv)(sock, buf, xlen, flags); - trickle_lock(); + TRICKLE_LOCK; #ifdef DEBUG safe_printv(0, "[DEBUG] recv(%d, *, %d, %d) = %d", sock, len, flags, ret); @@ -762,7 +803,7 @@ recv(int sock, void *buf, size_t len, int flags) } update(sock, ret, TRICKLE_RECV); - trickle_unlock(); + TRICKLE_UNLOCK; if (eagain) { errno = EAGAIN; @@ -786,14 +827,15 @@ recvfrom(int sock, void *buf, size_t len, int flags, struct sockaddr *from, ssize_t ret = -1; size_t xlen = len; int eagain; + LOCK_VAR; - trickle_lock(); + TRICKLE_LOCK; INIT; if (!(eagain = delay(sock, &xlen, TRICKLE_RECV) == TRICKLE_WOULDBLOCK)) { - trickle_unlock(); + TRICKLE_UNLOCK; ret = (*libc_recvfrom)(sock, buf, xlen, flags, from, fromlen); - trickle_lock(); + TRICKLE_LOCK; #ifdef DEBUG safe_printv(0, "[DEBUG] recvfrom(%d, *, %d, %d) = %d", sock, len, flags, ret); @@ -804,7 +846,7 @@ recvfrom(int sock, void *buf, size_t len, int flags, struct sockaddr *from, } update(sock, ret, TRICKLE_RECV); - trickle_unlock(); + TRICKLE_UNLOCK; if (eagain) { errno = EAGAIN; @@ -820,14 +862,15 @@ write(int fd, const void *buf, size_t len) ssize_t ret = -1; size_t xlen = len; int eagain; + LOCK_VAR; - trickle_lock(); + TRICKLE_LOCK; INIT; if (!(eagain = delay(fd, &xlen, TRICKLE_SEND) == TRICKLE_WOULDBLOCK)) { - trickle_unlock(); + TRICKLE_UNLOCK; ret = (*libc_write)(fd, buf, xlen); - trickle_lock(); + TRICKLE_LOCK; #ifdef DEBUG safe_printv(0, "[DEBUG] write(%d, *, %d) = %d", fd, len, ret); } else { @@ -836,7 +879,7 @@ write(int fd, const void *buf, size_t len) } update(fd, ret, TRICKLE_SEND); - trickle_unlock(); + TRICKLE_UNLOCK; if (eagain) { errno = EAGAIN; @@ -855,17 +898,18 @@ writev(int fd, const struct iovec *iov, int iovcnt) ssize_t ret = -1; size_t len = 0; int i, eagain; + LOCK_VAR; for (i = 0; i < iovcnt; i++) len += iov[i].iov_len; - trickle_lock(); + TRICKLE_LOCK; INIT; if (!(eagain = delay(fd, &len, TRICKLE_SEND) == TRICKLE_WOULDBLOCK)) { - trickle_unlock(); + TRICKLE_UNLOCK; ret = (*libc_writev)(fd, iov, iovcnt); - trickle_lock(); + TRICKLE_LOCK; #ifdef DEBUG safe_printv(0, "[DEBUG] writev(%d, *, %d) = %d", fd, iovcnt, ret); @@ -875,7 +919,7 @@ writev(int fd, const struct iovec *iov, int iovcnt) } update(fd, ret, TRICKLE_SEND); - trickle_unlock(); + TRICKLE_UNLOCK; if (eagain) { errno = EAGAIN; @@ -892,14 +936,15 @@ send(int sock, const void *buf, size_t len, int flags) ssize_t ret = -1; size_t xlen = len; int eagain; + LOCK_VAR; - trickle_lock(); + TRICKLE_LOCK; INIT; if (!(eagain = delay(sock, &xlen, TRICKLE_SEND) == TRICKLE_WOULDBLOCK)) { - trickle_unlock(); + TRICKLE_UNLOCK; ret = (*libc_send)(sock, buf, xlen, flags); - trickle_lock(); + TRICKLE_LOCK; #ifdef DEBUG safe_printv(0, "[DEBUG] send(%d, *, %d, %d) = %d", sock, len, flags, ret); @@ -910,7 +955,7 @@ send(int sock, const void *buf, size_t len, int flags) } update(sock, ret, TRICKLE_SEND); - trickle_unlock(); + TRICKLE_UNLOCK; if (eagain) { errno = EAGAIN; @@ -928,14 +973,15 @@ sendto(int sock, const void *buf, size_t len, int flags, const struct sockaddr * ssize_t ret = -1; size_t xlen = len; int eagain; + LOCK_VAR; - trickle_lock(); + TRICKLE_LOCK; INIT; if (!(eagain = delay(sock, &xlen, TRICKLE_SEND) == TRICKLE_WOULDBLOCK)) { - trickle_unlock(); + TRICKLE_UNLOCK; ret = (*libc_sendto)(sock, buf, xlen, flags, to, tolen); - trickle_lock(); + TRICKLE_LOCK; #ifdef DEBUG safe_printv(0, "[DEBUG] sendto(%d, *, %d) = %d", sock, len, ret); } else { @@ -944,7 +990,7 @@ sendto(int sock, const void *buf, size_t len, int flags, const struct sockaddr * } update(sock, ret, TRICKLE_SEND); - trickle_unlock(); + TRICKLE_UNLOCK; if (eagain) { errno = EAGAIN; @@ -971,10 +1017,11 @@ dup(int oldfd) { int newfd; struct sockdesc *sd, *nsd; + LOCK_VAR; - trickle_lock(); + TRICKLE_LOCK; INIT; - trickle_unlock(); + TRICKLE_UNLOCK; newfd = (*libc_dup)(oldfd); @@ -982,14 +1029,14 @@ dup(int oldfd) safe_printv(0, "[DEBUG] dup(%d) = %d", oldfd, newfd); #endif /* DEBUG */ - trickle_lock(); + TRICKLE_LOCK; TAILQ_FOREACH(sd, &sdhead, next) if (oldfd == sd->sock) break; if (sd != NULL && newfd != -1) { if ((nsd = malloc(sizeof(*nsd))) == NULL) { - trickle_unlock(); + TRICKLE_UNLOCK; (*libc_close)(newfd); return (-1); } @@ -997,7 +1044,7 @@ dup(int oldfd) memcpy(nsd, sd, sizeof(*nsd)); TAILQ_INSERT_TAIL(&sdhead, nsd, next); } - trickle_unlock(); + TRICKLE_UNLOCK; return (newfd); } @@ -1007,10 +1054,11 @@ dup2(int oldfd, int newfd) { struct sockdesc *sd, *nsd; int ret; + LOCK_VAR; - trickle_lock(); + TRICKLE_LOCK; INIT; - trickle_unlock(); + TRICKLE_UNLOCK; ret = (*libc_dup2)(oldfd, newfd); @@ -1018,21 +1066,21 @@ dup2(int oldfd, int newfd) safe_printv(0, "[DEBUG] dup2(%d, %d) = %d", oldfd, newfd, ret); #endif /* DEBUG */ - trickle_lock(); + TRICKLE_LOCK; TAILQ_FOREACH(sd, &sdhead, next) if (oldfd == sd->sock) break; if (sd != NULL && ret != -1) { if ((nsd = malloc(sizeof(*nsd))) == NULL) { - trickle_unlock(); + TRICKLE_UNLOCK; return (-1); } sd->sock = newfd; memcpy(nsd, sd, sizeof(*nsd)); TAILQ_INSERT_TAIL(&sdhead, nsd, next); } - trickle_unlock(); + TRICKLE_UNLOCK; return (ret); } @@ -1047,10 +1095,11 @@ accept(int sock, struct sockaddr *addr, socklen_t *addrlen) { int ret; struct sockdesc *sd; + LOCK_VAR; - trickle_lock(); + TRICKLE_LOCK; INIT; - trickle_unlock(); + TRICKLE_UNLOCK; ret = (*libc_accept)(sock, addr, addrlen); @@ -1061,9 +1110,9 @@ accept(int sock, struct sockaddr *addr, socklen_t *addrlen) if (ret != -1) { if ((sd = calloc(1, sizeof(*sd))) == NULL) return (ret); - trickle_lock(); + TRICKLE_LOCK; if ((sd->stat = bwstat_new()) == NULL) { - trickle_unlock(); + TRICKLE_UNLOCK; free(sd); return (ret); } @@ -1072,7 +1121,7 @@ accept(int sock, struct sockaddr *addr, socklen_t *addrlen) sd->stat->lsmooth = lsmooth; sd->stat->tsmooth = tsmooth; TAILQ_INSERT_TAIL(&sdhead, sd, next); - trickle_unlock(); + TRICKLE_UNLOCK; } return (ret); @@ -1084,8 +1133,9 @@ sendfile(int out_fd, int in_fd, off_t *offset, size_t count) { size_t inbytes = count, outbytes = count, bytes; ssize_t ret = 0; + LOCK_VAR; - trickle_lock(); + TRICKLE_LOCK; INIT; /* in_fd = recv, out_fd = send */ @@ -1093,7 +1143,7 @@ sendfile(int out_fd, int in_fd, off_t *offset, size_t count) /* We should never get TRICKLE_WOULDBLOCK here */ delay(in_fd, &inbytes, TRICKLE_RECV); delay(out_fd, &outbytes, TRICKLE_SEND); - trickle_unlock(); + TRICKLE_UNLOCK; /* This is a slightly ugly hack. */ bytes = MIN(inbytes, outbytes);