diff options
| author | Ted Ross <tross@apache.org> | 2013-06-26 17:36:32 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-06-26 17:36:32 +0000 |
| commit | ede7589da6bdded56f54c0a7802e779c1e900ea2 (patch) | |
| tree | c0ad1631b29417b9322643be4bdfbfe557cc3426 | |
| parent | 3a6acad9663e3b552ebed78645c77e855f9502ac (diff) | |
| download | qpid-python-ede7589da6bdded56f54c0a7802e779c1e900ea2.tar.gz | |
NO-JIRA - Major refactoring of the code for composing performatives in messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1497019 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/extras/dispatch/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | qpid/extras/dispatch/include/qpid/dispatch/compose.h | 184 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/compose.c | 433 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/compose_private.h | 26 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/message.c | 454 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/message_private.h | 4 |
6 files changed, 678 insertions, 424 deletions
diff --git a/qpid/extras/dispatch/CMakeLists.txt b/qpid/extras/dispatch/CMakeLists.txt index 918bc1c406..fb62be2eeb 100644 --- a/qpid/extras/dispatch/CMakeLists.txt +++ b/qpid/extras/dispatch/CMakeLists.txt @@ -77,6 +77,7 @@ set(server_SOURCES src/agent.c src/alloc.c src/buffer.c + src/compose.c src/config.c src/container.c src/dispatch.c diff --git a/qpid/extras/dispatch/include/qpid/dispatch/compose.h b/qpid/extras/dispatch/include/qpid/dispatch/compose.h new file mode 100644 index 0000000000..753b3fd1b6 --- /dev/null +++ b/qpid/extras/dispatch/include/qpid/dispatch/compose.h @@ -0,0 +1,184 @@ +#ifndef __dispatch_compose_h__ +#define __dispatch_compose_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/buffer.h> + +typedef struct dx_composed_field_t dx_composed_field_t; + +#define DX_PERFORMATIVE_HEADER 0x70 +#define DX_PERFORMATIVE_DELIVERY_ANNOTATIONS 0x71 +#define DX_PERFORMATIVE_MESSAGE_ANNOTATIONS 0x72 +#define DX_PERFORMATIVE_PROPERTIES 0x73 +#define DX_PERFORMATIVE_APPLICATION_PROPERTIES 0x74 +#define DX_PERFORMATIVE_BODY_DATA 0x75 +#define DX_PERFORMATIVE_BODY_AMQP_SEQUENCE 0x76 +#define DX_PERFORMATIVE_BODY_AMQP_VALUE 0x77 +#define DX_PERFORMATIVE_FOOTER 0x78 + +/** + * Begin composing a new field for a message. The new field can be standalone or + * appended onto an existing field. + * + * @param performative The performative for the message section being composed. + * @param extend An existing field onto which to append the new field or NULL to + * create a standalone field. + * @return A pointer to the newly created field. + */ +dx_composed_field_t *dx_compose(uint8_t performative, dx_composed_field_t *extend); + +/** + * Free the resources associated with a composed field. + * + * @param A field pointer returned by dx_compose. + */ +void dx_compose_free(dx_composed_field_t *field); + +/** + * Begin to compose the elements of a list in the field. This is called before inserting + * the first list element. + * + * @param field A field created by dx_compose. + */ +void dx_compose_start_list(dx_composed_field_t *field); + +/** + * Complete the composition of a list in the field. This is called after the last + * list element has been inserted. + * + * @param field A field created by dx_compose. + */ +void dx_compose_end_list(dx_composed_field_t *field); + +/** + * Begin to compose the elements os a map in the field. This is called before + * inserting the first element-pair into the map. + * + * @param field A field created by dx_compose. + */ +void dx_compose_start_map(dx_composed_field_t *field); + +/** + * Complete the composition of a map in the field. This is called after the last + * element-pair has been inserted. + * + * @param field A field created by dx_compose. + */ +void dx_compose_end_map(dx_composed_field_t *field); + +/** + * Insert a null element into the field. + * + * @param field A field created by dx_compose. + */ +void dx_compose_insert_null(dx_composed_field_t *field); + +/** + * Insert a boolean value into the field. + * + * @param field A field created by dx_compose. + * @param value The boolean (zero or non-zero) value to insert. + */ +void dx_compose_insert_bool(dx_composed_field_t *field, int value); + +/** + * Insert an unsigned integer (up to 32 bits) into the field. + * + * @param field A field created by dx_compose. + * @param value The unsigned integer value to be inserted. + */ +void dx_compose_insert_uint(dx_composed_field_t *field, uint32_t value); + +/** + * Insert a long (64-bit) unsigned value into the field. + * + * @param field A field created by dx_compose. + * @param value The unsigned integer value to be inserted. + */ +void dx_compose_insert_ulong(dx_composed_field_t *field, uint64_t value); + +/** + * Insert a signed integer (up to 32 bits) into the field. + * + * @param field A field created by dx_compose. + * @param value The integer value to be inserted. + */ +void dx_compose_insert_int(dx_composed_field_t *field, int32_t value); + +/** + * Insert a long signed integer (64 bits) into the field. + * + * @param field A field created by dx_compose. + * @param value The integer value to be inserted. + */ +void dx_compose_insert_long(dx_composed_field_t *field, int64_t value); + +/** + * Insert a timestamp into the field. + * + * @param field A field created by dx_compose. + * @param value The timestamp value to be inserted. + */ +void dx_compose_insert_timestamp(dx_composed_field_t *field, uint64_t value); + +/** + * Insert a UUID into the field. + * + * @param field A field created by dx_compose. + * @param value The pointer to the first octet in the UUID to be inserted. + */ +void dx_compose_insert_uuid(dx_composed_field_t *field, const char *value); + +/** + * Insert a binary blob into the field. + * + * @param field A field created by dx_compose. + * @param value The pointer to the first octet to be inserted. + * @param len The length, in octets, of the binary blob. + */ +void dx_compose_insert_binary(dx_composed_field_t *field, const uint8_t *value, uint32_t len); + +/** + * Insert a binary blob from a list of buffers. + * + * @param field A field created by dx_compose. + * @param buffers A pointer to a list of buffers to be inserted as binary data. Note that + * the buffer list will be left empty by this function. + */ +void dx_compose_insert_binary_buffers(dx_composed_field_t *field, dx_buffer_list_t *buffers); + +/** + * Insert a null-terminated utf8-encoded string into the field. + * + * @param field A field created by dx_compose. + * @param value A pointer to a null-terminated string. + */ +void dx_compose_insert_string(dx_composed_field_t *field, const char *value); + +/** + * Insert a symbol into the field. + * + * @param field A field created by dx_compose. + * @param value A pointer to a null-terminated ASCII string. + */ +void dx_compose_insert_symbol(dx_composed_field_t *field, const char *value); + +#endif + diff --git a/qpid/extras/dispatch/src/compose.c b/qpid/extras/dispatch/src/compose.c new file mode 100644 index 0000000000..3c0d8974ba --- /dev/null +++ b/qpid/extras/dispatch/src/compose.c @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/alloc.h> +#include <qpid/dispatch/buffer.h> +#include "message_private.h" +#include "compose_private.h" +#include <memory.h> + +typedef struct dx_composite_t { + DEQ_LINKS(struct dx_composite_t); + int isMap; + uint32_t count; + uint32_t length; + dx_field_location_t length_location; + dx_field_location_t count_location; +} dx_composite_t; + +ALLOC_DECLARE(dx_composite_t); +ALLOC_DEFINE(dx_composite_t); +DEQ_DECLARE(dx_composite_t, dx_field_stack_t); + + +struct dx_composed_field_t { + dx_buffer_list_t buffers; + dx_field_stack_t fieldStack; +}; + +ALLOC_DECLARE(dx_composed_field_t); +ALLOC_DEFINE(dx_composed_field_t); + + +static void bump_count(dx_composed_field_t *field) +{ + dx_composite_t *comp = DEQ_HEAD(field->fieldStack); + if (comp) + comp->count++; +} + + +static void dx_insert(dx_composed_field_t *field, const uint8_t *seq, size_t len) +{ + dx_buffer_t *buf = DEQ_TAIL(field->buffers); + dx_composite_t *comp = DEQ_HEAD(field->fieldStack); + + while (len > 0) { + if (buf == 0 || dx_buffer_capacity(buf) == 0) { + buf = dx_allocate_buffer(); + if (buf == 0) + return; + DEQ_INSERT_TAIL(field->buffers, buf); + } + + size_t to_copy = dx_buffer_capacity(buf); + if (to_copy > len) + to_copy = len; + memcpy(dx_buffer_cursor(buf), seq, to_copy); + dx_buffer_insert(buf, to_copy); + len -= to_copy; + seq += to_copy; + if (comp) + comp->length += to_copy; + } +} + + +static void dx_insert_8(dx_composed_field_t *field, uint8_t value) +{ + dx_insert(field, &value, 1); +} + + +static void dx_insert_32(dx_composed_field_t *field, uint32_t value) +{ + uint8_t buf[4]; + buf[0] = (uint8_t) ((value & 0xFF000000) >> 24); + buf[1] = (uint8_t) ((value & 0x00FF0000) >> 16); + buf[2] = (uint8_t) ((value & 0x0000FF00) >> 8); + buf[3] = (uint8_t) (value & 0x000000FF); + dx_insert(field, buf, 4); +} + + +static void dx_insert_64(dx_composed_field_t *field, uint64_t value) +{ + uint8_t buf[8]; + buf[0] = (uint8_t) ((value & 0xFF00000000000000L) >> 56); + buf[1] = (uint8_t) ((value & 0x00FF000000000000L) >> 48); + buf[2] = (uint8_t) ((value & 0x0000FF0000000000L) >> 40); + buf[3] = (uint8_t) ((value & 0x000000FF00000000L) >> 32); + buf[4] = (uint8_t) ((value & 0x00000000FF000000L) >> 24); + buf[5] = (uint8_t) ((value & 0x0000000000FF0000L) >> 16); + buf[6] = (uint8_t) ((value & 0x000000000000FF00L) >> 8); + buf[7] = (uint8_t) (value & 0x00000000000000FFL); + dx_insert(field, buf, 8); +} + + +static void dx_overwrite(dx_buffer_t **buf, size_t *cursor, uint8_t value) +{ + while (*buf) { + if (*cursor >= dx_buffer_size(*buf)) { + *buf = (*buf)->next; + *cursor = 0; + } else { + dx_buffer_base(*buf)[*cursor] = value; + (*cursor)++; + return; + } + } +} + + +static void dx_overwrite_32(dx_field_location_t *field, uint32_t value) +{ + dx_buffer_t *buf = field->buffer; + size_t cursor = field->offset; + + dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0xFF000000) >> 24)); + dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x00FF0000) >> 24)); + dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x0000FF00) >> 24)); + dx_overwrite(&buf, &cursor, (uint8_t) (value & 0x000000FF)); +} + + +static void dx_compose_start_composite(dx_composed_field_t *field, int isMap) +{ + if (isMap) + dx_insert_8(field, 0xd1); // map32 + else + dx_insert_8(field, 0xd0); // list32 + + // + // Push a composite descriptor on the field stack + // + dx_composite_t *comp = new_dx_composite_t(); + DEQ_ITEM_INIT(comp); + comp->isMap = isMap; + + // + // Mark the current location to later overwrite the length + // + comp->length_location.buffer = DEQ_TAIL(field->buffers); + comp->length_location.offset = dx_buffer_size(comp->length_location.buffer); + comp->length_location.length = 4; + comp->length_location.parsed = 1; + + dx_insert(field, (const uint8_t*) "\x00\x00\x00\x00", 4); + + // + // Mark the current location to later overwrite the count + // + comp->count_location.buffer = DEQ_TAIL(field->buffers); + comp->count_location.offset = dx_buffer_size(comp->count_location.buffer); + comp->count_location.length = 4; + comp->count_location.parsed = 1; + + dx_insert(field, (const uint8_t*) "\x00\x00\x00\x00", 4); + + comp->length = 4; // Include the length of the count field + comp->count = 0; + + DEQ_INSERT_HEAD(field->fieldStack, comp); +} + + +static void dx_compose_end_composite(dx_composed_field_t *field) +{ + dx_composite_t *comp = DEQ_HEAD(field->fieldStack); + assert(comp); + + dx_overwrite_32(&comp->length_location, comp->length); + dx_overwrite_32(&comp->count_location, comp->count); + + DEQ_REMOVE_HEAD(field->fieldStack); + free_dx_composite_t(comp); +} + + +dx_composed_field_t *dx_compose(uint8_t performative, dx_composed_field_t *extend) +{ + dx_composed_field_t *field = extend; + + if (field) { + assert(DEQ_SIZE(field->fieldStack) == 0); + } else { + field = new_dx_composed_field_t(); + if (!field) + return 0; + + DEQ_INIT(field->buffers); + DEQ_INIT(field->fieldStack); + } + + dx_insert(field, (const uint8_t*) "\x00\x53", 2); + dx_insert_8(field, performative); + + return field; +} + + +void dx_compose_free(dx_composed_field_t *field) +{ + dx_buffer_t *buf = DEQ_HEAD(field->buffers); + while (buf) { + DEQ_REMOVE_HEAD(field->buffers); + dx_free_buffer(buf); + } + + dx_composite_t *comp = DEQ_HEAD(field->fieldStack); + while (comp) { + DEQ_REMOVE_HEAD(field->fieldStack); + free_dx_composite_t(comp); + } + + free_dx_composed_field_t(field); +} + + +void dx_compose_start_list(dx_composed_field_t *field) +{ + dx_compose_start_composite(field, 0); +} + + +void dx_compose_end_list(dx_composed_field_t *field) +{ + dx_compose_end_composite(field); +} + + +void dx_compose_start_map(dx_composed_field_t *field) +{ + dx_compose_start_composite(field, 1); +} + + +void dx_compose_end_map(dx_composed_field_t *field) +{ + dx_compose_end_composite(field); +} + + +void dx_compose_insert_null(dx_composed_field_t *field) +{ + dx_insert_8(field, 0x40); + bump_count(field); +} + + +void dx_compose_insert_bool(dx_composed_field_t *field, int value) +{ + dx_insert_8(field, value ? 0x41 : 0x42); + bump_count(field); +} + + +void dx_compose_insert_uint(dx_composed_field_t *field, uint32_t value) +{ + if (value == 0) { + dx_insert_8(field, 0x43); // uint0 + } else if (value < 256) { + dx_insert_8(field, 0x52); // smalluint + dx_insert_8(field, (uint8_t) value); + } else { + dx_insert_8(field, 0x70); // uint + dx_insert_32(field, value); + } + bump_count(field); +} + + +void dx_compose_insert_ulong(dx_composed_field_t *field, uint64_t value) +{ + if (value == 0) { + dx_insert_8(field, 0x44); // ulong0 + } else if (value < 256) { + dx_insert_8(field, 0x53); // smallulong + dx_insert_8(field, (uint8_t) value); + } else { + dx_insert_8(field, 0x80); // ulong + dx_insert_64(field, value); + } + bump_count(field); +} + + +void dx_compose_insert_int(dx_composed_field_t *field, int32_t value) +{ + if (value >= -128 && value <= 127) { + dx_insert_8(field, 0x54); // smallint + dx_insert_8(field, (uint8_t) value); + } else { + dx_insert_8(field, 0x71); // int + dx_insert_32(field, (uint32_t) value); + } + bump_count(field); +} + + +void dx_compose_insert_long(dx_composed_field_t *field, int64_t value) +{ + if (value >= -128 && value <= 127) { + dx_insert_8(field, 0x55); // smalllong + dx_insert_8(field, (uint8_t) value); + } else { + dx_insert_8(field, 0x81); // long + dx_insert_64(field, (uint64_t) value); + } + bump_count(field); +} + + +void dx_compose_insert_timestamp(dx_composed_field_t *field, uint64_t value) +{ + dx_insert_8(field, 0x83); // timestamp + dx_insert_64(field, value); + bump_count(field); +} + + +void dx_compose_insert_uuid(dx_composed_field_t *field, const char *value) +{ + dx_insert_8(field, 0x98); // uuid + dx_insert(field, (const uint8_t*) value, 16); + bump_count(field); +} + + +void dx_compose_insert_binary(dx_composed_field_t *field, const uint8_t *value, uint32_t len) +{ + if (len < 256) { + dx_insert_8(field, 0xa0); // vbin8 + dx_insert_8(field, (uint8_t) len); + } else { + dx_insert_8(field, 0xb0); // vbin32 + dx_insert_32(field, len); + } + dx_insert(field, value, len); + bump_count(field); +} + + +void dx_compose_insert_binary_buffers(dx_composed_field_t *field, dx_buffer_list_t *buffers) +{ + dx_buffer_t *buf = DEQ_HEAD(*buffers); + uint32_t len = 0; + + // + // Calculate the size of the binary field to be appended. + // + while (buf) { + len += dx_buffer_size(buf); + buf = DEQ_NEXT(buf); + } + + // + // Supply the appropriate binary tag for the length. + // + if (len < 256) { + dx_insert_8(field, 0xa0); // vbin8 + dx_insert_8(field, (uint8_t) len); + } else { + dx_insert_8(field, 0xb0); // vbin32 + dx_insert_32(field, len); + } + + // + // Move the supplied buffers to the tail of the field's buffer list. + // + buf = DEQ_HEAD(*buffers); + while (buf) { + DEQ_REMOVE_HEAD(*buffers); + DEQ_INSERT_TAIL(field->buffers, buf); + buf = DEQ_HEAD(*buffers); + } +} + + +void dx_compose_insert_string(dx_composed_field_t *field, const char *value) +{ + uint32_t len = strlen(value); + + if (len < 256) { + dx_insert_8(field, 0xa1); // str8-utf8 + dx_insert_8(field, (uint8_t) len); + } else { + dx_insert_8(field, 0xb1); // str32-utf8 + dx_insert_32(field, len); + } + dx_insert(field, (const uint8_t*) value, len); + bump_count(field); +} + + +void dx_compose_insert_symbol(dx_composed_field_t *field, const char *value) +{ + uint32_t len = strlen(value); + + if (len < 256) { + dx_insert_8(field, 0xa3); // sym8 + dx_insert_8(field, (uint8_t) len); + } else { + dx_insert_8(field, 0xb3); // sym32 + dx_insert_32(field, len); + } + dx_insert(field, (const uint8_t*) value, len); + bump_count(field); +} + + +dx_buffer_list_t *dx_compose_buffers(dx_composed_field_t *field) +{ + return &field->buffers; +} + diff --git a/qpid/extras/dispatch/src/compose_private.h b/qpid/extras/dispatch/src/compose_private.h new file mode 100644 index 0000000000..58eec6097b --- /dev/null +++ b/qpid/extras/dispatch/src/compose_private.h @@ -0,0 +1,26 @@ +#ifndef __compose_private_h__ +#define __compose_private_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/compose.h> + +dx_buffer_list_t *dx_compose_buffers(dx_composed_field_t *field); + +#endif diff --git a/qpid/extras/dispatch/src/message.c b/qpid/extras/dispatch/src/message.c index 041205879e..495c13ed7c 100644 --- a/qpid/extras/dispatch/src/message.c +++ b/qpid/extras/dispatch/src/message.c @@ -20,6 +20,7 @@ #include <qpid/dispatch/ctools.h> #include <qpid/dispatch/threading.h> #include "message_private.h" +#include "compose_private.h" #include <string.h> #include <stdio.h> @@ -255,134 +256,6 @@ static int dx_check_and_advance(dx_buffer_t **buffer, } -static void dx_insert(dx_message_content_t *msg, const uint8_t *seq, size_t len) -{ - dx_buffer_t *buf = DEQ_TAIL(msg->buffers); - - while (len > 0) { - if (buf == 0 || dx_buffer_capacity(buf) == 0) { - buf = dx_allocate_buffer(); - if (buf == 0) - return; - DEQ_INSERT_TAIL(msg->buffers, buf); - } - - size_t to_copy = dx_buffer_capacity(buf); - if (to_copy > len) - to_copy = len; - memcpy(dx_buffer_cursor(buf), seq, to_copy); - dx_buffer_insert(buf, to_copy); - len -= to_copy; - seq += to_copy; - msg->length += to_copy; - } -} - - -static void dx_insert_8(dx_message_content_t *msg, uint8_t value) -{ - dx_insert(msg, &value, 1); -} - - -static void dx_insert_32(dx_message_content_t *msg, uint32_t value) -{ - uint8_t buf[4]; - buf[0] = (uint8_t) ((value & 0xFF000000) >> 24); - buf[1] = (uint8_t) ((value & 0x00FF0000) >> 16); - buf[2] = (uint8_t) ((value & 0x0000FF00) >> 8); - buf[3] = (uint8_t) (value & 0x000000FF); - dx_insert(msg, buf, 4); -} - - -static void dx_insert_64(dx_message_content_t *msg, uint64_t value) -{ - uint8_t buf[8]; - buf[0] = (uint8_t) ((value & 0xFF00000000000000L) >> 56); - buf[1] = (uint8_t) ((value & 0x00FF000000000000L) >> 48); - buf[2] = (uint8_t) ((value & 0x0000FF0000000000L) >> 40); - buf[3] = (uint8_t) ((value & 0x000000FF00000000L) >> 32); - buf[4] = (uint8_t) ((value & 0x00000000FF000000L) >> 24); - buf[5] = (uint8_t) ((value & 0x0000000000FF0000L) >> 16); - buf[6] = (uint8_t) ((value & 0x000000000000FF00L) >> 8); - buf[7] = (uint8_t) (value & 0x00000000000000FFL); - dx_insert(msg, buf, 8); -} - - -static void dx_overwrite(dx_buffer_t **buf, size_t *cursor, uint8_t value) -{ - while (*buf) { - if (*cursor >= dx_buffer_size(*buf)) { - *buf = (*buf)->next; - *cursor = 0; - } else { - dx_buffer_base(*buf)[*cursor] = value; - (*cursor)++; - return; - } - } -} - - -static void dx_overwrite_32(dx_field_location_t *field, uint32_t value) -{ - dx_buffer_t *buf = field->buffer; - size_t cursor = field->offset; - - dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0xFF000000) >> 24)); - dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x00FF0000) >> 24)); - dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x0000FF00) >> 24)); - dx_overwrite(&buf, &cursor, (uint8_t) (value & 0x000000FF)); -} - - -static void dx_start_list_performative(dx_message_content_t *msg, uint8_t code) -{ - // - // Insert the short-form performative tag - // - dx_insert(msg, (const uint8_t*) "\x00\x53", 2); - dx_insert_8(msg, code); - - // - // Open the list with a list32 tag - // - dx_insert_8(msg, 0xd0); - - // - // Mark the current location to later overwrite the length - // - msg->compose_length.buffer = DEQ_TAIL(msg->buffers); - msg->compose_length.offset = dx_buffer_size(msg->compose_length.buffer); - msg->compose_length.length = 4; - msg->compose_length.parsed = 1; - - dx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4); - - // - // Mark the current location to later overwrite the count - // - msg->compose_count.buffer = DEQ_TAIL(msg->buffers); - msg->compose_count.offset = dx_buffer_size(msg->compose_count.buffer); - msg->compose_count.length = 4; - msg->compose_count.parsed = 1; - - dx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4); - - msg->length = 4; // Include the length of the count field - msg->count = 0; -} - - -static void dx_end_list(dx_message_content_t *msg) -{ - dx_overwrite_32(&msg->compose_length, msg->length); - dx_overwrite_32(&msg->compose_count, msg->count); -} - - static dx_field_location_t *dx_message_field_location(dx_message_t *msg, dx_message_field_t field) { dx_message_content_t *content = MSG_CONTENT(msg); @@ -825,302 +698,43 @@ ssize_t dx_message_field_copy(dx_message_t *msg, dx_message_field_t field, void void dx_message_compose_1(dx_message_t *msg, const char *to, dx_buffer_list_t *buffers) { - dx_message_begin_header(msg); - dx_message_insert_boolean(msg, 0); // durable - //dx_message_insert_null(msg); // priority - //dx_message_insert_null(msg); // ttl - //dx_message_insert_boolean(msg, 0); // first-acquirer - //dx_message_insert_uint(msg, 0); // delivery-count - dx_message_end_header(msg); - - dx_message_begin_message_properties(msg); - dx_message_insert_null(msg); // message-id - dx_message_insert_null(msg); // user-id - dx_message_insert_string(msg, to); // to - //dx_message_insert_null(msg); // subject - //dx_message_insert_null(msg); // reply-to - //dx_message_insert_null(msg); // correlation-id - //dx_message_insert_null(msg); // content-type - //dx_message_insert_null(msg); // content-encoding - //dx_message_insert_timestamp(msg, 0); // absolute-expiry-time - //dx_message_insert_timestamp(msg, 0); // creation-time - //dx_message_insert_null(msg); // group-id - //dx_message_insert_uint(msg, 0); // group-sequence - //dx_message_insert_null(msg); // reply-to-group-id - dx_message_end_message_properties(msg); - - if (buffers) - dx_message_append_body_data(msg, buffers); -} - - -void dx_message_begin_header(dx_message_t *msg) -{ - dx_start_list_performative(MSG_CONTENT(msg), 0x70); -} - - -void dx_message_end_header(dx_message_t *msg) -{ - dx_end_list(MSG_CONTENT(msg)); -} - - -void dx_message_begin_delivery_annotations(dx_message_t *msg) -{ - assert(0); // Not Implemented -} - - -void dx_message_end_delivery_annotations(dx_message_t *msg) -{ - assert(0); // Not Implemented -} - - -void dx_message_begin_message_annotations(dx_message_t *msg) -{ - assert(0); // Not Implemented -} - - -void dx_message_end_message_annotations(dx_message_t *msg) -{ - assert(0); // Not Implemented -} - - -void dx_message_begin_message_properties(dx_message_t *msg) -{ - dx_start_list_performative(MSG_CONTENT(msg), 0x73); -} - - -void dx_message_end_message_properties(dx_message_t *msg) -{ - dx_end_list(MSG_CONTENT(msg)); -} - - -void dx_message_begin_application_properties(dx_message_t *msg) -{ - assert(0); // Not Implemented -} - - -void dx_message_end_application_properties(dx_message_t *msg) -{ - assert(0); // Not Implemented -} - - -void dx_message_append_body_data(dx_message_t *msg, dx_buffer_list_t *buffers) -{ + dx_composed_field_t *field = dx_compose(DX_PERFORMATIVE_HEADER, 0); dx_message_content_t *content = MSG_CONTENT(msg); - dx_buffer_t *buf = DEQ_HEAD(*buffers); - uint32_t len = 0; - // - // Calculate the size of the body to be appended. - // - while (buf) { - len += dx_buffer_size(buf); - buf = DEQ_NEXT(buf); - } - - // - // Insert a DATA section performative header. - // - dx_insert(content, (const uint8_t*) "\x00\x53\x75", 3); - if (len < 256) { - dx_insert_8(content, 0xa0); // vbin8 - dx_insert_8(content, (uint8_t) len); - } else { - dx_insert_8(content, 0xb0); // vbin32 - dx_insert_32(content, len); + dx_compose_start_list(field); + dx_compose_insert_bool(field, 0); // durable + //dx_compose_insert_null(field); // priority + //dx_compose_insert_null(field); // ttl + //dx_compose_insert_boolean(field, 0); // first-acquirer + //dx_compose_insert_uint(field, 0); // delivery-count + dx_compose_end_list(field); + + field = dx_compose(DX_PERFORMATIVE_PROPERTIES, field); + dx_compose_start_list(field); + dx_compose_insert_null(field); // compose-id + dx_compose_insert_null(field); // user-id + dx_compose_insert_string(field, to); // to + //dx_compose_insert_null(field); // subject + //dx_compose_insert_null(field); // reply-to + //dx_compose_insert_null(field); // correlation-id + //dx_compose_insert_null(field); // content-type + //dx_compose_insert_null(field); // content-encoding + //dx_compose_insert_timestamp(field, 0); // absolute-expiry-time + //dx_compose_insert_timestamp(field, 0); // creation-time + //dx_compose_insert_null(field); // group-id + //dx_compose_insert_uint(field, 0); // group-sequence + //dx_compose_insert_null(field); // reply-to-group-id + dx_compose_end_list(field); + + if (buffers) { + field = dx_compose(DX_PERFORMATIVE_BODY_DATA, field); + dx_compose_insert_binary_buffers(field, buffers); } - // - // Move the supplied buffers to the tail of the message's buffer list. - // - buf = DEQ_HEAD(*buffers); - while (buf) { - DEQ_REMOVE_HEAD(*buffers); - DEQ_INSERT_TAIL(content->buffers, buf); - buf = DEQ_HEAD(*buffers); - } -} - - -void dx_message_begin_body_sequence(dx_message_t *msg) -{ -} - - -void dx_message_end_body_sequence(dx_message_t *msg) -{ -} - - -void dx_message_begin_footer(dx_message_t *msg) -{ - assert(0); // Not Implemented -} - - -void dx_message_end_footer(dx_message_t *msg) -{ - assert(0); // Not Implemented -} - - -void dx_message_insert_null(dx_message_t *msg) -{ - dx_message_content_t *content = MSG_CONTENT(msg); - dx_insert_8(content, 0x40); - content->count++; -} - - -void dx_message_insert_boolean(dx_message_t *msg, int value) -{ - dx_message_content_t *content = MSG_CONTENT(msg); - if (value) - dx_insert(content, (const uint8_t*) "\x56\x01", 2); - else - dx_insert(content, (const uint8_t*) "\x56\x00", 2); - content->count++; -} - - -void dx_message_insert_ubyte(dx_message_t *msg, uint8_t value) -{ - dx_message_content_t *content = MSG_CONTENT(msg); - dx_insert_8(content, 0x50); - dx_insert_8(content, value); - content->count++; -} - - -void dx_message_insert_uint(dx_message_t *msg, uint32_t value) -{ - dx_message_content_t *content = MSG_CONTENT(msg); - if (value == 0) { - dx_insert_8(content, 0x43); // uint0 - } else if (value < 256) { - dx_insert_8(content, 0x52); // smalluint - dx_insert_8(content, (uint8_t) value); - } else { - dx_insert_8(content, 0x70); // uint - dx_insert_32(content, value); - } - content->count++; -} + dx_buffer_list_t *field_buffers = dx_compose_buffers(field); + content->buffers = *field_buffers; + DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers. - -void dx_message_insert_ulong(dx_message_t *msg, uint64_t value) -{ - dx_message_content_t *content = MSG_CONTENT(msg); - if (value == 0) { - dx_insert_8(content, 0x44); // ulong0 - } else if (value < 256) { - dx_insert_8(content, 0x53); // smallulong - dx_insert_8(content, (uint8_t) value); - } else { - dx_insert_8(content, 0x80); // ulong - dx_insert_64(content, value); - } - content->count++; -} - - -void dx_message_insert_binary(dx_message_t *msg, const uint8_t *start, size_t len) -{ - dx_message_content_t *content = MSG_CONTENT(msg); - if (len < 256) { - dx_insert_8(content, 0xa0); // vbin8 - dx_insert_8(content, (uint8_t) len); - } else { - dx_insert_8(content, 0xb0); // vbin32 - dx_insert_32(content, len); - } - dx_insert(content, start, len); - content->count++; -} - - -void dx_message_insert_string(dx_message_t *msg, const char *str) -{ - dx_message_content_t *content = MSG_CONTENT(msg); - uint32_t len = strlen(str); - - if (len < 256) { - dx_insert_8(content, 0xa1); // str8-utf8 - dx_insert_8(content, (uint8_t) len); - dx_insert(content, (const uint8_t*) str, len); - } else { - dx_insert_8(content, 0xb1); // str32-utf8 - dx_insert_32(content, len); - dx_insert(content, (const uint8_t*) str, len); - } - content->count++; -} - - -void dx_message_insert_uuid(dx_message_t *msg, const uint8_t *value) -{ - dx_message_content_t *content = MSG_CONTENT(msg); - dx_insert_8(content, 0x98); // uuid - dx_insert(content, value, 16); - content->count++; -} - - -void dx_message_insert_symbol(dx_message_t *msg, const char *start, size_t len) -{ - dx_message_content_t *content = MSG_CONTENT(msg); - if (len < 256) { - dx_insert_8(content, 0xa3); // sym8 - dx_insert_8(content, (uint8_t) len); - dx_insert(content, (const uint8_t*) start, len); - } else { - dx_insert_8(content, 0xb3); // sym32 - dx_insert_32(content, len); - dx_insert(content, (const uint8_t*) start, len); - } - content->count++; -} - - -void dx_message_insert_timestamp(dx_message_t *msg, uint64_t value) -{ - dx_message_content_t *content = MSG_CONTENT(msg); - dx_insert_8(content, 0x83); // timestamp - dx_insert_64(content, value); - content->count++; -} - - -void dx_message_begin_list(dx_message_t* msg) -{ - assert(0); // Not Implemented -} - - -void dx_message_end_list(dx_message_t* msg) -{ - assert(0); // Not Implemented -} - - -void dx_message_begin_map(dx_message_t* msg) -{ - assert(0); // Not Implemented -} - - -void dx_message_end_map(dx_message_t* msg) -{ - assert(0); // Not Implemented + dx_compose_free(field); } diff --git a/qpid/extras/dispatch/src/message_private.h b/qpid/extras/dispatch/src/message_private.h index 891ceccee3..4280a18e02 100644 --- a/qpid/extras/dispatch/src/message_private.h +++ b/qpid/extras/dispatch/src/message_private.h @@ -74,10 +74,6 @@ typedef struct { dx_field_location_t field_user_id; // The string value of the user-id dx_field_location_t field_to; // The string value of the to field dx_field_location_t body; // The body of the message - dx_field_location_t compose_length; - dx_field_location_t compose_count; - uint32_t length; - uint32_t count; dx_buffer_t *parse_buffer; unsigned char *parse_cursor; dx_message_depth_t parse_depth; |
