diff options
author | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-05-04 20:31:54 +0100 |
---|---|---|
committer | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-05-04 20:31:54 +0100 |
commit | 3238a4fb8525a7da066528c4f735412d4fabadfb (patch) | |
tree | adb3a5245c870505dbb322536aae5a12da0d5ec1 | |
parent | 107d08b387e3f43aed3e11465c6c0cca64f89b4a (diff) | |
download | rabbitmq-c-github-ask-3238a4fb8525a7da066528c4f735412d4fabadfb.tar.gz |
Split out mid-level API from amqp_socket.c into amqp_api.c
-rw-r--r-- | librabbitmq/Makefile.am | 2 | ||||
-rw-r--r-- | librabbitmq/amqp_api.c | 101 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 90 |
3 files changed, 102 insertions, 91 deletions
diff --git a/librabbitmq/Makefile.am b/librabbitmq/Makefile.am index 95b6796..44b9d78 100644 --- a/librabbitmq/Makefile.am +++ b/librabbitmq/Makefile.am @@ -1,6 +1,6 @@ lib_LTLIBRARIES = librabbitmq.la -librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c amqp_debug.c +librabbitmq_la_SOURCES = amqp_mem.c amqp_table.c amqp_connection.c amqp_socket.c amqp_debug.c amqp_api.c nodist_librabbitmq_la_SOURCES = amqp_framing.c include_HEADERS = amqp_framing.h amqp.h noinst_librabbitmq_la_INCLUDES = amqp_private.h diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c new file mode 100644 index 0000000..fc8b56c --- /dev/null +++ b/librabbitmq/amqp_api.c @@ -0,0 +1,101 @@ +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <stdint.h> +#include <errno.h> + +#include "amqp.h" +#include "amqp_framing.h" +#include "amqp_private.h" + +#include <assert.h> + +int amqp_basic_publish(amqp_connection_state_t state, + amqp_bytes_t exchange, + amqp_bytes_t routing_key, + amqp_boolean_t mandatory, + amqp_boolean_t immediate, + amqp_basic_properties_t const *properties, + amqp_bytes_t body) +{ + amqp_frame_t f; + size_t body_offset; + size_t usable_body_payload_size = state->frame_max - (HEADER_SIZE + FOOTER_SIZE); + + amqp_basic_publish_t m = + (amqp_basic_publish_t) { + .exchange = exchange, + .routing_key = routing_key, + .mandatory = mandatory, + .immediate = immediate + }; + + amqp_basic_properties_t default_properties; + + AMQP_CHECK_RESULT(amqp_send_method(state, 1, AMQP_BASIC_PUBLISH_METHOD, &m)); + + if (properties == NULL) { + memset(&default_properties, 0, sizeof(default_properties)); + properties = &default_properties; + } + + f.frame_type = AMQP_FRAME_HEADER; + f.channel = 1; + f.payload.properties.class_id = AMQP_BASIC_CLASS; + f.payload.properties.body_size = body.len; + f.payload.properties.decoded = (void *) properties; + AMQP_CHECK_RESULT(amqp_send_frame(state, &f)); + + body_offset = 0; + while (1) { + int remaining = body.len - body_offset; + assert(remaining >= 0); + + if (remaining == 0) + break; + + f.frame_type = AMQP_FRAME_BODY; + f.channel = 1; + f.payload.body_fragment.bytes = BUF_AT(body, body_offset); + if (remaining >= usable_body_payload_size) { + f.payload.body_fragment.len = usable_body_payload_size; + } else { + f.payload.body_fragment.len = remaining; + } + + body_offset += f.payload.body_fragment.len; + AMQP_CHECK_RESULT(amqp_send_frame(state, &f)); + } + + return 0; +} + +amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, int code) { + amqp_channel_close_t s = + (amqp_channel_close_t) { + .reply_code = code, + .reply_text = {.len = 0, .bytes = NULL}, + .class_id = 0, + .method_id = 0 + }; + return amqp_simple_rpc(state, + 1, + AMQP_CHANNEL_CLOSE_METHOD, + AMQP_CHANNEL_CLOSE_OK_METHOD, + &s); +} + +amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code) { + amqp_connection_close_t s = + (amqp_connection_close_t) { + .reply_code = code, + .reply_text = {.len = 0, .bytes = NULL}, + .class_id = 0, + .method_id = 0 + }; + return amqp_simple_rpc(state, + 0, + AMQP_CONNECTION_CLOSE_METHOD, + AMQP_CONNECTION_CLOSE_OK_METHOD, + &s); +} diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 2162eb4..8d0bcb9 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -362,93 +362,3 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, result.library_errno = 0; return result; } - -int amqp_basic_publish(amqp_connection_state_t state, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_boolean_t mandatory, - amqp_boolean_t immediate, - amqp_basic_properties_t const *properties, - amqp_bytes_t body) -{ - amqp_frame_t f; - size_t body_offset; - size_t usable_body_payload_size = state->frame_max - (HEADER_SIZE + FOOTER_SIZE); - - amqp_basic_publish_t m = - (amqp_basic_publish_t) { - .exchange = exchange, - .routing_key = routing_key, - .mandatory = mandatory, - .immediate = immediate - }; - - amqp_basic_properties_t default_properties; - - AMQP_CHECK_RESULT(amqp_send_method(state, 1, AMQP_BASIC_PUBLISH_METHOD, &m)); - - if (properties == NULL) { - memset(&default_properties, 0, sizeof(default_properties)); - properties = &default_properties; - } - - f.frame_type = AMQP_FRAME_HEADER; - f.channel = 1; - f.payload.properties.class_id = AMQP_BASIC_CLASS; - f.payload.properties.body_size = body.len; - f.payload.properties.decoded = (void *) properties; - AMQP_CHECK_RESULT(amqp_send_frame(state, &f)); - - body_offset = 0; - while (1) { - int remaining = body.len - body_offset; - assert(remaining >= 0); - - if (remaining == 0) - break; - - f.frame_type = AMQP_FRAME_BODY; - f.channel = 1; - f.payload.body_fragment.bytes = BUF_AT(body, body_offset); - if (remaining >= usable_body_payload_size) { - f.payload.body_fragment.len = usable_body_payload_size; - } else { - f.payload.body_fragment.len = remaining; - } - - body_offset += f.payload.body_fragment.len; - AMQP_CHECK_RESULT(amqp_send_frame(state, &f)); - } - - return 0; -} - -amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, int code) { - amqp_channel_close_t s = - (amqp_channel_close_t) { - .reply_code = code, - .reply_text = {.len = 0, .bytes = NULL}, - .class_id = 0, - .method_id = 0 - }; - return amqp_simple_rpc(state, - 1, - AMQP_CHANNEL_CLOSE_METHOD, - AMQP_CHANNEL_CLOSE_OK_METHOD, - &s); -} - -amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code) { - amqp_connection_close_t s = - (amqp_connection_close_t) { - .reply_code = code, - .reply_text = {.len = 0, .bytes = NULL}, - .class_id = 0, - .method_id = 0 - }; - return amqp_simple_rpc(state, - 0, - AMQP_CONNECTION_CLOSE_METHOD, - AMQP_CONNECTION_CLOSE_OK_METHOD, - &s); -} |