diff options
| author | Ted Ross <tross@apache.org> | 2013-02-04 22:46:32 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-02-04 22:46:32 +0000 |
| commit | cffa3637701472511da91eeee1e9c989bfc02f7a (patch) | |
| tree | c27389e84f14facbc2f4862b46bfdd0b64c05dba | |
| parent | 0b78470bcc369a5e3702c846d47a0c575730cc8f (diff) | |
| download | qpid-python-cffa3637701472511da91eeee1e9c989bfc02f7a.tar.gz | |
QPID-4538
Work-in-progress - Major cleanup in the "message.h" API
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1442413 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | extras/nexus/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | extras/nexus/include/qpid/nexus/alloc.h | 6 | ||||
| -rw-r--r-- | extras/nexus/include/qpid/nexus/buffer.h | 75 | ||||
| -rw-r--r-- | extras/nexus/include/qpid/nexus/ctools.h | 96 | ||||
| -rw-r--r-- | extras/nexus/include/qpid/nexus/message.h | 145 | ||||
| -rw-r--r-- | extras/nexus/src/buffer.c | 76 | ||||
| -rw-r--r-- | extras/nexus/src/container.c | 4 | ||||
| -rw-r--r-- | extras/nexus/src/iterator.c | 2 | ||||
| -rw-r--r-- | extras/nexus/src/message.c | 614 | ||||
| -rw-r--r-- | extras/nexus/src/message_private.h | 94 | ||||
| -rw-r--r-- | extras/nexus/src/timer.c | 38 | ||||
| -rw-r--r-- | extras/nexus/tests/alloc_test.c | 2 | ||||
| -rw-r--r-- | extras/nexus/tests/message_test.c | 40 | ||||
| -rw-r--r-- | extras/nexus/tests/timer_test.c | 2 |
14 files changed, 657 insertions, 538 deletions
diff --git a/extras/nexus/CMakeLists.txt b/extras/nexus/CMakeLists.txt index 04c33c35e2..e1cbcd3cdd 100644 --- a/extras/nexus/CMakeLists.txt +++ b/extras/nexus/CMakeLists.txt @@ -65,6 +65,7 @@ set(CATCH_UNDEFINED "-Wl,--no-undefined") set(server_SOURCES src/alloc.c src/auth.c + src/buffer.c src/container.c src/hash.c src/iterator.c diff --git a/extras/nexus/include/qpid/nexus/alloc.h b/extras/nexus/include/qpid/nexus/alloc.h index a0c832c069..30450db641 100644 --- a/extras/nexus/include/qpid/nexus/alloc.h +++ b/extras/nexus/include/qpid/nexus/alloc.h @@ -57,14 +57,14 @@ void nx_dealloc(nx_alloc_type_desc_t *desc, nx_alloc_pool_t **tpool, void *p); T *new_##T(); \ void free_##T(T *p) -#define ALLOC_DEFINE_CONFIG(T,C) \ - nx_alloc_type_desc_t __desc_##T = {#T, sizeof(T), C, 0, 0, 0}; \ +#define ALLOC_DEFINE_CONFIG(T,S,C) \ + nx_alloc_type_desc_t __desc_##T = {#T, S, C, 0, 0, 0}; \ __thread nx_alloc_pool_t *__local_pool_##T = 0; \ T *new_##T() { return (T*) nx_alloc(&__desc_##T, &__local_pool_##T); } \ void free_##T(T *p) { nx_dealloc(&__desc_##T, &__local_pool_##T, (void*) p); } \ nx_alloc_stats_t *alloc_stats_##T() { return __desc_##T.stats; } -#define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, 0) +#define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, sizeof(T), 0) #endif diff --git a/extras/nexus/include/qpid/nexus/buffer.h b/extras/nexus/include/qpid/nexus/buffer.h new file mode 100644 index 0000000000..6fb471c7f3 --- /dev/null +++ b/extras/nexus/include/qpid/nexus/buffer.h @@ -0,0 +1,75 @@ +#ifndef __nexus_buffer_h__ +#define __nexus_buffer_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/nexus/ctools.h> + +typedef struct nx_buffer_t nx_buffer_t; + +DEQ_DECLARE(nx_buffer_t, nx_buffer_list_t); + +struct nx_buffer_t { + DEQ_LINKS(nx_buffer_t); + unsigned int size; +}; + +/** + */ +nx_buffer_t *nx_allocate_buffer(void); + +/** + * @param buf A pointer to an allocated buffer + */ +void nx_free_buffer(nx_buffer_t *buf); + +/** + * @param buf A pointer to an allocated buffer + * @return A pointer to the first octet in the buffer + */ +unsigned char *nx_buffer_base(nx_buffer_t *buf); + +/** + * @param buf A pointer to an allocated buffer + * @return A pointer to the first free octet in the buffer, the insert point for new data. + */ +unsigned char *nx_buffer_cursor(nx_buffer_t *buf); + +/** + * @param buf A pointer to an allocated buffer + * @return The number of octets in the buffer's free space, how many octets may be inserted. + */ +size_t nx_buffer_capacity(nx_buffer_t *buf); + +/** + * @param buf A pointer to an allocated buffer + * @return The number of octets of data in the buffer + */ +size_t nx_buffer_size(nx_buffer_t *buf); + +/** + * Notify the buffer that octets have been inserted at the buffer's cursor. This will advance the + * cursor by len octets. + * + * @param buf A pointer to an allocated buffer + * @param len The number of octets that have been appended to the buffer + */ +void nx_buffer_insert(nx_buffer_t *buf, size_t len); + +#endif diff --git a/extras/nexus/include/qpid/nexus/ctools.h b/extras/nexus/include/qpid/nexus/ctools.h index 6b8f072b75..67298f3782 100644 --- a/extras/nexus/include/qpid/nexus/ctools.h +++ b/extras/nexus/include/qpid/nexus/ctools.h @@ -37,11 +37,11 @@ #define DEQ_LINKS(t) t *prev; t *next -#define DEQ_INIT(d) do { d.head = 0; d.tail = 0; d.scratch = 0; d.size = 0; } while (0) +#define DEQ_INIT(d) do { (d).head = 0; (d).tail = 0; (d).scratch = 0; (d).size = 0; } while (0) #define DEQ_ITEM_INIT(i) do { (i)->next = 0; (i)->prev = 0; } while(0) -#define DEQ_HEAD(d) (d.head) -#define DEQ_TAIL(d) (d.tail) -#define DEQ_SIZE(d) (d.size) +#define DEQ_HEAD(d) ((d).head) +#define DEQ_TAIL(d) ((d).tail) +#define DEQ_SIZE(d) ((d).size) #define DEQ_NEXT(i) (i)->next #define DEQ_PREV(i) (i)->prev @@ -49,67 +49,67 @@ do { \ CT_ASSERT((i)->next == 0); \ CT_ASSERT((i)->prev == 0); \ - if (d.head) { \ - (i)->next = d.head; \ - d.head->prev = i; \ + if ((d).head) { \ + (i)->next = (d).head; \ + (d).head->prev = i; \ } else { \ - d.tail = i; \ + (d).tail = i; \ (i)->next = 0; \ - CT_ASSERT(d.size == 0); \ + CT_ASSERT((d).size == 0); \ } \ (i)->prev = 0; \ - d.head = i; \ - d.size++; \ + (d).head = i; \ + (d).size++; \ } while (0) #define DEQ_INSERT_TAIL(d,i) \ do { \ CT_ASSERT((i)->next == 0); \ CT_ASSERT((i)->prev == 0); \ - if (d.tail) { \ - (i)->prev = d.tail; \ - d.tail->next = i; \ + if ((d).tail) { \ + (i)->prev = (d).tail; \ + (d).tail->next = i; \ } else { \ - d.head = i; \ + (d).head = i; \ (i)->prev = 0; \ - CT_ASSERT(d.size == 0); \ + CT_ASSERT((d).size == 0); \ } \ (i)->next = 0; \ - d.tail = i; \ - d.size++; \ + (d).tail = i; \ + (d).size++; \ } while (0) -#define DEQ_REMOVE_HEAD(d) \ -do { \ - CT_ASSERT(d.head); \ - if (d.head) { \ - d.scratch = d.head; \ - d.head = d.head->next; \ - if (d.head == 0) { \ - d.tail = 0; \ - CT_ASSERT(d.size == 1); \ +#define DEQ_REMOVE_HEAD(d) \ +do { \ + CT_ASSERT((d).head); \ + if ((d).head) { \ + (d).scratch = (d).head; \ + (d).head = (d).head->next; \ + if ((d).head == 0) { \ + (d).tail = 0; \ + CT_ASSERT((d).size == 1); \ } else \ - d.head->prev = 0; \ - d.size--; \ - d.scratch->next = 0; \ - d.scratch->prev = 0; \ + (d).head->prev = 0; \ + (d).size--; \ + (d).scratch->next = 0; \ + (d).scratch->prev = 0; \ } \ } while (0) #define DEQ_REMOVE_TAIL(d) \ do { \ - CT_ASSERT(d.tail); \ - if (d.tail) { \ - d.scratch = d.tail; \ - d.tail = d.tail->prev; \ - if (d.tail == 0) { \ - d.head = 0; \ - CT_ASSERT(d.size == 1); \ + CT_ASSERT((d).tail); \ + if ((d).tail) { \ + (d).scratch = (d).tail; \ + (d).tail = (d).tail->prev; \ + if ((d).tail == 0) { \ + (d).head = 0; \ + CT_ASSERT((d).size == 1); \ } else \ - d.tail->next = 0; \ - d.size--; \ - d.scratch->next = 0; \ - d.scratch->prev = 0; \ + (d).tail->next = 0; \ + (d).size--; \ + (d).scratch->next = 0; \ + (d).scratch->prev = 0; \ } \ } while (0) @@ -120,11 +120,11 @@ do { \ if ((a)->next) \ (a)->next->prev = (i); \ else \ - d.tail = (i); \ + (d).tail = (i); \ (i)->next = (a)->next; \ (i)->prev = (a); \ (a)->next = (i); \ - d.size++; \ + (d).size++; \ } while (0) #define DEQ_REMOVE(d,i) \ @@ -132,15 +132,15 @@ do { \ if ((i)->next) \ (i)->next->prev = (i)->prev; \ else \ - d.tail = (i)->prev; \ + (d).tail = (i)->prev; \ if ((i)->prev) \ (i)->prev->next = (i)->next; \ else \ - d.head = (i)->next; \ - d.size--; \ + (d).head = (i)->next; \ + (d).size--; \ (i)->next = 0; \ (i)->prev = 0; \ - CT_ASSERT(d.size || (!d.head && !d.tail)); \ + CT_ASSERT((d).size || (!(d).head && !(d).tail)); \ } while (0) #endif diff --git a/extras/nexus/include/qpid/nexus/message.h b/extras/nexus/include/qpid/nexus/message.h index 3bb6b950ea..ade0525666 100644 --- a/extras/nexus/include/qpid/nexus/message.h +++ b/extras/nexus/include/qpid/nexus/message.h @@ -21,99 +21,105 @@ #include <proton/engine.h> #include <qpid/nexus/ctools.h> +#include <qpid/nexus/alloc.h> #include <qpid/nexus/iterator.h> +#include <qpid/nexus/buffer.h> + +// Callback for status change (confirmed persistent, loaded-in-memory, etc.) typedef struct nx_message_t nx_message_t; -typedef struct nx_buffer_t nx_buffer_t; -DEQ_DECLARE(nx_buffer_t, nx_buffer_list_t); DEQ_DECLARE(nx_message_t, nx_message_list_t); -typedef struct { - nx_buffer_t *buffer; // Buffer that contains the first octet of the field, null if the field is not present - size_t offset; // Offset in the buffer to the first octet - size_t length; // Length of the field or zero if unneeded - int parsed; // non-zero iff the buffer chain has been parsed to find this field -} nx_field_location_t; - - -// TODO - consider using pointers to nx_field_location_t below to save memory struct nx_message_t { DEQ_LINKS(nx_message_t); - nx_buffer_list_t buffers; // The buffer chain containing the message - pn_delivery_t *in_delivery; // The delivery on which the message arrived - pn_delivery_t *out_delivery; // The delivery on which the message was last sent - nx_field_location_t section_message_header; // The message header list - nx_field_location_t section_delivery_annotation; // The delivery annotation map - nx_field_location_t section_message_annotation; // The message annotation map - nx_field_location_t section_message_properties; // The message properties list - nx_field_location_t section_application_properties; // The application properties list - nx_field_location_t section_body; // The message body: Data - nx_field_location_t section_footer; // The footer - nx_field_location_t field_user_id; // The string value of the user-id - nx_field_location_t field_to; // The string value of the to field - nx_field_location_t body; // The body of the message - nx_field_location_t compose_length; - nx_field_location_t compose_count; - uint32_t length; - uint32_t count; + // Private members not listed here. }; -struct nx_buffer_t { - DEQ_LINKS(nx_buffer_t); - unsigned int size; -}; - -typedef struct { - size_t buffer_size; - unsigned long buffer_preallocation_count; - unsigned long buffer_rebalancing_batch_count; - unsigned long buffer_local_storage_max; - unsigned long buffer_free_list_max; - unsigned long message_allocation_batch_count; - unsigned long message_rebalancing_batch_count; - unsigned long message_local_storage_max; -} nx_allocator_config_t; - -const nx_allocator_config_t *nx_allocator_default_config(void); - -void nx_allocator_initialize(const nx_allocator_config_t *config); -void nx_allocator_finalize(void); - -// -// Functions for per-thread allocators. -// -nx_message_t *nx_allocate_message(void); -nx_buffer_t *nx_allocate_buffer(void); -void nx_free_message(nx_message_t *msg); -void nx_free_buffer(nx_buffer_t *buf); - - typedef enum { NX_DEPTH_NONE, NX_DEPTH_HEADER, NX_DEPTH_DELIVERY_ANNOTATIONS, NX_DEPTH_MESSAGE_ANNOTATIONS, - NX_DEPTH_MESSAGE_PROPERTIES, // Needed for 'user-id' and 'to' + NX_DEPTH_PROPERTIES, NX_DEPTH_APPLICATION_PROPERTIES, NX_DEPTH_BODY, NX_DEPTH_ALL } nx_message_depth_t; + +typedef enum { + // + // Message Sections + // + NX_FIELD_HEADER, + NX_FIELD_DELIVERY_ANNOTATION, + NX_FIELD_MESSAGE_ANNOTATION, + NX_FIELD_PROPERTIES, + NX_FIELD_APPLICATION_PROPERTIES, + NX_FIELD_BODY, + NX_FIELD_FOOTER, + + // + // Fields of the Header Section + // + NX_FIELD_DURABLE, + NX_FIELD_PRIORITY, + NX_FIELD_TTL, + NX_FIELD_FIRST_ACQUIRER, + NX_FIELD_DELIVERY_COUNT, + + // + // Fields of the Properties Section + // + NX_FIELD_MESSAGE_ID, + NX_FIELD_USER_ID, + NX_FIELD_TO, + NX_FIELD_SUBJECT, + NX_FIELD_REPLY_TO, + NX_FIELD_CORRELATION_ID, + NX_FIELD_CONTENT_TYPE, + NX_FIELD_CONTENT_ENCODING, + NX_FIELD_ABSOLUTE_EXPIRY_TIME, + NX_FIELD_CREATION_TIME, + NX_FIELD_GROUP_ID, + NX_FIELD_GROUP_SEQUENCE, + NX_FIELD_REPLY_TO_GROUP_ID +} nx_message_field_t; + +// +// Functions for allocation +// +nx_message_t *nx_allocate_message(void); +void nx_free_message(nx_message_t *qm); +nx_message_t *nx_message_copy(nx_message_t *qm); +int nx_message_persistent(nx_message_t *qm); +int nx_message_in_memory(nx_message_t *qm); + +void nx_message_set_out_delivery(nx_message_t *msg, pn_delivery_t *delivery); +pn_delivery_t *nx_message_out_delivery(nx_message_t *msg); +void nx_message_set_in_delivery(nx_message_t *msg, pn_delivery_t *delivery); +pn_delivery_t *nx_message_in_delivery(nx_message_t *msg); + // // Functions for received messages // nx_message_t *nx_message_receive(pn_delivery_t *delivery); +void nx_message_send(nx_message_t *msg, pn_link_t *link); + int nx_message_check(nx_message_t *msg, nx_message_depth_t depth); -nx_field_iterator_t *nx_message_field_to(nx_message_t *msg); -nx_field_iterator_t *nx_message_body(nx_message_t *msg); +nx_field_iterator_t *nx_message_field(nx_message_t *msg, nx_message_field_t field); + +pn_delivery_t *nx_message_inbound_delivery(nx_message_t *qm); // // Functions for composed messages // // Convenience Functions -void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_t *buf_chain); +void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_list_t *buffers); +void nx_message_copy_header(nx_message_t *msg); // Copy received header into send-header (prior to adding annotations) +void nx_message_copy_message_annotations(nx_message_t *msg); // Raw Functions void nx_message_begin_header(nx_message_t *msg); @@ -131,7 +137,7 @@ void nx_message_end_message_properties(nx_message_t *msg); void nx_message_begin_application_properties(nx_message_t *msg); void nx_message_end_application_properties(nx_message_t *msg); -void nx_message_append_body_data(nx_message_t *msg, nx_buffer_t *buf_chain); +void nx_message_append_body_data(nx_message_t *msg, nx_buffer_list_t *buffers); void nx_message_begin_body_sequence(nx_message_t *msg); void nx_message_end_body_sequence(nx_message_t *msg); @@ -149,14 +155,9 @@ void nx_message_insert_string(nx_message_t *msg, const char *start); void nx_message_insert_uuid(nx_message_t *msg, const uint8_t *value); void nx_message_insert_symbol(nx_message_t *msg, const char *start, size_t len); void nx_message_insert_timestamp(nx_message_t *msg, uint64_t value); - -// -// Functions for buffers -// -unsigned char *nx_buffer_base(nx_buffer_t *buf); // Pointer to the first octet in the buffer -unsigned char *nx_buffer_cursor(nx_buffer_t *buf); // Pointer to the first free octet in the buffer -size_t nx_buffer_capacity(nx_buffer_t *buf); // Size of free space in the buffer in octets -size_t nx_buffer_size(nx_buffer_t *buf); // Number of octets in the buffer -void nx_buffer_insert(nx_buffer_t *buf, size_t len); // Notify the buffer that 'len' octets were written at cursor +void nx_message_begin_list(nx_message_t* msg); +void nx_message_end_list(nx_message_t* msg); +void nx_message_begin_map(nx_message_t* msg); +void nx_message_end_map(nx_message_t* msg); #endif diff --git a/extras/nexus/src/buffer.c b/extras/nexus/src/buffer.c new file mode 100644 index 0000000000..10f32b9541 --- /dev/null +++ b/extras/nexus/src/buffer.c @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/nexus/buffer.h> +#include <qpid/nexus/alloc.h> + +#ifndef NEXUS_BUFFER_SIZE +#define NEXUS_BUFFER_SIZE 512 +#endif + +ALLOC_DECLARE(nx_buffer_t); +ALLOC_DEFINE_CONFIG(nx_buffer_t, sizeof(nx_buffer_t) + NEXUS_BUFFER_SIZE, 0); + + +nx_buffer_t *nx_allocate_buffer(void) +{ + nx_buffer_t *buf = new_nx_buffer_t(); + + DEQ_ITEM_INIT(buf); + buf->size = 0; + return buf; +} + + +void nx_free_buffer(nx_buffer_t *buf) +{ + free_nx_buffer_t(buf); +} + + +unsigned char *nx_buffer_base(nx_buffer_t *buf) +{ + return (unsigned char*) &buf[1]; +} + + +unsigned char *nx_buffer_cursor(nx_buffer_t *buf) +{ + return ((unsigned char*) &buf[1]) + buf->size; +} + + +size_t nx_buffer_capacity(nx_buffer_t *buf) +{ + return NEXUS_BUFFER_SIZE - buf->size; +} + + +size_t nx_buffer_size(nx_buffer_t *buf) +{ + return buf->size; +} + + +void nx_buffer_insert(nx_buffer_t *buf, size_t len) +{ + buf->size += len; + assert(buf->size <= NEXUS_BUFFER_SIZE); +} + diff --git a/extras/nexus/src/container.c b/extras/nexus/src/container.c index 3d57a8f21d..fdf610db93 100644 --- a/extras/nexus/src/container.c +++ b/extras/nexus/src/container.c @@ -399,10 +399,6 @@ void nx_container_initialize(void) { nx_log(module, LOG_TRACE, "Container Initializing"); - // TODO - move allocator init to server? - const nx_allocator_config_t *alloc_config = nx_allocator_default_config(); - nx_allocator_initialize(alloc_config); - node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4 node_map = hash(10, 32, 0); // 1K buckets, item batches of 32 lock = sys_mutex(); diff --git a/extras/nexus/src/iterator.c b/extras/nexus/src/iterator.c index d03590e851..668629d83a 100644 --- a/extras/nexus/src/iterator.c +++ b/extras/nexus/src/iterator.c @@ -18,9 +18,9 @@ */ #include <qpid/nexus/iterator.h> -#include <qpid/nexus/message.h> #include <qpid/nexus/ctools.h> #include <qpid/nexus/alloc.h> +#include "message_private.h" #include <stdio.h> #include <string.h> diff --git a/extras/nexus/src/message.c b/extras/nexus/src/message.c index 11f58a1474..c9f7c1a581 100644 --- a/extras/nexus/src/message.c +++ b/extras/nexus/src/message.c @@ -17,51 +17,14 @@ * under the License. */ -#include <qpid/nexus/message.h> #include <qpid/nexus/ctools.h> #include <qpid/nexus/threading.h> +#include "message_private.h" #include <string.h> #include <stdio.h> - -// -// Per-Thread allocator -// -typedef struct nx_allocator_t { - nx_message_list_t message_free_list; - nx_buffer_list_t buffer_free_list; -} nx_allocator_t; - -// -// Global allocator (protected by a global lock) -// -typedef struct { - nx_message_list_t message_free_list; - nx_buffer_list_t buffer_free_list; - sys_mutex_t *lock; -} nx_global_allocator_t; - -static nx_global_allocator_t global; -static nx_allocator_config_t default_config; -static const nx_allocator_config_t *config; - - -static nx_allocator_t *nx_get_allocator(void) -{ - static __thread nx_allocator_t *alloc = 0; - - if (!alloc) { - alloc = NEW(nx_allocator_t); - - if (!alloc) - return 0; - - DEQ_INIT(alloc->message_free_list); - DEQ_INIT(alloc->buffer_free_list); - } - - return alloc; -} +ALLOC_DEFINE_CONFIG(nx_message_t, sizeof(nx_message_pvt_t), 0); +ALLOC_DEFINE(nx_message_content_t); static void advance(unsigned char **cursor, nx_buffer_t **buffer, int consume) @@ -288,7 +251,7 @@ static int nx_check_and_advance(nx_buffer_t **buffer, } -static void nx_insert(nx_message_t *msg, const uint8_t *seq, size_t len) +static void nx_insert(nx_message_content_t *msg, const uint8_t *seq, size_t len) { nx_buffer_t *buf = DEQ_TAIL(msg->buffers); @@ -312,13 +275,13 @@ static void nx_insert(nx_message_t *msg, const uint8_t *seq, size_t len) } -static void nx_insert_8(nx_message_t *msg, uint8_t value) +static void nx_insert_8(nx_message_content_t *msg, uint8_t value) { nx_insert(msg, &value, 1); } -static void nx_insert_32(nx_message_t *msg, uint32_t value) +static void nx_insert_32(nx_message_content_t *msg, uint32_t value) { uint8_t buf[4]; buf[0] = (uint8_t) ((value & 0xFF000000) >> 24); @@ -329,7 +292,7 @@ static void nx_insert_32(nx_message_t *msg, uint32_t value) } -static void nx_insert_64(nx_message_t *msg, uint64_t value) +static void nx_insert_64(nx_message_content_t *msg, uint64_t value) { uint8_t buf[8]; buf[0] = (uint8_t) ((value & 0xFF00000000000000L) >> 56); @@ -371,7 +334,7 @@ static void nx_overwrite_32(nx_field_location_t *field, uint32_t value) } -static void nx_start_list_performative(nx_message_t *msg, uint8_t code) +static void nx_start_list_performative(nx_message_content_t *msg, uint8_t code) { // // Insert the short-form performative tag @@ -409,219 +372,116 @@ static void nx_start_list_performative(nx_message_t *msg, uint8_t code) } -static void nx_end_list(nx_message_t *msg) +static void nx_end_list(nx_message_content_t *msg) { nx_overwrite_32(&msg->compose_length, msg->length); nx_overwrite_32(&msg->compose_count, msg->count); } -const nx_allocator_config_t *nx_allocator_default_config(void) +nx_message_t *nx_allocate_message() { - default_config.buffer_size = 1024; - default_config.buffer_preallocation_count = 512; - default_config.buffer_rebalancing_batch_count = 16; - default_config.buffer_local_storage_max = 64; - default_config.buffer_free_list_max = 1000000; - default_config.message_allocation_batch_count = 256; - default_config.message_rebalancing_batch_count = 64; - default_config.message_local_storage_max = 256; - - return &default_config; -} - + nx_message_pvt_t *msg = (nx_message_pvt_t*) new_nx_message_t(); + if (!msg) + return 0; -void nx_allocator_initialize(const nx_allocator_config_t *c) -{ - config = c; + DEQ_ITEM_INIT(msg); + msg->content = new_nx_message_content_t(); + msg->out_delivery = 0; - // Initialize the fields in the global structure. - DEQ_INIT(global.message_free_list); - DEQ_INIT(global.buffer_free_list); - global.lock = sys_mutex(); + if (msg->content == 0) { + free_nx_message_t((nx_message_t*) msg); + return 0; + } - // Pre-allocate buffers according to the configuration - int i; - nx_buffer_t *buf; + memset(msg->content, 0, sizeof(nx_message_content_t)); + msg->content->lock = sys_mutex(); + msg->content->ref_count = 1; - for (i = 0; i < config->buffer_preallocation_count; i++) { - buf = (nx_buffer_t*) malloc (sizeof(nx_buffer_t) + config->buffer_size); - DEQ_ITEM_INIT(buf); - DEQ_INSERT_TAIL(global.buffer_free_list, buf); - } + return (nx_message_t*) msg; } -void nx_allocator_finalize(void) +void nx_free_message(nx_message_t *in_msg) { - // TODO - Free buffers and messages -} + uint32_t rc; + nx_message_pvt_t *msg = (nx_message_pvt_t*) in_msg; + nx_message_content_t *content = msg->content; + sys_mutex_lock(content->lock); + rc = --content->ref_count; + sys_mutex_unlock(content->lock); -nx_message_t *nx_allocate_message(void) -{ - nx_allocator_t *alloc = nx_get_allocator(); - nx_message_t *msg; - int i; + if (rc == 0) { + nx_buffer_t *buf = DEQ_HEAD(content->buffers); - if (DEQ_SIZE(alloc->message_free_list) == 0) { - // - // The local free list is empty, rebalance a batch of objects from the global - // free list. - // - sys_mutex_lock(global.lock); - if (DEQ_SIZE(global.message_free_list) >= config->message_rebalancing_batch_count) { - for (i = 0; i < config->message_rebalancing_batch_count; i++) { - msg = DEQ_HEAD(global.message_free_list); - DEQ_REMOVE_HEAD(global.message_free_list); - DEQ_INSERT_TAIL(alloc->message_free_list, msg); - } + while (buf) { + DEQ_REMOVE_HEAD(content->buffers); + nx_free_buffer(buf); + buf = DEQ_HEAD(content->buffers); } - sys_mutex_unlock(global.lock); - } - if (DEQ_SIZE(alloc->message_free_list) == 0) { - // - // The local free list is still empty. This means there were not enough objects on the - // global free list to make up a batch. Allocate new objects from the heap and store - // them in the local free list. - // - nx_message_t *batch = NEW_ARRAY(nx_message_t, config->message_allocation_batch_count); - memset(batch, 0, sizeof(nx_message_t) * config->message_allocation_batch_count); - for (i = 0; i < config->message_allocation_batch_count; i++) { - DEQ_INSERT_TAIL(alloc->message_free_list, &batch[i]); - } + sys_mutex_free(content->lock); + free_nx_message_content_t(content); } - // - // If the local free list is still empty, we're out of memory. - // - if (DEQ_SIZE(alloc->message_free_list) == 0) - return 0; - - msg = DEQ_HEAD(alloc->message_free_list); - DEQ_REMOVE_HEAD(alloc->message_free_list); - - DEQ_INIT(msg->buffers); - msg->in_delivery = NULL; - msg->out_delivery = NULL; - msg->section_message_header.buffer = 0; - msg->section_message_header.parsed = 0; - msg->section_delivery_annotation.buffer = 0; - msg->section_delivery_annotation.parsed = 0; - msg->section_message_annotation.buffer = 0; - msg->section_message_annotation.parsed = 0; - msg->section_message_properties.buffer = 0; - msg->section_message_properties.parsed = 0; - msg->section_application_properties.buffer = 0; - msg->section_application_properties.parsed = 0; - msg->section_body.buffer = 0; - msg->section_body.parsed = 0; - msg->section_footer.buffer = 0; - msg->section_footer.parsed = 0; - msg->field_user_id.buffer = 0; - msg->field_user_id.parsed = 0; - msg->field_to.buffer = 0; - msg->field_to.parsed = 0; - msg->body.buffer = 0; - msg->body.parsed = 0; - return msg; + free_nx_message_t((nx_message_t*) msg); } -nx_buffer_t *nx_allocate_buffer(void) +nx_message_t *nx_message_copy(nx_message_t *in_msg) { - nx_allocator_t *alloc = nx_get_allocator(); - nx_buffer_t *buf; - int i; - - if (DEQ_SIZE(alloc->buffer_free_list) == 0) { - sys_mutex_lock(global.lock); - if (DEQ_SIZE(global.buffer_free_list) >= config->buffer_rebalancing_batch_count) { - // Rebalance a batch of free descriptors to the local free list. - for (i = 0; i < config->buffer_rebalancing_batch_count; i++) { - buf = DEQ_HEAD(global.buffer_free_list); - DEQ_REMOVE_HEAD(global.buffer_free_list); - DEQ_INSERT_TAIL(alloc->buffer_free_list, buf); - } - } - sys_mutex_unlock(global.lock); - } + nx_message_pvt_t *msg = (nx_message_pvt_t*) in_msg; + nx_message_content_t *content = msg->content; + nx_message_pvt_t *copy = (nx_message_pvt_t*) new_nx_message_t(); - if (DEQ_SIZE(alloc->buffer_free_list) == 0) { - // Allocate a buffer from the heap - buf = (nx_buffer_t*) malloc (sizeof(nx_buffer_t) + config->buffer_size); - DEQ_ITEM_INIT(buf); - DEQ_INSERT_TAIL(alloc->buffer_free_list, buf); - } - - if (DEQ_SIZE(alloc->buffer_free_list) == 0) + if (!copy) return 0; - buf = DEQ_HEAD(alloc->buffer_free_list); - DEQ_REMOVE_HEAD(alloc->buffer_free_list); + DEQ_ITEM_INIT(copy); + copy->content = content; + copy->out_delivery = 0; - buf->size = 0; + sys_mutex_lock(content->lock); + content->ref_count++; + sys_mutex_unlock(content->lock); - return buf; + return (nx_message_t*) copy; } -void nx_free_message(nx_message_t *msg) +void nx_message_set_out_delivery(nx_message_t *msg, pn_delivery_t *delivery) { - nx_allocator_t *alloc = nx_get_allocator(); + ((nx_message_pvt_t*) msg)->out_delivery = delivery; +} - // Free any buffers in the message - int i; - nx_buffer_t *buf = DEQ_HEAD(msg->buffers); - while (buf) { - DEQ_REMOVE_HEAD(msg->buffers); - nx_free_buffer(buf); - buf = DEQ_HEAD(msg->buffers); - } - DEQ_INSERT_TAIL(alloc->message_free_list, msg); - if (DEQ_SIZE(alloc->message_free_list) > config->message_local_storage_max) { - // - // The local free list has exceeded the threshold for local storage. - // Rebalance a batch of free objects to the global free list. - // - sys_mutex_lock(global.lock); - for (i = 0; i < config->message_rebalancing_batch_count; i++) { - msg = DEQ_HEAD(alloc->message_free_list); - DEQ_REMOVE_HEAD(alloc->message_free_list); - DEQ_INSERT_TAIL(global.message_free_list, msg); - } - sys_mutex_unlock(global.lock); - } +pn_delivery_t *nx_message_out_delivery(nx_message_t *msg) +{ + return ((nx_message_pvt_t*) msg)->out_delivery; } -void nx_free_buffer(nx_buffer_t *buf) +void nx_message_set_in_delivery(nx_message_t *msg, pn_delivery_t *delivery) { - nx_allocator_t *alloc = nx_get_allocator(); - int i; - - DEQ_INSERT_TAIL(alloc->buffer_free_list, buf); - if (DEQ_SIZE(alloc->buffer_free_list) > config->buffer_local_storage_max) { - // Rebalance a batch of free descriptors to the global free list. - sys_mutex_lock(global.lock); - for (i = 0; i < config->buffer_rebalancing_batch_count; i++) { - buf = DEQ_HEAD(alloc->buffer_free_list); - DEQ_REMOVE_HEAD(alloc->buffer_free_list); - DEQ_INSERT_TAIL(global.buffer_free_list, buf); - } - sys_mutex_unlock(global.lock); - } + nx_message_content_t *content = MSG_CONTENT(msg); + content->in_delivery = delivery; +} + + +pn_delivery_t *nx_message_in_delivery(nx_message_t *msg) +{ + nx_message_content_t *content = MSG_CONTENT(msg); + return content->in_delivery; } nx_message_t *nx_message_receive(pn_delivery_t *delivery) { - pn_link_t *link = pn_delivery_link(delivery); - nx_message_t *msg = (nx_message_t*) pn_delivery_get_context(delivery); - ssize_t rc; - nx_buffer_t *buf; + pn_link_t *link = pn_delivery_link(delivery); + nx_message_pvt_t *msg = (nx_message_pvt_t*) pn_delivery_get_context(delivery); + ssize_t rc; + nx_buffer_t *buf; // // If there is no message associated with the delivery, this is the first time @@ -629,15 +489,16 @@ nx_message_t *nx_message_receive(pn_delivery_t *delivery) // link it and the delivery together. // if (!msg) { - msg = nx_allocate_message(); + msg = (nx_message_pvt_t*) nx_allocate_message(); pn_delivery_set_context(delivery, (void*) msg); // // Record the incoming delivery only if it is not settled. If it is - // settled, there's no need to propagate disposition back to the sender. + // settled, it should not be recorded as no future operations on it are + // permitted. // if (!pn_delivery_settled(delivery)) - msg->in_delivery = delivery; + msg->content->in_delivery = delivery; } // @@ -645,10 +506,10 @@ nx_message_t *nx_message_receive(pn_delivery_t *delivery) // we will store incoming message data. If there is no buffer in the message, allocate // an empty one and add it to the message. // - buf = DEQ_TAIL(msg->buffers); + buf = DEQ_TAIL(msg->content->buffers); if (!buf) { buf = nx_allocate_buffer(); - DEQ_INSERT_TAIL(msg->buffers, buf); + DEQ_INSERT_TAIL(msg->content->buffers, buf); } while (1) { @@ -667,10 +528,10 @@ nx_message_t *nx_message_receive(pn_delivery_t *delivery) // of the buffer size. // if (nx_buffer_size(buf) == 0) { - DEQ_REMOVE_TAIL(msg->buffers); + DEQ_REMOVE_TAIL(msg->content->buffers); nx_free_buffer(buf); } - return msg; + return (nx_message_t*) msg; } if (rc > 0) { @@ -686,7 +547,7 @@ nx_message_t *nx_message_receive(pn_delivery_t *delivery) // if (nx_buffer_capacity(buf) == 0) { buf = nx_allocate_buffer(); - DEQ_INSERT_TAIL(msg->buffers, buf); + DEQ_INSERT_TAIL(msg->content->buffers, buf); } } else // @@ -697,11 +558,24 @@ nx_message_t *nx_message_receive(pn_delivery_t *delivery) break; } - return NULL; + return 0; +} + + +void nx_message_send(nx_message_t *in_msg, pn_link_t *link) +{ + nx_message_pvt_t *msg = (nx_message_pvt_t*) in_msg; + nx_buffer_t *buf = DEQ_HEAD(msg->content->buffers); + + // TODO - Handle cases where annotations have been added or modified + while (buf) { + pn_link_send(link, (char*) nx_buffer_base(buf), nx_buffer_size(buf)); + buf = DEQ_NEXT(buf); + } } -int nx_message_check(nx_message_t *msg, nx_message_depth_t depth) +int nx_message_check(nx_message_t *in_msg, nx_message_depth_t depth) { #define LONG 10 @@ -712,8 +586,8 @@ int nx_message_check(nx_message_t *msg, nx_message_depth_t depth) #define DELIVERY_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x71" #define MESSAGE_ANNOTATION_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72" #define MESSAGE_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x72" -#define MESSAGE_PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73" -#define MESSAGE_PROPERTIES_SHORT (unsigned char*) "\x00\x53\x73" +#define PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73" +#define PROPERTIES_SHORT (unsigned char*) "\x00\x53\x73" #define APPLICATION_PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74" #define APPLICATION_PROPERTIES_SHORT (unsigned char*) "\x00\x53\x74" #define BODY_DATA_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75" @@ -726,8 +600,10 @@ int nx_message_check(nx_message_t *msg, nx_message_depth_t depth) #define TAGS_MAP (unsigned char*) "\xc1\xd1" #define TAGS_BINARY (unsigned char*) "\xa0\xb0" - nx_buffer_t *buffer = DEQ_HEAD(msg->buffers); - unsigned char *cursor; + nx_message_pvt_t *msg = (nx_message_pvt_t*) in_msg; + nx_message_content_t *content = msg->content; + nx_buffer_t *buffer = DEQ_HEAD(content->buffers); + unsigned char *cursor; if (!buffer) return 0; // Invalid - No data in the message @@ -740,9 +616,9 @@ int nx_message_check(nx_message_t *msg, nx_message_depth_t depth) // // MESSAGE HEADER // - if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_LONG, LONG, TAGS_LIST, &msg->section_message_header)) + if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_LONG, LONG, TAGS_LIST, &content->section_message_header)) return 0; - if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_SHORT, SHORT, TAGS_LIST, &msg->section_message_header)) + if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_SHORT, SHORT, TAGS_LIST, &content->section_message_header)) return 0; if (depth == NX_DEPTH_HEADER) @@ -751,9 +627,9 @@ int nx_message_check(nx_message_t *msg, nx_message_depth_t depth) // // DELIVERY ANNOTATION // - if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_LONG, LONG, TAGS_MAP, &msg->section_delivery_annotation)) + if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_delivery_annotation)) return 0; - if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_SHORT, SHORT, TAGS_MAP, &msg->section_delivery_annotation)) + if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_delivery_annotation)) return 0; if (depth == NX_DEPTH_DELIVERY_ANNOTATIONS) @@ -762,31 +638,31 @@ int nx_message_check(nx_message_t *msg, nx_message_depth_t depth) // // MESSAGE ANNOTATION // - if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_LONG, LONG, TAGS_MAP, &msg->section_message_annotation)) + if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_message_annotation)) return 0; - if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_SHORT, SHORT, TAGS_MAP, &msg->section_message_annotation)) + if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_message_annotation)) return 0; if (depth == NX_DEPTH_MESSAGE_ANNOTATIONS) return 1; // - // MESSAGE PROPERTIES + // PROPERTIES // - if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_PROPERTIES_LONG, LONG, TAGS_LIST, &msg->section_message_properties)) + if (0 == nx_check_and_advance(&buffer, &cursor, PROPERTIES_LONG, LONG, TAGS_LIST, &content->section_message_properties)) return 0; - if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_PROPERTIES_SHORT, SHORT, TAGS_LIST, &msg->section_message_properties)) + if (0 == nx_check_and_advance(&buffer, &cursor, PROPERTIES_SHORT, SHORT, TAGS_LIST, &content->section_message_properties)) return 0; - if (depth == NX_DEPTH_MESSAGE_PROPERTIES) + if (depth == NX_DEPTH_PROPERTIES) return 1; // // APPLICATION PROPERTIES // - if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_LONG, LONG, TAGS_MAP, &msg->section_application_properties)) + if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_LONG, LONG, TAGS_MAP, &content->section_application_properties)) return 0; - if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_SHORT, SHORT, TAGS_MAP, &msg->section_application_properties)) + if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_SHORT, SHORT, TAGS_MAP, &content->section_application_properties)) return 0; if (depth == NX_DEPTH_APPLICATION_PROPERTIES) @@ -795,13 +671,13 @@ int nx_message_check(nx_message_t *msg, nx_message_depth_t depth) // // BODY (Note that this function expects a single data section or a single AMQP sequence) // - if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_LONG, LONG, TAGS_BINARY, &msg->section_body)) + if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_LONG, LONG, TAGS_BINARY, &content->section_body)) return 0; - if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_SHORT, SHORT, TAGS_BINARY, &msg->section_body)) + if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_SHORT, SHORT, TAGS_BINARY, &content->section_body)) return 0; - if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_LONG, LONG, TAGS_LIST, &msg->section_body)) + if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_LONG, LONG, TAGS_LIST, &content->section_body)) return 0; - if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_SHORT, SHORT, TAGS_LIST, &msg->section_body)) + if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_SHORT, SHORT, TAGS_LIST, &content->section_body)) return 0; if (depth == NX_DEPTH_BODY) @@ -810,67 +686,72 @@ int nx_message_check(nx_message_t *msg, nx_message_depth_t depth) // // FOOTER // - if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_LONG, LONG, TAGS_MAP, &msg->section_footer)) + if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_LONG, LONG, TAGS_MAP, &content->section_footer)) return 0; - if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_SHORT, SHORT, TAGS_MAP, &msg->section_footer)) + if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_SHORT, SHORT, TAGS_MAP, &content->section_footer)) return 0; return 1; } -nx_field_iterator_t *nx_message_field_to(nx_message_t *msg) +nx_field_iterator_t *nx_message_field(nx_message_t *msg, nx_message_field_t field) { - while (1) { - if (msg->field_to.parsed) - return nx_field_iterator_buffer(msg->field_to.buffer, msg->field_to.offset, msg->field_to.length, ITER_VIEW_ALL); + nx_message_content_t *content = MSG_CONTENT(msg); - if (msg->section_message_properties.parsed == 0) - break; + switch (field) { + case NX_FIELD_TO: + while (1) { + if (content->field_to.parsed) + return nx_field_iterator_buffer(content->field_to.buffer, content->field_to.offset, content->field_to.length, ITER_VIEW_ALL); - nx_buffer_t *buffer = msg->section_message_properties.buffer; - unsigned char *cursor = nx_buffer_base(buffer) + msg->section_message_properties.offset; + if (content->section_message_properties.parsed == 0) + break; - int count = start_list(&cursor, &buffer); - int result; + nx_buffer_t *buffer = content->section_message_properties.buffer; + unsigned char *cursor = nx_buffer_base(buffer) + content->section_message_properties.offset; - if (count < 3) - break; + int count = start_list(&cursor, &buffer); + int result; - result = traverse_field(&cursor, &buffer, 0); // message_id - if (!result) return 0; - result = traverse_field(&cursor, &buffer, 0); // user_id - if (!result) return 0; - result = traverse_field(&cursor, &buffer, &msg->field_to); // to - if (!result) return 0; - } + if (count < 3) + break; - return 0; -} + result = traverse_field(&cursor, &buffer, 0); // message_id + if (!result) return 0; + result = traverse_field(&cursor, &buffer, 0); // user_id + if (!result) return 0; + result = traverse_field(&cursor, &buffer, &content->field_to); // to + if (!result) return 0; + } + break; + case NX_FIELD_BODY: + while (1) { + if (content->body.parsed) + return nx_field_iterator_buffer(content->body.buffer, content->body.offset, content->body.length, ITER_VIEW_ALL); -nx_field_iterator_t *nx_message_body(nx_message_t *msg) -{ - while (1) { - if (msg->body.parsed) - return nx_field_iterator_buffer(msg->body.buffer, msg->body.offset, msg->body.length, ITER_VIEW_ALL); + if (content->section_body.parsed == 0) + break; - if (msg->section_body.parsed == 0) - break; + nx_buffer_t *buffer = content->section_body.buffer; + unsigned char *cursor = nx_buffer_base(buffer) + content->section_body.offset; + int result; - nx_buffer_t *buffer = msg->section_body.buffer; - unsigned char *cursor = nx_buffer_base(buffer) + msg->section_body.offset; - int result; + result = traverse_field(&cursor, &buffer, &content->body); + if (!result) return 0; + } + break; - result = traverse_field(&cursor, &buffer, &msg->body); - if (!result) return 0; + default: + break; } return 0; } -void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_t *buf_chain) +void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_list_t *buffers) { nx_message_begin_header(msg); nx_message_insert_boolean(msg, 0); // durable @@ -896,20 +777,20 @@ void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_t *buf_ch //nx_message_insert_null(msg); // reply-to-group-id nx_message_end_message_properties(msg); - if (buf_chain) - nx_message_append_body_data(msg, buf_chain); + if (buffers) + nx_message_append_body_data(msg, buffers); } void nx_message_begin_header(nx_message_t *msg) { - nx_start_list_performative(msg, 0x70); + nx_start_list_performative(MSG_CONTENT(msg), 0x70); } void nx_message_end_header(nx_message_t *msg) { - nx_end_list(msg); + nx_end_list(MSG_CONTENT(msg)); } @@ -939,13 +820,13 @@ void nx_message_end_message_annotations(nx_message_t *msg) void nx_message_begin_message_properties(nx_message_t *msg) { - nx_start_list_performative(msg, 0x73); + nx_start_list_performative(MSG_CONTENT(msg), 0x73); } void nx_message_end_message_properties(nx_message_t *msg) { - nx_end_list(msg); + nx_end_list(MSG_CONTENT(msg)); } @@ -961,34 +842,40 @@ void nx_message_end_application_properties(nx_message_t *msg) } -void nx_message_append_body_data(nx_message_t *msg, nx_buffer_t *buf_chain) +void nx_message_append_body_data(nx_message_t *msg, nx_buffer_list_t *buffers) { - uint32_t len = 0; - nx_buffer_t *buf = buf_chain; - nx_buffer_t *last = 0; - size_t count = 0; + nx_message_content_t *content = MSG_CONTENT(msg); + nx_buffer_t *buf = DEQ_HEAD(*buffers); + uint32_t len = 0; + // + // Calculate the size of the body to be appended. + // while (buf) { len += nx_buffer_size(buf); - count++; - last = buf; buf = DEQ_NEXT(buf); } - nx_insert(msg, (const uint8_t*) "\x00\x53\x75", 3); + // + // Insert a DATA section performative header. + // + nx_insert(content, (const uint8_t*) "\x00\x53\x75", 3); if (len < 256) { - nx_insert_8(msg, 0xa0); // vbin8 - nx_insert_8(msg, (uint8_t) len); + nx_insert_8(content, 0xa0); // vbin8 + nx_insert_8(content, (uint8_t) len); } else { - nx_insert_8(msg, 0xb0); // vbin32 - nx_insert_32(msg, len); + nx_insert_8(content, 0xb0); // vbin32 + nx_insert_32(content, len); } - if (len > 0) { - buf_chain->prev = msg->buffers.tail; - msg->buffers.tail->next = buf_chain; - msg->buffers.tail = last; - msg->buffers.size += count; + // + // Move the supplied buffers to the tail of the message's buffer list. + // + buf = DEQ_HEAD(*buffers); + while (buf) { + DEQ_REMOVE_HEAD(*buffers); + DEQ_INSERT_TAIL(content->buffers, buf); + buf = DEQ_HEAD(*buffers); } } @@ -1017,148 +904,151 @@ void nx_message_end_footer(nx_message_t *msg) void nx_message_insert_null(nx_message_t *msg) { - nx_insert_8(msg, 0x40); - msg->count++; + nx_message_content_t *content = MSG_CONTENT(msg); + nx_insert_8(content, 0x40); + content->count++; } void nx_message_insert_boolean(nx_message_t *msg, int value) { + nx_message_content_t *content = MSG_CONTENT(msg); if (value) - nx_insert(msg, (const uint8_t*) "\x56\x01", 2); + nx_insert(content, (const uint8_t*) "\x56\x01", 2); else - nx_insert(msg, (const uint8_t*) "\x56\x00", 2); - msg->count++; + nx_insert(content, (const uint8_t*) "\x56\x00", 2); + content->count++; } void nx_message_insert_ubyte(nx_message_t *msg, uint8_t value) { - nx_insert_8(msg, 0x50); - nx_insert_8(msg, value); - msg->count++; + nx_message_content_t *content = MSG_CONTENT(msg); + nx_insert_8(content, 0x50); + nx_insert_8(content, value); + content->count++; } void nx_message_insert_uint(nx_message_t *msg, uint32_t value) { + nx_message_content_t *content = MSG_CONTENT(msg); if (value == 0) { - nx_insert_8(msg, 0x43); // uint0 + nx_insert_8(content, 0x43); // uint0 } else if (value < 256) { - nx_insert_8(msg, 0x52); // smalluint - nx_insert_8(msg, (uint8_t) value); + nx_insert_8(content, 0x52); // smalluint + nx_insert_8(content, (uint8_t) value); } else { - nx_insert_8(msg, 0x70); // uint - nx_insert_32(msg, value); + nx_insert_8(content, 0x70); // uint + nx_insert_32(content, value); } - msg->count++; + content->count++; } void nx_message_insert_ulong(nx_message_t *msg, uint64_t value) { + nx_message_content_t *content = MSG_CONTENT(msg); if (value == 0) { - nx_insert_8(msg, 0x44); // ulong0 + nx_insert_8(content, 0x44); // ulong0 } else if (value < 256) { - nx_insert_8(msg, 0x53); // smallulong - nx_insert_8(msg, (uint8_t) value); + nx_insert_8(content, 0x53); // smallulong + nx_insert_8(content, (uint8_t) value); } else { - nx_insert_8(msg, 0x80); // ulong - nx_insert_64(msg, value); + nx_insert_8(content, 0x80); // ulong + nx_insert_64(content, value); } - msg->count++; + content->count++; } void nx_message_insert_binary(nx_message_t *msg, const uint8_t *start, size_t len) { + nx_message_content_t *content = MSG_CONTENT(msg); if (len < 256) { - nx_insert_8(msg, 0xa0); // vbin8 - nx_insert_8(msg, (uint8_t) len); + nx_insert_8(content, 0xa0); // vbin8 + nx_insert_8(content, (uint8_t) len); } else { - nx_insert_8(msg, 0xb0); // vbin32 - nx_insert_32(msg, len); + nx_insert_8(content, 0xb0); // vbin32 + nx_insert_32(content, len); } - nx_insert(msg, start, len); - msg->count++; + nx_insert(content, start, len); + content->count++; } void nx_message_insert_string(nx_message_t *msg, const char *start) { + nx_message_content_t *content = MSG_CONTENT(msg); uint32_t len = strlen(start); if (len < 256) { - nx_insert_8(msg, 0xa1); // str8-utf8 - nx_insert_8(msg, (uint8_t) len); - nx_insert(msg, (const uint8_t*) start, len); + nx_insert_8(content, 0xa1); // str8-utf8 + nx_insert_8(content, (uint8_t) len); + nx_insert(content, (const uint8_t*) start, len); } else { - nx_insert_8(msg, 0xb1); // str32-utf8 - nx_insert_32(msg, len); - nx_insert(msg, (const uint8_t*) start, len); + nx_insert_8(content, 0xb1); // str32-utf8 + nx_insert_32(content, len); + nx_insert(content, (const uint8_t*) start, len); } - msg->count++; + content->count++; } void nx_message_insert_uuid(nx_message_t *msg, const uint8_t *value) { - nx_insert_8(msg, 0x98); // uuid - nx_insert(msg, value, 16); - msg->count++; + nx_message_content_t *content = MSG_CONTENT(msg); + nx_insert_8(content, 0x98); // uuid + nx_insert(content, value, 16); + content->count++; } void nx_message_insert_symbol(nx_message_t *msg, const char *start, size_t len) { + nx_message_content_t *content = MSG_CONTENT(msg); if (len < 256) { - nx_insert_8(msg, 0xa3); // sym8 - nx_insert_8(msg, (uint8_t) len); - nx_insert(msg, (const uint8_t*) start, len); + nx_insert_8(content, 0xa3); // sym8 + nx_insert_8(content, (uint8_t) len); + nx_insert(content, (const uint8_t*) start, len); } else { - nx_insert_8(msg, 0xb3); // sym32 - nx_insert_32(msg, len); - nx_insert(msg, (const uint8_t*) start, len); + nx_insert_8(content, 0xb3); // sym32 + nx_insert_32(content, len); + nx_insert(content, (const uint8_t*) start, len); } - msg->count++; + content->count++; } void nx_message_insert_timestamp(nx_message_t *msg, uint64_t value) { - nx_insert_8(msg, 0x83); // timestamp - nx_insert_64(msg, value); - msg->count++; -} - - -unsigned char *nx_buffer_base(nx_buffer_t *buf) -{ - return (unsigned char*) &buf[1]; + nx_message_content_t *content = MSG_CONTENT(msg); + nx_insert_8(content, 0x83); // timestamp + nx_insert_64(content, value); + content->count++; } -unsigned char *nx_buffer_cursor(nx_buffer_t *buf) +void nx_message_begin_list(nx_message_t* msg) { - return ((unsigned char*) &buf[1]) + buf->size; + assert(0); // Not Implemented } -size_t nx_buffer_capacity(nx_buffer_t *buf) +void nx_message_end_list(nx_message_t* msg) { - return config->buffer_size - buf->size; + assert(0); // Not Implemented } -size_t nx_buffer_size(nx_buffer_t *buf) +void nx_message_begin_map(nx_message_t* msg) { - return buf->size; + assert(0); // Not Implemented } -void nx_buffer_insert(nx_buffer_t *buf, size_t len) +void nx_message_end_map(nx_message_t* msg) { - buf->size += len; - assert(buf->size <= config->buffer_size); + assert(0); // Not Implemented } diff --git a/extras/nexus/src/message_private.h b/extras/nexus/src/message_private.h new file mode 100644 index 0000000000..0e20332357 --- /dev/null +++ b/extras/nexus/src/message_private.h @@ -0,0 +1,94 @@ +#ifndef __message_private_h__ +#define __message_private_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/nexus/message.h> +#include <qpid/nexus/alloc.h> +#include <qpid/nexus/threading.h> + +/** + * Architecture of the message module: + * + * +--------------+ +----------------------+ + * | | | | + * | nx_message_t |----------->| nx_message_content_t | + * | | +----->| | + * +--------------+ | +----------------------+ + * | | + * +--------------+ | | +-------------+ +-------------+ +-------------+ + * | | | +--->| nx_buffer_t |-->| nx_buffer_t |-->| nx_buffer_t |--/ + * | nx_message_t |-----+ +-------------+ +-------------+ +-------------+ + * | | + * +--------------+ + * + * The message module provides chained-fixed-sized-buffer storage of message content with multiple + * references. If a message is received and is to be queued for multiple destinations, there is only + * one copy of the message content in memory but multiple lightweight references to the content. + * + */ + +typedef struct { + nx_buffer_t *buffer; // Buffer that contains the first octet of the field, null if the field is not present + size_t offset; // Offset in the buffer to the first octet + size_t length; // Length of the field or zero if unneeded + int parsed; // non-zero iff the buffer chain has been parsed to find this field +} nx_field_location_t; + + +// TODO - consider using pointers to nx_field_location_t below to save memory +// TODO - we need a second buffer list for modified annotations and header +// There are three message scenarios: +// 1) Received message is held and forwarded unmodified - single buffer list +// 2) Received message is held and modified before forwarding - two buffer lists +// 3) Message is composed internally - single buffer list + +typedef struct { + sys_mutex_t *lock; + uint32_t ref_count; // The number of qmessages referencing this + nx_buffer_list_t buffers; // The buffer chain containing the message + pn_delivery_t *in_delivery; // The delivery on which the message arrived + nx_field_location_t section_message_header; // The message header list + nx_field_location_t section_delivery_annotation; // The delivery annotation map + nx_field_location_t section_message_annotation; // The message annotation map + nx_field_location_t section_message_properties; // The message properties list + nx_field_location_t section_application_properties; // The application properties list + nx_field_location_t section_body; // The message body: Data + nx_field_location_t section_footer; // The footer + nx_field_location_t field_user_id; // The string value of the user-id + nx_field_location_t field_to; // The string value of the to field + nx_field_location_t body; // The body of the message + nx_field_location_t compose_length; + nx_field_location_t compose_count; + uint32_t length; + uint32_t count; +} nx_message_content_t; + +typedef struct { + DEQ_LINKS(nx_message_t); // Deq linkage that overlays the nx_message_t + nx_message_content_t *content; + pn_delivery_t *out_delivery; +} nx_message_pvt_t; + +ALLOC_DECLARE(nx_message_t); +ALLOC_DECLARE(nx_message_content_t); + +#define MSG_CONTENT(m) (((nx_message_pvt_t*) m)->content) + +#endif diff --git a/extras/nexus/src/timer.c b/extras/nexus/src/timer.c index 81b531305d..f6d0071a77 100644 --- a/extras/nexus/src/timer.c +++ b/extras/nexus/src/timer.c @@ -21,15 +21,17 @@ #include "server_private.h" #include <qpid/nexus/ctools.h> #include <qpid/nexus/threading.h> +#include <qpid/nexus/alloc.h> #include <assert.h> #include <stdio.h> static sys_mutex_t *lock; -static nx_timer_list_t free_list; static nx_timer_list_t idle_timers; static nx_timer_list_t scheduled_timers; static long time_base; +ALLOC_DECLARE(nx_timer_t); +ALLOC_DEFINE(nx_timer_t); //========================================================================= // Private static functions @@ -67,27 +69,21 @@ static void nx_timer_cancel_LH(nx_timer_t *timer) nx_timer_t *nx_timer(nx_timer_cb_t cb, void* context) { - nx_timer_t *timer; + nx_timer_t *timer = new_nx_timer_t(); + if (!timer) + return 0; - sys_mutex_lock(lock); - - timer = DEQ_HEAD(free_list); - if (timer) { - DEQ_REMOVE_HEAD(free_list); - } else { - timer = NEW(nx_timer_t); - DEQ_ITEM_INIT(timer); - } + DEQ_ITEM_INIT(timer); - if (timer) { - timer->handler = cb; - timer->context = context; - timer->delta_time = 0; - timer->state = TIMER_IDLE; - DEQ_INSERT_TAIL(idle_timers, timer); - } + timer->handler = cb; + timer->context = context; + timer->delta_time = 0; + timer->state = TIMER_IDLE; + sys_mutex_lock(lock); + DEQ_INSERT_TAIL(idle_timers, timer); sys_mutex_unlock(lock); + return timer; } @@ -97,9 +93,10 @@ void nx_timer_free(nx_timer_t *timer) sys_mutex_lock(lock); nx_timer_cancel_LH(timer); DEQ_REMOVE(idle_timers, timer); - DEQ_INSERT_TAIL(free_list, timer); - timer->state = TIMER_FREE; sys_mutex_unlock(lock); + + timer->state = TIMER_FREE; + free_nx_timer_t(timer); } @@ -180,7 +177,6 @@ void nx_timer_cancel(nx_timer_t *timer) void nx_timer_initialize(sys_mutex_t *server_lock) { lock = server_lock; - DEQ_INIT(free_list); DEQ_INIT(idle_timers); DEQ_INIT(scheduled_timers); time_base = 0; diff --git a/extras/nexus/tests/alloc_test.c b/extras/nexus/tests/alloc_test.c index 02f48af7e7..b5eac74aa0 100644 --- a/extras/nexus/tests/alloc_test.c +++ b/extras/nexus/tests/alloc_test.c @@ -30,7 +30,7 @@ typedef struct { nx_alloc_config_t config = {3, 7, 10}; ALLOC_DECLARE(object_t); -ALLOC_DEFINE_CONFIG(object_t, &config); +ALLOC_DEFINE_CONFIG(object_t, sizeof(object_t), &config); static char* check_stats(nx_alloc_stats_t *stats, uint64_t ah, uint64_t fh, uint64_t ht, uint64_t rt, uint64_t rg) diff --git a/extras/nexus/tests/message_test.c b/extras/nexus/tests/message_test.c index e9a4f01636..c00b0c055f 100644 --- a/extras/nexus/tests/message_test.c +++ b/extras/nexus/tests/message_test.c @@ -20,26 +20,18 @@ #include "test_case.h" #include <stdio.h> #include <string.h> -#include <qpid/nexus/message.h> +#include "message_private.h" #include <qpid/nexus/iterator.h> #include <proton/message.h> -static char* test_init(void *context) -{ - nx_allocator_initialize(nx_allocator_default_config()); - nx_allocator_finalize(); - return 0; -} - - static char* test_send_to_messenger(void *context) { - nx_allocator_initialize(nx_allocator_default_config()); + nx_message_t *msg = nx_allocate_message(); + nx_message_content_t *content = MSG_CONTENT(msg); - nx_message_t *msg = nx_allocate_message(); nx_message_compose_1(msg, "test_addr_0", 0); - nx_buffer_t *buf = DEQ_HEAD(msg->buffers); + nx_buffer_t *buf = DEQ_HEAD(content->buffers); if (buf == 0) return "Expected a buffer in the test message"; pn_message_t *pn_msg = pn_message(); @@ -52,15 +44,12 @@ static char* test_send_to_messenger(void *context) pn_message_free(pn_msg); nx_free_message(msg); - nx_allocator_finalize(); return 0; } static char* test_receive_from_messenger(void *context) { - nx_allocator_initialize(nx_allocator_default_config()); - pn_message_t *pn_msg = pn_message(); pn_message_set_address(pn_msg, "test_addr_1"); @@ -70,12 +59,14 @@ static char* test_receive_from_messenger(void *context) if (result != 0) return "Error in pn_message_encode"; nx_buffer_insert(buf, size); - nx_message_t *msg = nx_allocate_message(); - DEQ_INSERT_TAIL(msg->buffers, buf); + nx_message_t *msg = nx_allocate_message(); + nx_message_content_t *content = MSG_CONTENT(msg); + + DEQ_INSERT_TAIL(content->buffers, buf); int valid = nx_message_check(msg, NX_DEPTH_ALL); if (!valid) return "nx_message_check returns 'invalid'"; - nx_field_iterator_t *iter = nx_message_field_to(msg); + nx_field_iterator_t *iter = nx_message_field(msg, NX_FIELD_TO); if (iter == 0) return "Expected an iterator for the 'to' field"; if (!nx_field_iterator_equal(iter, (unsigned char*) "test_addr_1")) @@ -84,15 +75,12 @@ static char* test_receive_from_messenger(void *context) pn_message_free(pn_msg); nx_free_message(msg); - nx_allocator_finalize(); return 0; } static char* test_insufficient_check_depth(void *context) { - nx_allocator_initialize(nx_allocator_default_config()); - pn_message_t *pn_msg = pn_message(); pn_message_set_address(pn_msg, "test_addr_2"); @@ -102,17 +90,18 @@ static char* test_insufficient_check_depth(void *context) if (result != 0) return "Error in pn_message_encode"; nx_buffer_insert(buf, size); - nx_message_t *msg = nx_allocate_message(); - DEQ_INSERT_TAIL(msg->buffers, buf); + nx_message_t *msg = nx_allocate_message(); + nx_message_content_t *content = MSG_CONTENT(msg); + + DEQ_INSERT_TAIL(content->buffers, buf); int valid = nx_message_check(msg, NX_DEPTH_DELIVERY_ANNOTATIONS); if (!valid) return "nx_message_check returns 'invalid'"; - nx_field_iterator_t *iter = nx_message_field_to(msg); + nx_field_iterator_t *iter = nx_message_field(msg, NX_FIELD_TO); if (iter) return "Expected no iterator for the 'to' field"; nx_free_message(msg); - nx_allocator_finalize(); return 0; } @@ -121,7 +110,6 @@ int message_tests(void) { int result = 0; - TEST_CASE(test_init, 0); TEST_CASE(test_send_to_messenger, 0); TEST_CASE(test_receive_from_messenger, 0); TEST_CASE(test_insufficient_check_depth, 0); diff --git a/extras/nexus/tests/timer_test.c b/extras/nexus/tests/timer_test.c index f50f9367ea..09be21f4b6 100644 --- a/extras/nexus/tests/timer_test.c +++ b/extras/nexus/tests/timer_test.c @@ -19,6 +19,7 @@ #include <stdio.h> #include <qpid/nexus/timer.h> +#include "alloc_private.h" #include "timer_private.h" #include "test_case.h" #include <qpid/nexus/threading.h> @@ -341,6 +342,7 @@ static char* test_big(void *context) int timer_tests(void) { int result = 0; + nx_alloc_initialize(); fire_mask = 0; DEQ_INIT(pending_timers); |
