summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <aega@.med.umich.edu>2012-01-13 18:17:07 -0500
committerAlan Antonuk <aega@.med.umich.edu>2012-01-31 14:45:51 -0500
commit14009a5fd9bee456d9aa8f863512ef674cb1e369 (patch)
tree7035a1a7142ee0e82b4ee3fdbd9698c3d033d09a
parenta40de904f235baec9fa532063e0a7db71125df65 (diff)
downloadrabbitmq-c-github-ask-14009a5fd9bee456d9aa8f863512ef674cb1e369.tar.gz
Reworking internal memory management so that memory can be released per-channel
Also added unit tests
-rw-r--r--CMakeLists.txt13
-rw-r--r--librabbitmq/CMakeLists.txt29
-rw-r--r--librabbitmq/amqp.h4
-rw-r--r--librabbitmq/amqp_connection.c77
-rw-r--r--librabbitmq/amqp_mem.c280
-rw-r--r--librabbitmq/amqp_private.h179
-rw-r--r--librabbitmq/amqp_socket.c25
-rw-r--r--tests/CMakeLists.txt21
-rw-r--r--tests/test_hashtable.cpp102
-rw-r--r--tests/test_pools.cpp193
-rw-r--r--tests/test_tables.c2
11 files changed, 893 insertions, 32 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 34256cd..5acbda6 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -65,6 +65,8 @@ set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} ${CMAKE_SOURCE_DIR}/cmake)
find_package(POPT)
+option(BUILD_SHARED_LIBS "Build rabbitmq as a shared library" ON)
+
add_subdirectory(librabbitmq)
add_subdirectory(examples)
@@ -72,4 +74,15 @@ if (POPT_FOUND)
add_subdirectory(tools)
endif (POPT_FOUND)
+option(BUILD_TESTING "Enable unit tests" OFF)
+
+if (BUILD_TESTING)
+ if (BUILD_SHARED_LIBS)
+ message(FATAL_ERROR "Can only test when compiled statically, disable BUILD_SHARED_LIBS to enable testing")
+ endif (BUILD_SHARED_LIBS)
+
+ enable_testing()
+ add_subdirectory(tests)
+endif (BUILD_TESTING)
+
add_subdirectory(third-party)
diff --git a/librabbitmq/CMakeLists.txt b/librabbitmq/CMakeLists.txt
index 80632ed..5a01d28 100644
--- a/librabbitmq/CMakeLists.txt
+++ b/librabbitmq/CMakeLists.txt
@@ -1,7 +1,5 @@
project(librabbitmq "C")
-set(CMAKE_INCLUDE_CURRENT_DIR ON)
-
# Stuff dealing with code generation
set(AMQP_CODEGEN_PY "${CMAKE_CURRENT_BINARY_DIR}/amqp_codegen.py")
set(CODEGEN_PY "${CMAKE_CURRENT_BINARY_DIR}/codegen.py")
@@ -43,17 +41,34 @@ SET(CONFIG_CONTENTS "#define VERSION \"0.0.1\"
file(WRITE "${CMAKE_CURRENT_BINARY_DIR}/config.h" ${CONFIG_CONTENTS})
if(WIN32)
- set(SOCKET_IMPL "windows")
+ set(SOCKET_IMPL "${CMAKE_CURRENT_SOURCE_DIR}/windows")
else(WIN32)
- set(SOCKET_IMPL "unix")
+ set(SOCKET_IMPL "${CMAKE_CURRENT_SOURCE_DIR}/unix")
endif(WIN32)
if(MSVC)
set(MSINTTYPES_INCLUDE "${CMAKE_CURRENT_SOURCE_DIR}/../msinttypes")
endif(MSVC)
-include_directories(${SOCKET_IMPL} ${MSINTTYPES_INCLUDE})
-add_definitions(-DBUILDING_LIBRABBITMQ)
+set(LIBRABBITMQ_INCLUDE_DIRS
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${CMAKE_CURRENT_BINARY_DIR}
+ ${SOCKET_IMPL}
+ ${MSINTTYPES_INCLUDE}
+ )
+
+include_directories(${LIBRABBITMQ_INCLUDE_DIRS})
+
+set(LIBRABBITMQ_INCLUDE_DIRS
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${CMAKE_CURRENT_BINARY_DIR}
+ ${SOCKET_IMPL}
+ ${MSINTTYPES_INCLUDE}
+ PARENT_SCOPE)
+
+if (BUILD_SHARED_LIBS)
+ add_definitions(-DBUILDING_LIBRABBITMQ)
+endif (BUILD_SHARED_LIBS)
set(RABBITMQ_SOURCES
${CMAKE_CURRENT_BINARY_DIR}/amqp_framing.h
@@ -64,7 +79,7 @@ set(RABBITMQ_SOURCES
${SOCKET_IMPL}/socket.h ${SOCKET_IMPL}/socket.c
)
-add_library(rabbitmq SHARED ${RABBITMQ_SOURCES})
+add_library(rabbitmq ${RABBITMQ_SOURCES})
if(WIN32)
target_link_libraries(rabbitmq ws2_32)
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index b756025..6d37091 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -181,6 +181,8 @@ typedef struct amqp_pool_t_ {
int next_page;
char *alloc_block;
size_t alloc_used;
+
+ struct amqp_pool_t_ *next;
} amqp_pool_t;
typedef struct amqp_method_t_ {
@@ -277,6 +279,8 @@ RABBITMQ_EXPORT void amqp_release_buffers(amqp_connection_state_t state);
RABBITMQ_EXPORT void amqp_maybe_release_buffers(amqp_connection_state_t state);
+RABBITMQ_EXPORT void amqp_maybe_release_buffers_for_channel(amqp_connection_state_t state, amqp_channel_t channel);
+
RABBITMQ_EXPORT int amqp_send_frame(amqp_connection_state_t state,
amqp_frame_t const *frame);
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 886805d..b3d490e 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -46,9 +46,6 @@
#include "amqp_framing.h"
#include "amqp_private.h"
-#define INITIAL_FRAME_POOL_PAGE_SIZE 65536
-#define INITIAL_DECODING_POOL_PAGE_SIZE 131072
-#define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072
#define ENFORCE_STATE(statevec, statenum) \
{ \
@@ -61,19 +58,23 @@
}
amqp_connection_state_t amqp_new_connection(void) {
+ amqp_pool_t* frame_pool = NULL;
amqp_connection_state_t state =
(amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_));
if (state == NULL)
return NULL;
- init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE);
- init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE);
+ amqp_init_all_pools(state);
if (amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0) != 0)
goto out_nomem;
- state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len);
+ frame_pool = amqp_get_frame_pool(state);
+ if (NULL == frame_pool)
+ goto out_nomem;
+
+ state->inbound_buffer.bytes = amqp_pool_alloc(frame_pool, state->inbound_buffer.len);
if (state->inbound_buffer.bytes == NULL)
goto out_nomem;
@@ -92,8 +93,7 @@ amqp_connection_state_t amqp_new_connection(void) {
out_nomem:
free(state->sock_inbound_buffer.bytes);
- empty_amqp_pool(&state->frame_pool);
- empty_amqp_pool(&state->decoding_pool);
+ amqp_destroy_all_pools(state);
free(state);
return NULL;
}
@@ -121,9 +121,6 @@ int amqp_tune_connection(amqp_connection_state_t state,
state->frame_max = frame_max;
state->heartbeat = heartbeat;
- empty_amqp_pool(&state->frame_pool);
- init_amqp_pool(&state->frame_pool, frame_max);
-
state->inbound_buffer.len = frame_max;
state->outbound_buffer.len = frame_max;
newbuf = realloc(state->outbound_buffer.bytes, frame_max);
@@ -143,8 +140,8 @@ int amqp_get_channel_max(amqp_connection_state_t state) {
int amqp_destroy_connection(amqp_connection_state_t state) {
int s = state->sockfd;
- empty_amqp_pool(&state->frame_pool);
- empty_amqp_pool(&state->decoding_pool);
+ amqp_destroy_all_pools(state);
+
free(state->outbound_buffer.bytes);
free(state->sock_inbound_buffer.bytes);
free(state);
@@ -185,6 +182,8 @@ int amqp_handle_input(amqp_connection_state_t state,
{
size_t bytes_consumed;
void *raw_frame;
+ amqp_pool_t* frame_pool = NULL;
+ amqp_pool_t* decoding_pool = NULL;
/* Returning frame_type of zero indicates either insufficient input,
or a complete, ignored frame was read. */
@@ -194,7 +193,17 @@ int amqp_handle_input(amqp_connection_state_t state,
return 0;
if (state->state == CONNECTION_STATE_IDLE) {
- state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool,
+ frame_pool = amqp_get_frame_pool(state);
+ if (state->frame_pool == NULL) {
+ return -ERROR_NO_MEMORY;
+ }
+ /* We can recycle here because:
+ - Methods data is copied to the decoding_pool
+ - Property data is copied to the decoding_pool
+ - Body data, the pool is moved to the decoding_pools structure
+ */
+ recycle_amqp_pool(frame_pool);
+ state->inbound_buffer.bytes = amqp_pool_alloc(frame_pool,
state->inbound_buffer.len);
if (state->inbound_buffer.bytes == NULL)
/* state->inbound_buffer.len is always nonzero, because it
@@ -269,8 +278,12 @@ int amqp_handle_input(amqp_connection_state_t state,
encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 4);
encoded.len = state->target_size - HEADER_SIZE - 4 - FOOTER_SIZE;
+ decoding_pool = amqp_get_decoding_pool(state, decoded_frame->channel);
+ if (decoding_pool == NULL)
+ return -ERROR_NO_MEMORY;
+
res = amqp_decode_method(decoded_frame->payload.method.id,
- &state->decoding_pool, encoded,
+ decoding_pool, encoded,
&decoded_frame->payload.method.decoded);
if (res < 0)
return res;
@@ -287,8 +300,12 @@ int amqp_handle_input(amqp_connection_state_t state,
encoded.len = state->target_size - HEADER_SIZE - 12 - FOOTER_SIZE;
decoded_frame->payload.properties.raw = encoded;
+ decoding_pool = amqp_get_decoding_pool(state, decoded_frame->channel);
+ if (decoding_pool == NULL)
+ return -ERROR_NO_MEMORY;
+
res = amqp_decode_properties(decoded_frame->payload.properties.class_id,
- &state->decoding_pool, encoded,
+ decoding_pool, encoded,
&decoded_frame->payload.properties.decoded);
if (res < 0)
return res;
@@ -300,6 +317,11 @@ int amqp_handle_input(amqp_connection_state_t state,
= state->target_size - HEADER_SIZE - FOOTER_SIZE;
decoded_frame->payload.body_fragment.bytes
= amqp_offset(raw_frame, HEADER_SIZE);
+
+ /* Move ownership of the pool to the channel hashmap */
+ if (0 != amqp_move_frame_pool(state, decoded_frame->channel))
+ return -ERROR_NO_MEMORY;
+
break;
case AMQP_FRAME_HEARTBEAT:
@@ -331,8 +353,7 @@ void amqp_release_buffers(amqp_connection_state_t state) {
if (state->first_queued_frame)
amqp_abort("Programming error: attempt to amqp_release_buffers while waiting events enqueued");
- recycle_amqp_pool(&state->frame_pool);
- recycle_amqp_pool(&state->decoding_pool);
+ amqp_recycle_all_decoding_pools(state);
}
void amqp_maybe_release_buffers(amqp_connection_state_t state) {
@@ -341,6 +362,26 @@ void amqp_maybe_release_buffers(amqp_connection_state_t state) {
}
}
+void amqp_maybe_release_buffers_for_channel(amqp_connection_state_t state, amqp_channel_t channel) {
+ // To release buffers for a channel
+ if (state->state == CONNECTION_STATE_IDLE)
+ {
+ amqp_link_t* last_frame_ptr = state->first_queued_frame;
+
+ last_frame_ptr = state->first_queued_frame;
+ while (last_frame_ptr != NULL)
+ {
+ amqp_frame_t* frame = (amqp_frame_t*)last_frame_ptr->data;
+ if (frame->channel == channel)
+ {
+ return;
+ }
+ last_frame_ptr = last_frame_ptr->next;
+ }
+ amqp_recycle_decoding_pool(state, channel);
+ }
+}
+
int amqp_send_frame(amqp_connection_state_t state,
const amqp_frame_t *frame)
{
diff --git a/librabbitmq/amqp_mem.c b/librabbitmq/amqp_mem.c
index 85be08c..2538bef 100644
--- a/librabbitmq/amqp_mem.c
+++ b/librabbitmq/amqp_mem.c
@@ -44,8 +44,11 @@
#include <assert.h>
#include "amqp.h"
+#include "amqp_private.h"
#include "config.h"
+#define INITIAL_DECODING_POOL_PAGE_SIZE 131072
+
char const *amqp_version(void) {
return VERSION; /* defined in config.h */
}
@@ -62,6 +65,8 @@ void init_amqp_pool(amqp_pool_t *pool, size_t pagesize) {
pool->next_page = 0;
pool->alloc_block = NULL;
pool->alloc_used = 0;
+
+ pool->next = NULL;
}
static void empty_blocklist(amqp_pool_blocklist_t *x) {
@@ -186,3 +191,278 @@ amqp_bytes_t amqp_bytes_malloc(size_t amount) {
void amqp_bytes_free(amqp_bytes_t bytes) {
free(bytes.bytes);
}
+
+amqp_pool_t* amqp_get_frame_pool(amqp_connection_state_t state)
+{
+ int page_size = (0 == state->frame_max ? INITIAL_FRAME_POOL_PAGE_SIZE : state->frame_max);
+
+ assert(NULL != state);
+
+ if (NULL != state->frame_pool)
+ return state->frame_pool;
+
+ if (NULL != state->frame_pool_cache)
+ {
+ state->frame_pool = state->frame_pool_cache;
+ state->frame_pool_cache = state->frame_pool->next;
+ state->frame_pool->next = NULL;
+
+ if (state->frame_pool->pagesize != page_size)
+ {
+ empty_amqp_pool(state->frame_pool);
+ init_amqp_pool(state->frame_pool, page_size);
+ }
+ return state->frame_pool;
+ }
+
+ state->frame_pool = (amqp_pool_t*)amqp_pool_alloc(&state->pool_cache_pool, sizeof(amqp_pool_t));
+
+ if (NULL == state->frame_pool)
+ return NULL;
+
+ init_amqp_pool(state->frame_pool, page_size);
+
+ return state->frame_pool;
+}
+
+int amqp_move_frame_pool(amqp_connection_state_t state, amqp_channel_t channel)
+{
+ amqp_pool_t* decoding_pool = NULL;
+ amqp_pool_t* frame_pool = NULL;
+
+ decoding_pool = amqp_get_decoding_pool(state, channel);
+ if (NULL == decoding_pool)
+ {
+ return -ERROR_NO_MEMORY;
+ }
+
+ frame_pool = amqp_get_frame_pool(state);
+ if (NULL == frame_pool)
+ {
+ return -ERROR_NO_MEMORY;
+ }
+
+ state->frame_pool = NULL;
+
+ frame_pool->next = decoding_pool->next;
+ decoding_pool->next = frame_pool;
+
+ return 0;
+}
+
+void amqp_destroy_all_frame_pools(amqp_connection_state_t state)
+{
+ amqp_pool_t* frame_pool = NULL;
+
+ assert(NULL != state);
+
+ if (NULL != state->frame_pool)
+ {
+ empty_amqp_pool(state->frame_pool);
+ }
+
+ for (frame_pool = state->frame_pool_cache;
+ NULL != frame_pool;
+ frame_pool = frame_pool->next)
+ {
+ empty_amqp_pool(frame_pool);
+ }
+
+ empty_amqp_pool(&state->pool_cache_pool);
+}
+
+int amqp_hashtable_channel_hash(amqp_channel_t channel)
+{
+ return channel % AMQP_HASHTABLE_SIZE;
+}
+
+void amqp_hashtable_init(amqp_hashtable_t* table)
+{
+ assert(NULL != table);
+
+ memset(table, 0, sizeof(amqp_hashtable_t));
+}
+
+amqp_pool_t* amqp_hashtable_get_pool(amqp_hashtable_t* table, amqp_channel_t channel)
+{
+ amqp_hashtable_entry_t* entry = NULL;
+ int index = 0;
+
+ assert(NULL != table);
+
+ index = amqp_hashtable_channel_hash(channel);
+
+ assert(AMQP_HASHTABLE_SIZE > index);
+ entry = table->entries[index];
+
+ while (NULL != entry)
+ {
+ if (channel == entry->key)
+ {
+ return &entry->data;
+ }
+
+ entry = entry->next;
+ }
+
+ return NULL;
+}
+
+amqp_pool_t* amqp_hashtable_add_pool(amqp_hashtable_t* table, amqp_channel_t channel)
+{
+ amqp_hashtable_entry_t* new_entry = NULL;
+ int index = 0;
+
+ assert(NULL != table);
+
+ if (NULL != amqp_hashtable_get_pool(table, channel))
+ {
+ return NULL;
+ }
+
+ index = amqp_hashtable_channel_hash(channel);
+ assert(AMQP_HASHTABLE_SIZE > index);
+
+ new_entry = (amqp_hashtable_entry_t*)malloc(sizeof(amqp_hashtable_entry_t));
+ if (NULL == new_entry)
+ {
+ return NULL;
+ }
+ memset(new_entry, 0, sizeof(amqp_hashtable_entry_t));
+ new_entry->key = channel;
+ new_entry->next = table->entries[index];
+ table->entries[index] = new_entry;
+
+ return &new_entry->data;
+}
+
+amqp_pool_t* amqp_get_decoding_pool(amqp_connection_state_t state, amqp_channel_t channel)
+{
+ amqp_pool_t* pool = NULL;
+
+ assert(NULL != state);
+
+ // Try to get the decoding pool
+ pool = amqp_hashtable_get_pool(&state->decoding_pools, channel);
+
+ if (NULL == pool)
+ {
+ pool = amqp_hashtable_add_pool(&state->decoding_pools, channel);
+ init_amqp_pool(pool, INITIAL_DECODING_POOL_PAGE_SIZE);
+ }
+ return pool;
+}
+
+void amqp_recycle_decoding_pool_inner(amqp_connection_state_t state, amqp_pool_t* decoding_pool)
+{
+ amqp_pool_t* frame_pool = NULL;
+ amqp_pool_t* last_frame_pool = NULL;
+
+ assert(NULL != state);
+ assert(NULL != decoding_pool);
+
+ recycle_amqp_pool(decoding_pool);
+
+
+ if (NULL != decoding_pool->next)
+ {
+ frame_pool = decoding_pool->next;
+ for (frame_pool = decoding_pool->next;
+ NULL != frame_pool;
+ last_frame_pool = frame_pool, frame_pool = frame_pool->next)
+ {
+ recycle_amqp_pool(frame_pool);
+ }
+
+ last_frame_pool->next = state->frame_pool_cache;
+ state->frame_pool_cache = decoding_pool->next;
+ decoding_pool->next = NULL;
+ }
+}
+
+void amqp_recycle_decoding_pool(amqp_connection_state_t state, amqp_channel_t channel)
+{
+ amqp_pool_t* decoding_pool = NULL;
+
+ assert(NULL != state);
+
+ decoding_pool = amqp_get_decoding_pool(state, channel);
+
+ if (NULL == decoding_pool)
+ {
+ return;
+ }
+
+ amqp_recycle_decoding_pool_inner(state, decoding_pool);
+}
+
+void amqp_recycle_all_decoding_pools(amqp_connection_state_t state)
+{
+ int i = 0;
+ assert(NULL != state);
+
+ for (i = 0; i < AMQP_HASHTABLE_SIZE; ++i)
+ {
+ if (NULL != state->decoding_pools.entries[i])
+ {
+ amqp_hashtable_entry_t* entry = state->decoding_pools.entries[i];
+ for ( ; entry != NULL; entry = entry->next)
+ {
+ amqp_recycle_decoding_pool_inner(state, &entry->data);
+ }
+ }
+ }
+}
+
+void amqp_destroy_all_decoding_pools(amqp_connection_state_t state)
+{
+ int i = 0;
+ assert(NULL != state);
+ for (i = 0; i < AMQP_HASHTABLE_SIZE; ++i)
+ {
+ if (NULL != state->decoding_pools.entries[i])
+ {
+ amqp_hashtable_entry_t* entry = state->decoding_pools.entries[i];
+ while (entry != NULL)
+ {
+ amqp_hashtable_entry_t* last_entry = NULL;
+ amqp_pool_t* decoding_pool = &entry->data;
+ amqp_pool_t* last_frame_pool = NULL;
+
+ empty_amqp_pool(decoding_pool);
+
+ if (NULL != decoding_pool->next)
+ {
+ last_frame_pool = decoding_pool->next;
+
+ for (last_frame_pool = decoding_pool->next;
+ NULL != last_frame_pool->next;
+ last_frame_pool = last_frame_pool->next) { /* empty */ }
+
+ last_frame_pool->next = state->frame_pool_cache;
+ state->frame_pool_cache = decoding_pool->next;
+ decoding_pool->next = NULL;
+ }
+
+ last_entry = entry->next;
+ free(entry);
+ entry = last_entry;
+ }
+ }
+ }
+}
+
+void amqp_init_all_pools(amqp_connection_state_t state)
+{
+ state->frame_pool = NULL;
+ state->frame_pool_cache = NULL;
+ state->frame_max = 0;
+ init_amqp_pool(&state->pool_cache_pool, sizeof(amqp_pool_t)*16);
+ amqp_hashtable_init(&state->decoding_pools);
+}
+
+void amqp_destroy_all_pools(amqp_connection_state_t state)
+{
+ amqp_destroy_all_decoding_pools(state);
+ amqp_destroy_all_frame_pools(state);
+}
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index e019598..0301733 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -39,6 +39,7 @@
* ***** END LICENSE BLOCK *****
*/
+#include "amqp.h"
#include "config.h"
/* Error numbering: Because of differences in error numbering on
@@ -63,9 +64,14 @@
#define ERROR_BAD_AMQP_URL 8
#define ERROR_MAX 8
+#include "socket.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
extern char *amqp_os_error_string(int err);
-#include "socket.h"
/*
* Connection states: XXX FIX THIS
@@ -102,14 +108,70 @@ typedef enum amqp_connection_state_enum_ {
#define AMQP_PSEUDOFRAME_PROTOCOL_HEADER 'A'
+#define INITIAL_FRAME_POOL_PAGE_SIZE 65536
+#define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072
+
typedef struct amqp_link_t_ {
struct amqp_link_t_ *next;
void *data;
} amqp_link_t;
+#define AMQP_HASHTABLE_SIZE 256
+
+typedef struct amqp_hashtable_entry_t_ {
+ struct amqp_hashtable_entry_t_ *next;
+ amqp_pool_t data;
+ amqp_channel_t key;
+} amqp_hashtable_entry_t;
+
+typedef struct amqp_hashtable_t_ {
+ amqp_hashtable_entry_t* entries[AMQP_HASHTABLE_SIZE];
+} amqp_hashtable_t;
+
+/**
+ * Memory allocation scheme for amqp_connection_state_t:
+ *
+ * Most memory allocation in the library is done using an amqp_pool_t
+ * Since pools can only be recycled when everything in them is no
+ * longer being used we separate out usage so that it has an affinity
+ * to the channel.
+ *
+ * When receiving data on a socket, the library allocates a buffer
+ * to hold the data. This data comes from a 'frame_pool'. Once a complete
+ * frame is received it is decoded. If the frame contains a METHOD or
+ * HEADER frame it is decoded, memory is allocated from a 'decoding_pool'.
+ * Since there is little processing done for a BODY frame, its data is NOT
+ * copied to a 'decoding_pool' allocated memory block, instead the frame_pool
+ * that is responsible for the memory allocated to it is associated with a
+ * channel, the pool is returned when the memory for a channel is recycled.
+ *
+ * Relevant amqp_connection_state_t elements:
+ * - pool_cache_pool - this is where frame_pools are allocated from. We assume
+ * frame pools will last the lifetime of the connection object
+ * - frame_pool - this points to the 'current' frame_pool. This is NULL if there isn't
+ * a 'current' frame pool. To get the current frame pool one
+ * should use the amqp_get_frame_pool()
+ * - frame_pool_cache - this is a stack of the frame_pools that are not in use.
+ * a frame_pool will end up here (in a recycled state) on
+ * when a channel's pools are recycled
+ * - decoding_pools - this is a chained hashtable of the decoding_pools. They are
+ * created as needed by using the amqp_get_decoding_pool().
+ * Each amqp_pool_t also contains a next pointer which frame_pools
+ * maybe chained to the decoding pool. The frame_pools are
+ * returned to the frame_pool_cache when associated decoding_pool
+ * is recycled or destroyed using one of:
+ * amqp_recycle_decoding_pool
+ * amqp_recycle_all_decoding_pools
+ * amqp_destroy_all_decoding_pools
+ */
+
struct amqp_connection_state_t_ {
- amqp_pool_t frame_pool;
- amqp_pool_t decoding_pool;
+ amqp_pool_t pool_cache_pool;
+
+ amqp_pool_t *frame_pool;
+ amqp_pool_t *frame_pool_cache;
+
+ amqp_hashtable_t decoding_pools;
amqp_connection_state_enum state;
@@ -261,4 +323,115 @@ static inline int amqp_decode_bytes(amqp_bytes_t encoded, size_t *offset,
extern void amqp_abort(const char *fmt, ...);
+/**
+ * Hashing function for the decoding pools table.
+ *
+ * We assume people will use channels in monotoic increasing order
+ * thus just doing key % table size will lead to perfect utilization
+ */
+int amqp_hashtable_channel_hash(amqp_channel_t channel);
+
+/**
+ * Initializes the hashtable
+ *
+ * Just sets everything to zero so we don't deal with junk values
+ */
+void amqp_hashtable_init(amqp_hashtable_t* table);
+
+/**
+ * Adds a pool to the hashtable with the given channel as a key.
+ *
+ * NOTE: this is an internal function, you should probably use amqp_get_decoding_pool instead
+ *
+ * Will return a pointer to the new amqp_pool_t on success.
+ * Memory is owned by the hashtable and will be freed when amqp_destroy_all_decoding_pools is called
+ * Will return NULL if:
+ * - the amqp_channel_t already exists in the hashtable
+ * - memory allocation fails
+ */
+amqp_pool_t* amqp_hashtable_add_pool(amqp_hashtable_t* table, amqp_channel_t channel);
+
+/**
+ * Gets a pool from the hashtable with the given channel as a key
+ *
+ * NOTE: this is an internal function, you should probably use amqp_get_decoding_pool instead
+ *
+ * Will return a pointer to the amqp_pool_t on success.
+ * Memory is owned by the hashtable and will be freed when amqp_destroy_all_decoding_pools is called
+ * Will return NULL if:
+ * - the key does not exist in the hash table
+ */
+amqp_pool_t* amqp_hashtable_get_pool(amqp_hashtable_t* table, amqp_channel_t channel);
+
+void amqp_recycle_decoding_pool_inner(amqp_connection_state_t state, amqp_pool_t* decoding_pool);
+
+/**
+ * Gets the decoding pool for a specified channel, allocating if it doesn't exist
+ *
+ * Will return a pointer to the amqp_pool_t associated with the channe on success
+ * Memory is owned by the state object and will be freed with amqp_destroy_all_decoding_pools is called
+ * The state object owns the decoding pool.
+ * The pool should be recycled using the amqp_recycle_channel_pool or amqp_recycle_all_channel_pools function
+ * All pools associated with the decoding pool structure can be destroyed with amqp_destroy_all_channel_pools
+ */
+amqp_pool_t* amqp_get_decoding_pool(amqp_connection_state_t state, amqp_channel_t channel);
+
+/**
+ * Recycles the decoding amqp_pool_t associated the channel, and recycles and returns any frame pools to the frame_pool_cache
+ *
+ * Recycles the pool by calling recycle_amqp_pool
+ */
+void amqp_recycle_decoding_pool(amqp_connection_state_t state, amqp_channel_t channel);
+
+/**
+ * Recycles all of the decoding amqp_pool_t s, and recycles and returns any frame pools to the frame_pool_cache
+ *
+ * Recycles the pools by calling recycle_amqp_pool
+ */
+void amqp_recycle_all_decoding_pools(amqp_connection_state_t state);
+
+/**
+ * Destroys all of the decoding amqp_pool_t's, and returns any frame pools to the frame_pool_cache WITHOUT destroying them
+ *
+ * Emptys theh pools by calling empty_amqp_pool
+ */
+void amqp_destroy_all_decoding_pools(amqp_connection_state_t state);
+
+/* Gets the current frame pool.
+ *
+ * If there's already a 'current' frame pool it returns that
+ * If there isn't a current frame pool, it attempts to get one from the cache
+ * If the cache is empty it creates a new one
+ * Will return NULL on a out of memory condition
+ */
+amqp_pool_t* amqp_get_frame_pool(amqp_connection_state_t state);
+
+/**
+ * Moves gets the current frame_pool and associates it with the decoding pool for the specified channel
+ *
+ * This function creates pools if they do not already exist
+ * Returns 0 on success, non-zero on error, -ERROR_NO_MEMORY on allocation failure
+ */
+int amqp_move_frame_pool(amqp_connection_state_t state, amqp_channel_t channel);
+
+
+/**
+ * Destroys all of the frame pools, both current, and cached, then emptys the pool_frame_pool
+ */
+void amqp_destroy_all_frame_pools(amqp_connection_state_t state);
+
+/**
+ * Initializes both the frame and decoding pool structures
+ */
+void amqp_init_all_pools(amqp_connection_state_t state);
+
+
+/**
+ * Destroys the frame and decoding pool structures
+ */
+void amqp_destroy_all_pools(amqp_connection_state_t state);
+
+#ifdef __cplusplus
+}
+#endif
#endif
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 75bd8a4..06ac5dd 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -306,8 +306,19 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
((frame.channel == 0) &&
(frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD)) ) ))
{
- amqp_frame_t *frame_copy = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_frame_t));
- amqp_link_t *link = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_link_t));
+ amqp_frame_t *frame_copy = NULL;
+ amqp_link_t *link = NULL;
+ amqp_pool_t* decoding_pool = amqp_get_decoding_pool(state, frame.channel);
+
+ if (decoding_pool == NULL)
+ {
+ result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ result.library_error = ERROR_NO_MEMORY;
+ return result;
+ }
+
+ frame_copy = amqp_pool_alloc(decoding_pool, sizeof(amqp_frame_t));
+ link = amqp_pool_alloc(decoding_pool, sizeof(amqp_link_t));
if (frame_copy == NULL || link == NULL) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
@@ -400,8 +411,14 @@ static int amqp_login_inner(amqp_connection_state_t state,
{
amqp_table_entry_t properties[2];
amqp_connection_start_ok_t s;
- amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool,
- sasl_method, vl);
+
+ amqp_bytes_t response_bytes;
+
+ amqp_pool_t* pool = amqp_get_decoding_pool(state, 0);
+ if (NULL == pool)
+ return -ERROR_NO_MEMORY;
+
+ response_bytes = sasl_response(pool, sasl_method, vl);
if (response_bytes.bytes == NULL)
return -ERROR_NO_MEMORY;
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
new file mode 100644
index 0000000..fe39744
--- /dev/null
+++ b/tests/CMakeLists.txt
@@ -0,0 +1,21 @@
+
+include_directories(${LIBRABBITMQ_INCLUDE_DIRS})
+
+include_directories(${CMAKE_SOURCE_DIR}/third-party/gtest-1.6.0)
+
+add_executable(test_parse_url test_parse_url.c)
+target_link_libraries(test_parse_url rabbitmq)
+add_test(parse_url test_parse_url)
+
+add_executable(test_tables test_tables.c)
+target_link_libraries(test_tables rabbitmq)
+add_test(tables test_tables)
+set_tests_properties(tables PROPERTIES ENVIRONMENT srcdir=${CMAKE_CURRENT_SOURCE_DIR})
+
+add_executable(test_hashtable test_hashtable.cpp)
+target_link_libraries(test_hashtable rabbitmq gtest_main gtest)
+add_test(hashtable test_hashtable)
+
+add_executable(test_pools test_pools.cpp)
+target_link_libraries(test_pools rabbitmq gtest_main gtest)
+add_test(pools test_pools)
diff --git a/tests/test_hashtable.cpp b/tests/test_hashtable.cpp
new file mode 100644
index 0000000..6417def
--- /dev/null
+++ b/tests/test_hashtable.cpp
@@ -0,0 +1,102 @@
+#include <gtest/gtest.h>
+
+#include "amqp.h"
+#include "amqp_private.h"
+
+class test_hashtable : public ::testing::Test
+{
+ public:
+ test_hashtable()
+ {
+ amqp_hashtable_init(&state.decoding_pools);
+ }
+
+ ~test_hashtable()
+ {
+ amqp_destroy_all_decoding_pools(&state);
+ }
+
+ struct amqp_connection_state_t_ state;
+};
+
+TEST_F(test_hashtable, init)
+{
+ // The entire table should be zeroed
+ for (int i = 0; i < AMQP_HASHTABLE_SIZE; ++i)
+ {
+ EXPECT_EQ(0, state.decoding_pools.entries[i]);
+ }
+}
+
+TEST_F(test_hashtable, add_pool)
+{
+ amqp_pool_t* pool = amqp_hashtable_add_pool(&state.decoding_pools, 5);
+ EXPECT_NE((amqp_pool_t*)NULL, pool);
+}
+
+TEST_F(test_hashtable, add_pool_duplicate)
+{
+ amqp_pool_t* pool = amqp_hashtable_add_pool(&state.decoding_pools, 0);
+ EXPECT_NE((amqp_pool_t*)NULL, pool);
+
+ pool = amqp_hashtable_add_pool(&state.decoding_pools, 0);
+ EXPECT_EQ((amqp_pool_t*)NULL, pool);
+}
+
+TEST_F(test_hashtable, add_two_keys)
+{
+ const amqp_channel_t key1 = 2;
+ const amqp_channel_t key2 = 3;
+
+ EXPECT_NE(amqp_hashtable_channel_hash(key1), amqp_hashtable_channel_hash(key2));
+
+ amqp_pool_t* pool1 = amqp_hashtable_add_pool(&state.decoding_pools, key1);
+ amqp_pool_t* pool2 = amqp_hashtable_add_pool(&state.decoding_pools, key2);
+
+ EXPECT_NE(pool1, pool2);
+
+ amqp_pool_t* pool1a = amqp_hashtable_get_pool(&state.decoding_pools, key1);
+ amqp_pool_t* pool2a = amqp_hashtable_get_pool(&state.decoding_pools, key2);
+
+ EXPECT_EQ(pool1, pool1a);
+ EXPECT_EQ(pool2, pool2a);
+}
+
+TEST_F(test_hashtable, add_key_collision)
+{
+ const amqp_channel_t key1 = 0;
+ const amqp_channel_t key2 = key1 + AMQP_HASHTABLE_SIZE;
+
+ ASSERT_EQ(amqp_hashtable_channel_hash(key1), amqp_hashtable_channel_hash(key2));
+
+ amqp_pool_t* pool1 = amqp_hashtable_add_pool(&state.decoding_pools, key1);
+ amqp_pool_t* pool2 = amqp_hashtable_add_pool(&state.decoding_pools, key2);
+
+ EXPECT_NE(pool1, pool2);
+
+ amqp_pool_t* pool1a = amqp_hashtable_get_pool(&state.decoding_pools, key1);
+ amqp_pool_t* pool2a = amqp_hashtable_get_pool(&state.decoding_pools, key2);
+
+ EXPECT_EQ(pool1, pool1a);
+ EXPECT_EQ(pool2, pool2a);
+}
+
+TEST_F(test_hashtable, get_key_not_exist)
+{
+ amqp_pool_t* not_exist = amqp_hashtable_get_pool(&state.decoding_pools, 10);
+ EXPECT_EQ((amqp_pool_t*)NULL, not_exist);
+}
+
+TEST_F(test_hashtable, get_key_not_exist_chained)
+{
+ amqp_channel_t key_exist = 24;
+ amqp_channel_t key_not_exist = key_exist + AMQP_HASHTABLE_SIZE;
+
+ ASSERT_EQ(amqp_hashtable_channel_hash(key_exist), amqp_hashtable_channel_hash(key_not_exist));
+
+ amqp_hashtable_add_pool(&state.decoding_pools, key_exist);
+
+ amqp_pool_t* pool = amqp_hashtable_get_pool(&state.decoding_pools, key_not_exist);
+ EXPECT_EQ((amqp_pool_t*)NULL, pool);
+}
+
diff --git a/tests/test_pools.cpp b/tests/test_pools.cpp
new file mode 100644
index 0000000..3cfbb6c
--- /dev/null
+++ b/tests/test_pools.cpp
@@ -0,0 +1,193 @@
+#include "amqp_private.h"
+
+#include <gtest/gtest.h>
+
+#include <numeric>
+
+
+class test_pools : public ::testing::Test
+{
+ public:
+ test_pools()
+ {
+ amqp_init_all_pools(&state);
+ }
+
+ ~test_pools()
+ {
+ amqp_destroy_all_pools(&state);
+ }
+
+ struct amqp_connection_state_t_ state;
+};
+
+
+TEST_F(test_pools, initialization)
+{
+ EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool);
+ EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool_cache);
+}
+
+TEST_F(test_pools, get_frame_pool)
+{
+ amqp_pool_t* frame_pool = amqp_get_frame_pool(&state);
+ EXPECT_NE((amqp_pool_t*)NULL, frame_pool);
+ EXPECT_EQ((amqp_pool_t*)NULL, frame_pool->next);
+
+ EXPECT_EQ(frame_pool, state.frame_pool);
+ EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool_cache);
+
+
+ amqp_pool_t* same_pool = amqp_get_frame_pool(&state);
+ EXPECT_EQ(frame_pool, same_pool);
+ EXPECT_EQ((amqp_pool_t*)NULL, same_pool->next);
+
+ EXPECT_EQ(same_pool, state.frame_pool);
+ EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool_cache);
+}
+
+TEST_F(test_pools, move_frame_pool)
+{
+ amqp_channel_t channel = 38;
+
+ amqp_pool_t* decoding_pool = amqp_get_decoding_pool(&state, channel);
+ amqp_pool_t* frame_pool = amqp_get_frame_pool(&state);
+
+ EXPECT_EQ(0, amqp_move_frame_pool(&state, channel));
+
+ EXPECT_EQ(frame_pool, decoding_pool->next);
+ EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool);
+ EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool_cache);
+}
+
+TEST_F(test_pools, move_frame_pool_reuse)
+{
+ amqp_channel_t channel = 42;
+ amqp_pool_t* frame_pool = amqp_get_frame_pool(&state);
+
+ EXPECT_EQ(0, amqp_move_frame_pool(&state, channel));
+
+ amqp_recycle_decoding_pool(&state, channel);
+
+ EXPECT_EQ(frame_pool, state.frame_pool_cache);
+ EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool);
+
+
+ EXPECT_EQ(frame_pool, amqp_get_frame_pool(&state));
+}
+
+TEST_F(test_pools, test_frame_pool_reuse_one)
+{
+ amqp_channel_t channel1 = 174;
+ amqp_channel_t channel2 = 231;
+ amqp_pool_t* decoding_pool1 = amqp_get_decoding_pool(&state, channel1);
+ amqp_pool_t* decoding_pool2 = amqp_get_decoding_pool(&state, channel2);
+ EXPECT_NE(decoding_pool1, decoding_pool2);
+
+ amqp_pool_t* frame_pool1 = amqp_get_frame_pool(&state);
+ EXPECT_NE((amqp_pool_t*)NULL, frame_pool1);
+ EXPECT_EQ(0, amqp_move_frame_pool(&state, channel1));
+
+ amqp_pool_t* frame_pool2 = amqp_get_frame_pool(&state);
+ EXPECT_NE((amqp_pool_t*)NULL, frame_pool2);
+ EXPECT_EQ(0, amqp_move_frame_pool(&state, channel2));
+
+ amqp_recycle_decoding_pool(&state, channel1);
+ EXPECT_EQ((amqp_pool_t*)NULL, decoding_pool1->next);
+ EXPECT_EQ(frame_pool1, state.frame_pool_cache);
+ EXPECT_EQ(frame_pool1, amqp_get_frame_pool(&state));
+ EXPECT_EQ(frame_pool1, state.frame_pool);
+
+ EXPECT_EQ(frame_pool2, decoding_pool2->next);
+
+ EXPECT_EQ(decoding_pool1, amqp_get_decoding_pool(&state, channel1));
+ EXPECT_EQ(decoding_pool2, amqp_get_decoding_pool(&state, channel2));
+}
+
+TEST_F(test_pools, test_frame_pool_reuse_chain)
+{
+ amqp_channel_t channel = 6;
+ amqp_pool_t* decoding_pool = amqp_get_decoding_pool(&state, channel);
+
+ amqp_pool_t* frame_pool1 = amqp_get_frame_pool(&state);
+ EXPECT_EQ(0, amqp_move_frame_pool(&state, channel));
+
+ amqp_pool_t* frame_pool2 = amqp_get_frame_pool(&state);
+ EXPECT_NE(frame_pool1, frame_pool2);
+ EXPECT_EQ(0, amqp_move_frame_pool(&state, channel));
+
+ amqp_recycle_decoding_pool(&state, channel);
+ EXPECT_EQ((amqp_pool_t*)NULL, decoding_pool->next);
+ EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool);
+ EXPECT_NE((amqp_pool_t*)NULL, state.frame_pool_cache);
+ EXPECT_NE((amqp_pool_t*)NULL, state.frame_pool_cache->next);
+ EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool_cache->next->next);
+
+ EXPECT_NE((amqp_pool_t*)NULL, amqp_get_frame_pool(&state));
+ EXPECT_NE((amqp_pool_t*)NULL, state.frame_pool);
+ EXPECT_NE((amqp_pool_t*)NULL, state.frame_pool_cache);
+ EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool_cache->next);
+}
+
+
+TEST_F(test_pools, test_frame_pool_reuse_all)
+{
+ amqp_channel_t channel1 = 25;
+ amqp_channel_t channel2 = 91;
+ amqp_pool_t* decoding_pool1 = amqp_get_decoding_pool(&state, channel1);
+ amqp_pool_t* decoding_pool2 = amqp_get_decoding_pool(&state, channel2);
+ EXPECT_NE(decoding_pool1, decoding_pool2);
+
+ amqp_pool_t* frame_pool1 = amqp_get_frame_pool(&state);
+ EXPECT_NE((amqp_pool_t*)NULL, frame_pool1);
+ EXPECT_EQ(0, amqp_move_frame_pool(&state, channel1));
+
+ amqp_pool_t* frame_pool2 = amqp_get_frame_pool(&state);
+ EXPECT_NE((amqp_pool_t*)NULL, frame_pool2);
+ EXPECT_EQ(0, amqp_move_frame_pool(&state, channel2));
+
+ amqp_recycle_all_decoding_pools(&state);
+ EXPECT_EQ((amqp_pool_t*)NULL, decoding_pool1->next);
+ EXPECT_EQ((amqp_pool_t*)NULL, decoding_pool2->next);
+ EXPECT_NE((amqp_pool_t*)NULL, state.frame_pool_cache);
+ EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool);
+
+ amqp_pool_t* frame_pool_recycled = amqp_get_frame_pool(&state);
+ EXPECT_TRUE(frame_pool_recycled == frame_pool1 || frame_pool_recycled == frame_pool2);
+ EXPECT_EQ((amqp_pool_t*)NULL, frame_pool_recycled->next);
+ EXPECT_EQ(frame_pool_recycled, state.frame_pool);
+ EXPECT_NE((amqp_pool_t*)NULL, state.frame_pool_cache);
+ EXPECT_EQ((amqp_pool_t*)NULL, state.frame_pool_cache->next);
+
+ EXPECT_EQ(decoding_pool1, amqp_get_decoding_pool(&state, channel1));
+ EXPECT_EQ(decoding_pool2, amqp_get_decoding_pool(&state, channel2));
+}
+
+TEST_F(test_pools, get_decoding_pool)
+{
+ amqp_channel_t channel = 32;
+ amqp_pool_t* decoding_pool = amqp_get_decoding_pool(&state, channel);
+ EXPECT_NE((amqp_pool_t*)NULL, decoding_pool);
+ EXPECT_EQ((amqp_pool_t*)NULL, decoding_pool->next);
+
+ amqp_pool_t* decoding_pool_same = amqp_get_decoding_pool(&state, channel);
+ EXPECT_EQ(decoding_pool, decoding_pool_same);
+ EXPECT_EQ((amqp_pool_t*)NULL, decoding_pool_same->next);
+}
+
+TEST_F(test_pools, get_decoding_pool_differs)
+{
+ amqp_channel_t channel = 0;
+ amqp_pool_t* decoding_pool = amqp_get_decoding_pool(&state, channel);
+
+ for (int i = 1; i <= std::numeric_limits<amqp_channel_t>::max(); ++i)
+ {
+ amqp_pool_t* other_pool = amqp_get_decoding_pool(&state, i);
+ EXPECT_NE(decoding_pool, other_pool);
+ }
+
+ amqp_pool_t* decoding_pool_same = amqp_get_decoding_pool(&state, channel);
+ EXPECT_EQ(decoding_pool, decoding_pool_same);
+}
+
+
diff --git a/tests/test_tables.c b/tests/test_tables.c
index 4effd6e..6c80647 100644
--- a/tests/test_tables.c
+++ b/tests/test_tables.c
@@ -58,7 +58,9 @@ void die(const char *fmt, ...)
abort();
}
+#ifndef M_PI
#define M_PI 3.14159265358979323846264338327
+#endif
static void dump_indent(int indent, FILE *out)
{