diff options
| author | Ted Ross <tross@apache.org> | 2013-02-07 21:12:04 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-02-07 21:12:04 +0000 |
| commit | 115e0e7e5136fe46b78eadd21d7c54dc49107881 (patch) | |
| tree | 1441da5c2826f45f57051ca51c3fca8ff8442e60 | |
| parent | 88da48a26100c4ff24e791aeb6a7caf41528be54 (diff) | |
| download | qpid-python-115e0e7e5136fe46b78eadd21d7c54dc49107881.tar.gz | |
QPID-4538
- Added iovec access to message fields
- Buffer size is now run-time configurable
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1443728 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/extras/nexus/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | qpid/extras/nexus/include/qpid/nexus/alloc.h | 8 | ||||
| -rw-r--r-- | qpid/extras/nexus/include/qpid/nexus/buffer.h | 4 | ||||
| -rw-r--r-- | qpid/extras/nexus/include/qpid/nexus/container.h | 7 | ||||
| -rw-r--r-- | qpid/extras/nexus/include/qpid/nexus/iovec.h | 32 | ||||
| -rw-r--r-- | qpid/extras/nexus/include/qpid/nexus/iterator.h | 3 | ||||
| -rw-r--r-- | qpid/extras/nexus/include/qpid/nexus/message.h | 4 | ||||
| -rw-r--r-- | qpid/extras/nexus/src/alloc.c | 12 | ||||
| -rw-r--r-- | qpid/extras/nexus/src/buffer.c | 19 | ||||
| -rw-r--r-- | qpid/extras/nexus/src/iovec.c | 81 | ||||
| -rw-r--r-- | qpid/extras/nexus/src/message.c | 154 | ||||
| -rw-r--r-- | qpid/extras/nexus/tests/alloc_test.c | 2 | ||||
| -rw-r--r-- | qpid/extras/nexus/tests/message_test.c | 4 |
13 files changed, 270 insertions, 61 deletions
diff --git a/qpid/extras/nexus/CMakeLists.txt b/qpid/extras/nexus/CMakeLists.txt index e1cbcd3cdd..85ac7bef0c 100644 --- a/qpid/extras/nexus/CMakeLists.txt +++ b/qpid/extras/nexus/CMakeLists.txt @@ -68,6 +68,7 @@ set(server_SOURCES src/buffer.c src/container.c src/hash.c + src/iovec.c src/iterator.c src/log.c src/message.c diff --git a/qpid/extras/nexus/include/qpid/nexus/alloc.h b/qpid/extras/nexus/include/qpid/nexus/alloc.h index 30450db641..f4ce896f22 100644 --- a/qpid/extras/nexus/include/qpid/nexus/alloc.h +++ b/qpid/extras/nexus/include/qpid/nexus/alloc.h @@ -42,6 +42,8 @@ typedef struct { typedef struct { char *type_name; size_t type_size; + size_t *additional_size; + size_t total_size; nx_alloc_config_t *config; nx_alloc_stats_t *stats; nx_alloc_pool_t *global_pool; @@ -57,14 +59,14 @@ void nx_dealloc(nx_alloc_type_desc_t *desc, nx_alloc_pool_t **tpool, void *p); T *new_##T(); \ void free_##T(T *p) -#define ALLOC_DEFINE_CONFIG(T,S,C) \ - nx_alloc_type_desc_t __desc_##T = {#T, S, C, 0, 0, 0}; \ +#define ALLOC_DEFINE_CONFIG(T,S,A,C) \ + nx_alloc_type_desc_t __desc_##T = {#T, S, A, 0, C, 0, 0, 0}; \ __thread nx_alloc_pool_t *__local_pool_##T = 0; \ T *new_##T() { return (T*) nx_alloc(&__desc_##T, &__local_pool_##T); } \ void free_##T(T *p) { nx_dealloc(&__desc_##T, &__local_pool_##T, (void*) p); } \ nx_alloc_stats_t *alloc_stats_##T() { return __desc_##T.stats; } -#define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, sizeof(T), 0) +#define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, sizeof(T), 0, 0) #endif diff --git a/qpid/extras/nexus/include/qpid/nexus/buffer.h b/qpid/extras/nexus/include/qpid/nexus/buffer.h index 6fb471c7f3..8c2e0d8374 100644 --- a/qpid/extras/nexus/include/qpid/nexus/buffer.h +++ b/qpid/extras/nexus/include/qpid/nexus/buffer.h @@ -32,6 +32,10 @@ struct nx_buffer_t { /** */ +void nx_buffer_set_size(size_t size); + +/** + */ nx_buffer_t *nx_allocate_buffer(void); /** diff --git a/qpid/extras/nexus/include/qpid/nexus/container.h b/qpid/extras/nexus/include/qpid/nexus/container.h index f6c9839da0..056c9a5b5e 100644 --- a/qpid/extras/nexus/include/qpid/nexus/container.h +++ b/qpid/extras/nexus/include/qpid/nexus/container.h @@ -29,6 +29,9 @@ typedef uint8_t nx_dist_mode_t; #define NX_DIST_MOVE 0x02 #define NX_DIST_BOTH 0x03 +/** + * Node Lifetime Policy (see AMQP 3.5.9) + */ typedef enum { NX_LIFE_PERMANENT, NX_LIFE_DELETE_CLOSE, @@ -37,6 +40,10 @@ typedef enum { NX_LIFE_DELETE_NO_LINKS_MESSAGES } nx_lifetime_policy_t; + +/** + * Link Direction + */ typedef enum { NX_INCOMING, NX_OUTGOING diff --git a/qpid/extras/nexus/include/qpid/nexus/iovec.h b/qpid/extras/nexus/include/qpid/nexus/iovec.h new file mode 100644 index 0000000000..33730b9ed3 --- /dev/null +++ b/qpid/extras/nexus/include/qpid/nexus/iovec.h @@ -0,0 +1,32 @@ +#ifndef __nexus_iovec_h__ +#define __nexus_iovec_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <sys/uio.h> + +typedef struct nx_iovec_t nx_iovec_t; + +nx_iovec_t *nx_iovec(int vector_count); +void nx_iovec_free(nx_iovec_t *iov); +struct iovec *nx_iovec_array(nx_iovec_t *iov); +int nx_iovec_count(nx_iovec_t *iov); + + +#endif diff --git a/qpid/extras/nexus/include/qpid/nexus/iterator.h b/qpid/extras/nexus/include/qpid/nexus/iterator.h index 9aca3d4795..a98ab257fa 100644 --- a/qpid/extras/nexus/include/qpid/nexus/iterator.h +++ b/qpid/extras/nexus/include/qpid/nexus/iterator.h @@ -19,8 +19,7 @@ * under the License. */ - -typedef struct nx_buffer_t nx_buffer_t; +#include <qpid/nexus/buffer.h> /** * The field iterator is used to access fields within a buffer chain. diff --git a/qpid/extras/nexus/include/qpid/nexus/message.h b/qpid/extras/nexus/include/qpid/nexus/message.h index ade0525666..3885b09576 100644 --- a/qpid/extras/nexus/include/qpid/nexus/message.h +++ b/qpid/extras/nexus/include/qpid/nexus/message.h @@ -24,6 +24,7 @@ #include <qpid/nexus/alloc.h> #include <qpid/nexus/iterator.h> #include <qpid/nexus/buffer.h> +#include <qpid/nexus/iovec.h> // Callback for status change (confirmed persistent, loaded-in-memory, etc.) @@ -108,7 +109,8 @@ nx_message_t *nx_message_receive(pn_delivery_t *delivery); void nx_message_send(nx_message_t *msg, pn_link_t *link); int nx_message_check(nx_message_t *msg, nx_message_depth_t depth); -nx_field_iterator_t *nx_message_field(nx_message_t *msg, nx_message_field_t field); +nx_field_iterator_t *nx_message_field_iterator(nx_message_t *msg, nx_message_field_t field); +nx_iovec_t *nx_message_field_iovec(nx_message_t *msg, nx_message_field_t field); pn_delivery_t *nx_message_inbound_delivery(nx_message_t *qm); diff --git a/qpid/extras/nexus/src/alloc.c b/qpid/extras/nexus/src/alloc.c index 397a7897ac..ae710e53a5 100644 --- a/qpid/extras/nexus/src/alloc.c +++ b/qpid/extras/nexus/src/alloc.c @@ -19,6 +19,7 @@ #include <qpid/nexus/alloc.h> #include <qpid/nexus/ctools.h> +#include <qpid/nexus/log.h> #include <memory.h> #include <stdio.h> @@ -45,9 +46,16 @@ static void nx_alloc_init(nx_alloc_type_desc_t *desc) { sys_mutex_lock(init_lock); + desc->total_size = desc->type_size; + if (desc->additional_size) + desc->total_size += *desc->additional_size; + + nx_log("ALLOC", LOG_TRACE, "Initialized Allocator - type=%s type-size=%d total-size=%d", + desc->type_name, desc->type_size, desc->total_size); + if (!desc->global_pool) { if (desc->config == 0) - desc->config = desc->type_size > 256 ? + desc->config = desc->total_size > 256 ? &nx_alloc_default_config_big : &nx_alloc_default_config_small; assert (desc->config->local_free_list_max >= desc->config->transfer_batch_size); @@ -121,7 +129,7 @@ void *nx_alloc(nx_alloc_type_desc_t *desc, nx_alloc_pool_t **tpool) // Allocate a full batch from the heap and put it on the thread list. // for (idx = 0; idx < desc->config->transfer_batch_size; idx++) { - item = (item_t*) malloc(sizeof(item_t) + desc->type_size); + item = (item_t*) malloc(sizeof(item_t) + desc->total_size); if (item == 0) break; DEQ_ITEM_INIT(item); diff --git a/qpid/extras/nexus/src/buffer.c b/qpid/extras/nexus/src/buffer.c index 10f32b9541..3c091a4983 100644 --- a/qpid/extras/nexus/src/buffer.c +++ b/qpid/extras/nexus/src/buffer.c @@ -20,16 +20,23 @@ #include <qpid/nexus/buffer.h> #include <qpid/nexus/alloc.h> -#ifndef NEXUS_BUFFER_SIZE -#define NEXUS_BUFFER_SIZE 512 -#endif +static size_t buffer_size = 512; +static int size_locked = 0; ALLOC_DECLARE(nx_buffer_t); -ALLOC_DEFINE_CONFIG(nx_buffer_t, sizeof(nx_buffer_t) + NEXUS_BUFFER_SIZE, 0); +ALLOC_DEFINE_CONFIG(nx_buffer_t, sizeof(nx_buffer_t), &buffer_size, 0); + + +void nx_buffer_set_size(size_t size) +{ + assert(!size_locked); + buffer_size = size; +} nx_buffer_t *nx_allocate_buffer(void) { + size_locked = 1; nx_buffer_t *buf = new_nx_buffer_t(); DEQ_ITEM_INIT(buf); @@ -58,7 +65,7 @@ unsigned char *nx_buffer_cursor(nx_buffer_t *buf) size_t nx_buffer_capacity(nx_buffer_t *buf) { - return NEXUS_BUFFER_SIZE - buf->size; + return buffer_size - buf->size; } @@ -71,6 +78,6 @@ size_t nx_buffer_size(nx_buffer_t *buf) void nx_buffer_insert(nx_buffer_t *buf, size_t len) { buf->size += len; - assert(buf->size <= NEXUS_BUFFER_SIZE); + assert(buf->size <= buffer_size); } diff --git a/qpid/extras/nexus/src/iovec.c b/qpid/extras/nexus/src/iovec.c new file mode 100644 index 0000000000..4543176bbe --- /dev/null +++ b/qpid/extras/nexus/src/iovec.c @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/nexus/iovec.h> +#include <qpid/nexus/alloc.h> +#include <string.h> + +#define NX_IOVEC_MAX 64 + +struct nx_iovec_t { + struct iovec iov_array[NX_IOVEC_MAX]; + struct iovec *iov; + int iov_count; +}; + + +ALLOC_DECLARE(nx_iovec_t); +ALLOC_DEFINE(nx_iovec_t); + + +nx_iovec_t *nx_iovec(int vector_count) +{ + nx_iovec_t *iov = new_nx_iovec_t(); + if (!iov) + return 0; + + memset(iov, 0, sizeof(nx_iovec_t)); + + iov->iov_count = vector_count; + if (vector_count > NX_IOVEC_MAX) + iov->iov = (struct iovec*) malloc(sizeof(struct iovec) * vector_count); + else + iov->iov = &iov->iov_array[0]; + + return iov; +} + + +void nx_iovec_free(nx_iovec_t *iov) +{ + if (!iov) + return; + + if (iov->iov && iov->iov != &iov->iov_array[0]) + free(iov->iov); + + free_nx_iovec_t(iov); +} + + +struct iovec *nx_iovec_array(nx_iovec_t *iov) +{ + if (!iov) + return 0; + return iov->iov; +} + + +int nx_iovec_count(nx_iovec_t *iov) +{ + if (!iov) + return 0; + return iov->iov_count; +} + diff --git a/qpid/extras/nexus/src/message.c b/qpid/extras/nexus/src/message.c index c9f7c1a581..95e959e745 100644 --- a/qpid/extras/nexus/src/message.c +++ b/qpid/extras/nexus/src/message.c @@ -23,7 +23,7 @@ #include <string.h> #include <stdio.h> -ALLOC_DEFINE_CONFIG(nx_message_t, sizeof(nx_message_pvt_t), 0); +ALLOC_DEFINE_CONFIG(nx_message_t, sizeof(nx_message_pvt_t), 0, 0); ALLOC_DEFINE(nx_message_content_t); @@ -379,6 +379,62 @@ static void nx_end_list(nx_message_content_t *msg) } +static nx_field_location_t *nx_message_field_location(nx_message_t *msg, nx_message_field_t field) +{ + nx_message_content_t *content = MSG_CONTENT(msg); + + switch (field) { + case NX_FIELD_TO: + while (1) { + if (content->field_to.parsed) + return &content->field_to; + + if (content->section_message_properties.parsed == 0) + break; + + nx_buffer_t *buffer = content->section_message_properties.buffer; + unsigned char *cursor = nx_buffer_base(buffer) + content->section_message_properties.offset; + + int count = start_list(&cursor, &buffer); + int result; + + if (count < 3) + break; + + result = traverse_field(&cursor, &buffer, 0); // message_id + if (!result) return 0; + result = traverse_field(&cursor, &buffer, 0); // user_id + if (!result) return 0; + result = traverse_field(&cursor, &buffer, &content->field_to); // to + if (!result) return 0; + } + break; + + case NX_FIELD_BODY: + while (1) { + if (content->body.parsed) + return &content->body; + + if (content->section_body.parsed == 0) + break; + + nx_buffer_t *buffer = content->section_body.buffer; + unsigned char *cursor = nx_buffer_base(buffer) + content->section_body.offset; + int result; + + result = traverse_field(&cursor, &buffer, &content->body); + if (!result) return 0; + } + break; + + default: + break; + } + + return 0; +} + + nx_message_t *nx_allocate_message() { nx_message_pvt_t *msg = (nx_message_pvt_t*) new_nx_message_t(); @@ -695,59 +751,69 @@ int nx_message_check(nx_message_t *in_msg, nx_message_depth_t depth) } -nx_field_iterator_t *nx_message_field(nx_message_t *msg, nx_message_field_t field) +nx_field_iterator_t *nx_message_field_iterator(nx_message_t *msg, nx_message_field_t field) { - nx_message_content_t *content = MSG_CONTENT(msg); - - switch (field) { - case NX_FIELD_TO: - while (1) { - if (content->field_to.parsed) - return nx_field_iterator_buffer(content->field_to.buffer, content->field_to.offset, content->field_to.length, ITER_VIEW_ALL); - - if (content->section_message_properties.parsed == 0) - break; - - nx_buffer_t *buffer = content->section_message_properties.buffer; - unsigned char *cursor = nx_buffer_base(buffer) + content->section_message_properties.offset; - - int count = start_list(&cursor, &buffer); - int result; + nx_field_location_t *loc = nx_message_field_location(msg, field); + if (!loc) + return 0; - if (count < 3) - break; + return nx_field_iterator_buffer(loc->buffer, loc->offset, loc->length, ITER_VIEW_ALL); +} - result = traverse_field(&cursor, &buffer, 0); // message_id - if (!result) return 0; - result = traverse_field(&cursor, &buffer, 0); // user_id - if (!result) return 0; - result = traverse_field(&cursor, &buffer, &content->field_to); // to - if (!result) return 0; - } - break; - case NX_FIELD_BODY: - while (1) { - if (content->body.parsed) - return nx_field_iterator_buffer(content->body.buffer, content->body.offset, content->body.length, ITER_VIEW_ALL); +nx_iovec_t *nx_message_field_iovec(nx_message_t *msg, nx_message_field_t field) +{ + nx_field_location_t *loc = nx_message_field_location(msg, field); + if (!loc) + return 0; - if (content->section_body.parsed == 0) - break; + // + // Count the number of buffers this field straddles + // + int bufcnt = 1; + nx_buffer_t *buf = loc->buffer; + size_t bufsize = nx_buffer_size(buf) - loc->offset; + ssize_t remaining = loc->length - bufsize; + + while (remaining > 0) { + bufcnt++; + buf = buf->next; + if (!buf) + return 0; + remaining -= nx_buffer_size(buf); + } - nx_buffer_t *buffer = content->section_body.buffer; - unsigned char *cursor = nx_buffer_base(buffer) + content->section_body.offset; - int result; + // + // Allocate an iovec object big enough to hold the number of buffers + // + nx_iovec_t *iov = nx_iovec(bufcnt); + if (!iov) + return 0; - result = traverse_field(&cursor, &buffer, &content->body); - if (!result) return 0; + // + // Build out the io vectors with pointers to the segments of the field in buffers + // + bufcnt = 0; + buf = loc->buffer; + bufsize = nx_buffer_size(buf) - loc->offset; + void *base = nx_buffer_base(buf) + loc->offset; + remaining = loc->length; + + while (remaining > 0) { + nx_iovec_array(iov)[bufcnt].iov_base = base; + nx_iovec_array(iov)[bufcnt].iov_len = bufsize; + bufcnt++; + remaining -= bufsize; + if (remaining > 0) { + buf = buf->next; + base = nx_buffer_base(buf); + bufsize = nx_buffer_size(buf); + if (bufsize > remaining) + bufsize = remaining; } - break; - - default: - break; } - return 0; + return iov; } diff --git a/qpid/extras/nexus/tests/alloc_test.c b/qpid/extras/nexus/tests/alloc_test.c index b5eac74aa0..c8ea07e1cc 100644 --- a/qpid/extras/nexus/tests/alloc_test.c +++ b/qpid/extras/nexus/tests/alloc_test.c @@ -30,7 +30,7 @@ typedef struct { nx_alloc_config_t config = {3, 7, 10}; ALLOC_DECLARE(object_t); -ALLOC_DEFINE_CONFIG(object_t, sizeof(object_t), &config); +ALLOC_DEFINE_CONFIG(object_t, sizeof(object_t), 0, &config); static char* check_stats(nx_alloc_stats_t *stats, uint64_t ah, uint64_t fh, uint64_t ht, uint64_t rt, uint64_t rg) diff --git a/qpid/extras/nexus/tests/message_test.c b/qpid/extras/nexus/tests/message_test.c index c00b0c055f..1d69d30bc0 100644 --- a/qpid/extras/nexus/tests/message_test.c +++ b/qpid/extras/nexus/tests/message_test.c @@ -66,7 +66,7 @@ static char* test_receive_from_messenger(void *context) int valid = nx_message_check(msg, NX_DEPTH_ALL); if (!valid) return "nx_message_check returns 'invalid'"; - nx_field_iterator_t *iter = nx_message_field(msg, NX_FIELD_TO); + nx_field_iterator_t *iter = nx_message_field_iterator(msg, NX_FIELD_TO); if (iter == 0) return "Expected an iterator for the 'to' field"; if (!nx_field_iterator_equal(iter, (unsigned char*) "test_addr_1")) @@ -97,7 +97,7 @@ static char* test_insufficient_check_depth(void *context) int valid = nx_message_check(msg, NX_DEPTH_DELIVERY_ANNOTATIONS); if (!valid) return "nx_message_check returns 'invalid'"; - nx_field_iterator_t *iter = nx_message_field(msg, NX_FIELD_TO); + nx_field_iterator_t *iter = nx_message_field_iterator(msg, NX_FIELD_TO); if (iter) return "Expected no iterator for the 'to' field"; nx_free_message(msg); |
