diff options
| author | Ted Ross <tross@apache.org> | 2013-06-26 20:12:07 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-06-26 20:12:07 +0000 |
| commit | 27e19a50dda487c1ba3e69e539e57ae499b29b9c (patch) | |
| tree | 8e1c609c012370055b5ff06ad08af4fa5a00b8fa /qpid/extras/dispatch/src | |
| parent | 2a9141ac23a2c8459a0ac684f6522898d9abb6e7 (diff) | |
| download | qpid-python-27e19a50dda487c1ba3e69e539e57ae499b29b9c.tar.gz | |
NO-JIRA - Management agent in the container now responds to "get" requests.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1497070 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src')
| -rw-r--r-- | qpid/extras/dispatch/src/agent.c | 116 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/compose.c | 42 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/message.c | 42 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/message_private.h | 1 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/router_node.c | 22 |
5 files changed, 207 insertions, 16 deletions
diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c index 2486d156c5..3739842e36 100644 --- a/qpid/extras/dispatch/src/agent.c +++ b/qpid/extras/dispatch/src/agent.c @@ -29,11 +29,12 @@ #include <qpid/dispatch/timer.h> #include <qpid/dispatch/router.h> #include <qpid/dispatch/log.h> +#include <qpid/dispatch/compose.h> #include <string.h> #include <stdio.h> struct dx_agent_t { - dx_server_t *server; + dx_dispatch_t *dx; hash_t *class_hash; dx_message_list_t in_fifo; dx_message_list_t out_fifo; @@ -52,15 +53,15 @@ struct dx_agent_class_t { typedef struct { - dx_agent_t *agent; - dx_message_t *response_msg; + dx_agent_t *agent; + dx_composed_field_t *response; } dx_agent_request_t; static char *log_module = "AGENT"; -static void dx_agent_process_get(dx_agent_t *agent, dx_field_map_t *map) +static void dx_agent_process_get(dx_agent_t *agent, dx_field_map_t *map, dx_field_iterator_t *reply_to) { dx_field_iterator_t *cls = dx_field_map_by_key(map, "class"); if (cls == 0) @@ -74,11 +75,73 @@ static void dx_agent_process_get(dx_agent_t *agent, dx_field_map_t *map) dx_log(log_module, LOG_TRACE, "Received GET request for class: %s", cls_record->fqname); + // + // Compose the header + // + dx_composed_field_t *field = dx_compose(DX_PERFORMATIVE_HEADER, 0); + dx_compose_start_list(field); + dx_compose_insert_bool(field, 0); // durable + dx_compose_end_list(field); + + // + // Compose the Properties + // + field = dx_compose(DX_PERFORMATIVE_PROPERTIES, field); + dx_compose_start_list(field); + dx_compose_insert_null(field); // message-id + dx_compose_insert_null(field); // user-id + dx_compose_insert_string_iterator(field, reply_to); // to + dx_compose_insert_null(field); // subject + dx_compose_insert_null(field); // reply-to + dx_compose_insert_string(field, "1"); // correlation-id // TODO - fix + dx_compose_end_list(field); + + // + // Compose the Application Properties + // + field = dx_compose(DX_PERFORMATIVE_APPLICATION_PROPERTIES, field); + dx_compose_start_map(field); + dx_compose_insert_string(field, "operation"); + dx_compose_insert_string(field, "get"); + + dx_compose_insert_string(field, "status-code"); + dx_compose_insert_uint(field, 200); + + dx_compose_insert_string(field, "status-descriptor"); + dx_compose_insert_string(field, "OK"); + dx_compose_end_map(field); + + // + // Open the Body (AMQP Value) to be filled in by the handler. + // + field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, field); + dx_compose_start_list(field); + dx_compose_start_map(field); + + // + // The request record is allocated locally because the entire processing of the request + // will be done synchronously. + // dx_agent_request_t request; - request.agent = agent; - request.response_msg = 0; + request.agent = agent; + request.response = field; cls_record->query_handler(cls_record->context, 0, &request); + + // + // The response is complete, close the list. + // + dx_compose_end_list(field); + + // + // Create a message and send it. + // + dx_message_t *msg = dx_allocate_message(); + dx_message_compose_2(msg, field); + dx_router_send(agent->dx, reply_to, msg); + + dx_free_message(msg); + dx_compose_free(field); } @@ -98,6 +161,13 @@ static void dx_agent_process_request(dx_agent_t *agent, dx_message_t *msg) return; // + // Get an iterator for the reply-to. Exit if not found. + // + dx_field_iterator_t *reply_to = dx_message_field_iterator(msg, DX_FIELD_REPLY_TO); + if (reply_to == 0) + return; + + // // Try to get a map-view of the body. Exit if the body is not a map-value. // dx_field_map_t *map = dx_field_map(body, 1); @@ -121,15 +191,16 @@ static void dx_agent_process_request(dx_agent_t *agent, dx_message_t *msg) // dx_field_iterator_t *opcode_string = dx_field_raw(opcode); if (dx_field_iterator_equal(opcode_string, (unsigned char*) "get")) - dx_agent_process_get(agent, map); + dx_agent_process_get(agent, map, reply_to); dx_field_iterator_free(opcode_string); dx_field_map_free(map); dx_field_iterator_free(body); + dx_field_iterator_free(reply_to); } -static void dx_agent_timer_handler(void *context) +static void dx_agent_deferred_handler(void *context) { dx_agent_t *agent = (dx_agent_t*) context; dx_message_t *msg; @@ -165,12 +236,12 @@ static void dx_agent_rx_handler(void *context, dx_message_t *msg) dx_agent_t *dx_agent(dx_dispatch_t *dx) { dx_agent_t *agent = NEW(dx_agent_t); - agent->server = dx->server; + agent->dx = dx; agent->class_hash = hash(6, 10, 1); DEQ_INIT(agent->in_fifo); DEQ_INIT(agent->out_fifo); agent->lock = sys_mutex(); - agent->timer = dx_timer(dx, dx_agent_timer_handler, agent); + agent->timer = dx_timer(dx, dx_agent_deferred_handler, agent); agent->address = dx_router_register_address(dx, true, "agent", dx_agent_rx_handler, agent); return agent; @@ -225,41 +296,66 @@ dx_agent_class_t *dx_agent_register_event(dx_dispatch_t *dx, void dx_agent_value_string(void *correlator, const char *key, const char *value) { + dx_agent_request_t *request = (dx_agent_request_t*) correlator; + dx_compose_insert_string(request->response, key); + dx_compose_insert_string(request->response, value); } void dx_agent_value_uint(void *correlator, const char *key, uint64_t value) { + dx_agent_request_t *request = (dx_agent_request_t*) correlator; + dx_compose_insert_string(request->response, key); + dx_compose_insert_uint(request->response, value); } void dx_agent_value_null(void *correlator, const char *key) { + dx_agent_request_t *request = (dx_agent_request_t*) correlator; + dx_compose_insert_string(request->response, key); + dx_compose_insert_null(request->response); } void dx_agent_value_boolean(void *correlator, const char *key, bool value) { + dx_agent_request_t *request = (dx_agent_request_t*) correlator; + dx_compose_insert_string(request->response, key); + dx_compose_insert_bool(request->response, value); } void dx_agent_value_binary(void *correlator, const char *key, const uint8_t *value, size_t len) { + dx_agent_request_t *request = (dx_agent_request_t*) correlator; + dx_compose_insert_string(request->response, key); + dx_compose_insert_binary(request->response, value, len); } void dx_agent_value_uuid(void *correlator, const char *key, const uint8_t *value) { + dx_agent_request_t *request = (dx_agent_request_t*) correlator; + dx_compose_insert_string(request->response, key); + dx_compose_insert_uuid(request->response, value); } void dx_agent_value_timestamp(void *correlator, const char *key, uint64_t value) { + dx_agent_request_t *request = (dx_agent_request_t*) correlator; + dx_compose_insert_string(request->response, key); + dx_compose_insert_timestamp(request->response, value); } void dx_agent_value_complete(void *correlator, bool more) { + dx_agent_request_t *request = (dx_agent_request_t*) correlator; + dx_compose_end_map(request->response); + if (more) + dx_compose_start_map(request->response); } diff --git a/qpid/extras/dispatch/src/compose.c b/qpid/extras/dispatch/src/compose.c index 3c0d8974ba..f20fd53251 100644 --- a/qpid/extras/dispatch/src/compose.c +++ b/qpid/extras/dispatch/src/compose.c @@ -190,6 +190,16 @@ static void dx_compose_end_composite(dx_composed_field_t *field) dx_overwrite_32(&comp->count_location, comp->count); DEQ_REMOVE_HEAD(field->fieldStack); + + // + // If there is an enclosing composite, update its length and count + // + dx_composite_t *enclosing = DEQ_HEAD(field->fieldStack); + if (enclosing) { + enclosing->length += 4 + comp->length; + enclosing->count++; + } + free_dx_composite_t(comp); } @@ -336,10 +346,10 @@ void dx_compose_insert_timestamp(dx_composed_field_t *field, uint64_t value) } -void dx_compose_insert_uuid(dx_composed_field_t *field, const char *value) +void dx_compose_insert_uuid(dx_composed_field_t *field, const uint8_t *value) { dx_insert_8(field, 0x98); // uuid - dx_insert(field, (const uint8_t*) value, 16); + dx_insert(field, value, 16); bump_count(field); } @@ -410,6 +420,34 @@ void dx_compose_insert_string(dx_composed_field_t *field, const char *value) } +void dx_compose_insert_string_iterator(dx_composed_field_t *field, dx_field_iterator_t *iter) +{ + uint32_t len = 0; + + while (!dx_field_iterator_end(iter)) { + dx_field_iterator_octet(iter); + len++; + } + + dx_field_iterator_reset(iter); + + if (len < 256) { + dx_insert_8(field, 0xa1); // str8-utf8 + dx_insert_8(field, (uint8_t) len); + } else { + dx_insert_8(field, 0xb1); // str32-utf8 + dx_insert_32(field, len); + } + + while (!dx_field_iterator_end(iter)) { + uint8_t octet = dx_field_iterator_octet(iter); + dx_insert_8(field, octet); + } + + bump_count(field); +} + + void dx_compose_insert_symbol(dx_composed_field_t *field, const char *value) { uint32_t len = strlen(value); diff --git a/qpid/extras/dispatch/src/message.c b/qpid/extras/dispatch/src/message.c index 495c13ed7c..75910f4b24 100644 --- a/qpid/extras/dispatch/src/message.c +++ b/qpid/extras/dispatch/src/message.c @@ -287,6 +287,36 @@ static dx_field_location_t *dx_message_field_location(dx_message_t *msg, dx_mess } break; + case DX_FIELD_REPLY_TO: + while (1) { + if (content->field_reply_to.parsed) + return &content->field_reply_to; + + if (content->section_message_properties.parsed == 0) + break; + + dx_buffer_t *buffer = content->section_message_properties.buffer; + unsigned char *cursor = dx_buffer_base(buffer) + content->section_message_properties.offset; + + int count = start_list(&cursor, &buffer); + int result; + + if (count < 3) + break; + + result = traverse_field(&cursor, &buffer, 0); // message_id + if (!result) return 0; + result = traverse_field(&cursor, &buffer, 0); // user_id + if (!result) return 0; + result = traverse_field(&cursor, &buffer, 0); // to + if (!result) return 0; + result = traverse_field(&cursor, &buffer, 0); // subject + if (!result) return 0; + result = traverse_field(&cursor, &buffer, &content->field_reply_to); // reply_to + if (!result) return 0; + } + break; + case DX_FIELD_BODY: if (content->section_body.parsed) return &content->section_body; @@ -711,7 +741,7 @@ void dx_message_compose_1(dx_message_t *msg, const char *to, dx_buffer_list_t *b field = dx_compose(DX_PERFORMATIVE_PROPERTIES, field); dx_compose_start_list(field); - dx_compose_insert_null(field); // compose-id + dx_compose_insert_null(field); // message-id dx_compose_insert_null(field); // user-id dx_compose_insert_string(field, to); // to //dx_compose_insert_null(field); // subject @@ -738,3 +768,13 @@ void dx_message_compose_1(dx_message_t *msg, const char *to, dx_buffer_list_t *b dx_compose_free(field); } + +void dx_message_compose_2(dx_message_t *msg, dx_composed_field_t *field) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + dx_buffer_list_t *field_buffers = dx_compose_buffers(field); + + content->buffers = *field_buffers; + DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers. +} + diff --git a/qpid/extras/dispatch/src/message_private.h b/qpid/extras/dispatch/src/message_private.h index 4280a18e02..ca39a84660 100644 --- a/qpid/extras/dispatch/src/message_private.h +++ b/qpid/extras/dispatch/src/message_private.h @@ -73,6 +73,7 @@ typedef struct { dx_field_location_t section_footer; // The footer dx_field_location_t field_user_id; // The string value of the user-id dx_field_location_t field_to; // The string value of the to field + dx_field_location_t field_reply_to; // The string value of the reply_to field dx_field_location_t body; // The body of the message dx_buffer_t *parse_buffer; unsigned char *parse_cursor; diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index c6105e35ad..010b008f93 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -573,9 +573,25 @@ void dx_router_unregister_address(dx_address_t *ad) } -void dx_router_send(dx_dispatch_t *dx, - const char *address, - dx_message_t *msg) +void dx_router_send(dx_dispatch_t *dx, + dx_field_iterator_t *address, + dx_message_t *msg) { + dx_router_t *router = dx->router; + dx_address_t *addr; + + dx_field_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH); + sys_mutex_lock(router->lock); + hash_retrieve(router->out_hash, address, (void*) &addr); + if (addr) { + if (addr->rlink) { + pn_link_t *pn_outlink = dx_link_pn(addr->rlink->link); + dx_message_t *copy = dx_message_copy(msg); + DEQ_INSERT_TAIL(addr->rlink->out_fifo, copy); + pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo)); + dx_link_activate(addr->rlink->link); + } + } + sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher? } |
