summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-08-08 14:15:47 +0000
committerTed Ross <tross@apache.org>2013-08-08 14:15:47 +0000
commite33c4e5c33768233282b69c9cb14dd515d191bb7 (patch)
tree2b7d5643a2e2a7e74c7e7e63a91a8c0298a583bc
parentee187c3e89ddaea3ced38b6d9928cb29fac5b268 (diff)
downloadqpid-python-e33c4e5c33768233282b69c9cb14dd515d191bb7.tar.gz
QPID-5045 - Added tests for message REJECT and for the three-ack delivery pattern.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1511797 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/extras/dispatch/src/container.c6
-rw-r--r--qpid/extras/dispatch/tests/system_tests_one_router.py94
2 files changed, 87 insertions, 13 deletions
diff --git a/qpid/extras/dispatch/src/container.c b/qpid/extras/dispatch/src/container.c
index d4766f0432..38262cd094 100644
--- a/qpid/extras/dispatch/src/container.c
+++ b/qpid/extras/dispatch/src/container.c
@@ -325,8 +325,10 @@ static int process_handler(dx_container_t *container, void* unused, pn_connectio
}
//
- // Step 2.5: Traverse all of the links on the connection looking for
- // links. Call the attached node's writable handler for such links.
+ // Step 2.5: Call the attached node's writable handler for all active links
+ // on the connection. Note that in Dispatch, links are considered
+ // bidirectional. Incoming and outgoing only pertains to deliveries and
+ // deliveries are a subset of the traffic that flows both directions on links.
//
pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
while (pn_link) {
diff --git a/qpid/extras/dispatch/tests/system_tests_one_router.py b/qpid/extras/dispatch/tests/system_tests_one_router.py
index bb9f757cfa..611f40de8a 100644
--- a/qpid/extras/dispatch/tests/system_tests_one_router.py
+++ b/qpid/extras/dispatch/tests/system_tests_one_router.py
@@ -37,11 +37,14 @@ class RouterTest(unittest.TestCase):
self.router.terminate()
self.router.wait()
- def subscribe(self, messenger, address):
- messenger.subscribe(address)
+ def flush(self, messenger):
while messenger.work(100):
pass
+ def subscribe(self, messenger, address):
+ messenger.subscribe(address)
+ self.flush(messenger)
+
def test_0_discard(self):
addr = "amqp://0.0.0.0:20000/discard/1"
M1 = Messenger()
@@ -153,9 +156,12 @@ class RouterTest(unittest.TestCase):
tm.address = addr
tm.body = {'number': 0}
+
+ ##
+ ## Test ACCEPT
+ ##
tx_tracker = M1.put(tm)
M1.send(0)
-
M2.recv(1)
rx_tracker = M2.get(rm)
self.assertEqual(0, rm.body['number'])
@@ -164,13 +170,29 @@ class RouterTest(unittest.TestCase):
M2.accept(rx_tracker)
M2.settle(rx_tracker)
- while M2.work(100):
- pass
- while M1.work(100):
- pass
+ self.flush(M2)
+ self.flush(M1)
self.assertEqual(ACCEPTED, M1.status(tx_tracker))
+ ##
+ ## Test REJECT
+ ##
+ tx_tracker = M1.put(tm)
+ M1.send(0)
+ M2.recv(1)
+ rx_tracker = M2.get(rm)
+ self.assertEqual(0, rm.body['number'])
+ self.assertEqual(PENDING, M1.status(tx_tracker))
+
+ M2.reject(rx_tracker)
+ M2.settle(rx_tracker)
+
+ self.flush(M2)
+ self.flush(M1)
+
+ self.assertEqual(REJECTED, M1.status(tx_tracker))
+
M1.stop()
M2.stop()
@@ -178,13 +200,63 @@ class RouterTest(unittest.TestCase):
# def test_4_unsettled_undeliverable(self):
# pass
-# def test_4_three_ack(self):
-# pass
+ def test_5_three_ack(self):
+ addr = "amqp://0.0.0.0:20000/three_ack/1"
+ M1 = Messenger()
+ M2 = Messenger()
+
+ M1.timeout = 1000
+ M2.timeout = 1000
+ M1.outgoing_window = 5
+ M2.incoming_window = 5
+
+ M1.start()
+ M2.start()
+ self.subscribe(M2, addr)
+
+ tm = Message()
+ rm = Message()
+
+ tm.address = addr
+ tm.body = {'number': 200}
+
+ tx_tracker = M1.put(tm)
+ M1.send(0)
+ M2.recv(1)
+ rx_tracker = M2.get(rm)
+ self.assertEqual(200, rm.body['number'])
+ self.assertEqual(PENDING, M1.status(tx_tracker))
+
+ M2.accept(rx_tracker)
+
+ self.flush(M2)
+ self.flush(M1)
+
+ self.assertEqual(ACCEPTED, M1.status(tx_tracker))
+
+ M1.settle(tx_tracker)
+
+ self.flush(M1)
+ self.flush(M2)
+
+ ##
+ ## We need a way to verify on M2 (receiver) that the tracker has been
+ ## settled on the M1 (sender). [ See PROTON-395 ]
+ ##
+
+ M2.settle(rx_tracker)
+
+ self.flush(M2)
+ self.flush(M1)
+
+ M1.stop()
+ M2.stop()
+
-# def test_5_link_route_sender(self):
+# def test_6_link_route_sender(self):
# pass
-# def test_6_link_route_receiver(self):
+# def test_7_link_route_receiver(self):
# pass
if __name__ == '__main__':