From a78aa8ab906c97dc3f4123048519a2953fa96fb2 Mon Sep 17 00:00:00 2001 From: Alan Antonuk Date: Mon, 17 Feb 2014 19:01:00 -0800 Subject: Use poll(2) for timeouts on socket Use poll(2) instead of select(2) to do timeout operations on sockets. This helps with the situation where the fd is larger than FD_MAXSIZE. Fixes #168 --- librabbitmq/amqp_socket.c | 47 +++++++++++++++++++++++++++++++---------------- librabbitmq/amqp_timer.h | 5 ++++- 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 79a7696..6e54f7e 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -42,6 +42,7 @@ #include "amqp_timer.h" #include +#include #include #include #include @@ -64,9 +65,14 @@ # include # include # include +# include # include #endif +#ifdef _WIN32 +# define poll(fdarray, nfds, timeout) WSAPoll(fdarray, nfds, timeout) +#endif + static int amqp_os_socket_init(void) { @@ -275,7 +281,9 @@ int amqp_open_socket_noblock(char const *hostname, AMQP_INIT_TIMER(timer) - if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) { + if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0 || + INT_MAX < ((uint64_t)timeout->tv_sec * AMQP_MS_PER_S + + (uint64_t)timeout->tv_usec / AMQP_US_PER_MS))) { return AMQP_STATUS_INVALID_PARAMETER; } @@ -349,14 +357,12 @@ int amqp_open_socket_noblock(char const *hostname, #endif while(1) { - fd_set write_fd; - fd_set except_fd; + struct pollfd pfd; + int timeout_ms; - FD_ZERO(&write_fd); - FD_SET(sockfd, &write_fd); - - FD_ZERO(&except_fd); - FD_SET(sockfd, &except_fd); + pfd.fd = sockfd; + pfd.events = POLLERR | POLLOUT; + pfd.revents = 0; timer_error = amqp_timer_update(&timer, timeout); @@ -365,11 +371,13 @@ int amqp_open_socket_noblock(char const *hostname, break; } + timeout_ms = timer.tv.tv_sec * AMQP_MS_PER_S + + timer.tv.tv_usec / AMQP_US_PER_MS; /* Win32 requires except_fds to be passed to detect connection * failure. Other platforms only need write_fds, passing except_fds * seems to be harmless otherwise */ - res = select(sockfd+1, NULL, &write_fd, &except_fd, &timer.tv); + res = poll(&pfd, 1, timeout_ms); if (res > 0) { int result; @@ -547,22 +555,29 @@ static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, stru if (timeout) { int fd; - fd_set read_fd; - fd_set except_fd; fd = amqp_get_sockfd(state); if (-1 == fd) { return AMQP_STATUS_CONNECTION_CLOSED; } + if (INT_MAX < (uint64_t)timeout->tv_sec * AMQP_MS_PER_S + + (uint64_t)timeout->tv_usec / AMQP_US_PER_MS) { + return AMQP_STATUS_INVALID_PARAMETER; + } + while (1) { - FD_ZERO(&read_fd); - FD_SET(fd, &read_fd); + struct pollfd pfd; + int timeout_ms; + + pfd.fd = fd; + pfd.events = POLLIN; + pfd.revents = 0; - FD_ZERO(&except_fd); - FD_SET(fd, &except_fd); + timeout_ms = timeout->tv_sec * AMQP_MS_PER_S + + timeout->tv_usec * AMQP_US_PER_MS; - res = select(fd + 1, &read_fd, NULL, &except_fd, timeout); + res = poll(&pfd, 1, timeout_ms); if (0 < res) { break; diff --git a/librabbitmq/amqp_timer.h b/librabbitmq/amqp_timer.h index 8ac3de9..10254a5 100644 --- a/librabbitmq/amqp_timer.h +++ b/librabbitmq/amqp_timer.h @@ -37,7 +37,10 @@ # include #endif -#define AMQP_NS_PER_S 1000000000 +#define AMQP_MS_PER_S 1000 +#define AMQP_US_PER_MS 1000 +#define AMQP_NS_PER_S 1000000000 +#define AMQP_NS_PER_MS 1000000 #define AMQP_NS_PER_US 1000 #define AMQP_INIT_TIMER(structure) { \ -- cgit v1.2.1