diff options
| author | Gordon Sim <gsim@apache.org> | 2008-03-07 16:19:30 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-03-07 16:19:30 +0000 |
| commit | 791cd5a65e193ad4a1645065256a244ee238e9e4 (patch) | |
| tree | 16e0d4af9de12bfe397d9913a34681cb5e537ead /qpid/python/tests_0-10/message.py | |
| parent | d20ff23e2a578969b1cad084504cd72bcbb38581 (diff) | |
| download | qpid-python-791cd5a65e193ad4a1645065256a244ee238e9e4.tar.gz | |
Converted some more tests to use new client
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@634729 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/tests_0-10/message.py')
| -rw-r--r-- | qpid/python/tests_0-10/message.py | 58 |
1 files changed, 46 insertions, 12 deletions
diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py index 64e2bc44c4..aaefb52392 100644 --- a/qpid/python/tests_0-10/message.py +++ b/qpid/python/tests_0-10/message.py @@ -619,8 +619,10 @@ class MessageTests(TestBase010): for q in [qA, qB]: msg = q.get(timeout = 1) self.assertEquals("Message %s" % i, msg.body) + #TODO: tidy up completion session.receiver._completed.add(msg.id) + #TODO: tidy up completion session.channel.session_completed(session.receiver._completed) #messages should still be on the queue: self.assertEquals(10, session.queue_query(queue = "q").message_count) @@ -708,8 +710,9 @@ class MessageTests(TestBase010): """ Test acking of messages ranges """ - session = self.session - session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + session = self.conn.session("alternate-session", timeout=10) + + session.queue_declare(queue = "q", auto_delete=True) delivery_properties = session.delivery_properties(routing_key="q") for i in range (1, 11): session.message_transfer(message=Message(delivery_properties, "message %s" % (i))) @@ -718,26 +721,46 @@ class MessageTests(TestBase010): session.message_flow(unit = 0, value = 10, destination = "a") session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") queue = session.incoming("a") + ids = [] for i in range (1, 11): - self.assertEquals("message %s" % (i), queue.get(timeout = 1).body) + msg = queue.get(timeout = 1) + self.assertEquals("message %s" % (i), msg.body) + ids.append(msg.id) + self.assertEmpty(queue) - #ack all but the third message (command id 2) - session.execution_complete(cumulative_execution_mark=0xFFFFFFFF, ranged_execution_set=[0,1,3,6,7,7,8,9]) - session.message_recover() - self.assertEquals("message 3", queue.get(timeout = 1).content.body) + #ack all but the fourth message (command id 2) + accepted = RangedSet() + accepted.add(ids[0], ids[2]) + accepted.add(ids[4], ids[9]) + session.message_accept(accepted) + + #subscribe from second session here to ensure queue is not + #auto-deleted when alternate session closes (no need to ack on these): + self.session.message_subscribe(queue = "q", destination = "checker") + + #now close the session, and see that the unacked messages are + #then redelivered to another subscriber: + session.close(timeout=10) + + session = self.session + session.message_flow(destination="checker", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="checker", unit=1, value=0xFFFFFFFF) + queue = session.incoming("checker") + + self.assertEquals("message 4", queue.get(timeout = 1).body) self.assertEmpty(queue) def test_subscribe_not_acquired_2(self): session = self.session #publish some messages - self.queue_declare(queue = "q", exclusive=True, auto_delete=True) + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range(1, 11): session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i))) #consume some of them - session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1) + session.message_subscribe(queue = "q", destination = "a") session.message_set_flow_mode(flow_mode = 0, destination = "a") session.message_flow(unit = 0, value = 5, destination = "a") session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") @@ -746,11 +769,15 @@ class MessageTests(TestBase010): for i in range(1, 6): msg = queue.get(timeout = 1) self.assertEquals("message-%d" % (i), msg.body) - msg.complete() + #complete and accept + session.message_accept(RangedSet(msg.id)) + #TODO: tidy up completion + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) self.assertEmpty(queue) #now create a not-acquired subscriber - session.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) + session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") #check it gets those not consumed @@ -759,7 +786,10 @@ class MessageTests(TestBase010): for i in range(6, 11): msg = queue.get(timeout = 1) self.assertEquals("message-%d" % (i), msg.body) - msg.complete() + session.message_release(RangedSet(msg.id)) + #TODO: tidy up completion + session.receiver._completed.add(msg.id) + session.channel.session_completed(session.receiver._completed) session.message_flow(unit = 0, value = 1, destination = "b") self.assertEmpty(queue) @@ -790,6 +820,10 @@ class MessageTests(TestBase010): #check that acquire succeeds response = session.control_queue.get(timeout=1) self.assertEquals(response.transfers, [msg.command_id, msg.command_id]) + session.message_release(RangedSet(msg.id)) + session.channel._completed.add(msg.id) + session.channel.session_completed(session.channel._completed) + msg.complete() self.assertEmpty(queue) |
