summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-10-08 20:33:27 +0000
committerTed Ross <tross@apache.org>2013-10-08 20:33:27 +0000
commit3731f119921b4e5527fd15e5b3575b63edf8ccb2 (patch)
tree77345460fbcd578e4ed1569b7ca20677511509b1 /qpid
parent2cc9ee2ded5e5987cbf6897b5dea5e4a49d793ec (diff)
downloadqpid-python-3731f119921b4e5527fd15e5b3575b63edf8ccb2.tar.gz
QPID-5218 - Fixed crash caused by fanned-out non-presettled messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1530415 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/extras/dispatch/src/router_node.c5
-rw-r--r--qpid/extras/dispatch/tests/system_tests_one_router.py59
2 files changed, 61 insertions, 3 deletions
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c
index de69e21605..8efbf6a2af 100644
--- a/qpid/extras/dispatch/src/router_node.c
+++ b/qpid/extras/dispatch/src/router_node.c
@@ -547,7 +547,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
fanout++;
- if (fanout == 1)
+ if (fanout == 1 && !dx_delivery_settled(delivery))
re->delivery = delivery;
addr->deliveries_transit++;
@@ -569,8 +569,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
dx_delivery_free(delivery, PN_ACCEPTED);
} else if (fanout == 0) {
dx_delivery_free(delivery, PN_RELEASED);
- } else if (fanout > 1)
- dx_delivery_free(delivery, PN_ACCEPTED);
+ }
}
} else {
//
diff --git a/qpid/extras/dispatch/tests/system_tests_one_router.py b/qpid/extras/dispatch/tests/system_tests_one_router.py
index 8d486e2b85..28ad9a98d7 100644
--- a/qpid/extras/dispatch/tests/system_tests_one_router.py
+++ b/qpid/extras/dispatch/tests/system_tests_one_router.py
@@ -138,6 +138,65 @@ class RouterTest(unittest.TestCase):
M4.stop()
+ def test_2a_multicast_unsettled(self):
+ addr = "amqp://0.0.0.0:20000/pre_settled/multicast/1"
+ M1 = Messenger()
+ M2 = Messenger()
+ M3 = Messenger()
+ M4 = Messenger()
+
+ M1.timeout = 1.0
+ M2.timeout = 1.0
+ M3.timeout = 1.0
+ M4.timeout = 1.0
+
+ M1.outgoing_window = 5
+ M2.incoming_window = 5
+ M3.incoming_window = 5
+ M4.incoming_window = 5
+
+ M1.start()
+ M2.start()
+ M3.start()
+ M4.start()
+ self.subscribe(M2, addr)
+ self.subscribe(M3, addr)
+ self.subscribe(M4, addr)
+
+ tm = Message()
+ rm = Message()
+
+ tm.address = addr
+ for i in range(2):
+ tm.body = {'number': i}
+ M1.put(tm)
+ M1.send(0)
+
+ for i in range(2):
+ M2.recv(1)
+ trk = M2.get(rm)
+ M2.accept(trk)
+ M2.settle(trk)
+ self.assertEqual(i, rm.body['number'])
+
+ M3.recv(1)
+ trk = M3.get(rm)
+ M3.accept(trk)
+ M3.settle(trk)
+ self.assertEqual(i, rm.body['number'])
+
+ M4.recv(1)
+ trk = M4.get(rm)
+ M4.accept(trk)
+ M4.settle(trk)
+ self.assertEqual(i, rm.body['number'])
+
+ M1.stop()
+ M2.stop()
+ M3.stop()
+ M4.stop()
+
+
def test_3_propagated_disposition(self):
addr = "amqp://0.0.0.0:20000/unsettled/1"
M1 = Messenger()