summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_connection.c')
-rw-r--r--librabbitmq/amqp_connection.c50
1 files changed, 29 insertions, 21 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 8623eed..63af96a 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -52,17 +52,13 @@
#include <stdio.h>
#include <string.h>
#include <stdint.h>
-#include <errno.h>
-
-#include <unistd.h>
-#include <sys/uio.h>
-#include <sys/types.h>
+#include <assert.h>
#include "amqp.h"
#include "amqp_framing.h"
#include "amqp_private.h"
-#include <assert.h>
+#include "socket.h"
#define INITIAL_FRAME_POOL_PAGE_SIZE 65536
#define INITIAL_DECODING_POOL_PAGE_SIZE 131072
@@ -151,7 +147,7 @@ int amqp_tune_connection(amqp_connection_state_t state,
newbuf = realloc(state->outbound_buffer.bytes, frame_max);
if (newbuf == NULL) {
amqp_destroy_connection(state);
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
state->outbound_buffer.bytes = newbuf;
@@ -170,6 +166,15 @@ void amqp_destroy_connection(amqp_connection_state_t state) {
free(state);
}
+int amqp_end_connection(amqp_connection_state_t state) {
+ int s = state->sockfd;
+ amqp_destroy_connection(state);
+ if (socket_close(s) < 0)
+ return -encoded_socket_errno();
+ else
+ return 0;
+}
+
static void return_to_idle(amqp_connection_state_t state) {
state->inbound_buffer.bytes = NULL;
state->inbound_offset = 0;
@@ -199,7 +204,7 @@ int amqp_handle_input(amqp_connection_state_t state,
/* state->inbound_buffer.len is always nonzero, because it
corresponds to frame_max, which is not permitted to be less
than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */
- return -ENOMEM;
+ return -ERROR_NO_MEMORY;
}
state->state = CONNECTION_STATE_WAITING_FOR_HEADER;
}
@@ -246,7 +251,7 @@ int amqp_handle_input(amqp_connection_state_t state,
/* Check frame end marker (footer) */
if (D_8(state->inbound_buffer, state->target_size - 1) != AMQP_FRAME_END) {
- return -EINVAL;
+ return -ERROR_BAD_AMQP_DATA;
}
decoded_frame->channel = D_16(state->inbound_buffer, 1);
@@ -392,7 +397,7 @@ static int inner_send_frame(amqp_connection_state_t state,
break;
default:
- return -EINVAL;
+ abort();
}
E_32(state->outbound_buffer, 3, *payload_len);
@@ -419,16 +424,14 @@ int amqp_send_frame(amqp_connection_state_t state,
amqp_frame_t const *frame)
{
amqp_bytes_t encoded;
- int payload_len;
- int separate_body;
+ int payload_len, res;
- separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
- switch (separate_body) {
+ res = inner_send_frame(state, frame, &encoded, &payload_len);
+ switch (res) {
case 0:
- AMQP_CHECK_RESULT(write(state->sockfd,
- state->outbound_buffer.bytes,
- payload_len + (HEADER_SIZE + FOOTER_SIZE)));
- return 0;
+ res = socket_write(state->sockfd, state->outbound_buffer.bytes,
+ payload_len + (HEADER_SIZE + FOOTER_SIZE));
+ break;
case 1: {
struct iovec iov[3];
@@ -440,13 +443,18 @@ int amqp_send_frame(amqp_connection_state_t state,
iov[2].iov_base = &frame_end_byte;
assert(FOOTER_SIZE == 1);
iov[2].iov_len = FOOTER_SIZE;
- AMQP_CHECK_RESULT(writev(state->sockfd, &iov[0], 3));
- return 0;
+ res = socket_writev(state->sockfd, &iov[0], 3);
+ break;
}
default:
- return separate_body;
+ return res;
}
+
+ if (res < 0)
+ return -encoded_socket_errno();
+ else
+ return 0;
}
int amqp_send_frame_to(amqp_connection_state_t state,