summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-02-04 22:46:32 +0000
committerTed Ross <tross@apache.org>2013-02-04 22:46:32 +0000
commitcffa3637701472511da91eeee1e9c989bfc02f7a (patch)
treec27389e84f14facbc2f4862b46bfdd0b64c05dba
parent0b78470bcc369a5e3702c846d47a0c575730cc8f (diff)
downloadqpid-python-cffa3637701472511da91eeee1e9c989bfc02f7a.tar.gz
QPID-4538
Work-in-progress - Major cleanup in the "message.h" API git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1442413 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--extras/nexus/CMakeLists.txt1
-rw-r--r--extras/nexus/include/qpid/nexus/alloc.h6
-rw-r--r--extras/nexus/include/qpid/nexus/buffer.h75
-rw-r--r--extras/nexus/include/qpid/nexus/ctools.h96
-rw-r--r--extras/nexus/include/qpid/nexus/message.h145
-rw-r--r--extras/nexus/src/buffer.c76
-rw-r--r--extras/nexus/src/container.c4
-rw-r--r--extras/nexus/src/iterator.c2
-rw-r--r--extras/nexus/src/message.c614
-rw-r--r--extras/nexus/src/message_private.h94
-rw-r--r--extras/nexus/src/timer.c38
-rw-r--r--extras/nexus/tests/alloc_test.c2
-rw-r--r--extras/nexus/tests/message_test.c40
-rw-r--r--extras/nexus/tests/timer_test.c2
14 files changed, 657 insertions, 538 deletions
diff --git a/extras/nexus/CMakeLists.txt b/extras/nexus/CMakeLists.txt
index 04c33c35e2..e1cbcd3cdd 100644
--- a/extras/nexus/CMakeLists.txt
+++ b/extras/nexus/CMakeLists.txt
@@ -65,6 +65,7 @@ set(CATCH_UNDEFINED "-Wl,--no-undefined")
set(server_SOURCES
src/alloc.c
src/auth.c
+ src/buffer.c
src/container.c
src/hash.c
src/iterator.c
diff --git a/extras/nexus/include/qpid/nexus/alloc.h b/extras/nexus/include/qpid/nexus/alloc.h
index a0c832c069..30450db641 100644
--- a/extras/nexus/include/qpid/nexus/alloc.h
+++ b/extras/nexus/include/qpid/nexus/alloc.h
@@ -57,14 +57,14 @@ void nx_dealloc(nx_alloc_type_desc_t *desc, nx_alloc_pool_t **tpool, void *p);
T *new_##T(); \
void free_##T(T *p)
-#define ALLOC_DEFINE_CONFIG(T,C) \
- nx_alloc_type_desc_t __desc_##T = {#T, sizeof(T), C, 0, 0, 0}; \
+#define ALLOC_DEFINE_CONFIG(T,S,C) \
+ nx_alloc_type_desc_t __desc_##T = {#T, S, C, 0, 0, 0}; \
__thread nx_alloc_pool_t *__local_pool_##T = 0; \
T *new_##T() { return (T*) nx_alloc(&__desc_##T, &__local_pool_##T); } \
void free_##T(T *p) { nx_dealloc(&__desc_##T, &__local_pool_##T, (void*) p); } \
nx_alloc_stats_t *alloc_stats_##T() { return __desc_##T.stats; }
-#define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, 0)
+#define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, sizeof(T), 0)
#endif
diff --git a/extras/nexus/include/qpid/nexus/buffer.h b/extras/nexus/include/qpid/nexus/buffer.h
new file mode 100644
index 0000000000..6fb471c7f3
--- /dev/null
+++ b/extras/nexus/include/qpid/nexus/buffer.h
@@ -0,0 +1,75 @@
+#ifndef __nexus_buffer_h__
+#define __nexus_buffer_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/nexus/ctools.h>
+
+typedef struct nx_buffer_t nx_buffer_t;
+
+DEQ_DECLARE(nx_buffer_t, nx_buffer_list_t);
+
+struct nx_buffer_t {
+ DEQ_LINKS(nx_buffer_t);
+ unsigned int size;
+};
+
+/**
+ */
+nx_buffer_t *nx_allocate_buffer(void);
+
+/**
+ * @param buf A pointer to an allocated buffer
+ */
+void nx_free_buffer(nx_buffer_t *buf);
+
+/**
+ * @param buf A pointer to an allocated buffer
+ * @return A pointer to the first octet in the buffer
+ */
+unsigned char *nx_buffer_base(nx_buffer_t *buf);
+
+/**
+ * @param buf A pointer to an allocated buffer
+ * @return A pointer to the first free octet in the buffer, the insert point for new data.
+ */
+unsigned char *nx_buffer_cursor(nx_buffer_t *buf);
+
+/**
+ * @param buf A pointer to an allocated buffer
+ * @return The number of octets in the buffer's free space, how many octets may be inserted.
+ */
+size_t nx_buffer_capacity(nx_buffer_t *buf);
+
+/**
+ * @param buf A pointer to an allocated buffer
+ * @return The number of octets of data in the buffer
+ */
+size_t nx_buffer_size(nx_buffer_t *buf);
+
+/**
+ * Notify the buffer that octets have been inserted at the buffer's cursor. This will advance the
+ * cursor by len octets.
+ *
+ * @param buf A pointer to an allocated buffer
+ * @param len The number of octets that have been appended to the buffer
+ */
+void nx_buffer_insert(nx_buffer_t *buf, size_t len);
+
+#endif
diff --git a/extras/nexus/include/qpid/nexus/ctools.h b/extras/nexus/include/qpid/nexus/ctools.h
index 6b8f072b75..67298f3782 100644
--- a/extras/nexus/include/qpid/nexus/ctools.h
+++ b/extras/nexus/include/qpid/nexus/ctools.h
@@ -37,11 +37,11 @@
#define DEQ_LINKS(t) t *prev; t *next
-#define DEQ_INIT(d) do { d.head = 0; d.tail = 0; d.scratch = 0; d.size = 0; } while (0)
+#define DEQ_INIT(d) do { (d).head = 0; (d).tail = 0; (d).scratch = 0; (d).size = 0; } while (0)
#define DEQ_ITEM_INIT(i) do { (i)->next = 0; (i)->prev = 0; } while(0)
-#define DEQ_HEAD(d) (d.head)
-#define DEQ_TAIL(d) (d.tail)
-#define DEQ_SIZE(d) (d.size)
+#define DEQ_HEAD(d) ((d).head)
+#define DEQ_TAIL(d) ((d).tail)
+#define DEQ_SIZE(d) ((d).size)
#define DEQ_NEXT(i) (i)->next
#define DEQ_PREV(i) (i)->prev
@@ -49,67 +49,67 @@
do { \
CT_ASSERT((i)->next == 0); \
CT_ASSERT((i)->prev == 0); \
- if (d.head) { \
- (i)->next = d.head; \
- d.head->prev = i; \
+ if ((d).head) { \
+ (i)->next = (d).head; \
+ (d).head->prev = i; \
} else { \
- d.tail = i; \
+ (d).tail = i; \
(i)->next = 0; \
- CT_ASSERT(d.size == 0); \
+ CT_ASSERT((d).size == 0); \
} \
(i)->prev = 0; \
- d.head = i; \
- d.size++; \
+ (d).head = i; \
+ (d).size++; \
} while (0)
#define DEQ_INSERT_TAIL(d,i) \
do { \
CT_ASSERT((i)->next == 0); \
CT_ASSERT((i)->prev == 0); \
- if (d.tail) { \
- (i)->prev = d.tail; \
- d.tail->next = i; \
+ if ((d).tail) { \
+ (i)->prev = (d).tail; \
+ (d).tail->next = i; \
} else { \
- d.head = i; \
+ (d).head = i; \
(i)->prev = 0; \
- CT_ASSERT(d.size == 0); \
+ CT_ASSERT((d).size == 0); \
} \
(i)->next = 0; \
- d.tail = i; \
- d.size++; \
+ (d).tail = i; \
+ (d).size++; \
} while (0)
-#define DEQ_REMOVE_HEAD(d) \
-do { \
- CT_ASSERT(d.head); \
- if (d.head) { \
- d.scratch = d.head; \
- d.head = d.head->next; \
- if (d.head == 0) { \
- d.tail = 0; \
- CT_ASSERT(d.size == 1); \
+#define DEQ_REMOVE_HEAD(d) \
+do { \
+ CT_ASSERT((d).head); \
+ if ((d).head) { \
+ (d).scratch = (d).head; \
+ (d).head = (d).head->next; \
+ if ((d).head == 0) { \
+ (d).tail = 0; \
+ CT_ASSERT((d).size == 1); \
} else \
- d.head->prev = 0; \
- d.size--; \
- d.scratch->next = 0; \
- d.scratch->prev = 0; \
+ (d).head->prev = 0; \
+ (d).size--; \
+ (d).scratch->next = 0; \
+ (d).scratch->prev = 0; \
} \
} while (0)
#define DEQ_REMOVE_TAIL(d) \
do { \
- CT_ASSERT(d.tail); \
- if (d.tail) { \
- d.scratch = d.tail; \
- d.tail = d.tail->prev; \
- if (d.tail == 0) { \
- d.head = 0; \
- CT_ASSERT(d.size == 1); \
+ CT_ASSERT((d).tail); \
+ if ((d).tail) { \
+ (d).scratch = (d).tail; \
+ (d).tail = (d).tail->prev; \
+ if ((d).tail == 0) { \
+ (d).head = 0; \
+ CT_ASSERT((d).size == 1); \
} else \
- d.tail->next = 0; \
- d.size--; \
- d.scratch->next = 0; \
- d.scratch->prev = 0; \
+ (d).tail->next = 0; \
+ (d).size--; \
+ (d).scratch->next = 0; \
+ (d).scratch->prev = 0; \
} \
} while (0)
@@ -120,11 +120,11 @@ do { \
if ((a)->next) \
(a)->next->prev = (i); \
else \
- d.tail = (i); \
+ (d).tail = (i); \
(i)->next = (a)->next; \
(i)->prev = (a); \
(a)->next = (i); \
- d.size++; \
+ (d).size++; \
} while (0)
#define DEQ_REMOVE(d,i) \
@@ -132,15 +132,15 @@ do { \
if ((i)->next) \
(i)->next->prev = (i)->prev; \
else \
- d.tail = (i)->prev; \
+ (d).tail = (i)->prev; \
if ((i)->prev) \
(i)->prev->next = (i)->next; \
else \
- d.head = (i)->next; \
- d.size--; \
+ (d).head = (i)->next; \
+ (d).size--; \
(i)->next = 0; \
(i)->prev = 0; \
- CT_ASSERT(d.size || (!d.head && !d.tail)); \
+ CT_ASSERT((d).size || (!(d).head && !(d).tail)); \
} while (0)
#endif
diff --git a/extras/nexus/include/qpid/nexus/message.h b/extras/nexus/include/qpid/nexus/message.h
index 3bb6b950ea..ade0525666 100644
--- a/extras/nexus/include/qpid/nexus/message.h
+++ b/extras/nexus/include/qpid/nexus/message.h
@@ -21,99 +21,105 @@
#include <proton/engine.h>
#include <qpid/nexus/ctools.h>
+#include <qpid/nexus/alloc.h>
#include <qpid/nexus/iterator.h>
+#include <qpid/nexus/buffer.h>
+
+// Callback for status change (confirmed persistent, loaded-in-memory, etc.)
typedef struct nx_message_t nx_message_t;
-typedef struct nx_buffer_t nx_buffer_t;
-DEQ_DECLARE(nx_buffer_t, nx_buffer_list_t);
DEQ_DECLARE(nx_message_t, nx_message_list_t);
-typedef struct {
- nx_buffer_t *buffer; // Buffer that contains the first octet of the field, null if the field is not present
- size_t offset; // Offset in the buffer to the first octet
- size_t length; // Length of the field or zero if unneeded
- int parsed; // non-zero iff the buffer chain has been parsed to find this field
-} nx_field_location_t;
-
-
-// TODO - consider using pointers to nx_field_location_t below to save memory
struct nx_message_t {
DEQ_LINKS(nx_message_t);
- nx_buffer_list_t buffers; // The buffer chain containing the message
- pn_delivery_t *in_delivery; // The delivery on which the message arrived
- pn_delivery_t *out_delivery; // The delivery on which the message was last sent
- nx_field_location_t section_message_header; // The message header list
- nx_field_location_t section_delivery_annotation; // The delivery annotation map
- nx_field_location_t section_message_annotation; // The message annotation map
- nx_field_location_t section_message_properties; // The message properties list
- nx_field_location_t section_application_properties; // The application properties list
- nx_field_location_t section_body; // The message body: Data
- nx_field_location_t section_footer; // The footer
- nx_field_location_t field_user_id; // The string value of the user-id
- nx_field_location_t field_to; // The string value of the to field
- nx_field_location_t body; // The body of the message
- nx_field_location_t compose_length;
- nx_field_location_t compose_count;
- uint32_t length;
- uint32_t count;
+ // Private members not listed here.
};
-struct nx_buffer_t {
- DEQ_LINKS(nx_buffer_t);
- unsigned int size;
-};
-
-typedef struct {
- size_t buffer_size;
- unsigned long buffer_preallocation_count;
- unsigned long buffer_rebalancing_batch_count;
- unsigned long buffer_local_storage_max;
- unsigned long buffer_free_list_max;
- unsigned long message_allocation_batch_count;
- unsigned long message_rebalancing_batch_count;
- unsigned long message_local_storage_max;
-} nx_allocator_config_t;
-
-const nx_allocator_config_t *nx_allocator_default_config(void);
-
-void nx_allocator_initialize(const nx_allocator_config_t *config);
-void nx_allocator_finalize(void);
-
-//
-// Functions for per-thread allocators.
-//
-nx_message_t *nx_allocate_message(void);
-nx_buffer_t *nx_allocate_buffer(void);
-void nx_free_message(nx_message_t *msg);
-void nx_free_buffer(nx_buffer_t *buf);
-
-
typedef enum {
NX_DEPTH_NONE,
NX_DEPTH_HEADER,
NX_DEPTH_DELIVERY_ANNOTATIONS,
NX_DEPTH_MESSAGE_ANNOTATIONS,
- NX_DEPTH_MESSAGE_PROPERTIES, // Needed for 'user-id' and 'to'
+ NX_DEPTH_PROPERTIES,
NX_DEPTH_APPLICATION_PROPERTIES,
NX_DEPTH_BODY,
NX_DEPTH_ALL
} nx_message_depth_t;
+
+typedef enum {
+ //
+ // Message Sections
+ //
+ NX_FIELD_HEADER,
+ NX_FIELD_DELIVERY_ANNOTATION,
+ NX_FIELD_MESSAGE_ANNOTATION,
+ NX_FIELD_PROPERTIES,
+ NX_FIELD_APPLICATION_PROPERTIES,
+ NX_FIELD_BODY,
+ NX_FIELD_FOOTER,
+
+ //
+ // Fields of the Header Section
+ //
+ NX_FIELD_DURABLE,
+ NX_FIELD_PRIORITY,
+ NX_FIELD_TTL,
+ NX_FIELD_FIRST_ACQUIRER,
+ NX_FIELD_DELIVERY_COUNT,
+
+ //
+ // Fields of the Properties Section
+ //
+ NX_FIELD_MESSAGE_ID,
+ NX_FIELD_USER_ID,
+ NX_FIELD_TO,
+ NX_FIELD_SUBJECT,
+ NX_FIELD_REPLY_TO,
+ NX_FIELD_CORRELATION_ID,
+ NX_FIELD_CONTENT_TYPE,
+ NX_FIELD_CONTENT_ENCODING,
+ NX_FIELD_ABSOLUTE_EXPIRY_TIME,
+ NX_FIELD_CREATION_TIME,
+ NX_FIELD_GROUP_ID,
+ NX_FIELD_GROUP_SEQUENCE,
+ NX_FIELD_REPLY_TO_GROUP_ID
+} nx_message_field_t;
+
+//
+// Functions for allocation
+//
+nx_message_t *nx_allocate_message(void);
+void nx_free_message(nx_message_t *qm);
+nx_message_t *nx_message_copy(nx_message_t *qm);
+int nx_message_persistent(nx_message_t *qm);
+int nx_message_in_memory(nx_message_t *qm);
+
+void nx_message_set_out_delivery(nx_message_t *msg, pn_delivery_t *delivery);
+pn_delivery_t *nx_message_out_delivery(nx_message_t *msg);
+void nx_message_set_in_delivery(nx_message_t *msg, pn_delivery_t *delivery);
+pn_delivery_t *nx_message_in_delivery(nx_message_t *msg);
+
//
// Functions for received messages
//
nx_message_t *nx_message_receive(pn_delivery_t *delivery);
+void nx_message_send(nx_message_t *msg, pn_link_t *link);
+
int nx_message_check(nx_message_t *msg, nx_message_depth_t depth);
-nx_field_iterator_t *nx_message_field_to(nx_message_t *msg);
-nx_field_iterator_t *nx_message_body(nx_message_t *msg);
+nx_field_iterator_t *nx_message_field(nx_message_t *msg, nx_message_field_t field);
+
+pn_delivery_t *nx_message_inbound_delivery(nx_message_t *qm);
//
// Functions for composed messages
//
// Convenience Functions
-void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_t *buf_chain);
+void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_list_t *buffers);
+void nx_message_copy_header(nx_message_t *msg); // Copy received header into send-header (prior to adding annotations)
+void nx_message_copy_message_annotations(nx_message_t *msg);
// Raw Functions
void nx_message_begin_header(nx_message_t *msg);
@@ -131,7 +137,7 @@ void nx_message_end_message_properties(nx_message_t *msg);
void nx_message_begin_application_properties(nx_message_t *msg);
void nx_message_end_application_properties(nx_message_t *msg);
-void nx_message_append_body_data(nx_message_t *msg, nx_buffer_t *buf_chain);
+void nx_message_append_body_data(nx_message_t *msg, nx_buffer_list_t *buffers);
void nx_message_begin_body_sequence(nx_message_t *msg);
void nx_message_end_body_sequence(nx_message_t *msg);
@@ -149,14 +155,9 @@ void nx_message_insert_string(nx_message_t *msg, const char *start);
void nx_message_insert_uuid(nx_message_t *msg, const uint8_t *value);
void nx_message_insert_symbol(nx_message_t *msg, const char *start, size_t len);
void nx_message_insert_timestamp(nx_message_t *msg, uint64_t value);
-
-//
-// Functions for buffers
-//
-unsigned char *nx_buffer_base(nx_buffer_t *buf); // Pointer to the first octet in the buffer
-unsigned char *nx_buffer_cursor(nx_buffer_t *buf); // Pointer to the first free octet in the buffer
-size_t nx_buffer_capacity(nx_buffer_t *buf); // Size of free space in the buffer in octets
-size_t nx_buffer_size(nx_buffer_t *buf); // Number of octets in the buffer
-void nx_buffer_insert(nx_buffer_t *buf, size_t len); // Notify the buffer that 'len' octets were written at cursor
+void nx_message_begin_list(nx_message_t* msg);
+void nx_message_end_list(nx_message_t* msg);
+void nx_message_begin_map(nx_message_t* msg);
+void nx_message_end_map(nx_message_t* msg);
#endif
diff --git a/extras/nexus/src/buffer.c b/extras/nexus/src/buffer.c
new file mode 100644
index 0000000000..10f32b9541
--- /dev/null
+++ b/extras/nexus/src/buffer.c
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/nexus/buffer.h>
+#include <qpid/nexus/alloc.h>
+
+#ifndef NEXUS_BUFFER_SIZE
+#define NEXUS_BUFFER_SIZE 512
+#endif
+
+ALLOC_DECLARE(nx_buffer_t);
+ALLOC_DEFINE_CONFIG(nx_buffer_t, sizeof(nx_buffer_t) + NEXUS_BUFFER_SIZE, 0);
+
+
+nx_buffer_t *nx_allocate_buffer(void)
+{
+ nx_buffer_t *buf = new_nx_buffer_t();
+
+ DEQ_ITEM_INIT(buf);
+ buf->size = 0;
+ return buf;
+}
+
+
+void nx_free_buffer(nx_buffer_t *buf)
+{
+ free_nx_buffer_t(buf);
+}
+
+
+unsigned char *nx_buffer_base(nx_buffer_t *buf)
+{
+ return (unsigned char*) &buf[1];
+}
+
+
+unsigned char *nx_buffer_cursor(nx_buffer_t *buf)
+{
+ return ((unsigned char*) &buf[1]) + buf->size;
+}
+
+
+size_t nx_buffer_capacity(nx_buffer_t *buf)
+{
+ return NEXUS_BUFFER_SIZE - buf->size;
+}
+
+
+size_t nx_buffer_size(nx_buffer_t *buf)
+{
+ return buf->size;
+}
+
+
+void nx_buffer_insert(nx_buffer_t *buf, size_t len)
+{
+ buf->size += len;
+ assert(buf->size <= NEXUS_BUFFER_SIZE);
+}
+
diff --git a/extras/nexus/src/container.c b/extras/nexus/src/container.c
index 3d57a8f21d..fdf610db93 100644
--- a/extras/nexus/src/container.c
+++ b/extras/nexus/src/container.c
@@ -399,10 +399,6 @@ void nx_container_initialize(void)
{
nx_log(module, LOG_TRACE, "Container Initializing");
- // TODO - move allocator init to server?
- const nx_allocator_config_t *alloc_config = nx_allocator_default_config();
- nx_allocator_initialize(alloc_config);
-
node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4
node_map = hash(10, 32, 0); // 1K buckets, item batches of 32
lock = sys_mutex();
diff --git a/extras/nexus/src/iterator.c b/extras/nexus/src/iterator.c
index d03590e851..668629d83a 100644
--- a/extras/nexus/src/iterator.c
+++ b/extras/nexus/src/iterator.c
@@ -18,9 +18,9 @@
*/
#include <qpid/nexus/iterator.h>
-#include <qpid/nexus/message.h>
#include <qpid/nexus/ctools.h>
#include <qpid/nexus/alloc.h>
+#include "message_private.h"
#include <stdio.h>
#include <string.h>
diff --git a/extras/nexus/src/message.c b/extras/nexus/src/message.c
index 11f58a1474..c9f7c1a581 100644
--- a/extras/nexus/src/message.c
+++ b/extras/nexus/src/message.c
@@ -17,51 +17,14 @@
* under the License.
*/
-#include <qpid/nexus/message.h>
#include <qpid/nexus/ctools.h>
#include <qpid/nexus/threading.h>
+#include "message_private.h"
#include <string.h>
#include <stdio.h>
-
-//
-// Per-Thread allocator
-//
-typedef struct nx_allocator_t {
- nx_message_list_t message_free_list;
- nx_buffer_list_t buffer_free_list;
-} nx_allocator_t;
-
-//
-// Global allocator (protected by a global lock)
-//
-typedef struct {
- nx_message_list_t message_free_list;
- nx_buffer_list_t buffer_free_list;
- sys_mutex_t *lock;
-} nx_global_allocator_t;
-
-static nx_global_allocator_t global;
-static nx_allocator_config_t default_config;
-static const nx_allocator_config_t *config;
-
-
-static nx_allocator_t *nx_get_allocator(void)
-{
- static __thread nx_allocator_t *alloc = 0;
-
- if (!alloc) {
- alloc = NEW(nx_allocator_t);
-
- if (!alloc)
- return 0;
-
- DEQ_INIT(alloc->message_free_list);
- DEQ_INIT(alloc->buffer_free_list);
- }
-
- return alloc;
-}
+ALLOC_DEFINE_CONFIG(nx_message_t, sizeof(nx_message_pvt_t), 0);
+ALLOC_DEFINE(nx_message_content_t);
static void advance(unsigned char **cursor, nx_buffer_t **buffer, int consume)
@@ -288,7 +251,7 @@ static int nx_check_and_advance(nx_buffer_t **buffer,
}
-static void nx_insert(nx_message_t *msg, const uint8_t *seq, size_t len)
+static void nx_insert(nx_message_content_t *msg, const uint8_t *seq, size_t len)
{
nx_buffer_t *buf = DEQ_TAIL(msg->buffers);
@@ -312,13 +275,13 @@ static void nx_insert(nx_message_t *msg, const uint8_t *seq, size_t len)
}
-static void nx_insert_8(nx_message_t *msg, uint8_t value)
+static void nx_insert_8(nx_message_content_t *msg, uint8_t value)
{
nx_insert(msg, &value, 1);
}
-static void nx_insert_32(nx_message_t *msg, uint32_t value)
+static void nx_insert_32(nx_message_content_t *msg, uint32_t value)
{
uint8_t buf[4];
buf[0] = (uint8_t) ((value & 0xFF000000) >> 24);
@@ -329,7 +292,7 @@ static void nx_insert_32(nx_message_t *msg, uint32_t value)
}
-static void nx_insert_64(nx_message_t *msg, uint64_t value)
+static void nx_insert_64(nx_message_content_t *msg, uint64_t value)
{
uint8_t buf[8];
buf[0] = (uint8_t) ((value & 0xFF00000000000000L) >> 56);
@@ -371,7 +334,7 @@ static void nx_overwrite_32(nx_field_location_t *field, uint32_t value)
}
-static void nx_start_list_performative(nx_message_t *msg, uint8_t code)
+static void nx_start_list_performative(nx_message_content_t *msg, uint8_t code)
{
//
// Insert the short-form performative tag
@@ -409,219 +372,116 @@ static void nx_start_list_performative(nx_message_t *msg, uint8_t code)
}
-static void nx_end_list(nx_message_t *msg)
+static void nx_end_list(nx_message_content_t *msg)
{
nx_overwrite_32(&msg->compose_length, msg->length);
nx_overwrite_32(&msg->compose_count, msg->count);
}
-const nx_allocator_config_t *nx_allocator_default_config(void)
+nx_message_t *nx_allocate_message()
{
- default_config.buffer_size = 1024;
- default_config.buffer_preallocation_count = 512;
- default_config.buffer_rebalancing_batch_count = 16;
- default_config.buffer_local_storage_max = 64;
- default_config.buffer_free_list_max = 1000000;
- default_config.message_allocation_batch_count = 256;
- default_config.message_rebalancing_batch_count = 64;
- default_config.message_local_storage_max = 256;
-
- return &default_config;
-}
-
+ nx_message_pvt_t *msg = (nx_message_pvt_t*) new_nx_message_t();
+ if (!msg)
+ return 0;
-void nx_allocator_initialize(const nx_allocator_config_t *c)
-{
- config = c;
+ DEQ_ITEM_INIT(msg);
+ msg->content = new_nx_message_content_t();
+ msg->out_delivery = 0;
- // Initialize the fields in the global structure.
- DEQ_INIT(global.message_free_list);
- DEQ_INIT(global.buffer_free_list);
- global.lock = sys_mutex();
+ if (msg->content == 0) {
+ free_nx_message_t((nx_message_t*) msg);
+ return 0;
+ }
- // Pre-allocate buffers according to the configuration
- int i;
- nx_buffer_t *buf;
+ memset(msg->content, 0, sizeof(nx_message_content_t));
+ msg->content->lock = sys_mutex();
+ msg->content->ref_count = 1;
- for (i = 0; i < config->buffer_preallocation_count; i++) {
- buf = (nx_buffer_t*) malloc (sizeof(nx_buffer_t) + config->buffer_size);
- DEQ_ITEM_INIT(buf);
- DEQ_INSERT_TAIL(global.buffer_free_list, buf);
- }
+ return (nx_message_t*) msg;
}
-void nx_allocator_finalize(void)
+void nx_free_message(nx_message_t *in_msg)
{
- // TODO - Free buffers and messages
-}
+ uint32_t rc;
+ nx_message_pvt_t *msg = (nx_message_pvt_t*) in_msg;
+ nx_message_content_t *content = msg->content;
+ sys_mutex_lock(content->lock);
+ rc = --content->ref_count;
+ sys_mutex_unlock(content->lock);
-nx_message_t *nx_allocate_message(void)
-{
- nx_allocator_t *alloc = nx_get_allocator();
- nx_message_t *msg;
- int i;
+ if (rc == 0) {
+ nx_buffer_t *buf = DEQ_HEAD(content->buffers);
- if (DEQ_SIZE(alloc->message_free_list) == 0) {
- //
- // The local free list is empty, rebalance a batch of objects from the global
- // free list.
- //
- sys_mutex_lock(global.lock);
- if (DEQ_SIZE(global.message_free_list) >= config->message_rebalancing_batch_count) {
- for (i = 0; i < config->message_rebalancing_batch_count; i++) {
- msg = DEQ_HEAD(global.message_free_list);
- DEQ_REMOVE_HEAD(global.message_free_list);
- DEQ_INSERT_TAIL(alloc->message_free_list, msg);
- }
+ while (buf) {
+ DEQ_REMOVE_HEAD(content->buffers);
+ nx_free_buffer(buf);
+ buf = DEQ_HEAD(content->buffers);
}
- sys_mutex_unlock(global.lock);
- }
- if (DEQ_SIZE(alloc->message_free_list) == 0) {
- //
- // The local free list is still empty. This means there were not enough objects on the
- // global free list to make up a batch. Allocate new objects from the heap and store
- // them in the local free list.
- //
- nx_message_t *batch = NEW_ARRAY(nx_message_t, config->message_allocation_batch_count);
- memset(batch, 0, sizeof(nx_message_t) * config->message_allocation_batch_count);
- for (i = 0; i < config->message_allocation_batch_count; i++) {
- DEQ_INSERT_TAIL(alloc->message_free_list, &batch[i]);
- }
+ sys_mutex_free(content->lock);
+ free_nx_message_content_t(content);
}
- //
- // If the local free list is still empty, we're out of memory.
- //
- if (DEQ_SIZE(alloc->message_free_list) == 0)
- return 0;
-
- msg = DEQ_HEAD(alloc->message_free_list);
- DEQ_REMOVE_HEAD(alloc->message_free_list);
-
- DEQ_INIT(msg->buffers);
- msg->in_delivery = NULL;
- msg->out_delivery = NULL;
- msg->section_message_header.buffer = 0;
- msg->section_message_header.parsed = 0;
- msg->section_delivery_annotation.buffer = 0;
- msg->section_delivery_annotation.parsed = 0;
- msg->section_message_annotation.buffer = 0;
- msg->section_message_annotation.parsed = 0;
- msg->section_message_properties.buffer = 0;
- msg->section_message_properties.parsed = 0;
- msg->section_application_properties.buffer = 0;
- msg->section_application_properties.parsed = 0;
- msg->section_body.buffer = 0;
- msg->section_body.parsed = 0;
- msg->section_footer.buffer = 0;
- msg->section_footer.parsed = 0;
- msg->field_user_id.buffer = 0;
- msg->field_user_id.parsed = 0;
- msg->field_to.buffer = 0;
- msg->field_to.parsed = 0;
- msg->body.buffer = 0;
- msg->body.parsed = 0;
- return msg;
+ free_nx_message_t((nx_message_t*) msg);
}
-nx_buffer_t *nx_allocate_buffer(void)
+nx_message_t *nx_message_copy(nx_message_t *in_msg)
{
- nx_allocator_t *alloc = nx_get_allocator();
- nx_buffer_t *buf;
- int i;
-
- if (DEQ_SIZE(alloc->buffer_free_list) == 0) {
- sys_mutex_lock(global.lock);
- if (DEQ_SIZE(global.buffer_free_list) >= config->buffer_rebalancing_batch_count) {
- // Rebalance a batch of free descriptors to the local free list.
- for (i = 0; i < config->buffer_rebalancing_batch_count; i++) {
- buf = DEQ_HEAD(global.buffer_free_list);
- DEQ_REMOVE_HEAD(global.buffer_free_list);
- DEQ_INSERT_TAIL(alloc->buffer_free_list, buf);
- }
- }
- sys_mutex_unlock(global.lock);
- }
+ nx_message_pvt_t *msg = (nx_message_pvt_t*) in_msg;
+ nx_message_content_t *content = msg->content;
+ nx_message_pvt_t *copy = (nx_message_pvt_t*) new_nx_message_t();
- if (DEQ_SIZE(alloc->buffer_free_list) == 0) {
- // Allocate a buffer from the heap
- buf = (nx_buffer_t*) malloc (sizeof(nx_buffer_t) + config->buffer_size);
- DEQ_ITEM_INIT(buf);
- DEQ_INSERT_TAIL(alloc->buffer_free_list, buf);
- }
-
- if (DEQ_SIZE(alloc->buffer_free_list) == 0)
+ if (!copy)
return 0;
- buf = DEQ_HEAD(alloc->buffer_free_list);
- DEQ_REMOVE_HEAD(alloc->buffer_free_list);
+ DEQ_ITEM_INIT(copy);
+ copy->content = content;
+ copy->out_delivery = 0;
- buf->size = 0;
+ sys_mutex_lock(content->lock);
+ content->ref_count++;
+ sys_mutex_unlock(content->lock);
- return buf;
+ return (nx_message_t*) copy;
}
-void nx_free_message(nx_message_t *msg)
+void nx_message_set_out_delivery(nx_message_t *msg, pn_delivery_t *delivery)
{
- nx_allocator_t *alloc = nx_get_allocator();
+ ((nx_message_pvt_t*) msg)->out_delivery = delivery;
+}
- // Free any buffers in the message
- int i;
- nx_buffer_t *buf = DEQ_HEAD(msg->buffers);
- while (buf) {
- DEQ_REMOVE_HEAD(msg->buffers);
- nx_free_buffer(buf);
- buf = DEQ_HEAD(msg->buffers);
- }
- DEQ_INSERT_TAIL(alloc->message_free_list, msg);
- if (DEQ_SIZE(alloc->message_free_list) > config->message_local_storage_max) {
- //
- // The local free list has exceeded the threshold for local storage.
- // Rebalance a batch of free objects to the global free list.
- //
- sys_mutex_lock(global.lock);
- for (i = 0; i < config->message_rebalancing_batch_count; i++) {
- msg = DEQ_HEAD(alloc->message_free_list);
- DEQ_REMOVE_HEAD(alloc->message_free_list);
- DEQ_INSERT_TAIL(global.message_free_list, msg);
- }
- sys_mutex_unlock(global.lock);
- }
+pn_delivery_t *nx_message_out_delivery(nx_message_t *msg)
+{
+ return ((nx_message_pvt_t*) msg)->out_delivery;
}
-void nx_free_buffer(nx_buffer_t *buf)
+void nx_message_set_in_delivery(nx_message_t *msg, pn_delivery_t *delivery)
{
- nx_allocator_t *alloc = nx_get_allocator();
- int i;
-
- DEQ_INSERT_TAIL(alloc->buffer_free_list, buf);
- if (DEQ_SIZE(alloc->buffer_free_list) > config->buffer_local_storage_max) {
- // Rebalance a batch of free descriptors to the global free list.
- sys_mutex_lock(global.lock);
- for (i = 0; i < config->buffer_rebalancing_batch_count; i++) {
- buf = DEQ_HEAD(alloc->buffer_free_list);
- DEQ_REMOVE_HEAD(alloc->buffer_free_list);
- DEQ_INSERT_TAIL(global.buffer_free_list, buf);
- }
- sys_mutex_unlock(global.lock);
- }
+ nx_message_content_t *content = MSG_CONTENT(msg);
+ content->in_delivery = delivery;
+}
+
+
+pn_delivery_t *nx_message_in_delivery(nx_message_t *msg)
+{
+ nx_message_content_t *content = MSG_CONTENT(msg);
+ return content->in_delivery;
}
nx_message_t *nx_message_receive(pn_delivery_t *delivery)
{
- pn_link_t *link = pn_delivery_link(delivery);
- nx_message_t *msg = (nx_message_t*) pn_delivery_get_context(delivery);
- ssize_t rc;
- nx_buffer_t *buf;
+ pn_link_t *link = pn_delivery_link(delivery);
+ nx_message_pvt_t *msg = (nx_message_pvt_t*) pn_delivery_get_context(delivery);
+ ssize_t rc;
+ nx_buffer_t *buf;
//
// If there is no message associated with the delivery, this is the first time
@@ -629,15 +489,16 @@ nx_message_t *nx_message_receive(pn_delivery_t *delivery)
// link it and the delivery together.
//
if (!msg) {
- msg = nx_allocate_message();
+ msg = (nx_message_pvt_t*) nx_allocate_message();
pn_delivery_set_context(delivery, (void*) msg);
//
// Record the incoming delivery only if it is not settled. If it is
- // settled, there's no need to propagate disposition back to the sender.
+ // settled, it should not be recorded as no future operations on it are
+ // permitted.
//
if (!pn_delivery_settled(delivery))
- msg->in_delivery = delivery;
+ msg->content->in_delivery = delivery;
}
//
@@ -645,10 +506,10 @@ nx_message_t *nx_message_receive(pn_delivery_t *delivery)
// we will store incoming message data. If there is no buffer in the message, allocate
// an empty one and add it to the message.
//
- buf = DEQ_TAIL(msg->buffers);
+ buf = DEQ_TAIL(msg->content->buffers);
if (!buf) {
buf = nx_allocate_buffer();
- DEQ_INSERT_TAIL(msg->buffers, buf);
+ DEQ_INSERT_TAIL(msg->content->buffers, buf);
}
while (1) {
@@ -667,10 +528,10 @@ nx_message_t *nx_message_receive(pn_delivery_t *delivery)
// of the buffer size.
//
if (nx_buffer_size(buf) == 0) {
- DEQ_REMOVE_TAIL(msg->buffers);
+ DEQ_REMOVE_TAIL(msg->content->buffers);
nx_free_buffer(buf);
}
- return msg;
+ return (nx_message_t*) msg;
}
if (rc > 0) {
@@ -686,7 +547,7 @@ nx_message_t *nx_message_receive(pn_delivery_t *delivery)
//
if (nx_buffer_capacity(buf) == 0) {
buf = nx_allocate_buffer();
- DEQ_INSERT_TAIL(msg->buffers, buf);
+ DEQ_INSERT_TAIL(msg->content->buffers, buf);
}
} else
//
@@ -697,11 +558,24 @@ nx_message_t *nx_message_receive(pn_delivery_t *delivery)
break;
}
- return NULL;
+ return 0;
+}
+
+
+void nx_message_send(nx_message_t *in_msg, pn_link_t *link)
+{
+ nx_message_pvt_t *msg = (nx_message_pvt_t*) in_msg;
+ nx_buffer_t *buf = DEQ_HEAD(msg->content->buffers);
+
+ // TODO - Handle cases where annotations have been added or modified
+ while (buf) {
+ pn_link_send(link, (char*) nx_buffer_base(buf), nx_buffer_size(buf));
+ buf = DEQ_NEXT(buf);
+ }
}
-int nx_message_check(nx_message_t *msg, nx_message_depth_t depth)
+int nx_message_check(nx_message_t *in_msg, nx_message_depth_t depth)
{
#define LONG 10
@@ -712,8 +586,8 @@ int nx_message_check(nx_message_t *msg, nx_message_depth_t depth)
#define DELIVERY_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x71"
#define MESSAGE_ANNOTATION_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72"
#define MESSAGE_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x72"
-#define MESSAGE_PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73"
-#define MESSAGE_PROPERTIES_SHORT (unsigned char*) "\x00\x53\x73"
+#define PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73"
+#define PROPERTIES_SHORT (unsigned char*) "\x00\x53\x73"
#define APPLICATION_PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74"
#define APPLICATION_PROPERTIES_SHORT (unsigned char*) "\x00\x53\x74"
#define BODY_DATA_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75"
@@ -726,8 +600,10 @@ int nx_message_check(nx_message_t *msg, nx_message_depth_t depth)
#define TAGS_MAP (unsigned char*) "\xc1\xd1"
#define TAGS_BINARY (unsigned char*) "\xa0\xb0"
- nx_buffer_t *buffer = DEQ_HEAD(msg->buffers);
- unsigned char *cursor;
+ nx_message_pvt_t *msg = (nx_message_pvt_t*) in_msg;
+ nx_message_content_t *content = msg->content;
+ nx_buffer_t *buffer = DEQ_HEAD(content->buffers);
+ unsigned char *cursor;
if (!buffer)
return 0; // Invalid - No data in the message
@@ -740,9 +616,9 @@ int nx_message_check(nx_message_t *msg, nx_message_depth_t depth)
//
// MESSAGE HEADER
//
- if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_LONG, LONG, TAGS_LIST, &msg->section_message_header))
+ if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_LONG, LONG, TAGS_LIST, &content->section_message_header))
return 0;
- if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_SHORT, SHORT, TAGS_LIST, &msg->section_message_header))
+ if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_SHORT, SHORT, TAGS_LIST, &content->section_message_header))
return 0;
if (depth == NX_DEPTH_HEADER)
@@ -751,9 +627,9 @@ int nx_message_check(nx_message_t *msg, nx_message_depth_t depth)
//
// DELIVERY ANNOTATION
//
- if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_LONG, LONG, TAGS_MAP, &msg->section_delivery_annotation))
+ if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_delivery_annotation))
return 0;
- if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_SHORT, SHORT, TAGS_MAP, &msg->section_delivery_annotation))
+ if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_delivery_annotation))
return 0;
if (depth == NX_DEPTH_DELIVERY_ANNOTATIONS)
@@ -762,31 +638,31 @@ int nx_message_check(nx_message_t *msg, nx_message_depth_t depth)
//
// MESSAGE ANNOTATION
//
- if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_LONG, LONG, TAGS_MAP, &msg->section_message_annotation))
+ if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_message_annotation))
return 0;
- if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_SHORT, SHORT, TAGS_MAP, &msg->section_message_annotation))
+ if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_message_annotation))
return 0;
if (depth == NX_DEPTH_MESSAGE_ANNOTATIONS)
return 1;
//
- // MESSAGE PROPERTIES
+ // PROPERTIES
//
- if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_PROPERTIES_LONG, LONG, TAGS_LIST, &msg->section_message_properties))
+ if (0 == nx_check_and_advance(&buffer, &cursor, PROPERTIES_LONG, LONG, TAGS_LIST, &content->section_message_properties))
return 0;
- if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_PROPERTIES_SHORT, SHORT, TAGS_LIST, &msg->section_message_properties))
+ if (0 == nx_check_and_advance(&buffer, &cursor, PROPERTIES_SHORT, SHORT, TAGS_LIST, &content->section_message_properties))
return 0;
- if (depth == NX_DEPTH_MESSAGE_PROPERTIES)
+ if (depth == NX_DEPTH_PROPERTIES)
return 1;
//
// APPLICATION PROPERTIES
//
- if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_LONG, LONG, TAGS_MAP, &msg->section_application_properties))
+ if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_LONG, LONG, TAGS_MAP, &content->section_application_properties))
return 0;
- if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_SHORT, SHORT, TAGS_MAP, &msg->section_application_properties))
+ if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_SHORT, SHORT, TAGS_MAP, &content->section_application_properties))
return 0;
if (depth == NX_DEPTH_APPLICATION_PROPERTIES)
@@ -795,13 +671,13 @@ int nx_message_check(nx_message_t *msg, nx_message_depth_t depth)
//
// BODY (Note that this function expects a single data section or a single AMQP sequence)
//
- if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_LONG, LONG, TAGS_BINARY, &msg->section_body))
+ if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_LONG, LONG, TAGS_BINARY, &content->section_body))
return 0;
- if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_SHORT, SHORT, TAGS_BINARY, &msg->section_body))
+ if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_SHORT, SHORT, TAGS_BINARY, &content->section_body))
return 0;
- if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_LONG, LONG, TAGS_LIST, &msg->section_body))
+ if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_LONG, LONG, TAGS_LIST, &content->section_body))
return 0;
- if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_SHORT, SHORT, TAGS_LIST, &msg->section_body))
+ if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_SHORT, SHORT, TAGS_LIST, &content->section_body))
return 0;
if (depth == NX_DEPTH_BODY)
@@ -810,67 +686,72 @@ int nx_message_check(nx_message_t *msg, nx_message_depth_t depth)
//
// FOOTER
//
- if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_LONG, LONG, TAGS_MAP, &msg->section_footer))
+ if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_LONG, LONG, TAGS_MAP, &content->section_footer))
return 0;
- if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_SHORT, SHORT, TAGS_MAP, &msg->section_footer))
+ if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_SHORT, SHORT, TAGS_MAP, &content->section_footer))
return 0;
return 1;
}
-nx_field_iterator_t *nx_message_field_to(nx_message_t *msg)
+nx_field_iterator_t *nx_message_field(nx_message_t *msg, nx_message_field_t field)
{
- while (1) {
- if (msg->field_to.parsed)
- return nx_field_iterator_buffer(msg->field_to.buffer, msg->field_to.offset, msg->field_to.length, ITER_VIEW_ALL);
+ nx_message_content_t *content = MSG_CONTENT(msg);
- if (msg->section_message_properties.parsed == 0)
- break;
+ switch (field) {
+ case NX_FIELD_TO:
+ while (1) {
+ if (content->field_to.parsed)
+ return nx_field_iterator_buffer(content->field_to.buffer, content->field_to.offset, content->field_to.length, ITER_VIEW_ALL);
- nx_buffer_t *buffer = msg->section_message_properties.buffer;
- unsigned char *cursor = nx_buffer_base(buffer) + msg->section_message_properties.offset;
+ if (content->section_message_properties.parsed == 0)
+ break;
- int count = start_list(&cursor, &buffer);
- int result;
+ nx_buffer_t *buffer = content->section_message_properties.buffer;
+ unsigned char *cursor = nx_buffer_base(buffer) + content->section_message_properties.offset;
- if (count < 3)
- break;
+ int count = start_list(&cursor, &buffer);
+ int result;
- result = traverse_field(&cursor, &buffer, 0); // message_id
- if (!result) return 0;
- result = traverse_field(&cursor, &buffer, 0); // user_id
- if (!result) return 0;
- result = traverse_field(&cursor, &buffer, &msg->field_to); // to
- if (!result) return 0;
- }
+ if (count < 3)
+ break;
- return 0;
-}
+ result = traverse_field(&cursor, &buffer, 0); // message_id
+ if (!result) return 0;
+ result = traverse_field(&cursor, &buffer, 0); // user_id
+ if (!result) return 0;
+ result = traverse_field(&cursor, &buffer, &content->field_to); // to
+ if (!result) return 0;
+ }
+ break;
+ case NX_FIELD_BODY:
+ while (1) {
+ if (content->body.parsed)
+ return nx_field_iterator_buffer(content->body.buffer, content->body.offset, content->body.length, ITER_VIEW_ALL);
-nx_field_iterator_t *nx_message_body(nx_message_t *msg)
-{
- while (1) {
- if (msg->body.parsed)
- return nx_field_iterator_buffer(msg->body.buffer, msg->body.offset, msg->body.length, ITER_VIEW_ALL);
+ if (content->section_body.parsed == 0)
+ break;
- if (msg->section_body.parsed == 0)
- break;
+ nx_buffer_t *buffer = content->section_body.buffer;
+ unsigned char *cursor = nx_buffer_base(buffer) + content->section_body.offset;
+ int result;
- nx_buffer_t *buffer = msg->section_body.buffer;
- unsigned char *cursor = nx_buffer_base(buffer) + msg->section_body.offset;
- int result;
+ result = traverse_field(&cursor, &buffer, &content->body);
+ if (!result) return 0;
+ }
+ break;
- result = traverse_field(&cursor, &buffer, &msg->body);
- if (!result) return 0;
+ default:
+ break;
}
return 0;
}
-void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_t *buf_chain)
+void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_list_t *buffers)
{
nx_message_begin_header(msg);
nx_message_insert_boolean(msg, 0); // durable
@@ -896,20 +777,20 @@ void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_t *buf_ch
//nx_message_insert_null(msg); // reply-to-group-id
nx_message_end_message_properties(msg);
- if (buf_chain)
- nx_message_append_body_data(msg, buf_chain);
+ if (buffers)
+ nx_message_append_body_data(msg, buffers);
}
void nx_message_begin_header(nx_message_t *msg)
{
- nx_start_list_performative(msg, 0x70);
+ nx_start_list_performative(MSG_CONTENT(msg), 0x70);
}
void nx_message_end_header(nx_message_t *msg)
{
- nx_end_list(msg);
+ nx_end_list(MSG_CONTENT(msg));
}
@@ -939,13 +820,13 @@ void nx_message_end_message_annotations(nx_message_t *msg)
void nx_message_begin_message_properties(nx_message_t *msg)
{
- nx_start_list_performative(msg, 0x73);
+ nx_start_list_performative(MSG_CONTENT(msg), 0x73);
}
void nx_message_end_message_properties(nx_message_t *msg)
{
- nx_end_list(msg);
+ nx_end_list(MSG_CONTENT(msg));
}
@@ -961,34 +842,40 @@ void nx_message_end_application_properties(nx_message_t *msg)
}
-void nx_message_append_body_data(nx_message_t *msg, nx_buffer_t *buf_chain)
+void nx_message_append_body_data(nx_message_t *msg, nx_buffer_list_t *buffers)
{
- uint32_t len = 0;
- nx_buffer_t *buf = buf_chain;
- nx_buffer_t *last = 0;
- size_t count = 0;
+ nx_message_content_t *content = MSG_CONTENT(msg);
+ nx_buffer_t *buf = DEQ_HEAD(*buffers);
+ uint32_t len = 0;
+ //
+ // Calculate the size of the body to be appended.
+ //
while (buf) {
len += nx_buffer_size(buf);
- count++;
- last = buf;
buf = DEQ_NEXT(buf);
}
- nx_insert(msg, (const uint8_t*) "\x00\x53\x75", 3);
+ //
+ // Insert a DATA section performative header.
+ //
+ nx_insert(content, (const uint8_t*) "\x00\x53\x75", 3);
if (len < 256) {
- nx_insert_8(msg, 0xa0); // vbin8
- nx_insert_8(msg, (uint8_t) len);
+ nx_insert_8(content, 0xa0); // vbin8
+ nx_insert_8(content, (uint8_t) len);
} else {
- nx_insert_8(msg, 0xb0); // vbin32
- nx_insert_32(msg, len);
+ nx_insert_8(content, 0xb0); // vbin32
+ nx_insert_32(content, len);
}
- if (len > 0) {
- buf_chain->prev = msg->buffers.tail;
- msg->buffers.tail->next = buf_chain;
- msg->buffers.tail = last;
- msg->buffers.size += count;
+ //
+ // Move the supplied buffers to the tail of the message's buffer list.
+ //
+ buf = DEQ_HEAD(*buffers);
+ while (buf) {
+ DEQ_REMOVE_HEAD(*buffers);
+ DEQ_INSERT_TAIL(content->buffers, buf);
+ buf = DEQ_HEAD(*buffers);
}
}
@@ -1017,148 +904,151 @@ void nx_message_end_footer(nx_message_t *msg)
void nx_message_insert_null(nx_message_t *msg)
{
- nx_insert_8(msg, 0x40);
- msg->count++;
+ nx_message_content_t *content = MSG_CONTENT(msg);
+ nx_insert_8(content, 0x40);
+ content->count++;
}
void nx_message_insert_boolean(nx_message_t *msg, int value)
{
+ nx_message_content_t *content = MSG_CONTENT(msg);
if (value)
- nx_insert(msg, (const uint8_t*) "\x56\x01", 2);
+ nx_insert(content, (const uint8_t*) "\x56\x01", 2);
else
- nx_insert(msg, (const uint8_t*) "\x56\x00", 2);
- msg->count++;
+ nx_insert(content, (const uint8_t*) "\x56\x00", 2);
+ content->count++;
}
void nx_message_insert_ubyte(nx_message_t *msg, uint8_t value)
{
- nx_insert_8(msg, 0x50);
- nx_insert_8(msg, value);
- msg->count++;
+ nx_message_content_t *content = MSG_CONTENT(msg);
+ nx_insert_8(content, 0x50);
+ nx_insert_8(content, value);
+ content->count++;
}
void nx_message_insert_uint(nx_message_t *msg, uint32_t value)
{
+ nx_message_content_t *content = MSG_CONTENT(msg);
if (value == 0) {
- nx_insert_8(msg, 0x43); // uint0
+ nx_insert_8(content, 0x43); // uint0
} else if (value < 256) {
- nx_insert_8(msg, 0x52); // smalluint
- nx_insert_8(msg, (uint8_t) value);
+ nx_insert_8(content, 0x52); // smalluint
+ nx_insert_8(content, (uint8_t) value);
} else {
- nx_insert_8(msg, 0x70); // uint
- nx_insert_32(msg, value);
+ nx_insert_8(content, 0x70); // uint
+ nx_insert_32(content, value);
}
- msg->count++;
+ content->count++;
}
void nx_message_insert_ulong(nx_message_t *msg, uint64_t value)
{
+ nx_message_content_t *content = MSG_CONTENT(msg);
if (value == 0) {
- nx_insert_8(msg, 0x44); // ulong0
+ nx_insert_8(content, 0x44); // ulong0
} else if (value < 256) {
- nx_insert_8(msg, 0x53); // smallulong
- nx_insert_8(msg, (uint8_t) value);
+ nx_insert_8(content, 0x53); // smallulong
+ nx_insert_8(content, (uint8_t) value);
} else {
- nx_insert_8(msg, 0x80); // ulong
- nx_insert_64(msg, value);
+ nx_insert_8(content, 0x80); // ulong
+ nx_insert_64(content, value);
}
- msg->count++;
+ content->count++;
}
void nx_message_insert_binary(nx_message_t *msg, const uint8_t *start, size_t len)
{
+ nx_message_content_t *content = MSG_CONTENT(msg);
if (len < 256) {
- nx_insert_8(msg, 0xa0); // vbin8
- nx_insert_8(msg, (uint8_t) len);
+ nx_insert_8(content, 0xa0); // vbin8
+ nx_insert_8(content, (uint8_t) len);
} else {
- nx_insert_8(msg, 0xb0); // vbin32
- nx_insert_32(msg, len);
+ nx_insert_8(content, 0xb0); // vbin32
+ nx_insert_32(content, len);
}
- nx_insert(msg, start, len);
- msg->count++;
+ nx_insert(content, start, len);
+ content->count++;
}
void nx_message_insert_string(nx_message_t *msg, const char *start)
{
+ nx_message_content_t *content = MSG_CONTENT(msg);
uint32_t len = strlen(start);
if (len < 256) {
- nx_insert_8(msg, 0xa1); // str8-utf8
- nx_insert_8(msg, (uint8_t) len);
- nx_insert(msg, (const uint8_t*) start, len);
+ nx_insert_8(content, 0xa1); // str8-utf8
+ nx_insert_8(content, (uint8_t) len);
+ nx_insert(content, (const uint8_t*) start, len);
} else {
- nx_insert_8(msg, 0xb1); // str32-utf8
- nx_insert_32(msg, len);
- nx_insert(msg, (const uint8_t*) start, len);
+ nx_insert_8(content, 0xb1); // str32-utf8
+ nx_insert_32(content, len);
+ nx_insert(content, (const uint8_t*) start, len);
}
- msg->count++;
+ content->count++;
}
void nx_message_insert_uuid(nx_message_t *msg, const uint8_t *value)
{
- nx_insert_8(msg, 0x98); // uuid
- nx_insert(msg, value, 16);
- msg->count++;
+ nx_message_content_t *content = MSG_CONTENT(msg);
+ nx_insert_8(content, 0x98); // uuid
+ nx_insert(content, value, 16);
+ content->count++;
}
void nx_message_insert_symbol(nx_message_t *msg, const char *start, size_t len)
{
+ nx_message_content_t *content = MSG_CONTENT(msg);
if (len < 256) {
- nx_insert_8(msg, 0xa3); // sym8
- nx_insert_8(msg, (uint8_t) len);
- nx_insert(msg, (const uint8_t*) start, len);
+ nx_insert_8(content, 0xa3); // sym8
+ nx_insert_8(content, (uint8_t) len);
+ nx_insert(content, (const uint8_t*) start, len);
} else {
- nx_insert_8(msg, 0xb3); // sym32
- nx_insert_32(msg, len);
- nx_insert(msg, (const uint8_t*) start, len);
+ nx_insert_8(content, 0xb3); // sym32
+ nx_insert_32(content, len);
+ nx_insert(content, (const uint8_t*) start, len);
}
- msg->count++;
+ content->count++;
}
void nx_message_insert_timestamp(nx_message_t *msg, uint64_t value)
{
- nx_insert_8(msg, 0x83); // timestamp
- nx_insert_64(msg, value);
- msg->count++;
-}
-
-
-unsigned char *nx_buffer_base(nx_buffer_t *buf)
-{
- return (unsigned char*) &buf[1];
+ nx_message_content_t *content = MSG_CONTENT(msg);
+ nx_insert_8(content, 0x83); // timestamp
+ nx_insert_64(content, value);
+ content->count++;
}
-unsigned char *nx_buffer_cursor(nx_buffer_t *buf)
+void nx_message_begin_list(nx_message_t* msg)
{
- return ((unsigned char*) &buf[1]) + buf->size;
+ assert(0); // Not Implemented
}
-size_t nx_buffer_capacity(nx_buffer_t *buf)
+void nx_message_end_list(nx_message_t* msg)
{
- return config->buffer_size - buf->size;
+ assert(0); // Not Implemented
}
-size_t nx_buffer_size(nx_buffer_t *buf)
+void nx_message_begin_map(nx_message_t* msg)
{
- return buf->size;
+ assert(0); // Not Implemented
}
-void nx_buffer_insert(nx_buffer_t *buf, size_t len)
+void nx_message_end_map(nx_message_t* msg)
{
- buf->size += len;
- assert(buf->size <= config->buffer_size);
+ assert(0); // Not Implemented
}
diff --git a/extras/nexus/src/message_private.h b/extras/nexus/src/message_private.h
new file mode 100644
index 0000000000..0e20332357
--- /dev/null
+++ b/extras/nexus/src/message_private.h
@@ -0,0 +1,94 @@
+#ifndef __message_private_h__
+#define __message_private_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/nexus/message.h>
+#include <qpid/nexus/alloc.h>
+#include <qpid/nexus/threading.h>
+
+/**
+ * Architecture of the message module:
+ *
+ * +--------------+ +----------------------+
+ * | | | |
+ * | nx_message_t |----------->| nx_message_content_t |
+ * | | +----->| |
+ * +--------------+ | +----------------------+
+ * | |
+ * +--------------+ | | +-------------+ +-------------+ +-------------+
+ * | | | +--->| nx_buffer_t |-->| nx_buffer_t |-->| nx_buffer_t |--/
+ * | nx_message_t |-----+ +-------------+ +-------------+ +-------------+
+ * | |
+ * +--------------+
+ *
+ * The message module provides chained-fixed-sized-buffer storage of message content with multiple
+ * references. If a message is received and is to be queued for multiple destinations, there is only
+ * one copy of the message content in memory but multiple lightweight references to the content.
+ *
+ */
+
+typedef struct {
+ nx_buffer_t *buffer; // Buffer that contains the first octet of the field, null if the field is not present
+ size_t offset; // Offset in the buffer to the first octet
+ size_t length; // Length of the field or zero if unneeded
+ int parsed; // non-zero iff the buffer chain has been parsed to find this field
+} nx_field_location_t;
+
+
+// TODO - consider using pointers to nx_field_location_t below to save memory
+// TODO - we need a second buffer list for modified annotations and header
+// There are three message scenarios:
+// 1) Received message is held and forwarded unmodified - single buffer list
+// 2) Received message is held and modified before forwarding - two buffer lists
+// 3) Message is composed internally - single buffer list
+
+typedef struct {
+ sys_mutex_t *lock;
+ uint32_t ref_count; // The number of qmessages referencing this
+ nx_buffer_list_t buffers; // The buffer chain containing the message
+ pn_delivery_t *in_delivery; // The delivery on which the message arrived
+ nx_field_location_t section_message_header; // The message header list
+ nx_field_location_t section_delivery_annotation; // The delivery annotation map
+ nx_field_location_t section_message_annotation; // The message annotation map
+ nx_field_location_t section_message_properties; // The message properties list
+ nx_field_location_t section_application_properties; // The application properties list
+ nx_field_location_t section_body; // The message body: Data
+ nx_field_location_t section_footer; // The footer
+ nx_field_location_t field_user_id; // The string value of the user-id
+ nx_field_location_t field_to; // The string value of the to field
+ nx_field_location_t body; // The body of the message
+ nx_field_location_t compose_length;
+ nx_field_location_t compose_count;
+ uint32_t length;
+ uint32_t count;
+} nx_message_content_t;
+
+typedef struct {
+ DEQ_LINKS(nx_message_t); // Deq linkage that overlays the nx_message_t
+ nx_message_content_t *content;
+ pn_delivery_t *out_delivery;
+} nx_message_pvt_t;
+
+ALLOC_DECLARE(nx_message_t);
+ALLOC_DECLARE(nx_message_content_t);
+
+#define MSG_CONTENT(m) (((nx_message_pvt_t*) m)->content)
+
+#endif
diff --git a/extras/nexus/src/timer.c b/extras/nexus/src/timer.c
index 81b531305d..f6d0071a77 100644
--- a/extras/nexus/src/timer.c
+++ b/extras/nexus/src/timer.c
@@ -21,15 +21,17 @@
#include "server_private.h"
#include <qpid/nexus/ctools.h>
#include <qpid/nexus/threading.h>
+#include <qpid/nexus/alloc.h>
#include <assert.h>
#include <stdio.h>
static sys_mutex_t *lock;
-static nx_timer_list_t free_list;
static nx_timer_list_t idle_timers;
static nx_timer_list_t scheduled_timers;
static long time_base;
+ALLOC_DECLARE(nx_timer_t);
+ALLOC_DEFINE(nx_timer_t);
//=========================================================================
// Private static functions
@@ -67,27 +69,21 @@ static void nx_timer_cancel_LH(nx_timer_t *timer)
nx_timer_t *nx_timer(nx_timer_cb_t cb, void* context)
{
- nx_timer_t *timer;
+ nx_timer_t *timer = new_nx_timer_t();
+ if (!timer)
+ return 0;
- sys_mutex_lock(lock);
-
- timer = DEQ_HEAD(free_list);
- if (timer) {
- DEQ_REMOVE_HEAD(free_list);
- } else {
- timer = NEW(nx_timer_t);
- DEQ_ITEM_INIT(timer);
- }
+ DEQ_ITEM_INIT(timer);
- if (timer) {
- timer->handler = cb;
- timer->context = context;
- timer->delta_time = 0;
- timer->state = TIMER_IDLE;
- DEQ_INSERT_TAIL(idle_timers, timer);
- }
+ timer->handler = cb;
+ timer->context = context;
+ timer->delta_time = 0;
+ timer->state = TIMER_IDLE;
+ sys_mutex_lock(lock);
+ DEQ_INSERT_TAIL(idle_timers, timer);
sys_mutex_unlock(lock);
+
return timer;
}
@@ -97,9 +93,10 @@ void nx_timer_free(nx_timer_t *timer)
sys_mutex_lock(lock);
nx_timer_cancel_LH(timer);
DEQ_REMOVE(idle_timers, timer);
- DEQ_INSERT_TAIL(free_list, timer);
- timer->state = TIMER_FREE;
sys_mutex_unlock(lock);
+
+ timer->state = TIMER_FREE;
+ free_nx_timer_t(timer);
}
@@ -180,7 +177,6 @@ void nx_timer_cancel(nx_timer_t *timer)
void nx_timer_initialize(sys_mutex_t *server_lock)
{
lock = server_lock;
- DEQ_INIT(free_list);
DEQ_INIT(idle_timers);
DEQ_INIT(scheduled_timers);
time_base = 0;
diff --git a/extras/nexus/tests/alloc_test.c b/extras/nexus/tests/alloc_test.c
index 02f48af7e7..b5eac74aa0 100644
--- a/extras/nexus/tests/alloc_test.c
+++ b/extras/nexus/tests/alloc_test.c
@@ -30,7 +30,7 @@ typedef struct {
nx_alloc_config_t config = {3, 7, 10};
ALLOC_DECLARE(object_t);
-ALLOC_DEFINE_CONFIG(object_t, &config);
+ALLOC_DEFINE_CONFIG(object_t, sizeof(object_t), &config);
static char* check_stats(nx_alloc_stats_t *stats, uint64_t ah, uint64_t fh, uint64_t ht, uint64_t rt, uint64_t rg)
diff --git a/extras/nexus/tests/message_test.c b/extras/nexus/tests/message_test.c
index e9a4f01636..c00b0c055f 100644
--- a/extras/nexus/tests/message_test.c
+++ b/extras/nexus/tests/message_test.c
@@ -20,26 +20,18 @@
#include "test_case.h"
#include <stdio.h>
#include <string.h>
-#include <qpid/nexus/message.h>
+#include "message_private.h"
#include <qpid/nexus/iterator.h>
#include <proton/message.h>
-static char* test_init(void *context)
-{
- nx_allocator_initialize(nx_allocator_default_config());
- nx_allocator_finalize();
- return 0;
-}
-
-
static char* test_send_to_messenger(void *context)
{
- nx_allocator_initialize(nx_allocator_default_config());
+ nx_message_t *msg = nx_allocate_message();
+ nx_message_content_t *content = MSG_CONTENT(msg);
- nx_message_t *msg = nx_allocate_message();
nx_message_compose_1(msg, "test_addr_0", 0);
- nx_buffer_t *buf = DEQ_HEAD(msg->buffers);
+ nx_buffer_t *buf = DEQ_HEAD(content->buffers);
if (buf == 0) return "Expected a buffer in the test message";
pn_message_t *pn_msg = pn_message();
@@ -52,15 +44,12 @@ static char* test_send_to_messenger(void *context)
pn_message_free(pn_msg);
nx_free_message(msg);
- nx_allocator_finalize();
return 0;
}
static char* test_receive_from_messenger(void *context)
{
- nx_allocator_initialize(nx_allocator_default_config());
-
pn_message_t *pn_msg = pn_message();
pn_message_set_address(pn_msg, "test_addr_1");
@@ -70,12 +59,14 @@ static char* test_receive_from_messenger(void *context)
if (result != 0) return "Error in pn_message_encode";
nx_buffer_insert(buf, size);
- nx_message_t *msg = nx_allocate_message();
- DEQ_INSERT_TAIL(msg->buffers, buf);
+ nx_message_t *msg = nx_allocate_message();
+ nx_message_content_t *content = MSG_CONTENT(msg);
+
+ DEQ_INSERT_TAIL(content->buffers, buf);
int valid = nx_message_check(msg, NX_DEPTH_ALL);
if (!valid) return "nx_message_check returns 'invalid'";
- nx_field_iterator_t *iter = nx_message_field_to(msg);
+ nx_field_iterator_t *iter = nx_message_field(msg, NX_FIELD_TO);
if (iter == 0) return "Expected an iterator for the 'to' field";
if (!nx_field_iterator_equal(iter, (unsigned char*) "test_addr_1"))
@@ -84,15 +75,12 @@ static char* test_receive_from_messenger(void *context)
pn_message_free(pn_msg);
nx_free_message(msg);
- nx_allocator_finalize();
return 0;
}
static char* test_insufficient_check_depth(void *context)
{
- nx_allocator_initialize(nx_allocator_default_config());
-
pn_message_t *pn_msg = pn_message();
pn_message_set_address(pn_msg, "test_addr_2");
@@ -102,17 +90,18 @@ static char* test_insufficient_check_depth(void *context)
if (result != 0) return "Error in pn_message_encode";
nx_buffer_insert(buf, size);
- nx_message_t *msg = nx_allocate_message();
- DEQ_INSERT_TAIL(msg->buffers, buf);
+ nx_message_t *msg = nx_allocate_message();
+ nx_message_content_t *content = MSG_CONTENT(msg);
+
+ DEQ_INSERT_TAIL(content->buffers, buf);
int valid = nx_message_check(msg, NX_DEPTH_DELIVERY_ANNOTATIONS);
if (!valid) return "nx_message_check returns 'invalid'";
- nx_field_iterator_t *iter = nx_message_field_to(msg);
+ nx_field_iterator_t *iter = nx_message_field(msg, NX_FIELD_TO);
if (iter) return "Expected no iterator for the 'to' field";
nx_free_message(msg);
- nx_allocator_finalize();
return 0;
}
@@ -121,7 +110,6 @@ int message_tests(void)
{
int result = 0;
- TEST_CASE(test_init, 0);
TEST_CASE(test_send_to_messenger, 0);
TEST_CASE(test_receive_from_messenger, 0);
TEST_CASE(test_insufficient_check_depth, 0);
diff --git a/extras/nexus/tests/timer_test.c b/extras/nexus/tests/timer_test.c
index f50f9367ea..09be21f4b6 100644
--- a/extras/nexus/tests/timer_test.c
+++ b/extras/nexus/tests/timer_test.c
@@ -19,6 +19,7 @@
#include <stdio.h>
#include <qpid/nexus/timer.h>
+#include "alloc_private.h"
#include "timer_private.h"
#include "test_case.h"
#include <qpid/nexus/threading.h>
@@ -341,6 +342,7 @@ static char* test_big(void *context)
int timer_tests(void)
{
int result = 0;
+ nx_alloc_initialize();
fire_mask = 0;
DEQ_INIT(pending_timers);