summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-02-07 21:12:04 +0000
committerTed Ross <tross@apache.org>2013-02-07 21:12:04 +0000
commit115e0e7e5136fe46b78eadd21d7c54dc49107881 (patch)
tree1441da5c2826f45f57051ca51c3fca8ff8442e60
parent88da48a26100c4ff24e791aeb6a7caf41528be54 (diff)
downloadqpid-python-115e0e7e5136fe46b78eadd21d7c54dc49107881.tar.gz
QPID-4538
- Added iovec access to message fields - Buffer size is now run-time configurable git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1443728 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/extras/nexus/CMakeLists.txt1
-rw-r--r--qpid/extras/nexus/include/qpid/nexus/alloc.h8
-rw-r--r--qpid/extras/nexus/include/qpid/nexus/buffer.h4
-rw-r--r--qpid/extras/nexus/include/qpid/nexus/container.h7
-rw-r--r--qpid/extras/nexus/include/qpid/nexus/iovec.h32
-rw-r--r--qpid/extras/nexus/include/qpid/nexus/iterator.h3
-rw-r--r--qpid/extras/nexus/include/qpid/nexus/message.h4
-rw-r--r--qpid/extras/nexus/src/alloc.c12
-rw-r--r--qpid/extras/nexus/src/buffer.c19
-rw-r--r--qpid/extras/nexus/src/iovec.c81
-rw-r--r--qpid/extras/nexus/src/message.c154
-rw-r--r--qpid/extras/nexus/tests/alloc_test.c2
-rw-r--r--qpid/extras/nexus/tests/message_test.c4
13 files changed, 270 insertions, 61 deletions
diff --git a/qpid/extras/nexus/CMakeLists.txt b/qpid/extras/nexus/CMakeLists.txt
index e1cbcd3cdd..85ac7bef0c 100644
--- a/qpid/extras/nexus/CMakeLists.txt
+++ b/qpid/extras/nexus/CMakeLists.txt
@@ -68,6 +68,7 @@ set(server_SOURCES
src/buffer.c
src/container.c
src/hash.c
+ src/iovec.c
src/iterator.c
src/log.c
src/message.c
diff --git a/qpid/extras/nexus/include/qpid/nexus/alloc.h b/qpid/extras/nexus/include/qpid/nexus/alloc.h
index 30450db641..f4ce896f22 100644
--- a/qpid/extras/nexus/include/qpid/nexus/alloc.h
+++ b/qpid/extras/nexus/include/qpid/nexus/alloc.h
@@ -42,6 +42,8 @@ typedef struct {
typedef struct {
char *type_name;
size_t type_size;
+ size_t *additional_size;
+ size_t total_size;
nx_alloc_config_t *config;
nx_alloc_stats_t *stats;
nx_alloc_pool_t *global_pool;
@@ -57,14 +59,14 @@ void nx_dealloc(nx_alloc_type_desc_t *desc, nx_alloc_pool_t **tpool, void *p);
T *new_##T(); \
void free_##T(T *p)
-#define ALLOC_DEFINE_CONFIG(T,S,C) \
- nx_alloc_type_desc_t __desc_##T = {#T, S, C, 0, 0, 0}; \
+#define ALLOC_DEFINE_CONFIG(T,S,A,C) \
+ nx_alloc_type_desc_t __desc_##T = {#T, S, A, 0, C, 0, 0, 0}; \
__thread nx_alloc_pool_t *__local_pool_##T = 0; \
T *new_##T() { return (T*) nx_alloc(&__desc_##T, &__local_pool_##T); } \
void free_##T(T *p) { nx_dealloc(&__desc_##T, &__local_pool_##T, (void*) p); } \
nx_alloc_stats_t *alloc_stats_##T() { return __desc_##T.stats; }
-#define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, sizeof(T), 0)
+#define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, sizeof(T), 0, 0)
#endif
diff --git a/qpid/extras/nexus/include/qpid/nexus/buffer.h b/qpid/extras/nexus/include/qpid/nexus/buffer.h
index 6fb471c7f3..8c2e0d8374 100644
--- a/qpid/extras/nexus/include/qpid/nexus/buffer.h
+++ b/qpid/extras/nexus/include/qpid/nexus/buffer.h
@@ -32,6 +32,10 @@ struct nx_buffer_t {
/**
*/
+void nx_buffer_set_size(size_t size);
+
+/**
+ */
nx_buffer_t *nx_allocate_buffer(void);
/**
diff --git a/qpid/extras/nexus/include/qpid/nexus/container.h b/qpid/extras/nexus/include/qpid/nexus/container.h
index f6c9839da0..056c9a5b5e 100644
--- a/qpid/extras/nexus/include/qpid/nexus/container.h
+++ b/qpid/extras/nexus/include/qpid/nexus/container.h
@@ -29,6 +29,9 @@ typedef uint8_t nx_dist_mode_t;
#define NX_DIST_MOVE 0x02
#define NX_DIST_BOTH 0x03
+/**
+ * Node Lifetime Policy (see AMQP 3.5.9)
+ */
typedef enum {
NX_LIFE_PERMANENT,
NX_LIFE_DELETE_CLOSE,
@@ -37,6 +40,10 @@ typedef enum {
NX_LIFE_DELETE_NO_LINKS_MESSAGES
} nx_lifetime_policy_t;
+
+/**
+ * Link Direction
+ */
typedef enum {
NX_INCOMING,
NX_OUTGOING
diff --git a/qpid/extras/nexus/include/qpid/nexus/iovec.h b/qpid/extras/nexus/include/qpid/nexus/iovec.h
new file mode 100644
index 0000000000..33730b9ed3
--- /dev/null
+++ b/qpid/extras/nexus/include/qpid/nexus/iovec.h
@@ -0,0 +1,32 @@
+#ifndef __nexus_iovec_h__
+#define __nexus_iovec_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <sys/uio.h>
+
+typedef struct nx_iovec_t nx_iovec_t;
+
+nx_iovec_t *nx_iovec(int vector_count);
+void nx_iovec_free(nx_iovec_t *iov);
+struct iovec *nx_iovec_array(nx_iovec_t *iov);
+int nx_iovec_count(nx_iovec_t *iov);
+
+
+#endif
diff --git a/qpid/extras/nexus/include/qpid/nexus/iterator.h b/qpid/extras/nexus/include/qpid/nexus/iterator.h
index 9aca3d4795..a98ab257fa 100644
--- a/qpid/extras/nexus/include/qpid/nexus/iterator.h
+++ b/qpid/extras/nexus/include/qpid/nexus/iterator.h
@@ -19,8 +19,7 @@
* under the License.
*/
-
-typedef struct nx_buffer_t nx_buffer_t;
+#include <qpid/nexus/buffer.h>
/**
* The field iterator is used to access fields within a buffer chain.
diff --git a/qpid/extras/nexus/include/qpid/nexus/message.h b/qpid/extras/nexus/include/qpid/nexus/message.h
index ade0525666..3885b09576 100644
--- a/qpid/extras/nexus/include/qpid/nexus/message.h
+++ b/qpid/extras/nexus/include/qpid/nexus/message.h
@@ -24,6 +24,7 @@
#include <qpid/nexus/alloc.h>
#include <qpid/nexus/iterator.h>
#include <qpid/nexus/buffer.h>
+#include <qpid/nexus/iovec.h>
// Callback for status change (confirmed persistent, loaded-in-memory, etc.)
@@ -108,7 +109,8 @@ nx_message_t *nx_message_receive(pn_delivery_t *delivery);
void nx_message_send(nx_message_t *msg, pn_link_t *link);
int nx_message_check(nx_message_t *msg, nx_message_depth_t depth);
-nx_field_iterator_t *nx_message_field(nx_message_t *msg, nx_message_field_t field);
+nx_field_iterator_t *nx_message_field_iterator(nx_message_t *msg, nx_message_field_t field);
+nx_iovec_t *nx_message_field_iovec(nx_message_t *msg, nx_message_field_t field);
pn_delivery_t *nx_message_inbound_delivery(nx_message_t *qm);
diff --git a/qpid/extras/nexus/src/alloc.c b/qpid/extras/nexus/src/alloc.c
index 397a7897ac..ae710e53a5 100644
--- a/qpid/extras/nexus/src/alloc.c
+++ b/qpid/extras/nexus/src/alloc.c
@@ -19,6 +19,7 @@
#include <qpid/nexus/alloc.h>
#include <qpid/nexus/ctools.h>
+#include <qpid/nexus/log.h>
#include <memory.h>
#include <stdio.h>
@@ -45,9 +46,16 @@ static void nx_alloc_init(nx_alloc_type_desc_t *desc)
{
sys_mutex_lock(init_lock);
+ desc->total_size = desc->type_size;
+ if (desc->additional_size)
+ desc->total_size += *desc->additional_size;
+
+ nx_log("ALLOC", LOG_TRACE, "Initialized Allocator - type=%s type-size=%d total-size=%d",
+ desc->type_name, desc->type_size, desc->total_size);
+
if (!desc->global_pool) {
if (desc->config == 0)
- desc->config = desc->type_size > 256 ?
+ desc->config = desc->total_size > 256 ?
&nx_alloc_default_config_big : &nx_alloc_default_config_small;
assert (desc->config->local_free_list_max >= desc->config->transfer_batch_size);
@@ -121,7 +129,7 @@ void *nx_alloc(nx_alloc_type_desc_t *desc, nx_alloc_pool_t **tpool)
// Allocate a full batch from the heap and put it on the thread list.
//
for (idx = 0; idx < desc->config->transfer_batch_size; idx++) {
- item = (item_t*) malloc(sizeof(item_t) + desc->type_size);
+ item = (item_t*) malloc(sizeof(item_t) + desc->total_size);
if (item == 0)
break;
DEQ_ITEM_INIT(item);
diff --git a/qpid/extras/nexus/src/buffer.c b/qpid/extras/nexus/src/buffer.c
index 10f32b9541..3c091a4983 100644
--- a/qpid/extras/nexus/src/buffer.c
+++ b/qpid/extras/nexus/src/buffer.c
@@ -20,16 +20,23 @@
#include <qpid/nexus/buffer.h>
#include <qpid/nexus/alloc.h>
-#ifndef NEXUS_BUFFER_SIZE
-#define NEXUS_BUFFER_SIZE 512
-#endif
+static size_t buffer_size = 512;
+static int size_locked = 0;
ALLOC_DECLARE(nx_buffer_t);
-ALLOC_DEFINE_CONFIG(nx_buffer_t, sizeof(nx_buffer_t) + NEXUS_BUFFER_SIZE, 0);
+ALLOC_DEFINE_CONFIG(nx_buffer_t, sizeof(nx_buffer_t), &buffer_size, 0);
+
+
+void nx_buffer_set_size(size_t size)
+{
+ assert(!size_locked);
+ buffer_size = size;
+}
nx_buffer_t *nx_allocate_buffer(void)
{
+ size_locked = 1;
nx_buffer_t *buf = new_nx_buffer_t();
DEQ_ITEM_INIT(buf);
@@ -58,7 +65,7 @@ unsigned char *nx_buffer_cursor(nx_buffer_t *buf)
size_t nx_buffer_capacity(nx_buffer_t *buf)
{
- return NEXUS_BUFFER_SIZE - buf->size;
+ return buffer_size - buf->size;
}
@@ -71,6 +78,6 @@ size_t nx_buffer_size(nx_buffer_t *buf)
void nx_buffer_insert(nx_buffer_t *buf, size_t len)
{
buf->size += len;
- assert(buf->size <= NEXUS_BUFFER_SIZE);
+ assert(buf->size <= buffer_size);
}
diff --git a/qpid/extras/nexus/src/iovec.c b/qpid/extras/nexus/src/iovec.c
new file mode 100644
index 0000000000..4543176bbe
--- /dev/null
+++ b/qpid/extras/nexus/src/iovec.c
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/nexus/iovec.h>
+#include <qpid/nexus/alloc.h>
+#include <string.h>
+
+#define NX_IOVEC_MAX 64
+
+struct nx_iovec_t {
+ struct iovec iov_array[NX_IOVEC_MAX];
+ struct iovec *iov;
+ int iov_count;
+};
+
+
+ALLOC_DECLARE(nx_iovec_t);
+ALLOC_DEFINE(nx_iovec_t);
+
+
+nx_iovec_t *nx_iovec(int vector_count)
+{
+ nx_iovec_t *iov = new_nx_iovec_t();
+ if (!iov)
+ return 0;
+
+ memset(iov, 0, sizeof(nx_iovec_t));
+
+ iov->iov_count = vector_count;
+ if (vector_count > NX_IOVEC_MAX)
+ iov->iov = (struct iovec*) malloc(sizeof(struct iovec) * vector_count);
+ else
+ iov->iov = &iov->iov_array[0];
+
+ return iov;
+}
+
+
+void nx_iovec_free(nx_iovec_t *iov)
+{
+ if (!iov)
+ return;
+
+ if (iov->iov && iov->iov != &iov->iov_array[0])
+ free(iov->iov);
+
+ free_nx_iovec_t(iov);
+}
+
+
+struct iovec *nx_iovec_array(nx_iovec_t *iov)
+{
+ if (!iov)
+ return 0;
+ return iov->iov;
+}
+
+
+int nx_iovec_count(nx_iovec_t *iov)
+{
+ if (!iov)
+ return 0;
+ return iov->iov_count;
+}
+
diff --git a/qpid/extras/nexus/src/message.c b/qpid/extras/nexus/src/message.c
index c9f7c1a581..95e959e745 100644
--- a/qpid/extras/nexus/src/message.c
+++ b/qpid/extras/nexus/src/message.c
@@ -23,7 +23,7 @@
#include <string.h>
#include <stdio.h>
-ALLOC_DEFINE_CONFIG(nx_message_t, sizeof(nx_message_pvt_t), 0);
+ALLOC_DEFINE_CONFIG(nx_message_t, sizeof(nx_message_pvt_t), 0, 0);
ALLOC_DEFINE(nx_message_content_t);
@@ -379,6 +379,62 @@ static void nx_end_list(nx_message_content_t *msg)
}
+static nx_field_location_t *nx_message_field_location(nx_message_t *msg, nx_message_field_t field)
+{
+ nx_message_content_t *content = MSG_CONTENT(msg);
+
+ switch (field) {
+ case NX_FIELD_TO:
+ while (1) {
+ if (content->field_to.parsed)
+ return &content->field_to;
+
+ if (content->section_message_properties.parsed == 0)
+ break;
+
+ nx_buffer_t *buffer = content->section_message_properties.buffer;
+ unsigned char *cursor = nx_buffer_base(buffer) + content->section_message_properties.offset;
+
+ int count = start_list(&cursor, &buffer);
+ int result;
+
+ if (count < 3)
+ break;
+
+ result = traverse_field(&cursor, &buffer, 0); // message_id
+ if (!result) return 0;
+ result = traverse_field(&cursor, &buffer, 0); // user_id
+ if (!result) return 0;
+ result = traverse_field(&cursor, &buffer, &content->field_to); // to
+ if (!result) return 0;
+ }
+ break;
+
+ case NX_FIELD_BODY:
+ while (1) {
+ if (content->body.parsed)
+ return &content->body;
+
+ if (content->section_body.parsed == 0)
+ break;
+
+ nx_buffer_t *buffer = content->section_body.buffer;
+ unsigned char *cursor = nx_buffer_base(buffer) + content->section_body.offset;
+ int result;
+
+ result = traverse_field(&cursor, &buffer, &content->body);
+ if (!result) return 0;
+ }
+ break;
+
+ default:
+ break;
+ }
+
+ return 0;
+}
+
+
nx_message_t *nx_allocate_message()
{
nx_message_pvt_t *msg = (nx_message_pvt_t*) new_nx_message_t();
@@ -695,59 +751,69 @@ int nx_message_check(nx_message_t *in_msg, nx_message_depth_t depth)
}
-nx_field_iterator_t *nx_message_field(nx_message_t *msg, nx_message_field_t field)
+nx_field_iterator_t *nx_message_field_iterator(nx_message_t *msg, nx_message_field_t field)
{
- nx_message_content_t *content = MSG_CONTENT(msg);
-
- switch (field) {
- case NX_FIELD_TO:
- while (1) {
- if (content->field_to.parsed)
- return nx_field_iterator_buffer(content->field_to.buffer, content->field_to.offset, content->field_to.length, ITER_VIEW_ALL);
-
- if (content->section_message_properties.parsed == 0)
- break;
-
- nx_buffer_t *buffer = content->section_message_properties.buffer;
- unsigned char *cursor = nx_buffer_base(buffer) + content->section_message_properties.offset;
-
- int count = start_list(&cursor, &buffer);
- int result;
+ nx_field_location_t *loc = nx_message_field_location(msg, field);
+ if (!loc)
+ return 0;
- if (count < 3)
- break;
+ return nx_field_iterator_buffer(loc->buffer, loc->offset, loc->length, ITER_VIEW_ALL);
+}
- result = traverse_field(&cursor, &buffer, 0); // message_id
- if (!result) return 0;
- result = traverse_field(&cursor, &buffer, 0); // user_id
- if (!result) return 0;
- result = traverse_field(&cursor, &buffer, &content->field_to); // to
- if (!result) return 0;
- }
- break;
- case NX_FIELD_BODY:
- while (1) {
- if (content->body.parsed)
- return nx_field_iterator_buffer(content->body.buffer, content->body.offset, content->body.length, ITER_VIEW_ALL);
+nx_iovec_t *nx_message_field_iovec(nx_message_t *msg, nx_message_field_t field)
+{
+ nx_field_location_t *loc = nx_message_field_location(msg, field);
+ if (!loc)
+ return 0;
- if (content->section_body.parsed == 0)
- break;
+ //
+ // Count the number of buffers this field straddles
+ //
+ int bufcnt = 1;
+ nx_buffer_t *buf = loc->buffer;
+ size_t bufsize = nx_buffer_size(buf) - loc->offset;
+ ssize_t remaining = loc->length - bufsize;
+
+ while (remaining > 0) {
+ bufcnt++;
+ buf = buf->next;
+ if (!buf)
+ return 0;
+ remaining -= nx_buffer_size(buf);
+ }
- nx_buffer_t *buffer = content->section_body.buffer;
- unsigned char *cursor = nx_buffer_base(buffer) + content->section_body.offset;
- int result;
+ //
+ // Allocate an iovec object big enough to hold the number of buffers
+ //
+ nx_iovec_t *iov = nx_iovec(bufcnt);
+ if (!iov)
+ return 0;
- result = traverse_field(&cursor, &buffer, &content->body);
- if (!result) return 0;
+ //
+ // Build out the io vectors with pointers to the segments of the field in buffers
+ //
+ bufcnt = 0;
+ buf = loc->buffer;
+ bufsize = nx_buffer_size(buf) - loc->offset;
+ void *base = nx_buffer_base(buf) + loc->offset;
+ remaining = loc->length;
+
+ while (remaining > 0) {
+ nx_iovec_array(iov)[bufcnt].iov_base = base;
+ nx_iovec_array(iov)[bufcnt].iov_len = bufsize;
+ bufcnt++;
+ remaining -= bufsize;
+ if (remaining > 0) {
+ buf = buf->next;
+ base = nx_buffer_base(buf);
+ bufsize = nx_buffer_size(buf);
+ if (bufsize > remaining)
+ bufsize = remaining;
}
- break;
-
- default:
- break;
}
- return 0;
+ return iov;
}
diff --git a/qpid/extras/nexus/tests/alloc_test.c b/qpid/extras/nexus/tests/alloc_test.c
index b5eac74aa0..c8ea07e1cc 100644
--- a/qpid/extras/nexus/tests/alloc_test.c
+++ b/qpid/extras/nexus/tests/alloc_test.c
@@ -30,7 +30,7 @@ typedef struct {
nx_alloc_config_t config = {3, 7, 10};
ALLOC_DECLARE(object_t);
-ALLOC_DEFINE_CONFIG(object_t, sizeof(object_t), &config);
+ALLOC_DEFINE_CONFIG(object_t, sizeof(object_t), 0, &config);
static char* check_stats(nx_alloc_stats_t *stats, uint64_t ah, uint64_t fh, uint64_t ht, uint64_t rt, uint64_t rg)
diff --git a/qpid/extras/nexus/tests/message_test.c b/qpid/extras/nexus/tests/message_test.c
index c00b0c055f..1d69d30bc0 100644
--- a/qpid/extras/nexus/tests/message_test.c
+++ b/qpid/extras/nexus/tests/message_test.c
@@ -66,7 +66,7 @@ static char* test_receive_from_messenger(void *context)
int valid = nx_message_check(msg, NX_DEPTH_ALL);
if (!valid) return "nx_message_check returns 'invalid'";
- nx_field_iterator_t *iter = nx_message_field(msg, NX_FIELD_TO);
+ nx_field_iterator_t *iter = nx_message_field_iterator(msg, NX_FIELD_TO);
if (iter == 0) return "Expected an iterator for the 'to' field";
if (!nx_field_iterator_equal(iter, (unsigned char*) "test_addr_1"))
@@ -97,7 +97,7 @@ static char* test_insufficient_check_depth(void *context)
int valid = nx_message_check(msg, NX_DEPTH_DELIVERY_ANNOTATIONS);
if (!valid) return "nx_message_check returns 'invalid'";
- nx_field_iterator_t *iter = nx_message_field(msg, NX_FIELD_TO);
+ nx_field_iterator_t *iter = nx_message_field_iterator(msg, NX_FIELD_TO);
if (iter) return "Expected no iterator for the 'to' field";
nx_free_message(msg);