summaryrefslogtreecommitdiff
path: root/qpid/python/tests_0-10/message.py
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-03-07 16:19:30 +0000
committerGordon Sim <gsim@apache.org>2008-03-07 16:19:30 +0000
commit791cd5a65e193ad4a1645065256a244ee238e9e4 (patch)
tree16e0d4af9de12bfe397d9913a34681cb5e537ead /qpid/python/tests_0-10/message.py
parentd20ff23e2a578969b1cad084504cd72bcbb38581 (diff)
downloadqpid-python-791cd5a65e193ad4a1645065256a244ee238e9e4.tar.gz
Converted some more tests to use new client
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@634729 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/tests_0-10/message.py')
-rw-r--r--qpid/python/tests_0-10/message.py58
1 files changed, 46 insertions, 12 deletions
diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py
index 64e2bc44c4..aaefb52392 100644
--- a/qpid/python/tests_0-10/message.py
+++ b/qpid/python/tests_0-10/message.py
@@ -619,8 +619,10 @@ class MessageTests(TestBase010):
for q in [qA, qB]:
msg = q.get(timeout = 1)
self.assertEquals("Message %s" % i, msg.body)
+ #TODO: tidy up completion
session.receiver._completed.add(msg.id)
+ #TODO: tidy up completion
session.channel.session_completed(session.receiver._completed)
#messages should still be on the queue:
self.assertEquals(10, session.queue_query(queue = "q").message_count)
@@ -708,8 +710,9 @@ class MessageTests(TestBase010):
"""
Test acking of messages ranges
"""
- session = self.session
- session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ session = self.conn.session("alternate-session", timeout=10)
+
+ session.queue_declare(queue = "q", auto_delete=True)
delivery_properties = session.delivery_properties(routing_key="q")
for i in range (1, 11):
session.message_transfer(message=Message(delivery_properties, "message %s" % (i)))
@@ -718,26 +721,46 @@ class MessageTests(TestBase010):
session.message_flow(unit = 0, value = 10, destination = "a")
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
queue = session.incoming("a")
+ ids = []
for i in range (1, 11):
- self.assertEquals("message %s" % (i), queue.get(timeout = 1).body)
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message %s" % (i), msg.body)
+ ids.append(msg.id)
+
self.assertEmpty(queue)
- #ack all but the third message (command id 2)
- session.execution_complete(cumulative_execution_mark=0xFFFFFFFF, ranged_execution_set=[0,1,3,6,7,7,8,9])
- session.message_recover()
- self.assertEquals("message 3", queue.get(timeout = 1).content.body)
+ #ack all but the fourth message (command id 2)
+ accepted = RangedSet()
+ accepted.add(ids[0], ids[2])
+ accepted.add(ids[4], ids[9])
+ session.message_accept(accepted)
+
+ #subscribe from second session here to ensure queue is not
+ #auto-deleted when alternate session closes (no need to ack on these):
+ self.session.message_subscribe(queue = "q", destination = "checker")
+
+ #now close the session, and see that the unacked messages are
+ #then redelivered to another subscriber:
+ session.close(timeout=10)
+
+ session = self.session
+ session.message_flow(destination="checker", unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination="checker", unit=1, value=0xFFFFFFFF)
+ queue = session.incoming("checker")
+
+ self.assertEquals("message 4", queue.get(timeout = 1).body)
self.assertEmpty(queue)
def test_subscribe_not_acquired_2(self):
session = self.session
#publish some messages
- self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range(1, 11):
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
#consume some of them
- session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
+ session.message_subscribe(queue = "q", destination = "a")
session.message_set_flow_mode(flow_mode = 0, destination = "a")
session.message_flow(unit = 0, value = 5, destination = "a")
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
@@ -746,11 +769,15 @@ class MessageTests(TestBase010):
for i in range(1, 6):
msg = queue.get(timeout = 1)
self.assertEquals("message-%d" % (i), msg.body)
- msg.complete()
+ #complete and accept
+ session.message_accept(RangedSet(msg.id))
+ #TODO: tidy up completion
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
self.assertEmpty(queue)
#now create a not-acquired subscriber
- session.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
+ session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
#check it gets those not consumed
@@ -759,7 +786,10 @@ class MessageTests(TestBase010):
for i in range(6, 11):
msg = queue.get(timeout = 1)
self.assertEquals("message-%d" % (i), msg.body)
- msg.complete()
+ session.message_release(RangedSet(msg.id))
+ #TODO: tidy up completion
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
session.message_flow(unit = 0, value = 1, destination = "b")
self.assertEmpty(queue)
@@ -790,6 +820,10 @@ class MessageTests(TestBase010):
#check that acquire succeeds
response = session.control_queue.get(timeout=1)
self.assertEquals(response.transfers, [msg.command_id, msg.command_id])
+ session.message_release(RangedSet(msg.id))
+ session.channel._completed.add(msg.id)
+ session.channel.session_completed(session.channel._completed)
+
msg.complete()
self.assertEmpty(queue)