diff options
author | Alan Antonuk <aega@.med.umich.edu> | 2012-01-13 18:17:07 -0500 |
---|---|---|
committer | Alan Antonuk <aega@.med.umich.edu> | 2012-01-31 14:45:51 -0500 |
commit | 14009a5fd9bee456d9aa8f863512ef674cb1e369 (patch) | |
tree | 7035a1a7142ee0e82b4ee3fdbd9698c3d033d09a | |
parent | a40de904f235baec9fa532063e0a7db71125df65 (diff) | |
download | rabbitmq-c-github-ask-14009a5fd9bee456d9aa8f863512ef674cb1e369.tar.gz |
Reworking internal memory management so that memory can be released per-channel
Also added unit tests
-rw-r--r-- | CMakeLists.txt | 13 | ||||
-rw-r--r-- | librabbitmq/CMakeLists.txt | 29 | ||||
-rw-r--r-- | librabbitmq/amqp.h | 4 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 77 | ||||
-rw-r--r-- | librabbitmq/amqp_mem.c | 280 | ||||
-rw-r--r-- | librabbitmq/amqp_private.h | 179 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 25 | ||||
-rw-r--r-- | tests/CMakeLists.txt | 21 | ||||
-rw-r--r-- | tests/test_hashtable.cpp | 102 | ||||
-rw-r--r-- | tests/test_pools.cpp | 193 | ||||
-rw-r--r-- | tests/test_tables.c | 2 |
11 files changed, 893 insertions, 32 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 34256cd..5acbda6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -65,6 +65,8 @@ set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} ${CMAKE_SOURCE_DIR}/cmake) find_package(POPT) +option(BUILD_SHARED_LIBS "Build rabbitmq as a shared library" ON) + add_subdirectory(librabbitmq) add_subdirectory(examples) @@ -72,4 +74,15 @@ if (POPT_FOUND) add_subdirectory(tools) endif (POPT_FOUND) +option(BUILD_TESTING "Enable unit tests" OFF) + +if (BUILD_TESTING) + if (BUILD_SHARED_LIBS) + message(FATAL_ERROR "Can only test when compiled statically, disable BUILD_SHARED_LIBS to enable testing") + endif (BUILD_SHARED_LIBS) + + enable_testing() + add_subdirectory(tests) +endif (BUILD_TESTING) + add_subdirectory(third-party) diff --git a/librabbitmq/CMakeLists.txt b/librabbitmq/CMakeLists.txt index 80632ed..5a01d28 100644 --- a/librabbitmq/CMakeLists.txt +++ b/librabbitmq/CMakeLists.txt @@ -1,7 +1,5 @@ project(librabbitmq "C") -set(CMAKE_INCLUDE_CURRENT_DIR ON) - # Stuff dealing with code generation set(AMQP_CODEGEN_PY "${CMAKE_CURRENT_BINARY_DIR}/amqp_codegen.py") set(CODEGEN_PY "${CMAKE_CURRENT_BINARY_DIR}/codegen.py") @@ -43,17 +41,34 @@ SET(CONFIG_CONTENTS "#define VERSION \"0.0.1\" file(WRITE "${CMAKE_CURRENT_BINARY_DIR}/config.h" ${CONFIG_CONTENTS}) if(WIN32) - set(SOCKET_IMPL "windows") + set(SOCKET_IMPL "${CMAKE_CURRENT_SOURCE_DIR}/windows") else(WIN32) - set(SOCKET_IMPL "unix") + set(SOCKET_IMPL "${CMAKE_CURRENT_SOURCE_DIR}/unix") endif(WIN32) if(MSVC) set(MSINTTYPES_INCLUDE "${CMAKE_CURRENT_SOURCE_DIR}/../msinttypes") endif(MSVC) -include_directories(${SOCKET_IMPL} ${MSINTTYPES_INCLUDE}) -add_definitions(-DBUILDING_LIBRABBITMQ) +set(LIBRABBITMQ_INCLUDE_DIRS + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_BINARY_DIR} + ${SOCKET_IMPL} + ${MSINTTYPES_INCLUDE} + ) + +include_directories(${LIBRABBITMQ_INCLUDE_DIRS}) + +set(LIBRABBITMQ_INCLUDE_DIRS + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_BINARY_DIR} + ${SOCKET_IMPL} + ${MSINTTYPES_INCLUDE} + PARENT_SCOPE) + +if (BUILD_SHARED_LIBS) + add_definitions(-DBUILDING_LIBRABBITMQ) +endif (BUILD_SHARED_LIBS) set(RABBITMQ_SOURCES ${CMAKE_CURRENT_BINARY_DIR}/amqp_framing.h @@ -64,7 +79,7 @@ set(RABBITMQ_SOURCES ${SOCKET_IMPL}/socket.h ${SOCKET_IMPL}/socket.c ) -add_library(rabbitmq SHARED ${RABBITMQ_SOURCES}) +add_library(rabbitmq ${RABBITMQ_SOURCES}) if(WIN32) target_link_libraries(rabbitmq ws2_32) diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index b756025..6d37091 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -181,6 +181,8 @@ typedef struct amqp_pool_t_ { int next_page; char *alloc_block; size_t alloc_used; + + struct amqp_pool_t_ *next; } amqp_pool_t; typedef struct amqp_method_t_ { @@ -277,6 +279,8 @@ RABBITMQ_EXPORT void amqp_release_buffers(amqp_connection_state_t state); RABBITMQ_EXPORT void amqp_maybe_release_buffers(amqp_connection_state_t state); +RABBITMQ_EXPORT void amqp_maybe_release_buffers_for_channel(amqp_connection_state_t state, amqp_channel_t channel); + RABBITMQ_EXPORT int amqp_send_frame(amqp_connection_state_t state, amqp_frame_t const *frame); diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 886805d..b3d490e 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -46,9 +46,6 @@ #include "amqp_framing.h" #include "amqp_private.h" -#define INITIAL_FRAME_POOL_PAGE_SIZE 65536 -#define INITIAL_DECODING_POOL_PAGE_SIZE 131072 -#define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072 #define ENFORCE_STATE(statevec, statenum) \ { \ @@ -61,19 +58,23 @@ } amqp_connection_state_t amqp_new_connection(void) { + amqp_pool_t* frame_pool = NULL; amqp_connection_state_t state = (amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_)); if (state == NULL) return NULL; - init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE); - init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE); + amqp_init_all_pools(state); if (amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0) != 0) goto out_nomem; - state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len); + frame_pool = amqp_get_frame_pool(state); + if (NULL == frame_pool) + goto out_nomem; + + state->inbound_buffer.bytes = amqp_pool_alloc(frame_pool, state->inbound_buffer.len); if (state->inbound_buffer.bytes == NULL) goto out_nomem; @@ -92,8 +93,7 @@ amqp_connection_state_t amqp_new_connection(void) { out_nomem: free(state->sock_inbound_buffer.bytes); - empty_amqp_pool(&state->frame_pool); - empty_amqp_pool(&state->decoding_pool); + amqp_destroy_all_pools(state); free(state); return NULL; } @@ -121,9 +121,6 @@ int amqp_tune_connection(amqp_connection_state_t state, state->frame_max = frame_max; state->heartbeat = heartbeat; - empty_amqp_pool(&state->frame_pool); - init_amqp_pool(&state->frame_pool, frame_max); - state->inbound_buffer.len = frame_max; state->outbound_buffer.len = frame_max; newbuf = realloc(state->outbound_buffer.bytes, frame_max); @@ -143,8 +140,8 @@ int amqp_get_channel_max(amqp_connection_state_t state) { int amqp_destroy_connection(amqp_connection_state_t state) { int s = state->sockfd; - empty_amqp_pool(&state->frame_pool); - empty_amqp_pool(&state->decoding_pool); + amqp_destroy_all_pools(state); + free(state->outbound_buffer.bytes); free(state->sock_inbound_buffer.bytes); free(state); @@ -185,6 +182,8 @@ int amqp_handle_input(amqp_connection_state_t state, { size_t bytes_consumed; void *raw_frame; + amqp_pool_t* frame_pool = NULL; + amqp_pool_t* decoding_pool = NULL; /* Returning frame_type of zero indicates either insufficient input, or a complete, ignored frame was read. */ @@ -194,7 +193,17 @@ int amqp_handle_input(amqp_connection_state_t state, return 0; if (state->state == CONNECTION_STATE_IDLE) { - state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, + frame_pool = amqp_get_frame_pool(state); + if (state->frame_pool == NULL) { + return -ERROR_NO_MEMORY; + } + /* We can recycle here because: + - Methods data is copied to the decoding_pool + - Property data is copied to the decoding_pool + - Body data, the pool is moved to the decoding_pools structure + */ + recycle_amqp_pool(frame_pool); + state->inbound_buffer.bytes = amqp_pool_alloc(frame_pool, state->inbound_buffer.len); if (state->inbound_buffer.bytes == NULL) /* state->inbound_buffer.len is always nonzero, because it @@ -269,8 +278,12 @@ int amqp_handle_input(amqp_connection_state_t state, encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 4); encoded.len = state->target_size - HEADER_SIZE - 4 - FOOTER_SIZE; + decoding_pool = amqp_get_decoding_pool(state, decoded_frame->channel); + if (decoding_pool == NULL) + return -ERROR_NO_MEMORY; + res = amqp_decode_method(decoded_frame->payload.method.id, - &state->decoding_pool, encoded, + decoding_pool, encoded, &decoded_frame->payload.method.decoded); if (res < 0) return res; @@ -287,8 +300,12 @@ int amqp_handle_input(amqp_connection_state_t state, encoded.len = state->target_size - HEADER_SIZE - 12 - FOOTER_SIZE; decoded_frame->payload.properties.raw = encoded; + decoding_pool = amqp_get_decoding_pool(state, decoded_frame->channel); + if (decoding_pool == NULL) + return -ERROR_NO_MEMORY; + res = amqp_decode_properties(decoded_frame->payload.properties.class_id, - &state->decoding_pool, encoded, + decoding_pool, encoded, &decoded_frame->payload.properties.decoded); if (res < 0) return res; @@ -300,6 +317,11 @@ int amqp_handle_input(amqp_connection_state_t state, = state->target_size - HEADER_SIZE - FOOTER_SIZE; decoded_frame->payload.body_fragment.bytes = amqp_offset(raw_frame, HEADER_SIZE); + + /* Move ownership of the pool to the channel hashmap */ + if (0 != amqp_move_frame_pool(state, decoded_frame->channel)) + return -ERROR_NO_MEMORY; + break; case AMQP_FRAME_HEARTBEAT: @@ -331,8 +353,7 @@ void amqp_release_buffers(amqp_connection_state_t state) { if (state->first_queued_frame) amqp_abort("Programming error: attempt to amqp_release_buffers while waiting events enqueued"); - recycle_amqp_pool(&state->frame_pool); - recycle_amqp_pool(&state->decoding_pool); + amqp_recycle_all_decoding_pools(state); } void amqp_maybe_release_buffers(amqp_connection_state_t state) { @@ -341,6 +362,26 @@ void amqp_maybe_release_buffers(amqp_connection_state_t state) { } } +void amqp_maybe_release_buffers_for_channel(amqp_connection_state_t state, amqp_channel_t channel) { + // To release buffers for a channel + if (state->state == CONNECTION_STATE_IDLE) + { + amqp_link_t* last_frame_ptr = state->first_queued_frame; + + last_frame_ptr = state->first_queued_frame; + while (last_frame_ptr != NULL) + { + amqp_frame_t* frame = (amqp_frame_t*)last_frame_ptr->data; + if (frame->channel == channel) + { + return; + } + last_frame_ptr = last_frame_ptr->next; + } + amqp_recycle_decoding_pool(state, channel); + } +} + int amqp_send_frame(amqp_connection_state_t state, const amqp_frame_t *frame) { diff --git a/librabbitmq/amqp_mem.c b/librabbitmq/amqp_mem.c index 85be08c..2538bef 100644 --- a/librabbitmq/amqp_mem.c +++ b/librabbitmq/amqp_mem.c @@ -44,8 +44,11 @@ #include <assert.h> #include "amqp.h" +#include "amqp_private.h" #include "config.h" +#define INITIAL_DECODING_POOL_PAGE_SIZE 131072 + char const *amqp_version(void) { return VERSION; /* defined in config.h */ } @@ -62,6 +65,8 @@ void init_amqp_pool(amqp_pool_t *pool, size_t pagesize) { pool->next_page = 0; pool->alloc_block = NULL; pool->alloc_used = 0; + + pool->next = NULL; } static void empty_blocklist(amqp_pool_blocklist_t *x) { @@ -186,3 +191,278 @@ amqp_bytes_t amqp_bytes_malloc(size_t amount) { void amqp_bytes_free(amqp_bytes_t bytes) { free(bytes.bytes); } + +amqp_pool_t* amqp_get_frame_pool(amqp_connection_state_t state) +{ + int page_size = (0 == state->frame_max ? INITIAL_FRAME_POOL_PAGE_SIZE : state->frame_max); + + assert(NULL != state); + + if (NULL != state->frame_pool) + return state->frame_pool; + + if (NULL != state->frame_pool_cache) + { + state->frame_pool = state->frame_pool_cache; + state->frame_pool_cache = state->frame_pool->next; + state->frame_pool->next = NULL; + + if (state->frame_pool->pagesize != page_size) + { + empty_amqp_pool(state->frame_pool); + init_amqp_pool(state->frame_pool, page_size); + } + return state->frame_pool; + } + + state->frame_pool = (amqp_pool_t*)amqp_pool_alloc(&state->pool_cache_pool, sizeof(amqp_pool_t)); + + if (NULL == state->frame_pool) + return NULL; + + init_amqp_pool(state->frame_pool, page_size); + + return state->frame_pool; +} + +int amqp_move_frame_pool(amqp_connection_state_t state, amqp_channel_t channel) +{ + amqp_pool_t* decoding_pool = NULL; + amqp_pool_t* frame_pool = NULL; + + decoding_pool = amqp_get_decoding_pool(state, channel); + if (NULL == decoding_pool) + { + return -ERROR_NO_MEMORY; + } + + frame_pool = amqp_get_frame_pool(state); + if (NULL == frame_pool) + { + return -ERROR_NO_MEMORY; + } + + state->frame_pool = NULL; + + frame_pool->next = decoding_pool->next; + decoding_pool->next = frame_pool; + + return 0; +} + +void amqp_destroy_all_frame_pools(amqp_connection_state_t state) +{ + amqp_pool_t* frame_pool = NULL; + + assert(NULL != state); + + if (NULL != state->frame_pool) + { + empty_amqp_pool(state->frame_pool); + } + + for (frame_pool = state->frame_pool_cache; + NULL != frame_pool; + frame_pool = frame_pool->next) + { + empty_amqp_pool(frame_pool); + } + + empty_amqp_pool(&state->pool_cache_pool); +} + +int amqp_hashtable_channel_hash(amqp_channel_t channel) +{ + return channel % AMQP_HASHTABLE_SIZE; +} + +void amqp_hashtable_init(amqp_hashtable_t* table) +{ + assert(NULL != table); + + memset(table, 0, sizeof(amqp_hashtable_t)); +} + +amqp_pool_t* amqp_hashtable_get_pool(amqp_hashtable_t* table, amqp_channel_t channel) +{ + amqp_hashtable_entry_t* entry = NULL; + int index = 0; + + assert(NULL != table); + + index = amqp_hashtable_channel_hash(channel); + + assert(AMQP_HASHTABLE_SIZE > index); + entry = table->entries[index]; + + while (NULL != entry) + { + if (channel == entry->key) + { + return &entry->data; + } + + entry = entry->next; + } + + return NULL; +} + +amqp_pool_t* amqp_hashtable_add_pool(amqp_hashtable_t* table, amqp_channel_t channel) +{ + amqp_hashtable_entry_t* new_entry = NULL; + int index = 0; + + assert(NULL != table); + + if (NULL != amqp_hashtable_get_pool(table, channel)) + { + return NULL; + } + + index = amqp_hashtable_channel_hash(channel); + assert(AMQP_HASHTABLE_SIZE > index); + + new_entry = (amqp_hashtable_entry_t*)malloc(sizeof(amqp_hashtable_entry_t)); + if (NULL == new_entry) + { + return NULL; + } + memset(new_entry, 0, sizeof(amqp_hashtable_entry_t)); + new_entry->key = channel; + new_entry->next = table->entries[index]; + table->entries[index] = new_entry; + + return &new_entry->data; +} + +amqp_pool_t* amqp_get_decoding_pool(amqp_connection_state_t state, amqp_channel_t channel) +{ + amqp_pool_t* pool = NULL; + + assert(NULL != state); + + // Try to get the decoding pool + pool = amqp_hashtable_get_pool(&state->decoding_pools, channel); + + if (NULL == pool) + { + pool = amqp_hashtable_add_pool(&state->decoding_pools, channel); + init_amqp_pool(pool, INITIAL_DECODING_POOL_PAGE_SIZE); + } + return pool; +} + +void amqp_recycle_decoding_pool_inner(amqp_connection_state_t state, amqp_pool_t* decoding_pool) +{ + amqp_pool_t* frame_pool = NULL; + amqp_pool_t* last_frame_pool = NULL; + + assert(NULL != state); + assert(NULL != decoding_pool); + + recycle_amqp_pool(decoding_pool); + + + if (NULL != decoding_pool->next) + { + frame_pool = decoding_pool->next; + for (frame_pool = decoding_pool->next; + NULL != frame_pool; + last_frame_pool = frame_pool, frame_pool = frame_pool->next) + { + recycle_amqp_pool(frame_pool); + } + + last_frame_pool->next = state->frame_pool_cache; + state->frame_pool_cache = decoding_pool->next; + decoding_pool->next = NULL; + } +} + +void amqp_recycle_decoding_pool(amqp_connection_state_t state, amqp_channel_t channel) +{ + amqp_pool_t* decoding_pool = NULL; + + assert(NULL != state); + + decoding_pool = amqp_get_decoding_pool(state, channel); + + if (NULL == decoding_pool) + { + return; + } + + amqp_recycle_decoding_pool_inner(state, decoding_pool); +} + +void amqp_recycle_all_decoding_pools(amqp_connection_state_t state) +{ + int i = 0; + assert(NULL != state); + + for (i = 0; i < AMQP_HASHTABLE_SIZE; ++i) + { + if (NULL != state->decoding_pools.entries[i]) + { + amqp_hashtable_entry_t* entry = state->decoding_pools.entries[i]; + for ( ; entry != NULL; entry = entry->next) + { + amqp_recycle_decoding_pool_inner(state, &entry->data); + } + } + } +} + +void amqp_destroy_all_decoding_pools(amqp_connection_state_t state) +{ + int i = 0; + assert(NULL != state); + for (i = 0; i < AMQP_HASHTABLE_SIZE; ++i) + { + if (NULL != state->decoding_pools.entries[i]) + { + amqp_hashtable_entry_t* entry = state->decoding_pools.entries[i]; + while (entry != NULL) + { + amqp_hashtable_entry_t* last_entry = NULL; + amqp_pool_t* decoding_pool = &entry->data; + amqp_pool_t* last_frame_pool = NULL; + + empty_amqp_pool(decoding_pool); + + if (NULL != decoding_pool->next) + { + last_frame_pool = decoding_pool->next; + + for (last_frame_pool = decoding_pool->next; + NULL != last_frame_pool->next; + last_frame_pool = last_frame_pool->next) { /* empty */ } + + last_frame_pool->next = state->frame_pool_cache; + state->frame_pool_cache = decoding_pool->next; + decoding_pool->next = NULL; + } + + last_entry = entry->next; + free(entry); + entry = last_entry; + } + } + } +} + +void amqp_init_all_pools(amqp_connection_state_t state) +{ + state->frame_pool = NULL; + state->frame_pool_cache = NULL; + state->frame_max = 0; + init_amqp_pool(&state->pool_cache_pool, sizeof(amqp_pool_t)*16); + amqp_hashtable_init(&state->decoding_pools); +} + +void amqp_destroy_all_pools(amqp_connection_state_t state) +{ + amqp_destroy_all_decoding_pools(state); + amqp_destroy_all_frame_pools(state); +} diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index e019598..0301733 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -39,6 +39,7 @@ * ***** END LICENSE BLOCK ***** */ +#include "amqp.h" #include "config.h" /* Error numbering: Because of differences in error numbering on @@ -63,9 +64,14 @@ #define ERROR_BAD_AMQP_URL 8 #define ERROR_MAX 8 +#include "socket.h" + +#ifdef __cplusplus +extern "C" { +#endif + extern char *amqp_os_error_string(int err); -#include "socket.h" /* * Connection states: XXX FIX THIS @@ -102,14 +108,70 @@ typedef enum amqp_connection_state_enum_ { #define AMQP_PSEUDOFRAME_PROTOCOL_HEADER 'A' +#define INITIAL_FRAME_POOL_PAGE_SIZE 65536 +#define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072 + typedef struct amqp_link_t_ { struct amqp_link_t_ *next; void *data; } amqp_link_t; +#define AMQP_HASHTABLE_SIZE 256 + +typedef struct amqp_hashtable_entry_t_ { + struct amqp_hashtable_entry_t_ *next; + amqp_pool_t data; + amqp_channel_t key; +} amqp_hashtable_entry_t; + +typedef struct amqp_hashtable_t_ { + amqp_hashtable_entry_t* entries[AMQP_HASHTABLE_SIZE]; +} amqp_hashtable_t; + +/** + * Memory allocation scheme for amqp_connection_state_t: + * + * Most memory allocation in the library is done using an amqp_pool_t + * Since pools can only be recycled when everything in them is no + * longer being used we separate out usage so that it has an affinity + * to the channel. + * + * When receiving data on a socket, the library allocates a buffer + * to hold the data. This data comes from a 'frame_pool'. Once a complete + * frame is received it is decoded. If the frame contains a METHOD or + * HEADER frame it is decoded, memory is allocated from a 'decoding_pool'. + * Since there is little processing done for a BODY frame, its data is NOT + * copied to a 'decoding_pool' allocated memory block, instead the frame_pool + * that is responsible for the memory allocated to it is associated with a + * channel, the pool is returned when the memory for a channel is recycled. + * + * Relevant amqp_connection_state_t elements: + * - pool_cache_pool - this is where frame_pools are allocated from. We assume + * frame pools will last the lifetime of the connection object + * - frame_pool - this points to the 'current' frame_pool. This is NULL if there isn't + * a 'current' frame pool. To get the current frame pool one + * should use the amqp_get_frame_pool() + * - frame_pool_cache - this is a stack of the frame_pools that are not in use. + * a frame_pool will end up here (in a recycled state) on + * when a channel's pools are recycled + * - decoding_pools - this is a chained hashtable of the decoding_pools. They are + * created as needed by using the amqp_get_decoding_pool(). + * Each amqp_pool_t also contains a next pointer which frame_pools + * maybe chained to the decoding pool. The frame_pools are + * returned to the frame_pool_cache when associated decoding_pool + * is recycled or destroyed using one of: + * amqp_recycle_decoding_pool + * amqp_recycle_all_decoding_pools + * amqp_destroy_all_decoding_pools + */ + struct amqp_connection_state_t_ { - amqp_pool_t frame_pool; - amqp_pool_t decoding_pool; + amqp_pool_t pool_cache_pool; + + amqp_pool_t *frame_pool; + amqp_pool_t *frame_pool_cache; + + amqp_hashtable_t decoding_pools; amqp_connection_state_enum state; @@ -261,4 +323,115 @@ static inline int amqp_decode_bytes(amqp_bytes_t encoded, size_t *offset, extern void amqp_abort(const char *fmt, ...); +/** + * Hashing function for the decoding pools table. + * + * We assume people will use channels in monotoic increasing order + * thus just doing key % table size will lead to perfect utilization + */ +int amqp_hashtable_channel_hash(amqp_channel_t channel); + +/** + * Initializes the hashtable + * + * Just sets everything to zero so we don't deal with junk values + */ +void amqp_hashtable_init(amqp_hashtable_t* table); + +/** + * Adds a pool to the hashtable with the given channel as a key. + * + * NOTE: this is an internal function, you should probably use amqp_get_decoding_pool instead + * + * Will return a pointer to the new amqp_pool_t on success. + * Memory is owned by the hashtable and will be freed when amqp_destroy_all_decoding_pools is called + * Will return NULL if: + * - the amqp_channel_t already exists in the hashtable + * - memory allocation fails + */ +amqp_pool_t* amqp_hashtable_add_pool(amqp_hashtable_t* table, amqp_channel_t channel); + +/** + * Gets a pool from the hashtable with the given channel as a key + * + * NOTE: this is an internal function, you should probably use amqp_get_decoding_pool instead + * + * Will return a pointer to the amqp_pool_t on success. + * Memory is owned by the hashtable and will be freed when amqp_destroy_all_decoding_pools is called + * Will return NULL if: + * - the key does not exist in the hash table + */ +amqp_pool_t* amqp_hashtable_get_pool(amqp_hashtable_t* table, amqp_channel_t channel); + +void amqp_recycle_decoding_pool_inner(amqp_connection_state_t state, amqp_pool_t* decoding_pool); + +/** + * Gets the decoding pool for a specified channel, allocating if it doesn't exist + * + * Will return a pointer to the amqp_pool_t associated with the channe on success + * Memory is owned by the state object and will be freed with amqp_destroy_all_decoding_pools is called + * The state object owns the decoding pool. + * The pool should be recycled using the amqp_recycle_channel_pool or amqp_recycle_all_channel_pools function + * All pools associated with the decoding pool structure can be destroyed with amqp_destroy_all_channel_pools + */ +amqp_pool_t* amqp_get_decoding_pool(amqp_connection_state_t state, amqp_channel_t channel); + +/** + * Recycles the decoding amqp_pool_t associated the channel, and recycles and returns any frame pools to the frame_pool_cache + * + * Recycles the pool by calling recycle_amqp_pool + */ +void amqp_recycle_decoding_pool(amqp_connection_state_t state, amqp_channel_t channel); + +/** + * Recycles all of the decoding amqp_pool_t s, and recycles and returns any frame pools to the frame_pool_cache + * + * Recycles the pools by calling recycle_amqp_pool + */ +void amqp_recycle_all_decoding_pools(amqp_connection_state_t state); + +/** + * Destroys all of the decoding amqp_pool_t's, and returns any frame pools to the frame_pool_cache WITHOUT destroying them + * + * Emptys theh pools by calling empty_amqp_pool + */ +void amqp_destroy_all_decoding_pools(amqp_connection_state_t state); + +/* Gets the current frame pool. + * + * If there's already a 'current' frame pool it returns that + * If there isn't a current frame pool, it attempts to get one from the cache + * If the cache is empty it creates a new one + * Will return NULL on a out of memory condition + */ +amqp_pool_t* amqp_get_frame_pool(amqp_connection_state_t state); + +/** + * Moves gets the current frame_pool and associates it with the decoding pool for the specified channel + * + * This function creates pools if they do not already exist + * Returns 0 on success, non-zero on error, -ERROR_NO_MEMORY on allocation failure + */ +int amqp_move_frame_pool(amqp_connection_state_t state, amqp_channel_t channel); + + +/** + * Destroys all of the frame pools, both current, and cached, then emptys the pool_frame_pool + */ +void amqp_destroy_all_frame_pools(amqp_connection_state_t state); + +/** + * Initializes both the frame and decoding pool structures + */ +void amqp_init_all_pools(amqp_connection_state_t state); + + +/** + * Destroys the frame and decoding pool structures + */ +void amqp_destroy_all_pools(amqp_connection_state_t state); + +#ifdef __cplusplus +} +#endif #endif diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 75bd8a4..06ac5dd 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -306,8 +306,19 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, ((frame.channel == 0) && (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD)) ) )) { - amqp_frame_t *frame_copy = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_frame_t)); - amqp_link_t *link = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_link_t)); + amqp_frame_t *frame_copy = NULL; + amqp_link_t *link = NULL; + amqp_pool_t* decoding_pool = amqp_get_decoding_pool(state, frame.channel); + + if (decoding_pool == NULL) + { + result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + result.library_error = ERROR_NO_MEMORY; + return result; + } + + frame_copy = amqp_pool_alloc(decoding_pool, sizeof(amqp_frame_t)); + link = amqp_pool_alloc(decoding_pool, sizeof(amqp_link_t)); if (frame_copy == NULL || link == NULL) { result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; @@ -400,8 +411,14 @@ static int amqp_login_inner(amqp_connection_state_t state, { amqp_table_entry_t properties[2]; amqp_connection_start_ok_t s; - amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, - sasl_method, vl); + + amqp_bytes_t response_bytes; + + amqp_pool_t* pool = amqp_get_decoding_pool(state, 0); + if (NULL == pool) + return -ERROR_NO_MEMORY; + + response_bytes = sasl_response(pool, sasl_method, vl); if (response_bytes.bytes == NULL) return -ERROR_NO_MEMORY; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt new file mode 100644 index 0000000..fe39744 --- /dev/null +++ b/tests/CMakeLists.txt @@ -0,0 +1,21 @@ + +include_directories(${LIBRABBITMQ_INCLUDE_DIRS}) + +include_directories(${CMAKE_SOURCE_DIR}/third-party/gtest-1.6.0) + +add_executable(test_parse_url test_parse_url.c) +target_link_libraries(test_parse_url rabbitmq) +add_test(parse_url test_parse_url) + +add_executable(test_tables test_tables.c) +target_link_libraries(test_tables rabbitmq) +add_test(tables test_tables) +set_tests_properties(tables PROPERTIES ENVIRONMENT srcdir=${CMAKE_CURRENT_SOURCE_DIR}) + +add_executable(test_hashtable test_hashtable.cpp) +target_link_libraries(test_hashtable rabbitmq gtest_main gtest) +add_test(hashtable test_hashtable) + +add_executable(test_pools test_pools.cpp) +target_link_libraries(test_pools rabbitmq gtest_main gtest) +add_test(pools test_pools) diff --git a/tests/test_hashtable.cpp b/tests/test_hashtable.cpp new file mode 100644 index 0000000..6417def --- /dev/null +++ b/tests/test_hashtable.cpp @@ -0,0 +1,102 @@ +#include <gtest/gtest.h> + +#include "amqp.h" +#include "amqp_private.h" + +class test_hashtable : public ::testing::Test +{ + public: + test_hashtable() + { + amqp_hashtable_init(&state.decoding_pools); + } + + ~test_hashtable() + { + amqp_destroy_all_decoding_pools(&state); + } + + struct amqp_connection_state_t_ state; +}; + +TEST_F(test_hashtable, init) +{ + // The entire table should be zeroed + for (int i = 0; i < AMQP_HASHTABLE_SIZE; ++i) + { + EXPECT_EQ(0, state.decoding_pools.entries[i]); + } +} + +TEST_F(test_hashtable, add_pool) +{ + amqp_pool_t* pool = amqp_hashtable_add_pool(&state.decoding_pools, 5); + EXPECT_NE((amqp_pool_t*)NULL, pool); +} + +TEST_F(test_hashtable, add_pool_duplicate) +{ + amqp_pool_t* pool = amqp_hashtable_add_pool(&state.decoding_pools, 0); + EXPECT_NE((amqp_pool_t*)NULL, pool); + + pool = amqp_hashtable_add_pool(&state.decoding_pools, 0); + EXPECT_EQ((amqp_pool_t*)NULL, pool); +} + +TEST_F(test_hashtable, add_two_keys) +{ + const amqp_channel_t key1 = 2; + const amqp_channel_t key2 = 3; + + EXPECT_NE(amqp_hashtable_channel_hash(key1), amqp_hashtable_channel_hash(key2)); + + amqp_pool_t* pool1 = amqp_hashtable_add_pool(&state.decoding_pools, key1); + amqp_pool_t* pool2 = amqp_hashtable_add_pool(&state.decoding_pools, key2); + + EXPECT_NE(pool1, pool2); + + amqp_pool_t* pool1a = amqp_hashtable_get_pool(&state.decoding_pools, key1); + amqp_pool_t* pool2a = amqp_hashtable_get_pool(&state.decoding_pools, key2); + + EXPECT_EQ(pool1, pool1a); + EXPECT_EQ(pool2, pool2a); +} + +TEST_F(test_hashtable, add_key_collision) +{ + const amqp_channel_t key1 = 0; + const amqp_channel_t key2 = key1 + AMQP_HASHTABLE_SIZE; + + ASSERT_EQ(amqp_hashtable_channel_hash(key1), amqp_hashtable_channel_hash(key2)); + + amqp_pool_t* pool1 = amqp_hashtable_add_pool(&state.decoding_pools, key1); + amqp_pool_t* pool2 = amqp_hashtable_add_pool(&state.decoding_pools, key2); + + EXPECT_NE(pool1, pool2); + + amqp_pool_t* pool1a = amqp_hashtable_get_pool(&state.decoding_pools, key1); + amqp_pool_t* pool2a = amqp_hashtable_get_pool(&state.decoding_pools, key2); + + EXPECT_EQ(pool1, pool1a); + EXPECT_EQ(pool2, pool2a); +} + +TEST_F(test_hashtable, get_key_not_exist) +{ + amqp_pool_t* not_exist = amqp_hashtable_get_pool(&state.decoding_pools, 10); + EXPECT_EQ((amqp_pool_t*)NULL, not_exist); +} + +TEST_F(test_hashtable, get_key_not_exist_chained) +{ + amqp_channel_t key_exist = 24; + amqp_channel_t key_not_exist = key_exist + AMQP_HASHTABLE_SIZE; + + ASSERT_EQ(amqp_hashtable_channel_hash(key_exist), amqp_hashtable_channel_hash(key_not_exist)); + + amqp_hashtable_add_pool(&state.decoding_pools, key_exist); + + amqp_pool_t* pool = amqp_hashtable_get_pool(&state.decoding_pools, key_not_exist); + EXPECT_EQ((amqp_pool_t*)NULL, pool); +} + diff --git a/tests/test_pools.cpp b/tests/test_pools.cpp new file mode 100644 index 0000000..3cfbb6c --- /dev/null +++ b/tests/test_pools.cpp @@ -0,0 +1,193 @@ +#include "amqp_private.h" + +#include <gtest/gtest.h> + +#include <numeric> + + +class test_pools : public ::testing::Test +{ + public: + test_pools() + { + amqp_init_all_pools(&state); + } + + ~test_pools() + { + amqp_destroy_all_pools(&state); + } + + struct amqp_connection_state_t_ state; +}; + + +TEST_F(test_pools, initialization) +{ + EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool); + EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool_cache); +} + +TEST_F(test_pools, get_frame_pool) +{ + amqp_pool_t* frame_pool = amqp_get_frame_pool(&state); + EXPECT_NE((amqp_pool_t*)NULL, frame_pool); + EXPECT_EQ((amqp_pool_t*)NULL, frame_pool->next); + + EXPECT_EQ(frame_pool, state.frame_pool); + EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool_cache); + + + amqp_pool_t* same_pool = amqp_get_frame_pool(&state); + EXPECT_EQ(frame_pool, same_pool); + EXPECT_EQ((amqp_pool_t*)NULL, same_pool->next); + + EXPECT_EQ(same_pool, state.frame_pool); + EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool_cache); +} + +TEST_F(test_pools, move_frame_pool) +{ + amqp_channel_t channel = 38; + + amqp_pool_t* decoding_pool = amqp_get_decoding_pool(&state, channel); + amqp_pool_t* frame_pool = amqp_get_frame_pool(&state); + + EXPECT_EQ(0, amqp_move_frame_pool(&state, channel)); + + EXPECT_EQ(frame_pool, decoding_pool->next); + EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool); + EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool_cache); +} + +TEST_F(test_pools, move_frame_pool_reuse) +{ + amqp_channel_t channel = 42; + amqp_pool_t* frame_pool = amqp_get_frame_pool(&state); + + EXPECT_EQ(0, amqp_move_frame_pool(&state, channel)); + + amqp_recycle_decoding_pool(&state, channel); + + EXPECT_EQ(frame_pool, state.frame_pool_cache); + EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool); + + + EXPECT_EQ(frame_pool, amqp_get_frame_pool(&state)); +} + +TEST_F(test_pools, test_frame_pool_reuse_one) +{ + amqp_channel_t channel1 = 174; + amqp_channel_t channel2 = 231; + amqp_pool_t* decoding_pool1 = amqp_get_decoding_pool(&state, channel1); + amqp_pool_t* decoding_pool2 = amqp_get_decoding_pool(&state, channel2); + EXPECT_NE(decoding_pool1, decoding_pool2); + + amqp_pool_t* frame_pool1 = amqp_get_frame_pool(&state); + EXPECT_NE((amqp_pool_t*)NULL, frame_pool1); + EXPECT_EQ(0, amqp_move_frame_pool(&state, channel1)); + + amqp_pool_t* frame_pool2 = amqp_get_frame_pool(&state); + EXPECT_NE((amqp_pool_t*)NULL, frame_pool2); + EXPECT_EQ(0, amqp_move_frame_pool(&state, channel2)); + + amqp_recycle_decoding_pool(&state, channel1); + EXPECT_EQ((amqp_pool_t*)NULL, decoding_pool1->next); + EXPECT_EQ(frame_pool1, state.frame_pool_cache); + EXPECT_EQ(frame_pool1, amqp_get_frame_pool(&state)); + EXPECT_EQ(frame_pool1, state.frame_pool); + + EXPECT_EQ(frame_pool2, decoding_pool2->next); + + EXPECT_EQ(decoding_pool1, amqp_get_decoding_pool(&state, channel1)); + EXPECT_EQ(decoding_pool2, amqp_get_decoding_pool(&state, channel2)); +} + +TEST_F(test_pools, test_frame_pool_reuse_chain) +{ + amqp_channel_t channel = 6; + amqp_pool_t* decoding_pool = amqp_get_decoding_pool(&state, channel); + + amqp_pool_t* frame_pool1 = amqp_get_frame_pool(&state); + EXPECT_EQ(0, amqp_move_frame_pool(&state, channel)); + + amqp_pool_t* frame_pool2 = amqp_get_frame_pool(&state); + EXPECT_NE(frame_pool1, frame_pool2); + EXPECT_EQ(0, amqp_move_frame_pool(&state, channel)); + + amqp_recycle_decoding_pool(&state, channel); + EXPECT_EQ((amqp_pool_t*)NULL, decoding_pool->next); + EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool); + EXPECT_NE((amqp_pool_t*)NULL, state.frame_pool_cache); + EXPECT_NE((amqp_pool_t*)NULL, state.frame_pool_cache->next); + EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool_cache->next->next); + + EXPECT_NE((amqp_pool_t*)NULL, amqp_get_frame_pool(&state)); + EXPECT_NE((amqp_pool_t*)NULL, state.frame_pool); + EXPECT_NE((amqp_pool_t*)NULL, state.frame_pool_cache); + EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool_cache->next); +} + + +TEST_F(test_pools, test_frame_pool_reuse_all) +{ + amqp_channel_t channel1 = 25; + amqp_channel_t channel2 = 91; + amqp_pool_t* decoding_pool1 = amqp_get_decoding_pool(&state, channel1); + amqp_pool_t* decoding_pool2 = amqp_get_decoding_pool(&state, channel2); + EXPECT_NE(decoding_pool1, decoding_pool2); + + amqp_pool_t* frame_pool1 = amqp_get_frame_pool(&state); + EXPECT_NE((amqp_pool_t*)NULL, frame_pool1); + EXPECT_EQ(0, amqp_move_frame_pool(&state, channel1)); + + amqp_pool_t* frame_pool2 = amqp_get_frame_pool(&state); + EXPECT_NE((amqp_pool_t*)NULL, frame_pool2); + EXPECT_EQ(0, amqp_move_frame_pool(&state, channel2)); + + amqp_recycle_all_decoding_pools(&state); + EXPECT_EQ((amqp_pool_t*)NULL, decoding_pool1->next); + EXPECT_EQ((amqp_pool_t*)NULL, decoding_pool2->next); + EXPECT_NE((amqp_pool_t*)NULL, state.frame_pool_cache); + EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool); + + amqp_pool_t* frame_pool_recycled = amqp_get_frame_pool(&state); + EXPECT_TRUE(frame_pool_recycled == frame_pool1 || frame_pool_recycled == frame_pool2); + EXPECT_EQ((amqp_pool_t*)NULL, frame_pool_recycled->next); + EXPECT_EQ(frame_pool_recycled, state.frame_pool); + EXPECT_NE((amqp_pool_t*)NULL, state.frame_pool_cache); + EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool_cache->next); + + EXPECT_EQ(decoding_pool1, amqp_get_decoding_pool(&state, channel1)); + EXPECT_EQ(decoding_pool2, amqp_get_decoding_pool(&state, channel2)); +} + +TEST_F(test_pools, get_decoding_pool) +{ + amqp_channel_t channel = 32; + amqp_pool_t* decoding_pool = amqp_get_decoding_pool(&state, channel); + EXPECT_NE((amqp_pool_t*)NULL, decoding_pool); + EXPECT_EQ((amqp_pool_t*)NULL, decoding_pool->next); + + amqp_pool_t* decoding_pool_same = amqp_get_decoding_pool(&state, channel); + EXPECT_EQ(decoding_pool, decoding_pool_same); + EXPECT_EQ((amqp_pool_t*)NULL, decoding_pool_same->next); +} + +TEST_F(test_pools, get_decoding_pool_differs) +{ + amqp_channel_t channel = 0; + amqp_pool_t* decoding_pool = amqp_get_decoding_pool(&state, channel); + + for (int i = 1; i <= std::numeric_limits<amqp_channel_t>::max(); ++i) + { + amqp_pool_t* other_pool = amqp_get_decoding_pool(&state, i); + EXPECT_NE(decoding_pool, other_pool); + } + + amqp_pool_t* decoding_pool_same = amqp_get_decoding_pool(&state, channel); + EXPECT_EQ(decoding_pool, decoding_pool_same); +} + + diff --git a/tests/test_tables.c b/tests/test_tables.c index 4effd6e..6c80647 100644 --- a/tests/test_tables.c +++ b/tests/test_tables.c @@ -58,7 +58,9 @@ void die(const char *fmt, ...) abort(); } +#ifndef M_PI #define M_PI 3.14159265358979323846264338327 +#endif static void dump_indent(int indent, FILE *out) { |