summaryrefslogtreecommitdiff
path: root/qpid/extras/dispatch/src/router_node.c
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-05-02 20:58:23 +0000
committerTed Ross <tross@apache.org>2013-05-02 20:58:23 +0000
commitd83fef53fc84ec40fc02be778a05d738b0d5b0b9 (patch)
tree46ba7559c2ed36d39c23074061862563a6a73601 /qpid/extras/dispatch/src/router_node.c
parente92ff34198ad8a5e7ab1bab006dbe1db6aebc6e0 (diff)
downloadqpid-python-d83fef53fc84ec40fc02be778a05d738b0d5b0b9.tar.gz
NO-JIRA - Additional Development
- Added buffer of saved log messages for remote retrieval. - Added __FILE__ and __LINE__ annotations to logs. - Refactored dx_message_check() so it can be called multiple times with different depths. - Separated the buffer-size-specific tests into a separate unit test executable. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1478538 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src/router_node.c')
-rw-r--r--qpid/extras/dispatch/src/router_node.c14
1 files changed, 10 insertions, 4 deletions
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c
index 65756be215..872b8363c9 100644
--- a/qpid/extras/dispatch/src/router_node.c
+++ b/qpid/extras/dispatch/src/router_node.c
@@ -30,7 +30,7 @@ static char *module = "ROUTER";
/**
* Address Types and Processing:
*
- * Address Hash Compare onReceive onEmit
+ * Address Hash Key onReceive onEmit
* =============================================================================
* _local/<local> L<local> handler forward
* _topo/<area>/<router>/<local> A<area> forward forward
@@ -174,6 +174,10 @@ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *del
// cases for forwarding.
//
// Forward to the in-process handler for this message if there is one.
+ // Note: If the handler is going to queue the message for deferred processing,
+ // it must copy the message. This function assumes that the handler
+ // will process the message synchronously and be finished with it upon
+ // completion.
//
if (addr->handler)
addr->handler(addr->handler_context, msg);
@@ -183,8 +187,9 @@ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *del
// TODO - Don't forward if this is a "_local" address.
//
if (addr->rlink) {
- pn_link_t* pn_outlink = dx_link_pn(addr->rlink->link);
- DEQ_INSERT_TAIL(addr->rlink->out_fifo, msg);
+ pn_link_t *pn_outlink = dx_link_pn(addr->rlink->link);
+ dx_message_t *copy = dx_message_copy(msg);
+ DEQ_INSERT_TAIL(addr->rlink->out_fifo, copy);
pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo));
dx_link_activate(addr->rlink->link);
}
@@ -205,6 +210,7 @@ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *del
pn_delivery_settle(delivery);
}
+ dx_free_message(msg);
sys_mutex_unlock(router->lock);
}
} else {
@@ -286,7 +292,7 @@ static int router_incoming_link_handler(void* context, dx_link_t *link)
pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
- pn_link_flow(pn_link, 8);
+ pn_link_flow(pn_link, 32);
pn_link_open(pn_link);
} else {
pn_link_close(pn_link);