diff options
| author | Ted Ross <tross@apache.org> | 2013-05-02 20:58:23 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-05-02 20:58:23 +0000 |
| commit | d83fef53fc84ec40fc02be778a05d738b0d5b0b9 (patch) | |
| tree | 46ba7559c2ed36d39c23074061862563a6a73601 /qpid/extras/dispatch/src/router_node.c | |
| parent | e92ff34198ad8a5e7ab1bab006dbe1db6aebc6e0 (diff) | |
| download | qpid-python-d83fef53fc84ec40fc02be778a05d738b0d5b0b9.tar.gz | |
NO-JIRA - Additional Development
- Added buffer of saved log messages for remote retrieval.
- Added __FILE__ and __LINE__ annotations to logs.
- Refactored dx_message_check() so it can be called multiple times with different depths.
- Separated the buffer-size-specific tests into a separate unit test executable.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1478538 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src/router_node.c')
| -rw-r--r-- | qpid/extras/dispatch/src/router_node.c | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index 65756be215..872b8363c9 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -30,7 +30,7 @@ static char *module = "ROUTER"; /** * Address Types and Processing: * - * Address Hash Compare onReceive onEmit + * Address Hash Key onReceive onEmit * ============================================================================= * _local/<local> L<local> handler forward * _topo/<area>/<router>/<local> A<area> forward forward @@ -174,6 +174,10 @@ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *del // cases for forwarding. // // Forward to the in-process handler for this message if there is one. + // Note: If the handler is going to queue the message for deferred processing, + // it must copy the message. This function assumes that the handler + // will process the message synchronously and be finished with it upon + // completion. // if (addr->handler) addr->handler(addr->handler_context, msg); @@ -183,8 +187,9 @@ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *del // TODO - Don't forward if this is a "_local" address. // if (addr->rlink) { - pn_link_t* pn_outlink = dx_link_pn(addr->rlink->link); - DEQ_INSERT_TAIL(addr->rlink->out_fifo, msg); + pn_link_t *pn_outlink = dx_link_pn(addr->rlink->link); + dx_message_t *copy = dx_message_copy(msg); + DEQ_INSERT_TAIL(addr->rlink->out_fifo, copy); pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo)); dx_link_activate(addr->rlink->link); } @@ -205,6 +210,7 @@ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *del pn_delivery_settle(delivery); } + dx_free_message(msg); sys_mutex_unlock(router->lock); } } else { @@ -286,7 +292,7 @@ static int router_incoming_link_handler(void* context, dx_link_t *link) pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); - pn_link_flow(pn_link, 8); + pn_link_flow(pn_link, 32); pn_link_open(pn_link); } else { pn_link_close(pn_link); |
