diff options
| author | Ted Ross <tross@apache.org> | 2013-10-07 18:11:57 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-10-07 18:11:57 +0000 |
| commit | ee5511f752a7316760ed74f6493a7433ffed4923 (patch) | |
| tree | 2169af11aa47d9bf25dfe192b6a888ca5572778f | |
| parent | 659d2e50da4634bb33bd3a39f5024a3fc6e2d57d (diff) | |
| download | qpid-python-ee5511f752a7316760ed74f6493a7433ffed4923.tar.gz | |
QPID-5212 - Added management-agent access to router state.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1530019 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/extras/dispatch/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | qpid/extras/dispatch/TODO | 4 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/dispatch.c | 4 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/router_agent.c | 185 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/router_node.c | 19 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/router_private.h | 10 |
6 files changed, 212 insertions, 11 deletions
diff --git a/qpid/extras/dispatch/CMakeLists.txt b/qpid/extras/dispatch/CMakeLists.txt index 72adcffd6d..a55de067f9 100644 --- a/qpid/extras/dispatch/CMakeLists.txt +++ b/qpid/extras/dispatch/CMakeLists.txt @@ -101,6 +101,7 @@ set(server_SOURCES src/parse.c src/posix/threading.c src/python_embedded.c + src/router_agent.c src/router_node.c src/router_pynode.c src/server.c diff --git a/qpid/extras/dispatch/TODO b/qpid/extras/dispatch/TODO index 0ae3988d29..5bb334a6ca 100644 --- a/qpid/extras/dispatch/TODO +++ b/qpid/extras/dispatch/TODO @@ -39,9 +39,9 @@ enhancements to be fixed by going to the Apache Qpid JIRA instance: detection of topology change. o All PyRouter stimulus through a work queue. o Router Code Updates - . Remove all vestiges of "binding" - . Calculate the valid-origin mask for each path . Report address mappings to routers + . Generate RA immediately after updating routing tables + . Generate unsolicited updates for mobile addresses? o Expose idle-timeout/keepalive on connectors and listeners - Major Roadmap Features diff --git a/qpid/extras/dispatch/src/dispatch.c b/qpid/extras/dispatch/src/dispatch.c index a1a659fd74..2e37f4c6e8 100644 --- a/qpid/extras/dispatch/src/dispatch.c +++ b/qpid/extras/dispatch/src/dispatch.c @@ -35,7 +35,7 @@ dx_container_t *dx_container(dx_dispatch_t *dx); void dx_container_setup_agent(dx_dispatch_t *dx); void dx_container_free(dx_container_t *container); dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id); -void dx_router_setup_agent(dx_dispatch_t *dx); +void dx_router_setup_late(dx_dispatch_t *dx); void dx_router_free(dx_router_t *router); dx_agent_t *dx_agent(dx_dispatch_t *dx); void dx_agent_free(dx_agent_t *agent); @@ -103,7 +103,7 @@ dx_dispatch_t *dx_dispatch(const char *config_path) dx_alloc_setup_agent(dx); dx_server_setup_agent(dx); dx_container_setup_agent(dx); - dx_router_setup_agent(dx); + dx_router_setup_late(dx); return dx; } diff --git a/qpid/extras/dispatch/src/router_agent.c b/qpid/extras/dispatch/src/router_agent.c new file mode 100644 index 0000000000..c35f5fa768 --- /dev/null +++ b/qpid/extras/dispatch/src/router_agent.c @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/python_embedded.h> +#include <stdio.h> +#include <string.h> +#include <stdbool.h> +#include <stdlib.h> +#include <qpid/dispatch.h> +#include <qpid/dispatch/agent.h> +#include "dispatch_private.h" +#include "router_private.h" + +//static char *module = "router.agent"; + +#define DX_ROUTER_CLASS_ROUTER 1 +#define DX_ROUTER_CLASS_LINK 2 +#define DX_ROUTER_CLASS_NODE 3 +#define DX_ROUTER_CLASS_ADDRESS 4 + +typedef struct dx_router_class_t { + dx_router_t *router; + int class_id; +} dx_router_class_t; + + +static void dx_router_schema_handler(void *context, void *correlator) +{ +} + + +static const char *dx_router_addr_text(dx_address_t *addr) +{ + if (addr) { + const unsigned char *text = hash_key_by_handle(addr->hash_handle); + if (text) + return (const char*) text; + } + return 0; +} + + +static void dx_router_query_router(dx_router_t *router, void *cor) +{ + dx_agent_value_string(cor, "area", router->router_area); + dx_agent_value_string(cor, "router_id", router->router_id); + + sys_mutex_lock(router->lock); + dx_agent_value_uint(cor, "addr_count", DEQ_SIZE(router->addrs)); + dx_agent_value_uint(cor, "link_count", DEQ_SIZE(router->links)); + dx_agent_value_uint(cor, "node_count", DEQ_SIZE(router->routers)); + sys_mutex_unlock(router->lock); + + dx_agent_value_complete(cor, 0); +} + + +static void dx_router_query_link(dx_router_t *router, void *cor) +{ + sys_mutex_lock(router->lock); + dx_router_link_t *link = DEQ_HEAD(router->links); + const char *link_type = "?"; + const char *link_dir; + + while (link) { + dx_agent_value_uint(cor, "index", link->mask_bit); + switch (link->link_type) { + case DX_LINK_ENDPOINT: link_type = "endpoint"; break; + case DX_LINK_ROUTER: link_type = "inter-router"; break; + case DX_LINK_AREA: link_type = "inter-area"; break; + } + dx_agent_value_string(cor, "link-type", link_type); + + if (link->link_direction == DX_INCOMING) + link_dir = "in"; + else + link_dir = "out"; + dx_agent_value_string(cor, "link-dir", link_dir); + + const char *text = dx_router_addr_text(link->owning_addr); + if (text) + dx_agent_value_string(cor, "owning-addr", text); + else + dx_agent_value_null(cor, "owning-addr"); + + link = DEQ_NEXT(link); + dx_agent_value_complete(cor, link != 0); + } + sys_mutex_unlock(router->lock); +} + + +static void dx_router_query_node(dx_router_t *router, void *cor) +{ + sys_mutex_lock(router->lock); + dx_router_node_t *node = DEQ_HEAD(router->routers); + while (node) { + dx_agent_value_uint(cor, "index", node->mask_bit); + dx_agent_value_string(cor, "addr", dx_router_addr_text(node->owning_addr)); + if (node->next_hop) + dx_agent_value_uint(cor, "next-hop", node->next_hop->mask_bit); + else + dx_agent_value_null(cor, "next-hop"); + if (node->peer_link) + dx_agent_value_uint(cor, "router-link", node->peer_link->mask_bit); + else + dx_agent_value_null(cor, "router-link"); + node = DEQ_NEXT(node); + dx_agent_value_complete(cor, node != 0); + } + sys_mutex_unlock(router->lock); +} + + +static void dx_router_query_address(dx_router_t *router, void *cor) +{ + sys_mutex_lock(router->lock); + dx_address_t *addr = DEQ_HEAD(router->addrs); + while (addr) { + dx_agent_value_string(cor, "addr", dx_router_addr_text(addr)); + dx_agent_value_boolean(cor, "in-process", addr->handler != 0); + dx_agent_value_uint(cor, "subscriber-count", DEQ_SIZE(addr->rlinks)); + dx_agent_value_uint(cor, "remote-count", DEQ_SIZE(addr->rnodes)); + dx_agent_value_uint(cor, "deliveries-ingress", addr->deliveries_ingress); + dx_agent_value_uint(cor, "deliveries-egress", addr->deliveries_egress); + dx_agent_value_uint(cor, "deliveries-transit", addr->deliveries_transit); + addr = DEQ_NEXT(addr); + dx_agent_value_complete(cor, addr != 0); + } + sys_mutex_unlock(router->lock); +} + + +static void dx_router_query_handler(void* context, const char *id, void *correlator) +{ + dx_router_class_t *cls = (dx_router_class_t*) context; + switch (cls->class_id) { + case DX_ROUTER_CLASS_ROUTER: dx_router_query_router(cls->router, correlator); break; + case DX_ROUTER_CLASS_LINK: dx_router_query_link(cls->router, correlator); break; + case DX_ROUTER_CLASS_NODE: dx_router_query_node(cls->router, correlator); break; + case DX_ROUTER_CLASS_ADDRESS: dx_router_query_address(cls->router, correlator); break; + } +} + + +static dx_agent_class_t *dx_router_setup_class(dx_router_t *router, const char *fqname, int id) +{ + dx_router_class_t *cls = NEW(dx_router_class_t); + cls->router = router; + cls->class_id = id; + + return dx_agent_register_class(router->dx, fqname, cls, + dx_router_schema_handler, + dx_router_query_handler); +} + + +void dx_router_agent_setup(dx_router_t *router) +{ + router->class_router = + dx_router_setup_class(router, "org.apache.qpid.dispatch.router", DX_ROUTER_CLASS_ROUTER); + router->class_link = + dx_router_setup_class(router, "org.apache.qpid.dispatch.router.link", DX_ROUTER_CLASS_LINK); + router->class_node = + dx_router_setup_class(router, "org.apache.qpid.dispatch.router.node", DX_ROUTER_CLASS_NODE); + router->class_address = + dx_router_setup_class(router, "org.apache.qpid.dispatch.router.address", DX_ROUTER_CLASS_ADDRESS); +} + diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index 40f827f4cd..90e2e0c9dc 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -448,6 +448,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del in_process_copy = dx_message_copy(msg); handler = addr->handler; handler_context = addr->handler_context; + addr->deliveries_egress++; } // @@ -472,6 +473,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del if (fanout == 1 && !dx_delivery_settled(delivery)) re->delivery = delivery; + addr->deliveries_egress++; dx_link_activate(dest_link_ref->link->link); dest_link_ref = DEQ_NEXT(dest_link_ref); } @@ -542,6 +544,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del if (fanout == 1) re->delivery = delivery; + addr->deliveries_transit++; dx_link_activate(dest_link->link); } } @@ -739,9 +742,8 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) hash_retrieve(router->addr_hash, iter, (void**) &addr); if (!addr) { addr = new_dx_address_t(); + memset(addr, 0, sizeof(dx_address_t)); DEQ_ITEM_INIT(addr); - addr->handler = 0; - addr->handler_context = 0; DEQ_INIT(addr->rlinks); DEQ_INIT(addr->rnodes); hash_insert(router->addr_hash, iter, addr, &addr->hash_handle); @@ -1004,12 +1006,11 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) } -void dx_router_setup_agent(dx_dispatch_t *dx) +void dx_router_setup_late(dx_dispatch_t *dx) { + dx_router_agent_setup(dx->router); dx_router_python_setup(dx->router); dx_timer_schedule(dx->router->timer, 1000); - - // TODO } @@ -1046,9 +1047,8 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx, hash_retrieve(router->addr_hash, iter, (void**) &addr); if (!addr) { addr = new_dx_address_t(); + memset(addr, 0, sizeof(dx_address_t)); DEQ_ITEM_INIT(addr); - addr->handler = 0; - addr->handler_context = 0; DEQ_INIT(addr->rlinks); DEQ_INIT(addr->rnodes); hash_insert(router->addr_hash, iter, addr, &addr->hash_handle); @@ -1088,6 +1088,7 @@ void dx_router_send(dx_dispatch_t *dx, // // Forward to all of the local links receiving this address. // + addr->deliveries_ingress++; dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks); while (dest_link_ref) { dx_routed_event_t *re = new_dx_routed_event_t(); @@ -1099,11 +1100,14 @@ void dx_router_send(dx_dispatch_t *dx, DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re); dx_link_activate(dest_link_ref->link->link); + addr->deliveries_egress++; + dest_link_ref = DEQ_NEXT(dest_link_ref); } // // Forward to the next-hops for remote destinations. + // FIXME - use link-mask to avoid dups. // dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes); dx_router_link_t *dest_link; @@ -1121,6 +1125,7 @@ void dx_router_send(dx_dispatch_t *dx, re->disposition = 0; DEQ_INSERT_TAIL(dest_link->msg_fifo, re); dx_link_activate(dest_link->link); + addr->deliveries_transit++; } dest_node_ref = DEQ_NEXT(dest_node_ref); } diff --git a/qpid/extras/dispatch/src/router_private.h b/qpid/extras/dispatch/src/router_private.h index a21e4f1d5c..ac98fc155a 100644 --- a/qpid/extras/dispatch/src/router_private.h +++ b/qpid/extras/dispatch/src/router_private.h @@ -27,6 +27,7 @@ typedef struct dx_router_conn_t dx_router_conn_t; void dx_router_python_setup(dx_router_t *router); void dx_pyrouter_tick(dx_router_t *router); +void dx_router_agent_setup(dx_router_t *router); typedef enum { DX_LINK_ENDPOINT, // A link to a connected endpoint @@ -109,6 +110,10 @@ struct dx_address_t { dx_router_link_ref_list_t rlinks; // Locally-Connected Consumers dx_router_ref_list_t rnodes; // Remotely-Connected Consumers hash_handle_t *hash_handle; // Linkage back to the hash table entry + + uint64_t deliveries_ingress; + uint64_t deliveries_egress; + uint64_t deliveries_transit; }; ALLOC_DECLARE(dx_address_t); @@ -138,6 +143,11 @@ struct dx_router_t { PyObject *pyRouter; PyObject *pyTick; + + dx_agent_class_t *class_router; + dx_agent_class_t *class_link; + dx_agent_class_t *class_node; + dx_agent_class_t *class_address; }; |
