diff options
| author | Ted Ross <tross@apache.org> | 2013-05-09 19:06:32 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-05-09 19:06:32 +0000 |
| commit | 9b2bcff0a85ab36c547da2001616a2b0dd13146a (patch) | |
| tree | 67ac9f890598a340d751f974eda8000decc942c6 | |
| parent | 8fc7c8e492a0b8e594dcf08c8649976b7a6d4753 (diff) | |
| download | qpid-python-9b2bcff0a85ab36c547da2001616a2b0dd13146a.tar.gz | |
NO-JIRA - Dispatch work-in-progress.
- Added Map validation and access for in-buffer fields
Note that 'iterator' and 'message' require some refactoring and cleanup
- Added Agent handler for 'get' requests.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1480739 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/extras/dispatch/include/qpid/dispatch/iterator.h | 10 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/agent.c | 66 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/iterator.c | 286 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/message.c | 55 |
4 files changed, 387 insertions, 30 deletions
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/iterator.h b/qpid/extras/dispatch/include/qpid/dispatch/iterator.h index f47bbf5bad..8241fc3de9 100644 --- a/qpid/extras/dispatch/include/qpid/dispatch/iterator.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/iterator.h @@ -148,4 +148,14 @@ int dx_field_iterator_prefix(dx_field_iterator_t *iter, const char *prefix); */ unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter); + + +typedef struct dx_field_map_t dx_field_map_t; + +dx_field_map_t *dx_field_map(dx_field_iterator_t *iter, int string_keys_only); +void dx_field_map_free(dx_field_map_t *map); +dx_field_iterator_t *dx_field_map_by_key(dx_field_map_t *map, const char *key); + +dx_field_iterator_t *dx_field_string(dx_field_iterator_t *iter); + #endif diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c index 08425cee24..ad226351b6 100644 --- a/qpid/extras/dispatch/src/agent.c +++ b/qpid/extras/dispatch/src/agent.c @@ -28,6 +28,7 @@ #include <qpid/dispatch/threading.h> #include <qpid/dispatch/timer.h> #include <qpid/dispatch/router.h> +#include <qpid/dispatch/log.h> #include <string.h> #include <stdio.h> @@ -58,12 +59,70 @@ typedef struct { ALLOC_DECLARE(dx_agent_request_t); ALLOC_DEFINE(dx_agent_request_t); +static char *log_module = "AGENT"; -static void dx_agent_process_request(dx_message_t *msg) + +static void dx_agent_process_get(dx_agent_t *agent, dx_field_map_t *map) { + dx_field_iterator_t *cls = dx_field_map_by_key(map, "class"); + if (cls == 0) + return; + + dx_field_iterator_t *cls_string = dx_field_string(cls); + const dx_agent_class_t *cls_record; + hash_retrieve_const(agent->class_hash, cls_string, (const void**) &cls_record); + + if (cls_record == 0) + return; + + dx_log(log_module, LOG_TRACE, "Received GET request for class: %s", cls_record->fqname); +} + + +static void dx_agent_process_request(dx_agent_t *agent, dx_message_t *msg) +{ + // + // Parse the message through the body and exit if the message is not well formed. + // if (!dx_message_check(msg, DX_DEPTH_BODY)) return; - printf("Processing Agent Request\n"); + + // + // Get an iterator for the message body. Exit if the message has no body. + // + dx_field_iterator_t *body = dx_message_field_iterator(msg, DX_FIELD_BODY); + if (body == 0) + return; + + // + // Try to get a map-view of the body. Exit if the body is not a map-value. + // + dx_field_map_t *map = dx_field_map(body, 1); + if (map == 0) { + dx_field_iterator_free(body); + return; + } + + // + // Get an iterator for the "opcode" field in the map. Exit if the key is not found. + // + dx_field_iterator_t *opcode = dx_field_map_by_key(map, "opcode"); + if (opcode == 0) { + dx_field_map_free(map); + dx_field_iterator_free(body); + return; + } + + // + // Dispatch the opcode to the appropriate handler + // + dx_field_iterator_t *opcode_string = dx_field_string(opcode); + if (dx_field_iterator_equal(opcode_string, (unsigned char*) "get")) + dx_agent_process_get(agent, map); + + dx_field_iterator_free(opcode_string); + dx_field_map_free(map); + dx_field_iterator_free(body); } @@ -80,7 +139,7 @@ static void dx_agent_timer_handler(void *context) sys_mutex_unlock(agent->lock); if (msg) { - dx_agent_process_request(msg); + dx_agent_process_request(agent, msg); dx_free_message(msg); } } while (msg); @@ -147,6 +206,7 @@ dx_agent_class_t *dx_agent_register_class(dx_dispatch_t *dx, if (result < 0) assert(false); + dx_log(log_module, LOG_TRACE, "%s class registered: %s", query_handler ? "Object" : "Event", fqname); return cls; } diff --git a/qpid/extras/dispatch/src/iterator.c b/qpid/extras/dispatch/src/iterator.c index 92a7a1f479..44cc152bd9 100644 --- a/qpid/extras/dispatch/src/iterator.c +++ b/qpid/extras/dispatch/src/iterator.c @@ -20,10 +20,13 @@ #include <qpid/dispatch/iterator.h> #include <qpid/dispatch/ctools.h> #include <qpid/dispatch/alloc.h> +#include <qpid/dispatch/log.h> #include "message_private.h" #include <stdio.h> #include <string.h> +static const char *log_module = "FIELD"; + typedef enum { MODE_TO_END, MODE_TO_SLASH @@ -44,13 +47,35 @@ struct dx_field_iterator_t { unsigned char prefix; int at_prefix; int view_prefix; + unsigned char tag; }; - ALLOC_DECLARE(dx_field_iterator_t); ALLOC_DEFINE(dx_field_iterator_t); +typedef struct dx_field_pair_t { + DEQ_LINKS(struct dx_field_pair_t); + dx_field_iterator_t *key_iter; + dx_field_iterator_t *value_iter; +} dx_field_pair_t; + +DEQ_DECLARE(dx_field_pair_t, dx_field_pair_list_t); + +ALLOC_DECLARE(dx_field_pair_t); +ALLOC_DEFINE(dx_field_pair_t); + + +struct dx_field_map_t { + dx_field_iterator_t *outer; + int key_count; + dx_field_pair_list_t pairs; +}; + +ALLOC_DECLARE(dx_field_map_t); +ALLOC_DEFINE(dx_field_map_t); + + typedef enum { STATE_START, STATE_SLASH_LEFT, @@ -238,6 +263,7 @@ dx_field_iterator_t* dx_field_iterator_string(const char *text, dx_iterator_view if (!iter) return 0; + iter->tag = 0; iter->start_pointer.buffer = 0; iter->start_pointer.cursor = (unsigned char*) text; iter->start_pointer.length = strlen(text); @@ -254,6 +280,7 @@ dx_field_iterator_t *dx_field_iterator_buffer(dx_buffer_t *buffer, int offset, i if (!iter) return 0; + iter->tag = 0; iter->start_pointer.buffer = buffer; iter->start_pointer.cursor = dx_buffer_base(buffer) + offset; iter->start_pointer.length = length; @@ -360,6 +387,81 @@ int dx_field_iterator_prefix(dx_field_iterator_t *iter, const char *prefix) } +static dx_field_iterator_t *dx_field_parse_amqp_value(dx_field_iterator_t *iter, unsigned int *available) +{ + if (*available < 1) + return 0; + + unsigned int start = *available; + dx_field_iterator_t *value = new_dx_field_iterator_t(); + value->start_pointer = iter->pointer; + value->view = ITER_VIEW_ALL; + value->mode = MODE_TO_END; + value->at_prefix = 0; + value->view_prefix = 0; + + unsigned char tag = dx_field_iterator_octet(iter); + unsigned int length; + unsigned int length_size = 0; + + (*available)--; + + switch (tag & 0xF0) { + case 0x40: length = 0; break; + case 0x50: length = 1; break; + case 0x60: length = 2; break; + case 0x70: length = 4; break; + case 0x80: length = 8; break; + case 0x90: length = 16; break; + case 0xA0: + case 0xC0: + case 0xE0: length_size = 1; break; + case 0xB0: + case 0xD0: + case 0xF0: length_size = 4; break; + default: + free_dx_field_iterator_t(value); + return 0; + } + + if (*available < length_size) { + free_dx_field_iterator_t(value); + return 0; + } + + if (length_size == 1) { + length = (unsigned int) dx_field_iterator_octet(iter); + } else if (length_size == 4) { + length = ((unsigned int) dx_field_iterator_octet(iter)) << 24; + length += ((unsigned int) dx_field_iterator_octet(iter)) << 16; + length += ((unsigned int) dx_field_iterator_octet(iter)) << 8; + length += (unsigned int) dx_field_iterator_octet(iter); + } + + if (*available < length) { + free_dx_field_iterator_t(value); + return 0; + } + + for (unsigned int idx = 0; idx < length; idx++) + (void) dx_field_iterator_octet(iter); + (*available) -= (length + length_size); + + value->start_pointer.length = start - *available; + value->view_start_pointer = value->start_pointer; + value->pointer = value->start_pointer; + value->tag = tag; + + return value; +} + + +static int dx_tag_is_string(unsigned char tag) +{ + return (tag == 0xa1 || tag == 0xb1); +} + + unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter) { int length = 0; @@ -381,3 +483,185 @@ unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter) return copy; } + +dx_field_map_t *dx_field_map(dx_field_iterator_t *iter, int string_keys_only) +{ + dx_field_iterator_reset(iter); + unsigned char tag = dx_field_iterator_octet(iter); + + // + // If this field is not a map, return 0; + // + if (tag != 0xc1 && tag != 0xd1) { + dx_log(log_module, LOG_TRACE, "dx_field_map - Invalid Map, Unexpected tag: %02x", tag); + return 0; + } + + // + // Validate the map. Ensure the following: + // - There are an even number of fields in the compound structure + // - There are anough octets in the field to account for all of the contents + // - The field count matches the number of fields present + // - The keys are strings (if string_keys_only) + // + unsigned int length; + unsigned int count; + + if (tag == 0xc1) { + length = (unsigned int) dx_field_iterator_octet(iter); + count = (unsigned int) dx_field_iterator_octet(iter); + length -= 1; // Account for the 'count' octet + } else { + length = ((unsigned int) dx_field_iterator_octet(iter)) << 24; + length += ((unsigned int) dx_field_iterator_octet(iter)) << 16; + length += ((unsigned int) dx_field_iterator_octet(iter)) << 8; + length += (unsigned int) dx_field_iterator_octet(iter); + + count = ((unsigned int) dx_field_iterator_octet(iter)) << 24; + count += ((unsigned int) dx_field_iterator_octet(iter)) << 16; + count += ((unsigned int) dx_field_iterator_octet(iter)) << 8; + count += (unsigned int) dx_field_iterator_octet(iter); + + length -= 4; // Account for the 'count' octets + } + + // + // The map is not valid if count is not an even number. + // + if (count & 1) { + dx_log(log_module, LOG_TRACE, "dx_field_map - Invalid Map, odd number of fields: %d", count); + return 0; + } + + dx_field_map_t *map = new_dx_field_map_t(); + if (!map) + return 0; + + map->outer = iter; + map->key_count = count >> 1; + DEQ_INIT(map->pairs); + + unsigned int idx; + for (idx = 0; idx < map->key_count; idx++) { + dx_field_iterator_t *key = dx_field_parse_amqp_value(iter, &length); + dx_field_iterator_t *value = dx_field_parse_amqp_value(iter, &length); + + if (key == 0 || value == 0) { + dx_field_map_free(map); + return 0; + } + + if (string_keys_only && !dx_tag_is_string(key->tag)) { + dx_log(log_module, LOG_TRACE, "dx_field_map - Invalid Map, key tag is not a string: %02x", key->tag); + dx_field_map_free(map); + return 0; + } + + dx_field_pair_t *pair = new_dx_field_pair_t(); + if (!pair) { + dx_field_map_free(map); + return 0; + } + + DEQ_ITEM_INIT(pair); + pair->key_iter = key; + pair->value_iter = value; + DEQ_INSERT_TAIL(map->pairs, pair); + } + + return map; +} + + +void dx_field_map_free(dx_field_map_t *map) +{ + if (!map) + return; + + dx_field_pair_t *pair = DEQ_HEAD(map->pairs); + while (pair) { + DEQ_REMOVE_HEAD(map->pairs); + free_dx_field_iterator_t(pair->key_iter); + free_dx_field_iterator_t(pair->value_iter); + free_dx_field_pair_t(pair); + pair = DEQ_HEAD(map->pairs); + } + + free_dx_field_map_t(map); +} + + +dx_field_iterator_t *dx_field_map_by_key(dx_field_map_t *map, const char *key) +{ + dx_field_iterator_t *key_string; + dx_field_iterator_t *value = 0; + dx_field_pair_t *pair = DEQ_HEAD(map->pairs); + + while (pair && !value) { + key_string = dx_field_string(pair->key_iter); + if (dx_field_iterator_equal(key_string, (const unsigned char*) key)) + value = pair->value_iter; + free_dx_field_iterator_t(key_string); + pair = DEQ_NEXT(pair); + } + + return value; +} + + +static unsigned int dx_field_get_length(dx_field_iterator_t *iter, unsigned char tag) { + unsigned long length = 0; + + switch (tag & 0xF0) { + case 0x40: return 0; + case 0x50: return 1; + case 0x60: return 2; + case 0x70: return 4; + case 0x80: return 8; + case 0x90: return 16; + case 0xB0: + case 0xD0: + case 0xF0: + length += ((unsigned int) dx_field_iterator_octet(iter)) << 24; + length += ((unsigned int) dx_field_iterator_octet(iter)) << 16; + length += ((unsigned int) dx_field_iterator_octet(iter)) << 8; + // fall through to the next case + + case 0xA0: + case 0xC0: + case 0xE0: + length += (unsigned int) dx_field_iterator_octet(iter); + break; + + default: + return 0; + } + + return length; +} + + +dx_field_iterator_t *dx_field_string(dx_field_iterator_t *iter) +{ + dx_field_iterator_reset(iter); + unsigned char tag = dx_field_iterator_octet(iter); + if (!dx_tag_is_string(tag)) + return 0; + unsigned int length = dx_field_get_length(iter, tag); + + dx_field_iterator_t *result = new_dx_field_iterator_t(); + if (!result) + return 0; + result->start_pointer = iter->pointer; + result->start_pointer.length = length; + result->view_start_pointer = result->start_pointer; + result->pointer = result->start_pointer; + result->view = ITER_VIEW_ALL; + result->mode = MODE_TO_END; + result->at_prefix = 0; + result->view_prefix = 0; + result->tag = 0; + + return result; +} + diff --git a/qpid/extras/dispatch/src/message.c b/qpid/extras/dispatch/src/message.c index cf500fdadf..ea5092b13e 100644 --- a/qpid/extras/dispatch/src/message.c +++ b/qpid/extras/dispatch/src/message.c @@ -216,7 +216,8 @@ static int dx_check_and_advance(dx_buffer_t **buffer, // // Advance the pointers to consume the whole section. // - int consume = 0; + int pre_consume = 1; // Count the already extracted tag + int consume = 0; unsigned char tag = next_octet(&test_cursor, &test_buffer); if (!test_cursor) return 0; switch (tag) { @@ -226,6 +227,7 @@ static int dx_check_and_advance(dx_buffer_t **buffer, case 0xd0 : // list32 case 0xd1 : // map32 case 0xb0 : // vbin32 + pre_consume += 3; consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 24; if (!test_cursor) return 0; consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 16; @@ -237,11 +239,13 @@ static int dx_check_and_advance(dx_buffer_t **buffer, case 0xc0 : // list8 case 0xc1 : // map8 case 0xa0 : // vbin8 + pre_consume += 1; consume |= (int) next_octet(&test_cursor, &test_buffer); if (!test_cursor) return 0; break; } + location->length = pre_consume + consume; if (consume) advance(&test_cursor, &test_buffer, consume); @@ -411,20 +415,8 @@ static dx_field_location_t *dx_message_field_location(dx_message_t *msg, dx_mess break; case DX_FIELD_BODY: - while (1) { - if (content->body.parsed) - return &content->body; - - if (content->section_body.parsed == 0) - break; - - dx_buffer_t *buffer = content->section_body.buffer; - unsigned char *cursor = dx_buffer_base(buffer) + content->section_body.offset; - int result; - - result = traverse_field(&cursor, &buffer, &content->body); - if (!result) return 0; - } + if (content->section_body.parsed) + return &content->section_body; break; default: @@ -637,7 +629,8 @@ static int dx_check_field_LH(dx_message_content_t *content, const unsigned char *long_pattern, const unsigned char *short_pattern, const unsigned char *expected_tags, - dx_field_location_t *location) + dx_field_location_t *location, + int more) { #define LONG 10 #define SHORT 3 @@ -646,7 +639,8 @@ static int dx_check_field_LH(dx_message_content_t *content, return 0; if (0 == dx_check_and_advance(&content->parse_buffer, &content->parse_cursor, short_pattern, SHORT, expected_tags, location)) return 0; - content->parse_depth = depth; + if (!more) + content->parse_depth = depth; } return 1; } @@ -668,11 +662,14 @@ static int dx_message_check_LH(dx_message_content_t *content, dx_message_depth_t static const unsigned char * const BODY_DATA_SHORT = (unsigned char*) "\x00\x53\x75"; static const unsigned char * const BODY_SEQUENCE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76"; static const unsigned char * const BODY_SEQUENCE_SHORT = (unsigned char*) "\x00\x53\x76"; + static const unsigned char * const BODY_VALUE_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x77"; + static const unsigned char * const BODY_VALUE_SHORT = (unsigned char*) "\x00\x53\x77"; static const unsigned char * const FOOTER_LONG = (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78"; static const unsigned char * const FOOTER_SHORT = (unsigned char*) "\x00\x53\x78"; static const unsigned char * const TAGS_LIST = (unsigned char*) "\x45\xc0\xd0"; static const unsigned char * const TAGS_MAP = (unsigned char*) "\xc1\xd1"; static const unsigned char * const TAGS_BINARY = (unsigned char*) "\xa0\xb0"; + static const unsigned char * const TAGS_ANY = (unsigned char*) "\x45\xc0\xd0\xc1\xd1\xa0\xb0"; dx_buffer_t *buffer = DEQ_HEAD(content->buffers); @@ -694,7 +691,7 @@ static int dx_message_check_LH(dx_message_content_t *content, dx_message_depth_t // MESSAGE HEADER // if (0 == dx_check_field_LH(content, DX_DEPTH_HEADER, - MSG_HDR_LONG, MSG_HDR_SHORT, TAGS_LIST, &content->section_message_header)) + MSG_HDR_LONG, MSG_HDR_SHORT, TAGS_LIST, &content->section_message_header, 0)) return 0; if (depth == DX_DEPTH_HEADER) return 1; @@ -703,7 +700,7 @@ static int dx_message_check_LH(dx_message_content_t *content, dx_message_depth_t // DELIVERY ANNOTATION // if (0 == dx_check_field_LH(content, DX_DEPTH_DELIVERY_ANNOTATIONS, - DELIVERY_ANNOTATION_LONG, DELIVERY_ANNOTATION_SHORT, TAGS_MAP, &content->section_delivery_annotation)) + DELIVERY_ANNOTATION_LONG, DELIVERY_ANNOTATION_SHORT, TAGS_MAP, &content->section_delivery_annotation, 0)) return 0; if (depth == DX_DEPTH_DELIVERY_ANNOTATIONS) return 1; @@ -712,7 +709,7 @@ static int dx_message_check_LH(dx_message_content_t *content, dx_message_depth_t // MESSAGE ANNOTATION // if (0 == dx_check_field_LH(content, DX_DEPTH_MESSAGE_ANNOTATIONS, - MESSAGE_ANNOTATION_LONG, MESSAGE_ANNOTATION_SHORT, TAGS_MAP, &content->section_message_annotation)) + MESSAGE_ANNOTATION_LONG, MESSAGE_ANNOTATION_SHORT, TAGS_MAP, &content->section_message_annotation, 0)) return 0; if (depth == DX_DEPTH_MESSAGE_ANNOTATIONS) return 1; @@ -721,7 +718,7 @@ static int dx_message_check_LH(dx_message_content_t *content, dx_message_depth_t // PROPERTIES // if (0 == dx_check_field_LH(content, DX_DEPTH_PROPERTIES, - PROPERTIES_LONG, PROPERTIES_SHORT, TAGS_LIST, &content->section_message_properties)) + PROPERTIES_LONG, PROPERTIES_SHORT, TAGS_LIST, &content->section_message_properties, 0)) return 0; if (depth == DX_DEPTH_PROPERTIES) return 1; @@ -730,19 +727,25 @@ static int dx_message_check_LH(dx_message_content_t *content, dx_message_depth_t // APPLICATION PROPERTIES // if (0 == dx_check_field_LH(content, DX_DEPTH_APPLICATION_PROPERTIES, - APPLICATION_PROPERTIES_LONG, APPLICATION_PROPERTIES_SHORT, TAGS_MAP, &content->section_application_properties)) + APPLICATION_PROPERTIES_LONG, APPLICATION_PROPERTIES_SHORT, TAGS_MAP, &content->section_application_properties, 0)) return 0; if (depth == DX_DEPTH_APPLICATION_PROPERTIES) return 1; // - // BODY (Note that this function expects a single data section or a single AMQP sequence) + // BODY + // Note that this function expects a limited set of types in a VALUE section. This is + // not a problem for messages passing through Dispatch because through-only messages won't + // be parsed to BODY-depth. // if (0 == dx_check_field_LH(content, DX_DEPTH_BODY, - BODY_DATA_LONG, BODY_DATA_SHORT, TAGS_BINARY, &content->section_body)) + BODY_DATA_LONG, BODY_DATA_SHORT, TAGS_BINARY, &content->section_body, 1)) + return 0; + if (0 == dx_check_field_LH(content, DX_DEPTH_BODY, + BODY_SEQUENCE_LONG, BODY_SEQUENCE_SHORT, TAGS_LIST, &content->section_body, 1)) return 0; if (0 == dx_check_field_LH(content, DX_DEPTH_BODY, - BODY_SEQUENCE_LONG, BODY_SEQUENCE_SHORT, TAGS_LIST, &content->section_body)) + BODY_VALUE_LONG, BODY_VALUE_SHORT, TAGS_ANY, &content->section_body, 0)) return 0; if (depth == DX_DEPTH_BODY) return 1; @@ -751,7 +754,7 @@ static int dx_message_check_LH(dx_message_content_t *content, dx_message_depth_t // FOOTER // if (0 == dx_check_field_LH(content, DX_DEPTH_ALL, - FOOTER_LONG, FOOTER_SHORT, TAGS_MAP, &content->section_footer)) + FOOTER_LONG, FOOTER_SHORT, TAGS_MAP, &content->section_footer, 0)) return 0; return 1; |
