diff options
author | Ask Solem <ask@celeryproject.org> | 2014-10-24 12:35:01 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2014-10-24 12:35:01 +0100 |
commit | 185ce081e3efc846b476995b7da7297bb0eec82c (patch) | |
tree | 98adf852d24384b34b5a462907c2dce732c6821b /librabbitmq/amqp_socket.c | |
parent | be3000b4c84d7503f5ef4067de44ff16d060d158 (diff) | |
parent | 9626dd5cd5f78894f1416a1afd2d624ddd4904ae (diff) | |
download | rabbitmq-c-github-ask-master.tar.gz |
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r-- | librabbitmq/amqp_socket.c | 74 |
1 files changed, 54 insertions, 20 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 79a7696..29a7389 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -3,7 +3,7 @@ * ***** BEGIN LICENSE BLOCK ***** * Version: MIT * - * Portions created by Alan Antonuk are Copyright (c) 2012-2013 + * Portions created by Alan Antonuk are Copyright (c) 2012-2014 * Alan Antonuk. All Rights Reserved. * * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. @@ -42,6 +42,7 @@ #include "amqp_timer.h" #include <assert.h> +#include <limits.h> #include <stdarg.h> #include <stdint.h> #include <stdio.h> @@ -64,9 +65,14 @@ # include <netdb.h> # include <sys/uio.h> # include <fcntl.h> +# include <poll.h> # include <unistd.h> #endif +#ifdef _WIN32 +# define poll(fdarray, nfds, timeout) WSAPoll(fdarray, nfds, timeout) +#endif + static int amqp_os_socket_init(void) { @@ -275,7 +281,9 @@ int amqp_open_socket_noblock(char const *hostname, AMQP_INIT_TIMER(timer) - if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) { + if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0 || + INT_MAX < ((uint64_t)timeout->tv_sec * AMQP_MS_PER_S + + (uint64_t)timeout->tv_usec / AMQP_US_PER_MS))) { return AMQP_STATUS_INVALID_PARAMETER; } @@ -300,7 +308,6 @@ int amqp_open_socket_noblock(char const *hostname, for (addr = address_list; addr; addr = addr->ai_next) { if (-1 != sockfd) { amqp_os_socket_close(sockfd); - sockfd = -1; } sockfd = amqp_os_socket_socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); @@ -349,14 +356,12 @@ int amqp_open_socket_noblock(char const *hostname, #endif while(1) { - fd_set write_fd; - fd_set except_fd; + struct pollfd pfd; + int timeout_ms; - FD_ZERO(&write_fd); - FD_SET(sockfd, &write_fd); - - FD_ZERO(&except_fd); - FD_SET(sockfd, &except_fd); + pfd.fd = sockfd; + pfd.events = POLLERR | POLLOUT; + pfd.revents = 0; timer_error = amqp_timer_update(&timer, timeout); @@ -365,11 +370,13 @@ int amqp_open_socket_noblock(char const *hostname, break; } + timeout_ms = timer.tv.tv_sec * AMQP_MS_PER_S + + timer.tv.tv_usec / AMQP_US_PER_MS; /* Win32 requires except_fds to be passed to detect connection * failure. Other platforms only need write_fds, passing except_fds * seems to be harmless otherwise */ - res = select(sockfd+1, NULL, &write_fd, &except_fd, &timer.tv); + res = poll(&pfd, 1, timeout_ms); if (res > 0) { int result; @@ -547,22 +554,29 @@ static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, stru if (timeout) { int fd; - fd_set read_fd; - fd_set except_fd; fd = amqp_get_sockfd(state); if (-1 == fd) { return AMQP_STATUS_CONNECTION_CLOSED; } + if (INT_MAX < (uint64_t)timeout->tv_sec * AMQP_MS_PER_S + + (uint64_t)timeout->tv_usec / AMQP_US_PER_MS) { + return AMQP_STATUS_INVALID_PARAMETER; + } + while (1) { - FD_ZERO(&read_fd); - FD_SET(fd, &read_fd); + struct pollfd pfd; + int timeout_ms; + + pfd.fd = fd; + pfd.events = POLLIN; + pfd.revents = 0; - FD_ZERO(&except_fd); - FD_SET(fd, &except_fd); + timeout_ms = timeout->tv_sec * AMQP_MS_PER_S + + timeout->tv_usec / AMQP_US_PER_MS; - res = select(fd + 1, &read_fd, NULL, &except_fd, timeout); + res = poll(&pfd, 1, timeout_ms); if (0 < res) { break; @@ -1144,7 +1158,7 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, } { - amqp_table_entry_t default_properties[2]; + amqp_table_entry_t default_properties[5]; amqp_table_t default_table; amqp_connection_start_ok_t s; amqp_pool_t *channel_pool; @@ -1168,9 +1182,27 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, default_properties[0].value.value.bytes = amqp_cstring_bytes("rabbitmq-c"); - default_properties[1].key = amqp_cstring_bytes("information"); + /* version */ + default_properties[1].key = amqp_cstring_bytes("version"); default_properties[1].value.kind = AMQP_FIELD_KIND_UTF8; default_properties[1].value.value.bytes = + amqp_cstring_bytes(AMQP_VERSION_STRING); + + /* platform */ + default_properties[2].key = amqp_cstring_bytes("platform"); + default_properties[2].value.kind = AMQP_FIELD_KIND_UTF8; + default_properties[2].value.value.bytes = + amqp_cstring_bytes(AMQ_PLATFORM); + + /* copyright */ + default_properties[3].key = amqp_cstring_bytes("copyright"); + default_properties[3].value.kind = AMQP_FIELD_KIND_UTF8; + default_properties[3].value.value.bytes = + amqp_cstring_bytes(AMQ_COPYRIGHT); + + default_properties[4].key = amqp_cstring_bytes("information"); + default_properties[4].value.kind = AMQP_FIELD_KIND_UTF8; + default_properties[4].value.value.bytes = amqp_cstring_bytes("See https://github.com/alanxz/rabbitmq-c"); default_table.entries = default_properties; @@ -1245,6 +1277,8 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, if (server_channel_max != 0 && server_channel_max < channel_max) { channel_max = server_channel_max; + } else if (server_channel_max == 0 && channel_max == 0) { + channel_max = UINT16_MAX; } if (server_frame_max != 0 && server_frame_max < frame_max) { |