diff options
| author | Gordon Sim <gsim@apache.org> | 2008-03-06 17:52:59 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-03-06 17:52:59 +0000 |
| commit | 9fd4909832e16734c47c13eebbe4aca66640b1b0 (patch) | |
| tree | 6882efaaa8244086c0765f95ee1c1e9836bb088c /python/tests_0-10 | |
| parent | 19ad6741d3579b1ffce70a43271e4e4f804ad643 (diff) | |
| download | qpid-python-9fd4909832e16734c47c13eebbe4aca66640b1b0.tar.gz | |
Fixes to c++ broker:
- use final version of delivery properties where appropriate
- start sequence numbering of outgoing messages at 0
- explicit accept-mode is now 0 (no more confirm-mode)
- add default initialisers for numeric fields in methods
Converted some more python tests to use final 0-10 client.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@634368 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/tests_0-10')
| -rw-r--r-- | python/tests_0-10/broker.py | 52 | ||||
| -rw-r--r-- | python/tests_0-10/exchange.py | 38 | ||||
| -rw-r--r-- | python/tests_0-10/message.py | 572 | ||||
| -rw-r--r-- | python/tests_0-10/queue.py | 174 |
4 files changed, 426 insertions, 410 deletions
diff --git a/python/tests_0-10/broker.py b/python/tests_0-10/broker.py index 100288bbbb..25cf1241ec 100644 --- a/python/tests_0-10/broker.py +++ b/python/tests_0-10/broker.py @@ -18,9 +18,8 @@ # from qpid.client import Closed from qpid.queue import Empty -from qpid.content import Content from qpid.testlib import TestBase010 -from qpid.datatypes import Message +from qpid.datatypes import Message, RangedSet class BrokerTests(TestBase010): """Tests for basic Broker functionality""" @@ -31,28 +30,30 @@ class BrokerTests(TestBase010): consumer. Second, this test tries to explicitly receive and acknowledge a message with an acknowledging consumer. """ - ch = self.channel - self.queue_declare(ch, queue = "myqueue") + session = self.session + session.queue_declare(queue = "myqueue", exclusive=True, auto_delete=True) # No ack consumer ctag = "tag1" - self.subscribe(ch, queue = "myqueue", destination = ctag) + session.message_subscribe(queue = "myqueue", destination = ctag) + session.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF) + session.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF) body = "test no-ack" - ch.message_transfer(content = Content(body, properties = {"routing_key" : "myqueue"})) - msg = self.client.queue(ctag).get(timeout = 5) - self.assert_(msg.content.body == body) + session.message_transfer(message=Message(session.delivery_properties(routing_key="myqueue"), body)) + msg = session.incoming(ctag).get(timeout = 5) + self.assert_(msg.body == body) # Acknowledging consumer - self.queue_declare(ch, queue = "otherqueue") + session.queue_declare(queue = "otherqueue", exclusive=True, auto_delete=True) ctag = "tag2" - self.subscribe(ch, queue = "otherqueue", destination = ctag, confirm_mode = 1) - ch.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF) - ch.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF) + session.message_subscribe(queue = "otherqueue", destination = ctag, accept_mode = 1) + session.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF) + session.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF) body = "test ack" - ch.message_transfer(content = Content(body, properties = {"routing_key" : "otherqueue"})) - msg = self.client.queue(ctag).get(timeout = 5) - msg.complete() - self.assert_(msg.content.body == body) + session.message_transfer(message=Message(session.delivery_properties(routing_key="otherqueue"), body)) + msg = session.incoming(ctag).get(timeout = 5) + session.message_accept(RangedSet(msg.id)) + self.assert_(msg.body == body) def test_simple_delivery_immediate(self): """ @@ -90,22 +91,3 @@ class BrokerTests(TestBase010): queue = session.incoming(consumer_tag) msg = queue.get(timeout=5) self.assert_(msg.body == body) - - def test_invalid_channel(self): - channel = self.client.channel(200) - try: - channel.queue_declare(exclusive=True) - self.fail("Expected error on queue_declare for invalid channel") - except Closed, e: - self.assertConnectionException(504, e.args[0]) - - def test_closed_channel(self): - channel = self.client.channel(200) - channel.session_open() - channel.session_close() - try: - channel.queue_declare(exclusive=True) - self.fail("Expected error on queue_declare for closed channel") - except Closed, e: - if isinstance(e.args[0], str): self.fail(e) - self.assertConnectionException(504, e.args[0]) diff --git a/python/tests_0-10/exchange.py b/python/tests_0-10/exchange.py index 86c39b7736..afa7f05a49 100644 --- a/python/tests_0-10/exchange.py +++ b/python/tests_0-10/exchange.py @@ -37,7 +37,7 @@ class StandardExchangeVerifier: def verifyDirectExchange(self, ex): """Verify that ex behaves like a direct exchange.""" self.queue_declare(queue="q") - self.channel.queue_bind(queue="q", exchange=ex, routing_key="k") + self.session.exchange_bind(queue="q", exchange=ex, routing_key="k") self.assertPublishConsume(exchange=ex, queue="q", routing_key="k") try: self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk") @@ -47,34 +47,34 @@ class StandardExchangeVerifier: def verifyFanOutExchange(self, ex): """Verify that ex behaves like a fanout exchange.""" self.queue_declare(queue="q") - self.channel.queue_bind(queue="q", exchange=ex) + self.session.exchange_bind(queue="q", exchange=ex) self.queue_declare(queue="p") - self.channel.queue_bind(queue="p", exchange=ex) + self.session.exchange_bind(queue="p", exchange=ex) for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex) def verifyTopicExchange(self, ex): """Verify that ex behaves like a topic exchange""" self.queue_declare(queue="a") - self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*") + self.session.exchange_bind(queue="a", exchange=ex, routing_key="a.#.b.*") q = self.consume("a") self.assertPublishGet(q, ex, "a.b.x") self.assertPublishGet(q, ex, "a.x.b.x") self.assertPublishGet(q, ex, "a.x.x.b.x") # Shouldn't match - self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"})) - self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b.x.y"})) - self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"x.a.b.x"})) - self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"})) + self.session.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"})) + self.session.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b.x.y"})) + self.session.message_transfer(destination=ex, content=Content(properties={'routing_key':"x.a.b.x"})) + self.session.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"})) self.assert_(q.empty()) def verifyHeadersExchange(self, ex): """Verify that ex is a headers exchange""" self.queue_declare(queue="q") - self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} ) + self.session.exchange_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} ) q = self.consume("q") headers = {"name":"fred", "age":3} self.assertPublishGet(q, exchange=ex, properties=headers) - self.channel.message_transfer(destination=ex) # No headers, won't deliver + self.session.message_transfer(destination=ex) # No headers, won't deliver self.assertEmpty(q); @@ -215,7 +215,7 @@ class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase): """ def test(self): try: - self.channel.exchange_declare(exchange="humpty_dumpty", passive=True) + self.session.exchange_declare(exchange="humpty_dumpty", passive=True) self.fail("Expected 404 for passive declaration of unknown exchange.") except Closed, e: self.assertChannelException(404, e.args[0]) @@ -275,10 +275,10 @@ class HeadersExchangeTests(TestBase): self.assertPublishGet(self.q, exchange="amq.match", properties=headers) def myBasicPublish(self, headers): - self.channel.message_transfer(destination="amq.match", content=Content("foobar", properties={'application_headers':headers})) + self.session.message_transfer(destination="amq.match", content=Content("foobar", properties={'application_headers':headers})) def testMatchAll(self): - self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3}) + self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3}) self.myAssertPublishGet({"name":"fred", "age":3}) self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"}) @@ -290,7 +290,7 @@ class HeadersExchangeTests(TestBase): self.assertEmpty(self.q) def testMatchAny(self): - self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3}) + self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3}) self.myAssertPublishGet({"name":"fred"}) self.myAssertPublishGet({"name":"fred", "ignoreme":10}) self.myAssertPublishGet({"ignoreme":10, "age":3}) @@ -307,15 +307,15 @@ class MiscellaneousErrorsTests(TestBase): """ def testTypeNotKnown(self): try: - self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type") + self.session.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type") self.fail("Expected 503 for declaration of unknown exchange type.") except Closed, e: self.assertConnectionException(503, e.args[0]) def testDifferentDeclaredType(self): - self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct") + self.session.exchange_declare(exchange="test_different_declared_type_exchange", type="direct") try: - self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic") + self.session.exchange_declare(exchange="test_different_declared_type_exchange", type="topic") self.fail("Expected 530 for redeclaration of exchange with different type.") except Closed, e: self.assertConnectionException(530, e.args[0]) @@ -327,9 +327,9 @@ class MiscellaneousErrorsTests(TestBase): class ExchangeTests(TestBase): def testHeadersBindNoMatchArg(self): - self.channel.queue_declare(queue="q", exclusive=True, auto_delete=True) + self.session.queue_declare(queue="q", exclusive=True, auto_delete=True) try: - self.channel.queue_bind(queue="q", exchange="amq.match", arguments={"name":"fred" , "age":3} ) + self.session.exchange_bind(queue="q", exchange="amq.match", arguments={"name":"fred" , "age":3} ) self.fail("Expected failure for missing x-match arg.") except Closed, e: self.assertConnectionException(541, e.args[0]) diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index a3d32bdb2d..5f97d6c705 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -19,33 +19,34 @@ from qpid.client import Client, Closed from qpid.queue import Empty from qpid.content import Content -from qpid.testlib import testrunner, TestBase -from qpid.reference import Reference, ReferenceId -class MessageTests(TestBase): +from qpid.testlib import TestBase010 +from qpid.datatypes import Message, RangedSet + +class MessageTests(TestBase010): """Tests for 'methods' on the amqp message 'class'""" def test_consume_no_local(self): """ Test that the no_local flag is honoured in the consume method """ - channel = self.channel + session = self.session #setup, declare two queues: - channel.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True) - channel.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True) + session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True) + session.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True) #establish two consumers one of which excludes delivery of locally sent messages self.subscribe(destination="local_included", queue="test-queue-1a") self.subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True) #send a message - channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local")) - channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1b"}, body="consume_no_local")) + session.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local")) + session.message_transfer(content=Content(properties={'routing_key' : "test-queue-1b"}, body="consume_no_local")) #check the queues of the two consumers - excluded = self.client.queue("local_excluded") - included = self.client.queue("local_included") + excluded = session.incoming("local_excluded") + included = session.incoming("local_included") msg = included.get(timeout=1) - self.assertEqual("consume_no_local", msg.content.body) + self.assertEqual("consume_no_local", msg.body) try: excluded.get(timeout=1) self.fail("Received locally published message though no_local=true") @@ -62,42 +63,42 @@ class MessageTests(TestBase): deletes such messages. """ - channel = self.channel + session = self.session #setup: - channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) + session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) #establish consumer which excludes delivery of locally sent messages self.subscribe(destination="local_excluded", queue="test-queue", no_local=True) #send a 'local' message - channel.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="local")) + session.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="local")) #send a non local message other = self.connect() - channel2 = other.channel(1) - channel2.session_open() - channel2.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="foreign")) - channel2.session_close() + session2 = other.session(1) + session2.session_open() + session2.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="foreign")) + session2.session_close() other.close() #check that the second message only is delivered - excluded = self.client.queue("local_excluded") + excluded = session.incoming("local_excluded") msg = excluded.get(timeout=1) - self.assertEqual("foreign", msg.content.body) + self.assertEqual("foreign", msg.body) try: excluded.get(timeout=1) self.fail("Received extra message") except Empty: None #check queue is empty - self.assertEqual(0, channel.queue_query(queue="test-queue").message_count) + self.assertEqual(0, session.queue_query(queue="test-queue").message_count) def test_consume_exclusive(self): """ Test that the exclusive flag is honoured in the consume method """ - channel = self.channel + session = self.session #setup, declare a queue: - channel.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True) + session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True) #check that an exclusive consumer prevents other consumer being created: self.subscribe(destination="first", queue="test-queue-2", exclusive=True) @@ -107,12 +108,12 @@ class MessageTests(TestBase): except Closed, e: self.assertChannelException(403, e.args[0]) - #open new channel and cleanup last consumer: - channel = self.client.channel(2) - channel.session_open() + #open new session and cleanup last consumer: + session = self.client.session(2) + session.session_open() #check that an exclusive consumer cannot be created if a consumer already exists: - self.subscribe(channel, destination="first", queue="test-queue-2") + self.subscribe(session, destination="first", queue="test-queue-2") try: self.subscribe(destination="second", queue="test-queue-2", exclusive=True) self.fail("Expected exclusive consume request to fail due to previous consumer") @@ -123,7 +124,7 @@ class MessageTests(TestBase): """ Test error conditions associated with the queue field of the consume method: """ - channel = self.channel + session = self.session try: #queue specified but doesn't exist: self.subscribe(queue="invalid-queue", destination="") @@ -131,11 +132,11 @@ class MessageTests(TestBase): except Closed, e: self.assertChannelException(404, e.args[0]) - channel = self.client.channel(2) - channel.session_open() + session = self.client.session(2) + session.session_open() try: #queue not specified and none previously declared for channel: - self.subscribe(channel, queue="", destination="") + self.subscribe(session, queue="", destination="") self.fail("Expected failure when consuming from unspecified queue") except Closed, e: self.assertConnectionException(530, e.args[0]) @@ -144,9 +145,9 @@ class MessageTests(TestBase): """ Ensure unique consumer tags are enforced """ - channel = self.channel + session = self.session #setup, declare a queue: - channel.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True) + session.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True) #check that attempts to use duplicate tags are detected and prevented: self.subscribe(destination="first", queue="test-queue-3") @@ -160,43 +161,48 @@ class MessageTests(TestBase): """ Test compliance of the basic.cancel method """ - channel = self.channel + session = self.session #setup, declare a queue: - channel.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True) - self.subscribe(destination="my-consumer", queue="test-queue-4") - channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One")) + session.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "One")) + + session.message_subscribe(destination="my-consumer", queue="test-queue-4") + session.message_flow(destination="my-consumer", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="my-consumer", unit=1, value=0xFFFFFFFF) + + #should flush here #cancel should stop messages being delivered - channel.message_cancel(destination="my-consumer") - channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="Two")) - myqueue = self.client.queue("my-consumer") + session.message_cancel(destination="my-consumer") + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "Two")) + myqueue = session.incoming("my-consumer") msg = myqueue.get(timeout=1) - self.assertEqual("One", msg.content.body) + self.assertEqual("One", msg.body) try: msg = myqueue.get(timeout=1) self.fail("Got message after cancellation: " + msg) except Empty: None #cancellation of non-existant consumers should be handled without error - channel.message_cancel(destination="my-consumer") - channel.message_cancel(destination="this-never-existed") + session.message_cancel(destination="my-consumer") + session.message_cancel(destination="this-never-existed") def test_ack(self): """ Test basic ack/recover behaviour """ - channel = self.channel - channel.queue_declare(queue="test-ack-queue", exclusive=True, auto_delete=True) + session = self.conn.session("alternate-session", timeout=10) + session.queue_declare(queue="test-ack-queue", auto_delete=True) - self.subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1) - queue = self.client.queue("consumer_tag") + session.message_subscribe(queue = "test-ack-queue", destination = "consumer") + session.message_flow(destination="consumer", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="consumer", unit=1, value=0xFFFFFFFF) + queue = session.incoming("consumer") - channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="One")) - channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Two")) - channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Three")) - channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Four")) - channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Five")) + delivery_properties = session.delivery_properties(routing_key="test-ack-queue") + for i in ["One", "Two", "Three", "Four", "Five"]: + session.message_transfer(message=Message(delivery_properties, i)) msg1 = queue.get(timeout=1) msg2 = queue.get(timeout=1) @@ -204,26 +210,36 @@ class MessageTests(TestBase): msg4 = queue.get(timeout=1) msg5 = queue.get(timeout=1) - self.assertEqual("One", msg1.content.body) - self.assertEqual("Two", msg2.content.body) - self.assertEqual("Three", msg3.content.body) - self.assertEqual("Four", msg4.content.body) - self.assertEqual("Five", msg5.content.body) + self.assertEqual("One", msg1.body) + self.assertEqual("Two", msg2.body) + self.assertEqual("Three", msg3.body) + self.assertEqual("Four", msg4.body) + self.assertEqual("Five", msg5.body) + + session.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four + + #subscribe from second session here to ensure queue is not + #auto-deleted when alternate session closes (no need to ack on these): + self.session.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1) - msg2.complete(cumulative=True)#One and Two - msg4.complete(cumulative=False) + #now close the session, and see that the unacked messages are + #then redelivered to another subscriber: + session.close(timeout=10) - channel.message_recover(requeue=False) + session = self.session + session.message_flow(destination="checker", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="checker", unit=1, value=0xFFFFFFFF) + queue = session.incoming("checker") msg3b = queue.get(timeout=1) msg5b = queue.get(timeout=1) - self.assertEqual("Three", msg3b.content.body) - self.assertEqual("Five", msg5b.content.body) + self.assertEqual("Three", msg3b.body) + self.assertEqual("Five", msg5b.body) try: extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None @@ -231,27 +247,27 @@ class MessageTests(TestBase): """ Test recover behaviour """ - channel = self.channel - channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True) - channel.queue_bind(exchange="amq.fanout", queue="queue-a") - channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True) - channel.queue_bind(exchange="amq.fanout", queue="queue-b") + session = self.session + session.queue_declare(queue="queue-a", exclusive=True, auto_delete=True) + session.queue_bind(exchange="amq.fanout", queue="queue-a") + session.queue_declare(queue="queue-b", exclusive=True, auto_delete=True) + session.queue_bind(exchange="amq.fanout", queue="queue-b") self.subscribe(queue="queue-a", destination="unconfirmed", confirm_mode=1) self.subscribe(queue="queue-b", destination="confirmed", confirm_mode=0) - confirmed = self.client.queue("confirmed") - unconfirmed = self.client.queue("unconfirmed") + confirmed = session.incoming("confirmed") + unconfirmed = session.incoming("unconfirmed") data = ["One", "Two", "Three", "Four", "Five"] for d in data: - channel.message_transfer(destination="amq.fanout", content=Content(body=d)) + session.message_transfer(destination="amq.fanout", content=Content(body=d)) for q in [confirmed, unconfirmed]: for d in data: self.assertEqual(d, q.get(timeout=1).content.body) self.assertEmpty(q) - channel.message_recover(requeue=False) + session.message_recover(requeue=False) self.assertEmpty(confirmed) @@ -259,29 +275,29 @@ class MessageTests(TestBase): msg = None for d in data: msg = unconfirmed.get(timeout=1) - self.assertEqual(d, msg.content.body) + self.assertEqual(d, msg.body) self.assertEqual(True, msg.content['redelivered']) self.assertEmpty(unconfirmed) - data.remove(msg.content.body) + data.remove(msg.body) msg.complete(cumulative=False) - channel.message_recover(requeue=False) + session.message_recover(requeue=False) def test_recover_requeue(self): """ Test requeing on recovery """ - channel = self.channel - channel.queue_declare(queue="test-requeue", exclusive=True, auto_delete=True) + session = self.session + session.queue_declare(queue="test-requeue", exclusive=True, auto_delete=True) self.subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1) - queue = self.client.queue("consumer_tag") + queue = session.incoming("consumer_tag") - channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One")) - channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Two")) - channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Three")) - channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Four")) - channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Five")) + session.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One")) + session.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Two")) + session.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Three")) + session.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Four")) + session.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Five")) msg1 = queue.get(timeout=1) msg2 = queue.get(timeout=1) @@ -298,15 +314,15 @@ class MessageTests(TestBase): msg2.complete(cumulative=True) #One and Two msg4.complete(cumulative=False) #Four - channel.message_cancel(destination="consumer_tag") + session.message_cancel(destination="consumer_tag") #publish a new message - channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Six")) + session.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Six")) #requeue unacked messages (Three and Five) - channel.message_recover(requeue=True) + session.message_recover(requeue=True) self.subscribe(queue="test-requeue", destination="consumer_tag") - queue2 = self.client.queue("consumer_tag") + queue2 = session.incoming("consumer_tag") msg3b = queue2.get(timeout=1) msg5b = queue2.get(timeout=1) @@ -334,22 +350,22 @@ class MessageTests(TestBase): Test that the prefetch count specified is honoured """ #setup: declare queue and subscribe - channel = self.channel - channel.queue_declare(queue="test-prefetch-count", exclusive=True, auto_delete=True) + session = self.session + session.queue_declare(queue="test-prefetch-count", exclusive=True, auto_delete=True) subscription = self.subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1) - queue = self.client.queue("consumer_tag") + queue = session.incoming("consumer_tag") #set prefetch to 5: - channel.message_qos(prefetch_count=5) + session.message_qos(prefetch_count=5) #publish 10 messages: for i in range(1, 11): - channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-count"}, body="Message %d" % i)) + session.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-count"}, body="Message %d" % i)) #only 5 messages should have been delivered: for i in range(1, 6): msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) + self.assertEqual("Message %d" % i, msg.body) try: extra = queue.get(timeout=1) self.fail("Got unexpected 6th message in original queue: " + extra.content.body) @@ -360,7 +376,7 @@ class MessageTests(TestBase): for i in range(6, 11): msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) + self.assertEqual("Message %d" % i, msg.body) msg.complete() @@ -376,22 +392,22 @@ class MessageTests(TestBase): Test that the prefetch size specified is honoured """ #setup: declare queue and subscribe - channel = self.channel - channel.queue_declare(queue="test-prefetch-size", exclusive=True, auto_delete=True) + session = self.session + session.queue_declare(queue="test-prefetch-size", exclusive=True, auto_delete=True) subscription = self.subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1) - queue = self.client.queue("consumer_tag") + queue = session.incoming("consumer_tag") #set prefetch to 50 bytes (each message is 9 or 10 bytes): - channel.message_qos(prefetch_size=50) + session.message_qos(prefetch_size=50) #publish 10 messages: for i in range(1, 11): - channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body="Message %d" % i)) + session.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body="Message %d" % i)) #only 5 messages should have been delivered (i.e. 45 bytes worth): for i in range(1, 6): msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) + self.assertEqual("Message %d" % i, msg.body) try: extra = queue.get(timeout=1) @@ -403,7 +419,7 @@ class MessageTests(TestBase): for i in range(6, 11): msg = queue.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) + self.assertEqual("Message %d" % i, msg.body) msg.complete() @@ -415,54 +431,58 @@ class MessageTests(TestBase): #make sure that a single oversized message still gets delivered large = "abcdefghijklmnopqrstuvwxyz" large = large + "-" + large; - channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body=large)) + session.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body=large)) msg = queue.get(timeout=1) - self.assertEqual(large, msg.content.body) + self.assertEqual(large, msg.body) def test_reject(self): - channel = self.channel - channel.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout") - channel.queue_declare(queue = "r", exclusive=True, auto_delete=True) - channel.queue_bind(queue = "r", exchange = "amq.fanout") - - self.subscribe(queue = "q", destination = "consumer", confirm_mode = 1) - channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah")) - msg = self.client.queue("consumer").get(timeout = 1) - self.assertEquals(msg.content.body, "blah, blah") - channel.message_reject([msg.command_id, msg.command_id]) - - self.subscribe(queue = "r", destination = "checker") - msg = self.client.queue("checker").get(timeout = 1) - self.assertEquals(msg.content.body, "blah, blah") + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout") + session.queue_declare(queue = "r", exclusive=True, auto_delete=True) + session.exchange_bind(queue = "r", exchange = "amq.fanout") + + session.message_subscribe(queue = "q", destination = "consumer") + session.message_flow(destination="consumer", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="consumer", unit=1, value=0xFFFFFFFF) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "blah, blah")) + msg = session.incoming("consumer").get(timeout = 1) + self.assertEquals(msg.body, "blah, blah") + session.message_reject(RangedSet(msg.id)) + + session.message_subscribe(queue = "r", destination = "checker") + session.message_flow(destination="checker", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="checker", unit=1, value=0xFFFFFFFF) + msg = session.incoming("checker").get(timeout = 1) + self.assertEquals(msg.body, "blah, blah") def test_credit_flow_messages(self): """ Test basic credit based flow control with unit = message """ #declare an exclusive queue - channel = self.channel - channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #create consumer (for now that defaults to infinite credit) - channel.message_subscribe(queue = "q", destination = "c") - channel.message_flow_mode(mode = 0, destination = "c") + session.message_subscribe(queue = "q", destination = "c") + session.message_flow_mode(mode = 0, destination = "c") #send batch of messages to queue for i in range(1, 11): - channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) + session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) #set message credit to finite amount (less than enough for all messages) - channel.message_flow(unit = 0, value = 5, destination = "c") + session.message_flow(unit = 0, value = 5, destination = "c") #set infinite byte credit - channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") #check that expected number were received - q = self.client.queue("c") + q = session.incoming("c") for i in range(1, 6): - self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i) + self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) self.assertEmpty(q) #increase credit again and check more are received for i in range(6, 11): - channel.message_flow(unit = 0, value = 1, destination = "c") - self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i) + session.message_flow(unit = 0, value = 1, destination = "c") + self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) self.assertEmpty(q) def test_credit_flow_bytes(self): @@ -470,32 +490,32 @@ class MessageTests(TestBase): Test basic credit based flow control with unit = bytes """ #declare an exclusive queue - channel = self.channel - channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #create consumer (for now that defaults to infinite credit) - channel.message_subscribe(queue = "q", destination = "c") - channel.message_flow_mode(mode = 0, destination = "c") + session.message_subscribe(queue = "q", destination = "c") + session.message_flow_mode(mode = 0, destination = "c") #send batch of messages to queue for i in range(10): - channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) + session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) #each message is currently interpreted as requiring msg_size bytes of credit msg_size = 35 #set byte credit to finite amount (less than enough for all messages) - channel.message_flow(unit = 1, value = msg_size*5, destination = "c") + session.message_flow(unit = 1, value = msg_size*5, destination = "c") #set infinite message credit - channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") + session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") #check that expected number were received - q = self.client.queue("c") + q = session.incoming("c") for i in range(5): - self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") + self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") self.assertEmpty(q) #increase credit again and check more are received for i in range(5): - channel.message_flow(unit = 1, value = msg_size, destination = "c") - self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") + session.message_flow(unit = 1, value = msg_size, destination = "c") + self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") self.assertEmpty(q) @@ -504,30 +524,30 @@ class MessageTests(TestBase): Test basic window based flow control with unit = message """ #declare an exclusive queue - channel = self.channel - channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #create consumer (for now that defaults to infinite credit) - channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) - channel.message_flow_mode(mode = 1, destination = "c") + session.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) + session.message_flow_mode(mode = 1, destination = "c") #send batch of messages to queue for i in range(1, 11): - channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) + session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i)) #set message credit to finite amount (less than enough for all messages) - channel.message_flow(unit = 0, value = 5, destination = "c") + session.message_flow(unit = 0, value = 5, destination = "c") #set infinite byte credit - channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") #check that expected number were received - q = self.client.queue("c") + q = session.incoming("c") for i in range(1, 6): msg = q.get(timeout = 1) - self.assertDataEquals(channel, msg, "Message %d" % i) + self.assertDataEquals(session, msg, "Message %d" % i) self.assertEmpty(q) #acknowledge messages and check more are received msg.complete(cumulative=True) for i in range(6, 11): - self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i) + self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i) self.assertEmpty(q) @@ -536,291 +556,307 @@ class MessageTests(TestBase): Test basic window based flow control with unit = bytes """ #declare an exclusive queue - channel = self.channel - channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #create consumer (for now that defaults to infinite credit) - channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) - channel.message_flow_mode(mode = 1, destination = "c") + session.message_subscribe(queue = "q", destination = "c", confirm_mode = 1) + session.message_flow_mode(mode = 1, destination = "c") #send batch of messages to queue for i in range(10): - channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) + session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh")) #each message is currently interpreted as requiring msg_size bytes of credit msg_size = 40 #set byte credit to finite amount (less than enough for all messages) - channel.message_flow(unit = 1, value = msg_size*5, destination = "c") + session.message_flow(unit = 1, value = msg_size*5, destination = "c") #set infinite message credit - channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") + session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") #check that expected number were received - q = self.client.queue("c") + q = session.incoming("c") msgs = [] for i in range(5): msg = q.get(timeout = 1) msgs.append(msg) - self.assertDataEquals(channel, msg, "abcdefgh") + self.assertDataEquals(session, msg, "abcdefgh") self.assertEmpty(q) #ack each message individually and check more are received for i in range(5): msg = msgs.pop() msg.complete(cumulative=False) - self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") + self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh") self.assertEmpty(q) def test_subscribe_not_acquired(self): """ Test the not-acquired modes works as expected for a simple case """ - channel = self.channel - channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range(1, 6): - channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) + session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) self.subscribe(queue = "q", destination = "a", acquire_mode = 1) self.subscribe(queue = "q", destination = "b", acquire_mode = 1) for i in range(6, 11): - channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) + session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i)) #both subscribers should see all messages - qA = self.client.queue("a") - qB = self.client.queue("b") + qA = session.incoming("a") + qB = session.incoming("b") for i in range(1, 11): for q in [qA, qB]: msg = q.get(timeout = 1) - self.assertEquals("Message %s" % i, msg.content.body) + self.assertEquals("Message %s" % i, msg.body) msg.complete() #messages should still be on the queue: - self.assertEquals(10, channel.queue_query(queue = "q").message_count) + self.assertEquals(10, session.queue_query(queue = "q").message_count) def test_acquire(self): """ Test explicit acquire function """ - channel = self.channel - channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) - channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me")) + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + + #use fanout for now: + session.exchange_bind(exchange="amq.fanout", queue="q") + session.message_transfer(destination="amq.fanout", message=Message("acquire me")) + #session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me")) - self.subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1) - msg = self.client.queue("a").get(timeout = 1) + session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1) + session.message_flow(destination="a", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="a", unit=1, value=0xFFFFFFFF) + msg = session.incoming("a").get(timeout = 1) + self.assertEquals("acquire me", msg.body) #message should still be on the queue: - self.assertEquals(1, channel.queue_query(queue = "q").message_count) + self.assertEquals(1, session.queue_query(queue = "q").message_count) - channel.message_acquire([msg.command_id, msg.command_id]) + response = session.message_acquire(RangedSet(msg.id)) #check that we get notification (i.e. message_acquired) - response = channel.control_queue.get(timeout=1) self.assertEquals(response.transfers, [msg.command_id, msg.command_id]) #message should have been removed from the queue: - self.assertEquals(0, channel.queue_query(queue = "q").message_count) - msg.complete() - - + self.assertEquals(0, session.queue_query(queue = "q").message_count) + session.message_accept(RangedSet(msg.id)) def test_release(self): """ Test explicit release function """ - channel = self.channel - channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) - channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "release me")) + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) - self.subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1) - msg = self.client.queue("a").get(timeout = 1) - channel.message_cancel(destination = "a") - channel.message_release([msg.command_id, msg.command_id]) - msg.complete() + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "release me")) + + session.message_subscribe(queue = "q", destination = "a") + session.message_flow(destination="a", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="a", unit=1, value=0xFFFFFFFF) + msg = session.incoming("a").get(timeout = 1) + self.assertEquals("release me", msg.body) + session.message_cancel(destination = "a") + session.message_release(RangedSet(msg.id)) #message should not have been removed from the queue: - self.assertEquals(1, channel.queue_query(queue = "q").message_count) + self.assertEquals(1, session.queue_query(queue = "q").message_count) def test_release_ordering(self): """ Test order of released messages is as expected """ - channel = self.channel - channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range (1, 11): - channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "released message %s" % (i))) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "released message %s" % (i))) - channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1) - channel.message_flow(unit = 0, value = 10, destination = "a") - channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") - queue = self.client.queue("a") + session.message_subscribe(queue = "q", destination = "a") + session.message_flow(unit = 0, value = 10, destination = "a") + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + queue = session.incoming("a") first = queue.get(timeout = 1) - for i in range (2, 10): - self.assertEquals("released message %s" % (i), queue.get(timeout = 1).content.body) + for i in range(2, 10): + msg = queue.get(timeout = 1) + self.assertEquals("released message %s" % (i), msg.body) + last = queue.get(timeout = 1) self.assertEmpty(queue) - channel.message_release([first.command_id, last.command_id]) - last.complete()#will re-allocate credit, as in window mode - for i in range (1, 11): - self.assertEquals("released message %s" % (i), queue.get(timeout = 1).content.body) + released = RangedSet() + released.add(first.id, last.id) + session.message_release(released) + + #TODO: may want to clean this up... + session.receiver._completed.add(first.id, last.id) + session.channel.session_completed(session.receiver._completed) + + for i in range(1, 11): + self.assertEquals("released message %s" % (i), queue.get(timeout = 1).body) def test_ranged_ack(self): """ Test acking of messages ranges """ - channel = self.channel - channel.queue_declare(queue = "q", exclusive=True, auto_delete=True) + session = self.session + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) + delivery_properties = session.delivery_properties(routing_key="q") for i in range (1, 11): - channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message %s" % (i))) + session.message_transfer(message=Message(delivery_properties, "message %s" % (i))) - channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1) - channel.message_flow(unit = 0, value = 10, destination = "a") - channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") - queue = self.client.queue("a") + session.message_subscribe(queue = "q", destination = "a") + session.message_flow(unit = 0, value = 10, destination = "a") + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + queue = session.incoming("a") for i in range (1, 11): - self.assertEquals("message %s" % (i), queue.get(timeout = 1).content.body) + self.assertEquals("message %s" % (i), queue.get(timeout = 1).body) self.assertEmpty(queue) #ack all but the third message (command id 2) - channel.execution_complete(cumulative_execution_mark=0xFFFFFFFF, ranged_execution_set=[0,1,3,6,7,7,8,9]) - channel.message_recover() + session.execution_complete(cumulative_execution_mark=0xFFFFFFFF, ranged_execution_set=[0,1,3,6,7,7,8,9]) + session.message_recover() self.assertEquals("message 3", queue.get(timeout = 1).content.body) self.assertEmpty(queue) def test_subscribe_not_acquired_2(self): - channel = self.channel + session = self.session #publish some messages self.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range(1, 11): - channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i))) + session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i))) #consume some of them - channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1) - channel.message_flow_mode(mode = 0, destination = "a") - channel.message_flow(unit = 0, value = 5, destination = "a") - channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1) + session.message_flow_mode(mode = 0, destination = "a") + session.message_flow(unit = 0, value = 5, destination = "a") + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") - queue = self.client.queue("a") + queue = session.incoming("a") for i in range(1, 6): msg = queue.get(timeout = 1) - self.assertEquals("message-%d" % (i), msg.content.body) + self.assertEquals("message-%d" % (i), msg.body) msg.complete() self.assertEmpty(queue) #now create a not-acquired subscriber - channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) - channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") + session.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") #check it gets those not consumed - queue = self.client.queue("b") - channel.message_flow(unit = 0, value = 1, destination = "b") + queue = session.incoming("b") + session.message_flow(unit = 0, value = 1, destination = "b") for i in range(6, 11): msg = queue.get(timeout = 1) - self.assertEquals("message-%d" % (i), msg.content.body) + self.assertEquals("message-%d" % (i), msg.body) msg.complete() - channel.message_flow(unit = 0, value = 1, destination = "b") + session.message_flow(unit = 0, value = 1, destination = "b") self.assertEmpty(queue) #check all 'browsed' messages are still on the queue - self.assertEqual(5, channel.queue_query(queue="q").message_count) + self.assertEqual(5, session.queue_query(queue="q").message_count) def test_subscribe_not_acquired_3(self): - channel = self.channel + session = self.session #publish some messages self.queue_declare(queue = "q", exclusive=True, auto_delete=True) for i in range(1, 11): - channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i))) + session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i))) #create a not-acquired subscriber - channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1) - channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") - channel.message_flow(unit = 0, value = 10, destination = "a") + session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1) + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + session.message_flow(unit = 0, value = 10, destination = "a") #browse through messages - queue = self.client.queue("a") + queue = session.incoming("a") for i in range(1, 11): msg = queue.get(timeout = 1) - self.assertEquals("message-%d" % (i), msg.content.body) + self.assertEquals("message-%d" % (i), msg.body) if (i % 2): #try to acquire every second message - channel.message_acquire([msg.command_id, msg.command_id]) + session.message_acquire([msg.command_id, msg.command_id]) #check that acquire succeeds - response = channel.control_queue.get(timeout=1) + response = session.control_queue.get(timeout=1) self.assertEquals(response.transfers, [msg.command_id, msg.command_id]) msg.complete() self.assertEmpty(queue) #create a second not-acquired subscriber - channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) - channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") - channel.message_flow(unit = 0, value = 1, destination = "b") + session.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") + session.message_flow(unit = 0, value = 1, destination = "b") #check it gets those not consumed - queue = self.client.queue("b") + queue = session.incoming("b") for i in [2,4,6,8,10]: msg = queue.get(timeout = 1) - self.assertEquals("message-%d" % (i), msg.content.body) + self.assertEquals("message-%d" % (i), msg.body) msg.complete() - channel.message_flow(unit = 0, value = 1, destination = "b") + session.message_flow(unit = 0, value = 1, destination = "b") self.assertEmpty(queue) #check all 'browsed' messages are still on the queue - self.assertEqual(5, channel.queue_query(queue="q").message_count) + self.assertEqual(5, session.queue_query(queue="q").message_count) def test_release_unacquired(self): - channel = self.channel + session = self.session #create queue - self.queue_declare(queue = "q", exclusive=True, auto_delete=True, durable=True) + session.queue_declare(queue = "q", exclusive=True, auto_delete=True) #send message - channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "my-message")) #create two 'browsers' - channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1) - channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") - channel.message_flow(unit = 0, value = 10, destination = "a") - queueA = self.client.queue("a") - - channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1) - channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") - channel.message_flow(unit = 0, value = 10, destination = "b") - queueB = self.client.queue("b") + session.message_subscribe(queue = "q", destination = "a", acquire_mode=1) + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a") + session.message_flow(unit = 0, value = 10, destination = "a") + queueA = session.incoming("a") + + session.message_subscribe(queue = "q", destination = "b", acquire_mode=1) + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b") + session.message_flow(unit = 0, value = 10, destination = "b") + queueB = session.incoming("b") #have each browser release the message msgA = queueA.get(timeout = 1) - channel.message_release([msgA.command_id, msgA.command_id]) + session.message_release(RangedSet(msgA.id)) msgB = queueB.get(timeout = 1) - channel.message_release([msgB.command_id, msgB.command_id]) + session.message_release(RangedSet(msgB.id)) #cancel browsers - channel.message_cancel(destination = "a") - channel.message_cancel(destination = "b") + session.message_cancel(destination = "a") + session.message_cancel(destination = "b") #create consumer - channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1, acquire_mode=0) - channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") - channel.message_flow(unit = 0, value = 10, destination = "c") - queueC = self.client.queue("c") + session.message_subscribe(queue = "q", destination = "c") + session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") + session.message_flow(unit = 0, value = 10, destination = "c") + queueC = session.incoming("c") #consume the message then ack it msgC = queueC.get(timeout = 1) - msgC.complete() + session.message_accept(RangedSet(msgC.id)) #ensure there are no other messages self.assertEmpty(queueC) def test_no_size(self): self.queue_declare(queue = "q", exclusive=True, auto_delete=True) - ch = self.channel + ch = self.session ch.message_transfer(content=SizelessContent(properties={'routing_key' : "q"}, body="message-body")) ch.message_subscribe(queue = "q", destination="d", confirm_mode = 0) ch.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "d") ch.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "d") - queue = self.client.queue("d") + queue = session.incoming("d") msg = queue.get(timeout = 3) - self.assertEquals("message-body", msg.content.body) + self.assertEquals("message-body", msg.body) - def assertDataEquals(self, channel, msg, expected): - self.assertEquals(expected, msg.content.body) + def assertDataEquals(self, session, msg, expected): + self.assertEquals(expected, msg.body) def assertEmpty(self, queue): try: diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py index 7b3590d11b..f192a2af90 100644 --- a/python/tests_0-10/queue.py +++ b/python/tests_0-10/queue.py @@ -28,66 +28,64 @@ class QueueTests(TestBase): """ Test that the purge method removes messages from the queue """ - channel = self.channel + session = self.session #setup, declare a queue and add some messages to it: - channel.exchange_declare(exchange="test-exchange", type="direct") - channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) - channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - channel.message_transfer(destination="test-exchange", content=Content("one", properties={'routing_key':"key"})) - channel.message_transfer(destination="test-exchange", content=Content("two", properties={'routing_key':"key"})) - channel.message_transfer(destination="test-exchange", content=Content("three", properties={'routing_key':"key"})) + session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) + session.message_transfer(content=Content("one", properties={'routing_key':"test-queue"})) + session.message_transfer(content=Content("two", properties={'routing_key':"test-queue"})) + session.message_transfer(content=Content("three", properties={'routing_key':"test-queue"})) #check that the queue now reports 3 messages: - channel.queue_declare(queue="test-queue") - reply = channel.queue_query(queue="test-queue") + session.queue_declare(queue="test-queue") + reply = session.queue_query(queue="test-queue") self.assertEqual(3, reply.message_count) #now do the purge, then test that three messages are purged and the count drops to 0 - channel.queue_purge(queue="test-queue"); - reply = channel.queue_query(queue="test-queue") + session.queue_purge(queue="test-queue"); + reply = session.queue_query(queue="test-queue") self.assertEqual(0, reply.message_count) #send a further message and consume it, ensuring that the other messages are really gone - channel.message_transfer(destination="test-exchange", content=Content("four", properties={'routing_key':"key"})) + session.message_transfer(content=Content("four", properties={'routing_key':"test-queue"})) self.subscribe(queue="test-queue", destination="tag") queue = self.client.queue("tag") msg = queue.get(timeout=1) self.assertEqual("four", msg.content.body) - #check error conditions (use new channels): - channel = self.client.channel(2) - channel.session_open() + #check error conditions (use new sessions): + session = self.client.session(2) + session.session_open() try: #queue specified but doesn't exist: - channel.queue_purge(queue="invalid-queue") + session.queue_purge(queue="invalid-queue") self.fail("Expected failure when purging non-existent queue") except Closed, e: self.assertChannelException(404, e.args[0]) - channel = self.client.channel(3) - channel.session_open() + session = self.client.session(3) + session.session_open() try: #queue not specified and none previously declared for channel: - channel.queue_purge() + session.queue_purge() self.fail("Expected failure when purging unspecified queue") except Closed, e: self.assertConnectionException(530, e.args[0]) #cleanup other = self.connect() - channel = other.channel(1) - channel.session_open() - channel.exchange_delete(exchange="test-exchange") + session = other.session(1) + session.session_open() + session.exchange_delete(exchange="test-exchange") def test_declare_exclusive(self): """ Test that the exclusive field is honoured in queue.declare """ - # TestBase.setUp has already opened channel(1) - c1 = self.channel + # TestBase.setUp has already opened session(1) + c1 = self.session # Here we open a second separate connection: other = self.connect() - c2 = other.channel(1) + c2 = other.session(1) c2.session_open() #declare an exclusive queue: @@ -104,13 +102,13 @@ class QueueTests(TestBase): """ Test that the passive field is honoured in queue.declare """ - channel = self.channel + session = self.session #declare an exclusive queue: - channel.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True) - channel.queue_declare(queue="passive-queue-1", passive=True) + session.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True) + session.queue_declare(queue="passive-queue-1", passive=True) try: #other connection should not be allowed to declare this: - channel.queue_declare(queue="passive-queue-2", passive=True) + session.queue_declare(queue="passive-queue-2", passive=True) self.fail("Expected passive declaration of non-existant queue to raise a channel exception") except Closed, e: self.assertChannelException(404, e.args[0]) @@ -120,29 +118,29 @@ class QueueTests(TestBase): """ Test various permutations of the queue.bind method """ - channel = self.channel - channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True) + session = self.session + session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True) #straightforward case, both exchange & queue exist so no errors expected: - channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1") + session.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1") #use the queue name where the routing key is not specified: - channel.queue_bind(queue="queue-1", exchange="amq.direct") + session.queue_bind(queue="queue-1", exchange="amq.direct") #try and bind to non-existant exchange try: - channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1") + session.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1") self.fail("Expected bind to non-existant exchange to fail") except Closed, e: self.assertChannelException(404, e.args[0]) - #need to reopen a channel: - channel = self.client.channel(2) - channel.session_open() + #need to reopen a session: + session = self.client.session(2) + session.session_open() #try and bind non-existant queue: try: - channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1") + session.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1") self.fail("Expected bind of non-existant queue to fail") except Closed, e: self.assertChannelException(404, e.args[0]) @@ -161,10 +159,10 @@ class QueueTests(TestBase): def unbind_test(self, exchange, routing_key="", args=None, headers={}): #bind two queues and consume from them - channel = self.channel + session = self.session - channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True) - channel.queue_declare(queue="queue-2", exclusive=True, auto_delete=True) + session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True) + session.queue_declare(queue="queue-2", exclusive=True, auto_delete=True) self.subscribe(queue="queue-1", destination="queue-1") self.subscribe(queue="queue-2", destination="queue-2") @@ -172,18 +170,18 @@ class QueueTests(TestBase): queue1 = self.client.queue("queue-1") queue2 = self.client.queue("queue-2") - channel.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) - channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args) + session.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) + session.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args) #send a message that will match both bindings - channel.message_transfer(destination=exchange, + session.message_transfer(destination=exchange, content=Content("one", properties={'routing_key':routing_key, 'application_headers':headers})) #unbind first queue - channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) + session.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) #send another message - channel.message_transfer(destination=exchange, + session.message_transfer(destination=exchange, content=Content("two", properties={'routing_key':routing_key, 'application_headers':headers})) #check one queue has both messages and the other has only one @@ -205,26 +203,26 @@ class QueueTests(TestBase): """ Test core queue deletion behaviour """ - channel = self.channel + session = self.session #straight-forward case: - channel.queue_declare(queue="delete-me") - channel.message_transfer(content=Content("a", properties={'routing_key':"delete-me"})) - channel.message_transfer(content=Content("b", properties={'routing_key':"delete-me"})) - channel.message_transfer(content=Content("c", properties={'routing_key':"delete-me"})) - channel.queue_delete(queue="delete-me") + session.queue_declare(queue="delete-me") + session.message_transfer(content=Content("a", properties={'routing_key':"delete-me"})) + session.message_transfer(content=Content("b", properties={'routing_key':"delete-me"})) + session.message_transfer(content=Content("c", properties={'routing_key':"delete-me"})) + session.queue_delete(queue="delete-me") #check that it has gone be declaring passively try: - channel.queue_declare(queue="delete-me", passive=True) + session.queue_declare(queue="delete-me", passive=True) self.fail("Queue has not been deleted") except Closed, e: self.assertChannelException(404, e.args[0]) #check attempted deletion of non-existant queue is handled correctly: - channel = self.client.channel(2) - channel.session_open() + session = self.client.session(2) + session.session_open() try: - channel.queue_delete(queue="i-dont-exist", if_empty=True) + session.queue_delete(queue="i-dont-exist", if_empty=True) self.fail("Expected delete of non-existant queue to fail") except Closed, e: self.assertChannelException(404, e.args[0]) @@ -235,37 +233,37 @@ class QueueTests(TestBase): """ Test that if_empty field of queue_delete is honoured """ - channel = self.channel + session = self.session #create a queue and add a message to it (use default binding): - channel.queue_declare(queue="delete-me-2") - channel.queue_declare(queue="delete-me-2", passive=True) - channel.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"})) + session.queue_declare(queue="delete-me-2") + session.queue_declare(queue="delete-me-2", passive=True) + session.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"})) #try to delete, but only if empty: try: - channel.queue_delete(queue="delete-me-2", if_empty=True) + session.queue_delete(queue="delete-me-2", if_empty=True) self.fail("Expected delete if_empty to fail for non-empty queue") except Closed, e: self.assertChannelException(406, e.args[0]) #need new channel now: - channel = self.client.channel(2) - channel.session_open() + session = self.client.session(2) + session.session_open() #empty queue: - self.subscribe(channel, destination="consumer_tag", queue="delete-me-2") + self.subscribe(session, destination="consumer_tag", queue="delete-me-2") queue = self.client.queue("consumer_tag") msg = queue.get(timeout=1) self.assertEqual("message", msg.content.body) - channel.message_cancel(destination="consumer_tag") + session.message_cancel(destination="consumer_tag") #retry deletion on empty queue: - channel.queue_delete(queue="delete-me-2", if_empty=True) + session.queue_delete(queue="delete-me-2", if_empty=True) #check that it has gone by declaring passively: try: - channel.queue_declare(queue="delete-me-2", passive=True) + session.queue_declare(queue="delete-me-2", passive=True) self.fail("Queue has not been deleted") except Closed, e: self.assertChannelException(404, e.args[0]) @@ -274,29 +272,29 @@ class QueueTests(TestBase): """ Test that if_unused field of queue_delete is honoured """ - channel = self.channel + session = self.channel #create a queue and register a consumer: - channel.queue_declare(queue="delete-me-3") - channel.queue_declare(queue="delete-me-3", passive=True) + session.queue_declare(queue="delete-me-3") + session.queue_declare(queue="delete-me-3", passive=True) self.subscribe(destination="consumer_tag", queue="delete-me-3") - #need new channel now: - channel2 = self.client.channel(2) - channel2.session_open() + #need new session now: + session2 = self.client.session(2) + session2.session_open() #try to delete, but only if empty: try: - channel2.queue_delete(queue="delete-me-3", if_unused=True) + session2.queue_delete(queue="delete-me-3", if_unused=True) self.fail("Expected delete if_unused to fail for queue with existing consumer") except Closed, e: self.assertChannelException(406, e.args[0]) - channel.message_cancel(destination="consumer_tag") - channel.queue_delete(queue="delete-me-3", if_unused=True) + session.message_cancel(destination="consumer_tag") + session.queue_delete(queue="delete-me-3", if_unused=True) #check that it has gone by declaring passively: try: - channel.queue_declare(queue="delete-me-3", passive=True) + session.queue_declare(queue="delete-me-3", passive=True) self.fail("Queue has not been deleted") except Closed, e: self.assertChannelException(404, e.args[0]) @@ -306,31 +304,31 @@ class QueueTests(TestBase): """ Test auto-deletion (of non-exclusive queues) """ - channel = self.channel + session = self.session other = self.connect() - channel2 = other.channel(1) - channel2.session_open() + session2 = other.session(1) + session2.session_open() - channel.queue_declare(queue="auto-delete-me", auto_delete=True) + session.queue_declare(queue="auto-delete-me", auto_delete=True) - #consume from both channels - reply = channel.basic_consume(queue="auto-delete-me") - channel2.basic_consume(queue="auto-delete-me") + #consume from both sessions + reply = session.basic_consume(queue="auto-delete-me") + session2.basic_consume(queue="auto-delete-me") #implicit cancel - channel2.session_close() + session2.session_close() #check it is still there - channel.queue_declare(queue="auto-delete-me", passive=True) + session.queue_declare(queue="auto-delete-me", passive=True) #explicit cancel => queue is now unused again: - channel.basic_cancel(consumer_tag=reply.consumer_tag) + session.basic_cancel(consumer_tag=reply.consumer_tag) #NOTE: this assumes there is no timeout in use #check that it has gone be declaring passively try: - channel.queue_declare(queue="auto-delete-me", passive=True) + session.queue_declare(queue="auto-delete-me", passive=True) self.fail("Expected queue to have been deleted") except Closed, e: self.assertChannelException(404, e.args[0]) |
