diff options
| author | Ted Ross <tross@apache.org> | 2013-08-08 14:15:47 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-08-08 14:15:47 +0000 |
| commit | e33c4e5c33768233282b69c9cb14dd515d191bb7 (patch) | |
| tree | 2b7d5643a2e2a7e74c7e7e63a91a8c0298a583bc | |
| parent | ee187c3e89ddaea3ced38b6d9928cb29fac5b268 (diff) | |
| download | qpid-python-e33c4e5c33768233282b69c9cb14dd515d191bb7.tar.gz | |
QPID-5045 - Added tests for message REJECT and for the three-ack delivery pattern.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1511797 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/extras/dispatch/src/container.c | 6 | ||||
| -rw-r--r-- | qpid/extras/dispatch/tests/system_tests_one_router.py | 94 |
2 files changed, 87 insertions, 13 deletions
diff --git a/qpid/extras/dispatch/src/container.c b/qpid/extras/dispatch/src/container.c index d4766f0432..38262cd094 100644 --- a/qpid/extras/dispatch/src/container.c +++ b/qpid/extras/dispatch/src/container.c @@ -325,8 +325,10 @@ static int process_handler(dx_container_t *container, void* unused, pn_connectio } // - // Step 2.5: Traverse all of the links on the connection looking for - // links. Call the attached node's writable handler for such links. + // Step 2.5: Call the attached node's writable handler for all active links + // on the connection. Note that in Dispatch, links are considered + // bidirectional. Incoming and outgoing only pertains to deliveries and + // deliveries are a subset of the traffic that flows both directions on links. // pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); while (pn_link) { diff --git a/qpid/extras/dispatch/tests/system_tests_one_router.py b/qpid/extras/dispatch/tests/system_tests_one_router.py index bb9f757cfa..611f40de8a 100644 --- a/qpid/extras/dispatch/tests/system_tests_one_router.py +++ b/qpid/extras/dispatch/tests/system_tests_one_router.py @@ -37,11 +37,14 @@ class RouterTest(unittest.TestCase): self.router.terminate() self.router.wait() - def subscribe(self, messenger, address): - messenger.subscribe(address) + def flush(self, messenger): while messenger.work(100): pass + def subscribe(self, messenger, address): + messenger.subscribe(address) + self.flush(messenger) + def test_0_discard(self): addr = "amqp://0.0.0.0:20000/discard/1" M1 = Messenger() @@ -153,9 +156,12 @@ class RouterTest(unittest.TestCase): tm.address = addr tm.body = {'number': 0} + + ## + ## Test ACCEPT + ## tx_tracker = M1.put(tm) M1.send(0) - M2.recv(1) rx_tracker = M2.get(rm) self.assertEqual(0, rm.body['number']) @@ -164,13 +170,29 @@ class RouterTest(unittest.TestCase): M2.accept(rx_tracker) M2.settle(rx_tracker) - while M2.work(100): - pass - while M1.work(100): - pass + self.flush(M2) + self.flush(M1) self.assertEqual(ACCEPTED, M1.status(tx_tracker)) + ## + ## Test REJECT + ## + tx_tracker = M1.put(tm) + M1.send(0) + M2.recv(1) + rx_tracker = M2.get(rm) + self.assertEqual(0, rm.body['number']) + self.assertEqual(PENDING, M1.status(tx_tracker)) + + M2.reject(rx_tracker) + M2.settle(rx_tracker) + + self.flush(M2) + self.flush(M1) + + self.assertEqual(REJECTED, M1.status(tx_tracker)) + M1.stop() M2.stop() @@ -178,13 +200,63 @@ class RouterTest(unittest.TestCase): # def test_4_unsettled_undeliverable(self): # pass -# def test_4_three_ack(self): -# pass + def test_5_three_ack(self): + addr = "amqp://0.0.0.0:20000/three_ack/1" + M1 = Messenger() + M2 = Messenger() + + M1.timeout = 1000 + M2.timeout = 1000 + M1.outgoing_window = 5 + M2.incoming_window = 5 + + M1.start() + M2.start() + self.subscribe(M2, addr) + + tm = Message() + rm = Message() + + tm.address = addr + tm.body = {'number': 200} + + tx_tracker = M1.put(tm) + M1.send(0) + M2.recv(1) + rx_tracker = M2.get(rm) + self.assertEqual(200, rm.body['number']) + self.assertEqual(PENDING, M1.status(tx_tracker)) + + M2.accept(rx_tracker) + + self.flush(M2) + self.flush(M1) + + self.assertEqual(ACCEPTED, M1.status(tx_tracker)) + + M1.settle(tx_tracker) + + self.flush(M1) + self.flush(M2) + + ## + ## We need a way to verify on M2 (receiver) that the tracker has been + ## settled on the M1 (sender). [ See PROTON-395 ] + ## + + M2.settle(rx_tracker) + + self.flush(M2) + self.flush(M1) + + M1.stop() + M2.stop() + -# def test_5_link_route_sender(self): +# def test_6_link_route_sender(self): # pass -# def test_6_link_route_receiver(self): +# def test_7_link_route_receiver(self): # pass if __name__ == '__main__': |
