summaryrefslogtreecommitdiff
path: root/qpid/extras/dispatch/src/router_node.c
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-10-04 13:40:59 +0000
committerTed Ross <tross@apache.org>2013-10-04 13:40:59 +0000
commit28302f5604920d7cf7941481ff376f92cdd0e535 (patch)
tree3f7e2686d87cb713f7be84f46c5b4d7eb027c116 /qpid/extras/dispatch/src/router_node.c
parent20fbc65eba57a9526a39652cb1473a8551d3d97b (diff)
downloadqpid-python-28302f5604920d7cf7941481ff376f92cdd0e535.tar.gz
QPID-4967 - Work in progress on multi-router networks
- Added a feature to the hash table to allow referenced objects to hold a direct linkage back to the hash structure for fast deletion and access to the key. This allows the key to be stored in only one place and allows items to be removed without requiring a hash lookup on the key. - Completed the integration of the Python router and the C data structures that track remote routers (neighbor and multi-hop). - Allow multiple addresses in the ioAdapter from Python. - Added a separate address for the hello messages because the messaging pattern is different for these messages. - Added some content to the TODO file. - Added test configurations for a two-router network. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1529163 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src/router_node.c')
-rw-r--r--qpid/extras/dispatch/src/router_node.c163
1 files changed, 114 insertions, 49 deletions
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c
index fea3695620..35fb834029 100644
--- a/qpid/extras/dispatch/src/router_node.c
+++ b/qpid/extras/dispatch/src/router_node.c
@@ -28,8 +28,9 @@
static char *module = "ROUTER";
-static char *local_prefix = "_local/";
-static char *topo_prefix = "_topo/";
+static char *local_prefix = "_local/";
+static char *topo_prefix = "_topo/";
+static char *direct_prefix;
/**
* Address Types and Processing:
@@ -55,7 +56,7 @@ ALLOC_DEFINE(dx_address_t);
ALLOC_DEFINE(dx_router_conn_t);
-static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
+void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
{
dx_router_link_ref_t *ref = new_dx_router_link_ref_t();
DEQ_ITEM_INIT(ref);
@@ -65,7 +66,7 @@ static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_ro
}
-static void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
+void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
{
if (link->ref) {
DEQ_REMOVE(*ref_list, link->ref);
@@ -75,6 +76,31 @@ static void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_ro
}
+void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode)
+{
+ dx_router_ref_t *ref = new_dx_router_ref_t();
+ DEQ_ITEM_INIT(ref);
+ ref->router = rnode;
+ rnode->ref_count++;
+ DEQ_INSERT_TAIL(*ref_list, ref);
+}
+
+
+void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode)
+{
+ dx_router_ref_t *ref = DEQ_HEAD(*ref_list);
+ while (ref) {
+ if (ref->router == rnode) {
+ DEQ_REMOVE(*ref_list, ref);
+ free_dx_router_ref_t(ref);
+ rnode->ref_count--;
+ break;
+ }
+ ref = DEQ_NEXT(ref);
+ }
+}
+
+
/**
* Check an address to see if it no longer has any associated destinations.
* Depending on its policy, the address may be eligible for being closed out
@@ -262,10 +288,11 @@ static int router_writable_link_handler(void* context, dx_link_t *link)
}
-static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
+static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg)
{
- dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg);
- dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
+ dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg);
+ dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
+ dx_field_iterator_t *ingress_iter = 0;
dx_parsed_field_t *trace = 0;
dx_parsed_field_t *ingress = 0;
@@ -293,7 +320,7 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
trace_item = dx_parse_sub_value(trace, idx);
}
- dx_compose_insert_string(out_da, router->router_id);
+ dx_compose_insert_string(out_da, direct_prefix);
dx_compose_end_list(out_da);
}
@@ -303,15 +330,17 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
//
dx_compose_insert_string(out_da, DX_DA_INGRESS);
if (ingress && dx_parse_is_scalar(ingress)) {
- dx_field_iterator_t *iter = dx_parse_raw(ingress);
- dx_compose_insert_string_iterator(out_da, iter);
+ ingress_iter = dx_parse_raw(ingress);
+ dx_compose_insert_string_iterator(out_da, ingress_iter);
} else
- dx_compose_insert_string(out_da, router->router_id);
+ dx_compose_insert_string(out_da, direct_prefix);
dx_compose_end_map(out_da);
dx_message_set_delivery_annotations(msg, out_da);
dx_compose_free(out_da);
+
+ return ingress_iter;
}
@@ -394,7 +423,8 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
hash_retrieve(router->addr_hash, iter, (void*) &addr);
dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
- int is_local = dx_field_iterator_prefix(iter, local_prefix);
+ int is_local = dx_field_iterator_prefix(iter, local_prefix);
+ int is_direct = dx_field_iterator_prefix(iter, direct_prefix);
dx_field_iterator_free(iter);
if (addr) {
@@ -404,9 +434,10 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
//
//
- // Interpret and update the delivery annotations of the message
+ // Interpret and update the delivery annotations of the message. As a convenience,
+ // this function returns the iterator to the ingress field (if it exists).
//
- router_annotate_message(router, msg);
+ dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg);
//
// Forward to the in-process handler for this message if there is one. The
@@ -446,31 +477,54 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
}
//
- // Forward to the next-hops for remote destinations.
+ // If the address form is direct to this router node, don't relay it on
+ // to any other part of the network.
//
- dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
- dx_router_link_t *dest_link;
- while (dest_node_ref) {
- if (dest_node_ref->router->next_hop)
- dest_link = dest_node_ref->router->next_hop->peer_link;
- else
- dest_link = dest_node_ref->router->peer_link;
- if (dest_link) {
- dx_routed_event_t *re = new_dx_routed_event_t();
- DEQ_ITEM_INIT(re);
- re->delivery = 0;
- re->message = dx_message_copy(msg);
- re->settle = 0;
- re->disposition = 0;
- DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
-
- fanout++;
- if (fanout == 1)
- re->delivery = delivery;
-
- dx_link_activate(dest_link->link);
+ if (!is_direct) {
+ //
+ // Get the mask bit associated with the ingress router for the message.
+ // This will be compared against the "valid_origin" masks for each
+ // candidate destination router.
+ //
+ int origin = -1;
+ if (ingress_iter) {
+ dx_address_t *origin_addr;
+ hash_retrieve(router->addr_hash, ingress_iter, (void*) &origin_addr);
+ if (origin_addr && DEQ_SIZE(origin_addr->rnodes) == 1) {
+ dx_router_ref_t *rref = DEQ_HEAD(origin_addr->rnodes);
+ origin = rref->router->mask_bit;
+ }
+ }
+
+ //
+ // Forward to the next-hops for remote destinations.
+ //
+ if (origin >= 0) {
+ dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
+ dx_router_link_t *dest_link;
+ while (dest_node_ref) {
+ if (dest_node_ref->router->next_hop)
+ dest_link = dest_node_ref->router->next_hop->peer_link;
+ else
+ dest_link = dest_node_ref->router->peer_link;
+ if (dest_link && dx_bitmask_value(dest_node_ref->router->valid_origins, origin)) {
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = dx_message_copy(msg);
+ re->settle = 0;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+ fanout++;
+ if (fanout == 1)
+ re->delivery = delivery;
+
+ dx_link_activate(dest_link->link);
+ }
+ dest_node_ref = DEQ_NEXT(dest_node_ref);
+ }
}
- dest_node_ref = DEQ_NEXT(dest_node_ref);
}
}
}
@@ -633,10 +687,10 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
if (is_router) {
//
- // If this is a router link, put it in the router_address link-list.
+ // If this is a router link, put it in the hello_address link-list.
//
- dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink);
- rlink->owning_addr = router->router_addr;
+ dx_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink);
+ rlink->owning_addr = router->hello_addr;
router->out_links_by_mask_bit[rlink->mask_bit] = rlink;
} else {
@@ -667,7 +721,7 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
addr->handler_context = 0;
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
- hash_insert(router->addr_hash, iter, addr);
+ hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_INSERT_TAIL(router->addrs, addr);
}
dx_field_iterator_free(iter);
@@ -806,7 +860,7 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co
rlink->mask_bit = mask_bit;
rlink->link_type = DX_LINK_ROUTER;
rlink->link_direction = DX_OUTGOING;
- rlink->owning_addr = router->router_addr;
+ rlink->owning_addr = router->hello_addr;
rlink->link = sender;
rlink->connected_link = 0;
rlink->peer_link = 0;
@@ -814,9 +868,9 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co
DEQ_INIT(rlink->msg_fifo);
//
- // Add the new outgoing link to the router_address's list of links.
+ // Add the new outgoing link to the hello_address's list of links.
//
- dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink);
+ dx_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink);
//
// Index this link from the by-maskbit index so we can later find it quickly
@@ -867,6 +921,14 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
dx_container_register_node_type(dx, &router_node);
}
+ size_t dplen = 9 + strlen(area) + strlen(id);
+ direct_prefix = (char*) malloc(dplen);
+ strcpy(direct_prefix, "_topo/");
+ strcat(direct_prefix, area);
+ strcat(direct_prefix, "/");
+ strcat(direct_prefix, id);
+ strcat(direct_prefix, "/");
+
dx_router_t *router = NEW(dx_router_t);
router_node.type_context = router;
@@ -883,8 +945,11 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
DEQ_INIT(router->routers);
router->out_links_by_mask_bit = NEW_PTR_ARRAY(dx_router_link_t, dx_bitmask_width());
- for (int idx = 0; idx < dx_bitmask_width(); idx++)
+ router->routers_by_mask_bit = NEW_PTR_ARRAY(dx_router_node_t, dx_bitmask_width());
+ for (int idx = 0; idx < dx_bitmask_width(); idx++) {
router->out_links_by_mask_bit[idx] = 0;
+ router->routers_by_mask_bit[idx] = 0;
+ }
router->neighbor_free_mask = dx_bitmask(1);
router->lock = sys_mutex();
@@ -894,10 +959,11 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
router->pyTick = 0;
//
- // Create an address for all of the routers in the topology. It will be registered
+ // Create addresses for all of the routers in the topology. It will be registered
// locally later in the initialization sequence.
//
router->router_addr = dx_router_register_address(dx, "qdxrouter", 0, 0);
+ router->hello_addr = dx_router_register_address(dx, "qdxhello", 0, 0);
//
// Inform the field iterator module of this router's id and area. The field iterator
@@ -935,8 +1001,7 @@ void dx_router_free(dx_router_t *router)
const char *dx_router_id(const dx_dispatch_t *dx)
{
- dx_router_t *router = dx->router;
- return router->router_id;
+ return direct_prefix;
}
@@ -963,7 +1028,7 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
addr->handler_context = 0;
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
- hash_insert(router->addr_hash, iter, addr);
+ hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_ITEM_INIT(addr);
DEQ_INSERT_TAIL(router->addrs, addr);
}