diff options
Diffstat (limited to 'librabbitmq/amqp_connection.c')
-rw-r--r-- | librabbitmq/amqp_connection.c | 50 |
1 files changed, 29 insertions, 21 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 8623eed..63af96a 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -52,17 +52,13 @@ #include <stdio.h> #include <string.h> #include <stdint.h> -#include <errno.h> - -#include <unistd.h> -#include <sys/uio.h> -#include <sys/types.h> +#include <assert.h> #include "amqp.h" #include "amqp_framing.h" #include "amqp_private.h" -#include <assert.h> +#include "socket.h" #define INITIAL_FRAME_POOL_PAGE_SIZE 65536 #define INITIAL_DECODING_POOL_PAGE_SIZE 131072 @@ -151,7 +147,7 @@ int amqp_tune_connection(amqp_connection_state_t state, newbuf = realloc(state->outbound_buffer.bytes, frame_max); if (newbuf == NULL) { amqp_destroy_connection(state); - return -ENOMEM; + return -ERROR_NO_MEMORY; } state->outbound_buffer.bytes = newbuf; @@ -170,6 +166,15 @@ void amqp_destroy_connection(amqp_connection_state_t state) { free(state); } +int amqp_end_connection(amqp_connection_state_t state) { + int s = state->sockfd; + amqp_destroy_connection(state); + if (socket_close(s) < 0) + return -encoded_socket_errno(); + else + return 0; +} + static void return_to_idle(amqp_connection_state_t state) { state->inbound_buffer.bytes = NULL; state->inbound_offset = 0; @@ -199,7 +204,7 @@ int amqp_handle_input(amqp_connection_state_t state, /* state->inbound_buffer.len is always nonzero, because it corresponds to frame_max, which is not permitted to be less than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */ - return -ENOMEM; + return -ERROR_NO_MEMORY; } state->state = CONNECTION_STATE_WAITING_FOR_HEADER; } @@ -246,7 +251,7 @@ int amqp_handle_input(amqp_connection_state_t state, /* Check frame end marker (footer) */ if (D_8(state->inbound_buffer, state->target_size - 1) != AMQP_FRAME_END) { - return -EINVAL; + return -ERROR_BAD_AMQP_DATA; } decoded_frame->channel = D_16(state->inbound_buffer, 1); @@ -392,7 +397,7 @@ static int inner_send_frame(amqp_connection_state_t state, break; default: - return -EINVAL; + abort(); } E_32(state->outbound_buffer, 3, *payload_len); @@ -419,16 +424,14 @@ int amqp_send_frame(amqp_connection_state_t state, amqp_frame_t const *frame) { amqp_bytes_t encoded; - int payload_len; - int separate_body; + int payload_len, res; - separate_body = inner_send_frame(state, frame, &encoded, &payload_len); - switch (separate_body) { + res = inner_send_frame(state, frame, &encoded, &payload_len); + switch (res) { case 0: - AMQP_CHECK_RESULT(write(state->sockfd, - state->outbound_buffer.bytes, - payload_len + (HEADER_SIZE + FOOTER_SIZE))); - return 0; + res = socket_write(state->sockfd, state->outbound_buffer.bytes, + payload_len + (HEADER_SIZE + FOOTER_SIZE)); + break; case 1: { struct iovec iov[3]; @@ -440,13 +443,18 @@ int amqp_send_frame(amqp_connection_state_t state, iov[2].iov_base = &frame_end_byte; assert(FOOTER_SIZE == 1); iov[2].iov_len = FOOTER_SIZE; - AMQP_CHECK_RESULT(writev(state->sockfd, &iov[0], 3)); - return 0; + res = socket_writev(state->sockfd, &iov[0], 3); + break; } default: - return separate_body; + return res; } + + if (res < 0) + return -encoded_socket_errno(); + else + return 0; } int amqp_send_frame_to(amqp_connection_state_t state, |