diff options
| author | Ted Ross <tross@apache.org> | 2013-10-08 20:33:27 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-10-08 20:33:27 +0000 |
| commit | 3731f119921b4e5527fd15e5b3575b63edf8ccb2 (patch) | |
| tree | 77345460fbcd578e4ed1569b7ca20677511509b1 /qpid | |
| parent | 2cc9ee2ded5e5987cbf6897b5dea5e4a49d793ec (diff) | |
| download | qpid-python-3731f119921b4e5527fd15e5b3575b63edf8ccb2.tar.gz | |
QPID-5218 - Fixed crash caused by fanned-out non-presettled messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1530415 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
| -rw-r--r-- | qpid/extras/dispatch/src/router_node.c | 5 | ||||
| -rw-r--r-- | qpid/extras/dispatch/tests/system_tests_one_router.py | 59 |
2 files changed, 61 insertions, 3 deletions
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index de69e21605..8efbf6a2af 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -547,7 +547,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del DEQ_INSERT_TAIL(dest_link->msg_fifo, re); fanout++; - if (fanout == 1) + if (fanout == 1 && !dx_delivery_settled(delivery)) re->delivery = delivery; addr->deliveries_transit++; @@ -569,8 +569,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del dx_delivery_free(delivery, PN_ACCEPTED); } else if (fanout == 0) { dx_delivery_free(delivery, PN_RELEASED); - } else if (fanout > 1) - dx_delivery_free(delivery, PN_ACCEPTED); + } } } else { // diff --git a/qpid/extras/dispatch/tests/system_tests_one_router.py b/qpid/extras/dispatch/tests/system_tests_one_router.py index 8d486e2b85..28ad9a98d7 100644 --- a/qpid/extras/dispatch/tests/system_tests_one_router.py +++ b/qpid/extras/dispatch/tests/system_tests_one_router.py @@ -138,6 +138,65 @@ class RouterTest(unittest.TestCase): M4.stop() + def test_2a_multicast_unsettled(self): + addr = "amqp://0.0.0.0:20000/pre_settled/multicast/1" + M1 = Messenger() + M2 = Messenger() + M3 = Messenger() + M4 = Messenger() + + M1.timeout = 1.0 + M2.timeout = 1.0 + M3.timeout = 1.0 + M4.timeout = 1.0 + + M1.outgoing_window = 5 + M2.incoming_window = 5 + M3.incoming_window = 5 + M4.incoming_window = 5 + + M1.start() + M2.start() + M3.start() + M4.start() + self.subscribe(M2, addr) + self.subscribe(M3, addr) + self.subscribe(M4, addr) + + tm = Message() + rm = Message() + + tm.address = addr + for i in range(2): + tm.body = {'number': i} + M1.put(tm) + M1.send(0) + + for i in range(2): + M2.recv(1) + trk = M2.get(rm) + M2.accept(trk) + M2.settle(trk) + self.assertEqual(i, rm.body['number']) + + M3.recv(1) + trk = M3.get(rm) + M3.accept(trk) + M3.settle(trk) + self.assertEqual(i, rm.body['number']) + + M4.recv(1) + trk = M4.get(rm) + M4.accept(trk) + M4.settle(trk) + self.assertEqual(i, rm.body['number']) + + M1.stop() + M2.stop() + M3.stop() + M4.stop() + + def test_3_propagated_disposition(self): addr = "amqp://0.0.0.0:20000/unsettled/1" M1 = Messenger() |
