summaryrefslogtreecommitdiff
path: root/python/tests_0-10
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-03-06 17:52:59 +0000
committerGordon Sim <gsim@apache.org>2008-03-06 17:52:59 +0000
commit9fd4909832e16734c47c13eebbe4aca66640b1b0 (patch)
tree6882efaaa8244086c0765f95ee1c1e9836bb088c /python/tests_0-10
parent19ad6741d3579b1ffce70a43271e4e4f804ad643 (diff)
downloadqpid-python-9fd4909832e16734c47c13eebbe4aca66640b1b0.tar.gz
Fixes to c++ broker:
- use final version of delivery properties where appropriate - start sequence numbering of outgoing messages at 0 - explicit accept-mode is now 0 (no more confirm-mode) - add default initialisers for numeric fields in methods Converted some more python tests to use final 0-10 client. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@634368 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/tests_0-10')
-rw-r--r--python/tests_0-10/broker.py52
-rw-r--r--python/tests_0-10/exchange.py38
-rw-r--r--python/tests_0-10/message.py572
-rw-r--r--python/tests_0-10/queue.py174
4 files changed, 426 insertions, 410 deletions
diff --git a/python/tests_0-10/broker.py b/python/tests_0-10/broker.py
index 100288bbbb..25cf1241ec 100644
--- a/python/tests_0-10/broker.py
+++ b/python/tests_0-10/broker.py
@@ -18,9 +18,8 @@
#
from qpid.client import Closed
from qpid.queue import Empty
-from qpid.content import Content
from qpid.testlib import TestBase010
-from qpid.datatypes import Message
+from qpid.datatypes import Message, RangedSet
class BrokerTests(TestBase010):
"""Tests for basic Broker functionality"""
@@ -31,28 +30,30 @@ class BrokerTests(TestBase010):
consumer. Second, this test tries to explicitly receive and
acknowledge a message with an acknowledging consumer.
"""
- ch = self.channel
- self.queue_declare(ch, queue = "myqueue")
+ session = self.session
+ session.queue_declare(queue = "myqueue", exclusive=True, auto_delete=True)
# No ack consumer
ctag = "tag1"
- self.subscribe(ch, queue = "myqueue", destination = ctag)
+ session.message_subscribe(queue = "myqueue", destination = ctag)
+ session.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF)
body = "test no-ack"
- ch.message_transfer(content = Content(body, properties = {"routing_key" : "myqueue"}))
- msg = self.client.queue(ctag).get(timeout = 5)
- self.assert_(msg.content.body == body)
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="myqueue"), body))
+ msg = session.incoming(ctag).get(timeout = 5)
+ self.assert_(msg.body == body)
# Acknowledging consumer
- self.queue_declare(ch, queue = "otherqueue")
+ session.queue_declare(queue = "otherqueue", exclusive=True, auto_delete=True)
ctag = "tag2"
- self.subscribe(ch, queue = "otherqueue", destination = ctag, confirm_mode = 1)
- ch.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF)
- ch.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF)
+ session.message_subscribe(queue = "otherqueue", destination = ctag, accept_mode = 1)
+ session.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF)
body = "test ack"
- ch.message_transfer(content = Content(body, properties = {"routing_key" : "otherqueue"}))
- msg = self.client.queue(ctag).get(timeout = 5)
- msg.complete()
- self.assert_(msg.content.body == body)
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="otherqueue"), body))
+ msg = session.incoming(ctag).get(timeout = 5)
+ session.message_accept(RangedSet(msg.id))
+ self.assert_(msg.body == body)
def test_simple_delivery_immediate(self):
"""
@@ -90,22 +91,3 @@ class BrokerTests(TestBase010):
queue = session.incoming(consumer_tag)
msg = queue.get(timeout=5)
self.assert_(msg.body == body)
-
- def test_invalid_channel(self):
- channel = self.client.channel(200)
- try:
- channel.queue_declare(exclusive=True)
- self.fail("Expected error on queue_declare for invalid channel")
- except Closed, e:
- self.assertConnectionException(504, e.args[0])
-
- def test_closed_channel(self):
- channel = self.client.channel(200)
- channel.session_open()
- channel.session_close()
- try:
- channel.queue_declare(exclusive=True)
- self.fail("Expected error on queue_declare for closed channel")
- except Closed, e:
- if isinstance(e.args[0], str): self.fail(e)
- self.assertConnectionException(504, e.args[0])
diff --git a/python/tests_0-10/exchange.py b/python/tests_0-10/exchange.py
index 86c39b7736..afa7f05a49 100644
--- a/python/tests_0-10/exchange.py
+++ b/python/tests_0-10/exchange.py
@@ -37,7 +37,7 @@ class StandardExchangeVerifier:
def verifyDirectExchange(self, ex):
"""Verify that ex behaves like a direct exchange."""
self.queue_declare(queue="q")
- self.channel.queue_bind(queue="q", exchange=ex, routing_key="k")
+ self.session.exchange_bind(queue="q", exchange=ex, routing_key="k")
self.assertPublishConsume(exchange=ex, queue="q", routing_key="k")
try:
self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk")
@@ -47,34 +47,34 @@ class StandardExchangeVerifier:
def verifyFanOutExchange(self, ex):
"""Verify that ex behaves like a fanout exchange."""
self.queue_declare(queue="q")
- self.channel.queue_bind(queue="q", exchange=ex)
+ self.session.exchange_bind(queue="q", exchange=ex)
self.queue_declare(queue="p")
- self.channel.queue_bind(queue="p", exchange=ex)
+ self.session.exchange_bind(queue="p", exchange=ex)
for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex)
def verifyTopicExchange(self, ex):
"""Verify that ex behaves like a topic exchange"""
self.queue_declare(queue="a")
- self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*")
+ self.session.exchange_bind(queue="a", exchange=ex, routing_key="a.#.b.*")
q = self.consume("a")
self.assertPublishGet(q, ex, "a.b.x")
self.assertPublishGet(q, ex, "a.x.b.x")
self.assertPublishGet(q, ex, "a.x.x.b.x")
# Shouldn't match
- self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"}))
- self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b.x.y"}))
- self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"x.a.b.x"}))
- self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"}))
+ self.session.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"}))
+ self.session.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b.x.y"}))
+ self.session.message_transfer(destination=ex, content=Content(properties={'routing_key':"x.a.b.x"}))
+ self.session.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"}))
self.assert_(q.empty())
def verifyHeadersExchange(self, ex):
"""Verify that ex is a headers exchange"""
self.queue_declare(queue="q")
- self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} )
+ self.session.exchange_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} )
q = self.consume("q")
headers = {"name":"fred", "age":3}
self.assertPublishGet(q, exchange=ex, properties=headers)
- self.channel.message_transfer(destination=ex) # No headers, won't deliver
+ self.session.message_transfer(destination=ex) # No headers, won't deliver
self.assertEmpty(q);
@@ -215,7 +215,7 @@ class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase):
"""
def test(self):
try:
- self.channel.exchange_declare(exchange="humpty_dumpty", passive=True)
+ self.session.exchange_declare(exchange="humpty_dumpty", passive=True)
self.fail("Expected 404 for passive declaration of unknown exchange.")
except Closed, e:
self.assertChannelException(404, e.args[0])
@@ -275,10 +275,10 @@ class HeadersExchangeTests(TestBase):
self.assertPublishGet(self.q, exchange="amq.match", properties=headers)
def myBasicPublish(self, headers):
- self.channel.message_transfer(destination="amq.match", content=Content("foobar", properties={'application_headers':headers}))
+ self.session.message_transfer(destination="amq.match", content=Content("foobar", properties={'application_headers':headers}))
def testMatchAll(self):
- self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
+ self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
self.myAssertPublishGet({"name":"fred", "age":3})
self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"})
@@ -290,7 +290,7 @@ class HeadersExchangeTests(TestBase):
self.assertEmpty(self.q)
def testMatchAny(self):
- self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3})
+ self.session.exchange_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3})
self.myAssertPublishGet({"name":"fred"})
self.myAssertPublishGet({"name":"fred", "ignoreme":10})
self.myAssertPublishGet({"ignoreme":10, "age":3})
@@ -307,15 +307,15 @@ class MiscellaneousErrorsTests(TestBase):
"""
def testTypeNotKnown(self):
try:
- self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type")
+ self.session.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type")
self.fail("Expected 503 for declaration of unknown exchange type.")
except Closed, e:
self.assertConnectionException(503, e.args[0])
def testDifferentDeclaredType(self):
- self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
+ self.session.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
try:
- self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic")
+ self.session.exchange_declare(exchange="test_different_declared_type_exchange", type="topic")
self.fail("Expected 530 for redeclaration of exchange with different type.")
except Closed, e:
self.assertConnectionException(530, e.args[0])
@@ -327,9 +327,9 @@ class MiscellaneousErrorsTests(TestBase):
class ExchangeTests(TestBase):
def testHeadersBindNoMatchArg(self):
- self.channel.queue_declare(queue="q", exclusive=True, auto_delete=True)
+ self.session.queue_declare(queue="q", exclusive=True, auto_delete=True)
try:
- self.channel.queue_bind(queue="q", exchange="amq.match", arguments={"name":"fred" , "age":3} )
+ self.session.exchange_bind(queue="q", exchange="amq.match", arguments={"name":"fred" , "age":3} )
self.fail("Expected failure for missing x-match arg.")
except Closed, e:
self.assertConnectionException(541, e.args[0])
diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py
index a3d32bdb2d..5f97d6c705 100644
--- a/python/tests_0-10/message.py
+++ b/python/tests_0-10/message.py
@@ -19,33 +19,34 @@
from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
-from qpid.reference import Reference, ReferenceId
-class MessageTests(TestBase):
+from qpid.testlib import TestBase010
+from qpid.datatypes import Message, RangedSet
+
+class MessageTests(TestBase010):
"""Tests for 'methods' on the amqp message 'class'"""
def test_consume_no_local(self):
"""
Test that the no_local flag is honoured in the consume method
"""
- channel = self.channel
+ session = self.session
#setup, declare two queues:
- channel.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True)
#establish two consumers one of which excludes delivery of locally sent messages
self.subscribe(destination="local_included", queue="test-queue-1a")
self.subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True)
#send a message
- channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local"))
- channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1b"}, body="consume_no_local"))
+ session.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local"))
+ session.message_transfer(content=Content(properties={'routing_key' : "test-queue-1b"}, body="consume_no_local"))
#check the queues of the two consumers
- excluded = self.client.queue("local_excluded")
- included = self.client.queue("local_included")
+ excluded = session.incoming("local_excluded")
+ included = session.incoming("local_included")
msg = included.get(timeout=1)
- self.assertEqual("consume_no_local", msg.content.body)
+ self.assertEqual("consume_no_local", msg.body)
try:
excluded.get(timeout=1)
self.fail("Received locally published message though no_local=true")
@@ -62,42 +63,42 @@ class MessageTests(TestBase):
deletes such messages.
"""
- channel = self.channel
+ session = self.session
#setup:
- channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
#establish consumer which excludes delivery of locally sent messages
self.subscribe(destination="local_excluded", queue="test-queue", no_local=True)
#send a 'local' message
- channel.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="local"))
+ session.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="local"))
#send a non local message
other = self.connect()
- channel2 = other.channel(1)
- channel2.session_open()
- channel2.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="foreign"))
- channel2.session_close()
+ session2 = other.session(1)
+ session2.session_open()
+ session2.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="foreign"))
+ session2.session_close()
other.close()
#check that the second message only is delivered
- excluded = self.client.queue("local_excluded")
+ excluded = session.incoming("local_excluded")
msg = excluded.get(timeout=1)
- self.assertEqual("foreign", msg.content.body)
+ self.assertEqual("foreign", msg.body)
try:
excluded.get(timeout=1)
self.fail("Received extra message")
except Empty: None
#check queue is empty
- self.assertEqual(0, channel.queue_query(queue="test-queue").message_count)
+ self.assertEqual(0, session.queue_query(queue="test-queue").message_count)
def test_consume_exclusive(self):
"""
Test that the exclusive flag is honoured in the consume method
"""
- channel = self.channel
+ session = self.session
#setup, declare a queue:
- channel.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
#check that an exclusive consumer prevents other consumer being created:
self.subscribe(destination="first", queue="test-queue-2", exclusive=True)
@@ -107,12 +108,12 @@ class MessageTests(TestBase):
except Closed, e:
self.assertChannelException(403, e.args[0])
- #open new channel and cleanup last consumer:
- channel = self.client.channel(2)
- channel.session_open()
+ #open new session and cleanup last consumer:
+ session = self.client.session(2)
+ session.session_open()
#check that an exclusive consumer cannot be created if a consumer already exists:
- self.subscribe(channel, destination="first", queue="test-queue-2")
+ self.subscribe(session, destination="first", queue="test-queue-2")
try:
self.subscribe(destination="second", queue="test-queue-2", exclusive=True)
self.fail("Expected exclusive consume request to fail due to previous consumer")
@@ -123,7 +124,7 @@ class MessageTests(TestBase):
"""
Test error conditions associated with the queue field of the consume method:
"""
- channel = self.channel
+ session = self.session
try:
#queue specified but doesn't exist:
self.subscribe(queue="invalid-queue", destination="")
@@ -131,11 +132,11 @@ class MessageTests(TestBase):
except Closed, e:
self.assertChannelException(404, e.args[0])
- channel = self.client.channel(2)
- channel.session_open()
+ session = self.client.session(2)
+ session.session_open()
try:
#queue not specified and none previously declared for channel:
- self.subscribe(channel, queue="", destination="")
+ self.subscribe(session, queue="", destination="")
self.fail("Expected failure when consuming from unspecified queue")
except Closed, e:
self.assertConnectionException(530, e.args[0])
@@ -144,9 +145,9 @@ class MessageTests(TestBase):
"""
Ensure unique consumer tags are enforced
"""
- channel = self.channel
+ session = self.session
#setup, declare a queue:
- channel.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True)
#check that attempts to use duplicate tags are detected and prevented:
self.subscribe(destination="first", queue="test-queue-3")
@@ -160,43 +161,48 @@ class MessageTests(TestBase):
"""
Test compliance of the basic.cancel method
"""
- channel = self.channel
+ session = self.session
#setup, declare a queue:
- channel.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True)
- self.subscribe(destination="my-consumer", queue="test-queue-4")
- channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One"))
+ session.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True)
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "One"))
+
+ session.message_subscribe(destination="my-consumer", queue="test-queue-4")
+ session.message_flow(destination="my-consumer", unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination="my-consumer", unit=1, value=0xFFFFFFFF)
+
+ #should flush here
#cancel should stop messages being delivered
- channel.message_cancel(destination="my-consumer")
- channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="Two"))
- myqueue = self.client.queue("my-consumer")
+ session.message_cancel(destination="my-consumer")
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "Two"))
+ myqueue = session.incoming("my-consumer")
msg = myqueue.get(timeout=1)
- self.assertEqual("One", msg.content.body)
+ self.assertEqual("One", msg.body)
try:
msg = myqueue.get(timeout=1)
self.fail("Got message after cancellation: " + msg)
except Empty: None
#cancellation of non-existant consumers should be handled without error
- channel.message_cancel(destination="my-consumer")
- channel.message_cancel(destination="this-never-existed")
+ session.message_cancel(destination="my-consumer")
+ session.message_cancel(destination="this-never-existed")
def test_ack(self):
"""
Test basic ack/recover behaviour
"""
- channel = self.channel
- channel.queue_declare(queue="test-ack-queue", exclusive=True, auto_delete=True)
+ session = self.conn.session("alternate-session", timeout=10)
+ session.queue_declare(queue="test-ack-queue", auto_delete=True)
- self.subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1)
- queue = self.client.queue("consumer_tag")
+ session.message_subscribe(queue = "test-ack-queue", destination = "consumer")
+ session.message_flow(destination="consumer", unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination="consumer", unit=1, value=0xFFFFFFFF)
+ queue = session.incoming("consumer")
- channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="One"))
- channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Two"))
- channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Three"))
- channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Four"))
- channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Five"))
+ delivery_properties = session.delivery_properties(routing_key="test-ack-queue")
+ for i in ["One", "Two", "Three", "Four", "Five"]:
+ session.message_transfer(message=Message(delivery_properties, i))
msg1 = queue.get(timeout=1)
msg2 = queue.get(timeout=1)
@@ -204,26 +210,36 @@ class MessageTests(TestBase):
msg4 = queue.get(timeout=1)
msg5 = queue.get(timeout=1)
- self.assertEqual("One", msg1.content.body)
- self.assertEqual("Two", msg2.content.body)
- self.assertEqual("Three", msg3.content.body)
- self.assertEqual("Four", msg4.content.body)
- self.assertEqual("Five", msg5.content.body)
+ self.assertEqual("One", msg1.body)
+ self.assertEqual("Two", msg2.body)
+ self.assertEqual("Three", msg3.body)
+ self.assertEqual("Four", msg4.body)
+ self.assertEqual("Five", msg5.body)
+
+ session.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four
+
+ #subscribe from second session here to ensure queue is not
+ #auto-deleted when alternate session closes (no need to ack on these):
+ self.session.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1)
- msg2.complete(cumulative=True)#One and Two
- msg4.complete(cumulative=False)
+ #now close the session, and see that the unacked messages are
+ #then redelivered to another subscriber:
+ session.close(timeout=10)
- channel.message_recover(requeue=False)
+ session = self.session
+ session.message_flow(destination="checker", unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination="checker", unit=1, value=0xFFFFFFFF)
+ queue = session.incoming("checker")
msg3b = queue.get(timeout=1)
msg5b = queue.get(timeout=1)
- self.assertEqual("Three", msg3b.content.body)
- self.assertEqual("Five", msg5b.content.body)
+ self.assertEqual("Three", msg3b.body)
+ self.assertEqual("Five", msg5b.body)
try:
extra = queue.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
+ self.fail("Got unexpected message: " + extra.body)
except Empty: None
@@ -231,27 +247,27 @@ class MessageTests(TestBase):
"""
Test recover behaviour
"""
- channel = self.channel
- channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True)
- channel.queue_bind(exchange="amq.fanout", queue="queue-a")
- channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
- channel.queue_bind(exchange="amq.fanout", queue="queue-b")
+ session = self.session
+ session.queue_declare(queue="queue-a", exclusive=True, auto_delete=True)
+ session.queue_bind(exchange="amq.fanout", queue="queue-a")
+ session.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
+ session.queue_bind(exchange="amq.fanout", queue="queue-b")
self.subscribe(queue="queue-a", destination="unconfirmed", confirm_mode=1)
self.subscribe(queue="queue-b", destination="confirmed", confirm_mode=0)
- confirmed = self.client.queue("confirmed")
- unconfirmed = self.client.queue("unconfirmed")
+ confirmed = session.incoming("confirmed")
+ unconfirmed = session.incoming("unconfirmed")
data = ["One", "Two", "Three", "Four", "Five"]
for d in data:
- channel.message_transfer(destination="amq.fanout", content=Content(body=d))
+ session.message_transfer(destination="amq.fanout", content=Content(body=d))
for q in [confirmed, unconfirmed]:
for d in data:
self.assertEqual(d, q.get(timeout=1).content.body)
self.assertEmpty(q)
- channel.message_recover(requeue=False)
+ session.message_recover(requeue=False)
self.assertEmpty(confirmed)
@@ -259,29 +275,29 @@ class MessageTests(TestBase):
msg = None
for d in data:
msg = unconfirmed.get(timeout=1)
- self.assertEqual(d, msg.content.body)
+ self.assertEqual(d, msg.body)
self.assertEqual(True, msg.content['redelivered'])
self.assertEmpty(unconfirmed)
- data.remove(msg.content.body)
+ data.remove(msg.body)
msg.complete(cumulative=False)
- channel.message_recover(requeue=False)
+ session.message_recover(requeue=False)
def test_recover_requeue(self):
"""
Test requeing on recovery
"""
- channel = self.channel
- channel.queue_declare(queue="test-requeue", exclusive=True, auto_delete=True)
+ session = self.session
+ session.queue_declare(queue="test-requeue", exclusive=True, auto_delete=True)
self.subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1)
- queue = self.client.queue("consumer_tag")
+ queue = session.incoming("consumer_tag")
- channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One"))
- channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Two"))
- channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Three"))
- channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Four"))
- channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Five"))
+ session.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One"))
+ session.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Two"))
+ session.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Three"))
+ session.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Four"))
+ session.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Five"))
msg1 = queue.get(timeout=1)
msg2 = queue.get(timeout=1)
@@ -298,15 +314,15 @@ class MessageTests(TestBase):
msg2.complete(cumulative=True) #One and Two
msg4.complete(cumulative=False) #Four
- channel.message_cancel(destination="consumer_tag")
+ session.message_cancel(destination="consumer_tag")
#publish a new message
- channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Six"))
+ session.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Six"))
#requeue unacked messages (Three and Five)
- channel.message_recover(requeue=True)
+ session.message_recover(requeue=True)
self.subscribe(queue="test-requeue", destination="consumer_tag")
- queue2 = self.client.queue("consumer_tag")
+ queue2 = session.incoming("consumer_tag")
msg3b = queue2.get(timeout=1)
msg5b = queue2.get(timeout=1)
@@ -334,22 +350,22 @@ class MessageTests(TestBase):
Test that the prefetch count specified is honoured
"""
#setup: declare queue and subscribe
- channel = self.channel
- channel.queue_declare(queue="test-prefetch-count", exclusive=True, auto_delete=True)
+ session = self.session
+ session.queue_declare(queue="test-prefetch-count", exclusive=True, auto_delete=True)
subscription = self.subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1)
- queue = self.client.queue("consumer_tag")
+ queue = session.incoming("consumer_tag")
#set prefetch to 5:
- channel.message_qos(prefetch_count=5)
+ session.message_qos(prefetch_count=5)
#publish 10 messages:
for i in range(1, 11):
- channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-count"}, body="Message %d" % i))
+ session.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-count"}, body="Message %d" % i))
#only 5 messages should have been delivered:
for i in range(1, 6):
msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
+ self.assertEqual("Message %d" % i, msg.body)
try:
extra = queue.get(timeout=1)
self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
@@ -360,7 +376,7 @@ class MessageTests(TestBase):
for i in range(6, 11):
msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
+ self.assertEqual("Message %d" % i, msg.body)
msg.complete()
@@ -376,22 +392,22 @@ class MessageTests(TestBase):
Test that the prefetch size specified is honoured
"""
#setup: declare queue and subscribe
- channel = self.channel
- channel.queue_declare(queue="test-prefetch-size", exclusive=True, auto_delete=True)
+ session = self.session
+ session.queue_declare(queue="test-prefetch-size", exclusive=True, auto_delete=True)
subscription = self.subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1)
- queue = self.client.queue("consumer_tag")
+ queue = session.incoming("consumer_tag")
#set prefetch to 50 bytes (each message is 9 or 10 bytes):
- channel.message_qos(prefetch_size=50)
+ session.message_qos(prefetch_size=50)
#publish 10 messages:
for i in range(1, 11):
- channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body="Message %d" % i))
+ session.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body="Message %d" % i))
#only 5 messages should have been delivered (i.e. 45 bytes worth):
for i in range(1, 6):
msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
+ self.assertEqual("Message %d" % i, msg.body)
try:
extra = queue.get(timeout=1)
@@ -403,7 +419,7 @@ class MessageTests(TestBase):
for i in range(6, 11):
msg = queue.get(timeout=1)
- self.assertEqual("Message %d" % i, msg.content.body)
+ self.assertEqual("Message %d" % i, msg.body)
msg.complete()
@@ -415,54 +431,58 @@ class MessageTests(TestBase):
#make sure that a single oversized message still gets delivered
large = "abcdefghijklmnopqrstuvwxyz"
large = large + "-" + large;
- channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body=large))
+ session.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body=large))
msg = queue.get(timeout=1)
- self.assertEqual(large, msg.content.body)
+ self.assertEqual(large, msg.body)
def test_reject(self):
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout")
- channel.queue_declare(queue = "r", exclusive=True, auto_delete=True)
- channel.queue_bind(queue = "r", exchange = "amq.fanout")
-
- self.subscribe(queue = "q", destination = "consumer", confirm_mode = 1)
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah"))
- msg = self.client.queue("consumer").get(timeout = 1)
- self.assertEquals(msg.content.body, "blah, blah")
- channel.message_reject([msg.command_id, msg.command_id])
-
- self.subscribe(queue = "r", destination = "checker")
- msg = self.client.queue("checker").get(timeout = 1)
- self.assertEquals(msg.content.body, "blah, blah")
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout")
+ session.queue_declare(queue = "r", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue = "r", exchange = "amq.fanout")
+
+ session.message_subscribe(queue = "q", destination = "consumer")
+ session.message_flow(destination="consumer", unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination="consumer", unit=1, value=0xFFFFFFFF)
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "blah, blah"))
+ msg = session.incoming("consumer").get(timeout = 1)
+ self.assertEquals(msg.body, "blah, blah")
+ session.message_reject(RangedSet(msg.id))
+
+ session.message_subscribe(queue = "r", destination = "checker")
+ session.message_flow(destination="checker", unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination="checker", unit=1, value=0xFFFFFFFF)
+ msg = session.incoming("checker").get(timeout = 1)
+ self.assertEquals(msg.body, "blah, blah")
def test_credit_flow_messages(self):
"""
Test basic credit based flow control with unit = message
"""
#declare an exclusive queue
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
- channel.message_subscribe(queue = "q", destination = "c")
- channel.message_flow_mode(mode = 0, destination = "c")
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_flow_mode(mode = 0, destination = "c")
#send batch of messages to queue
for i in range(1, 11):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
+ session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
#set message credit to finite amount (less than enough for all messages)
- channel.message_flow(unit = 0, value = 5, destination = "c")
+ session.message_flow(unit = 0, value = 5, destination = "c")
#set infinite byte credit
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
#check that expected number were received
- q = self.client.queue("c")
+ q = session.incoming("c")
for i in range(1, 6):
- self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+ self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i)
self.assertEmpty(q)
#increase credit again and check more are received
for i in range(6, 11):
- channel.message_flow(unit = 0, value = 1, destination = "c")
- self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+ session.message_flow(unit = 0, value = 1, destination = "c")
+ self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i)
self.assertEmpty(q)
def test_credit_flow_bytes(self):
@@ -470,32 +490,32 @@ class MessageTests(TestBase):
Test basic credit based flow control with unit = bytes
"""
#declare an exclusive queue
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
- channel.message_subscribe(queue = "q", destination = "c")
- channel.message_flow_mode(mode = 0, destination = "c")
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_flow_mode(mode = 0, destination = "c")
#send batch of messages to queue
for i in range(10):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
+ session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
#each message is currently interpreted as requiring msg_size bytes of credit
msg_size = 35
#set byte credit to finite amount (less than enough for all messages)
- channel.message_flow(unit = 1, value = msg_size*5, destination = "c")
+ session.message_flow(unit = 1, value = msg_size*5, destination = "c")
#set infinite message credit
- channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
+ session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
#check that expected number were received
- q = self.client.queue("c")
+ q = session.incoming("c")
for i in range(5):
- self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+ self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
self.assertEmpty(q)
#increase credit again and check more are received
for i in range(5):
- channel.message_flow(unit = 1, value = msg_size, destination = "c")
- self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+ session.message_flow(unit = 1, value = msg_size, destination = "c")
+ self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
self.assertEmpty(q)
@@ -504,30 +524,30 @@ class MessageTests(TestBase):
Test basic window based flow control with unit = message
"""
#declare an exclusive queue
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
- channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
- channel.message_flow_mode(mode = 1, destination = "c")
+ session.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
+ session.message_flow_mode(mode = 1, destination = "c")
#send batch of messages to queue
for i in range(1, 11):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
+ session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
#set message credit to finite amount (less than enough for all messages)
- channel.message_flow(unit = 0, value = 5, destination = "c")
+ session.message_flow(unit = 0, value = 5, destination = "c")
#set infinite byte credit
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
#check that expected number were received
- q = self.client.queue("c")
+ q = session.incoming("c")
for i in range(1, 6):
msg = q.get(timeout = 1)
- self.assertDataEquals(channel, msg, "Message %d" % i)
+ self.assertDataEquals(session, msg, "Message %d" % i)
self.assertEmpty(q)
#acknowledge messages and check more are received
msg.complete(cumulative=True)
for i in range(6, 11):
- self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+ self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i)
self.assertEmpty(q)
@@ -536,291 +556,307 @@ class MessageTests(TestBase):
Test basic window based flow control with unit = bytes
"""
#declare an exclusive queue
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#create consumer (for now that defaults to infinite credit)
- channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
- channel.message_flow_mode(mode = 1, destination = "c")
+ session.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
+ session.message_flow_mode(mode = 1, destination = "c")
#send batch of messages to queue
for i in range(10):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
+ session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
#each message is currently interpreted as requiring msg_size bytes of credit
msg_size = 40
#set byte credit to finite amount (less than enough for all messages)
- channel.message_flow(unit = 1, value = msg_size*5, destination = "c")
+ session.message_flow(unit = 1, value = msg_size*5, destination = "c")
#set infinite message credit
- channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
+ session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
#check that expected number were received
- q = self.client.queue("c")
+ q = session.incoming("c")
msgs = []
for i in range(5):
msg = q.get(timeout = 1)
msgs.append(msg)
- self.assertDataEquals(channel, msg, "abcdefgh")
+ self.assertDataEquals(session, msg, "abcdefgh")
self.assertEmpty(q)
#ack each message individually and check more are received
for i in range(5):
msg = msgs.pop()
msg.complete(cumulative=False)
- self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+ self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
self.assertEmpty(q)
def test_subscribe_not_acquired(self):
"""
Test the not-acquired modes works as expected for a simple case
"""
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range(1, 6):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+ session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
self.subscribe(queue = "q", destination = "a", acquire_mode = 1)
self.subscribe(queue = "q", destination = "b", acquire_mode = 1)
for i in range(6, 11):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+ session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
#both subscribers should see all messages
- qA = self.client.queue("a")
- qB = self.client.queue("b")
+ qA = session.incoming("a")
+ qB = session.incoming("b")
for i in range(1, 11):
for q in [qA, qB]:
msg = q.get(timeout = 1)
- self.assertEquals("Message %s" % i, msg.content.body)
+ self.assertEquals("Message %s" % i, msg.body)
msg.complete()
#messages should still be on the queue:
- self.assertEquals(10, channel.queue_query(queue = "q").message_count)
+ self.assertEquals(10, session.queue_query(queue = "q").message_count)
def test_acquire(self):
"""
Test explicit acquire function
"""
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me"))
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+ #use fanout for now:
+ session.exchange_bind(exchange="amq.fanout", queue="q")
+ session.message_transfer(destination="amq.fanout", message=Message("acquire me"))
+ #session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me"))
- self.subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1)
- msg = self.client.queue("a").get(timeout = 1)
+ session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
+ session.message_flow(destination="a", unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination="a", unit=1, value=0xFFFFFFFF)
+ msg = session.incoming("a").get(timeout = 1)
+ self.assertEquals("acquire me", msg.body)
#message should still be on the queue:
- self.assertEquals(1, channel.queue_query(queue = "q").message_count)
+ self.assertEquals(1, session.queue_query(queue = "q").message_count)
- channel.message_acquire([msg.command_id, msg.command_id])
+ response = session.message_acquire(RangedSet(msg.id))
#check that we get notification (i.e. message_acquired)
- response = channel.control_queue.get(timeout=1)
self.assertEquals(response.transfers, [msg.command_id, msg.command_id])
#message should have been removed from the queue:
- self.assertEquals(0, channel.queue_query(queue = "q").message_count)
- msg.complete()
-
-
+ self.assertEquals(0, session.queue_query(queue = "q").message_count)
+ session.message_accept(RangedSet(msg.id))
def test_release(self):
"""
Test explicit release function
"""
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "release me"))
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
- self.subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1)
- msg = self.client.queue("a").get(timeout = 1)
- channel.message_cancel(destination = "a")
- channel.message_release([msg.command_id, msg.command_id])
- msg.complete()
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "release me"))
+
+ session.message_subscribe(queue = "q", destination = "a")
+ session.message_flow(destination="a", unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination="a", unit=1, value=0xFFFFFFFF)
+ msg = session.incoming("a").get(timeout = 1)
+ self.assertEquals("release me", msg.body)
+ session.message_cancel(destination = "a")
+ session.message_release(RangedSet(msg.id))
#message should not have been removed from the queue:
- self.assertEquals(1, channel.queue_query(queue = "q").message_count)
+ self.assertEquals(1, session.queue_query(queue = "q").message_count)
def test_release_ordering(self):
"""
Test order of released messages is as expected
"""
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range (1, 11):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "released message %s" % (i)))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "released message %s" % (i)))
- channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
- channel.message_flow(unit = 0, value = 10, destination = "a")
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
- queue = self.client.queue("a")
+ session.message_subscribe(queue = "q", destination = "a")
+ session.message_flow(unit = 0, value = 10, destination = "a")
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ queue = session.incoming("a")
first = queue.get(timeout = 1)
- for i in range (2, 10):
- self.assertEquals("released message %s" % (i), queue.get(timeout = 1).content.body)
+ for i in range(2, 10):
+ msg = queue.get(timeout = 1)
+ self.assertEquals("released message %s" % (i), msg.body)
+
last = queue.get(timeout = 1)
self.assertEmpty(queue)
- channel.message_release([first.command_id, last.command_id])
- last.complete()#will re-allocate credit, as in window mode
- for i in range (1, 11):
- self.assertEquals("released message %s" % (i), queue.get(timeout = 1).content.body)
+ released = RangedSet()
+ released.add(first.id, last.id)
+ session.message_release(released)
+
+ #TODO: may want to clean this up...
+ session.receiver._completed.add(first.id, last.id)
+ session.channel.session_completed(session.receiver._completed)
+
+ for i in range(1, 11):
+ self.assertEquals("released message %s" % (i), queue.get(timeout = 1).body)
def test_ranged_ack(self):
"""
Test acking of messages ranges
"""
- channel = self.channel
- channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ delivery_properties = session.delivery_properties(routing_key="q")
for i in range (1, 11):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message %s" % (i)))
+ session.message_transfer(message=Message(delivery_properties, "message %s" % (i)))
- channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
- channel.message_flow(unit = 0, value = 10, destination = "a")
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
- queue = self.client.queue("a")
+ session.message_subscribe(queue = "q", destination = "a")
+ session.message_flow(unit = 0, value = 10, destination = "a")
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ queue = session.incoming("a")
for i in range (1, 11):
- self.assertEquals("message %s" % (i), queue.get(timeout = 1).content.body)
+ self.assertEquals("message %s" % (i), queue.get(timeout = 1).body)
self.assertEmpty(queue)
#ack all but the third message (command id 2)
- channel.execution_complete(cumulative_execution_mark=0xFFFFFFFF, ranged_execution_set=[0,1,3,6,7,7,8,9])
- channel.message_recover()
+ session.execution_complete(cumulative_execution_mark=0xFFFFFFFF, ranged_execution_set=[0,1,3,6,7,7,8,9])
+ session.message_recover()
self.assertEquals("message 3", queue.get(timeout = 1).content.body)
self.assertEmpty(queue)
def test_subscribe_not_acquired_2(self):
- channel = self.channel
+ session = self.session
#publish some messages
self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range(1, 11):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
+ session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
#consume some of them
- channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
- channel.message_flow_mode(mode = 0, destination = "a")
- channel.message_flow(unit = 0, value = 5, destination = "a")
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
+ session.message_flow_mode(mode = 0, destination = "a")
+ session.message_flow(unit = 0, value = 5, destination = "a")
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
- queue = self.client.queue("a")
+ queue = session.incoming("a")
for i in range(1, 6):
msg = queue.get(timeout = 1)
- self.assertEquals("message-%d" % (i), msg.content.body)
+ self.assertEquals("message-%d" % (i), msg.body)
msg.complete()
self.assertEmpty(queue)
#now create a not-acquired subscriber
- channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
+ session.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
#check it gets those not consumed
- queue = self.client.queue("b")
- channel.message_flow(unit = 0, value = 1, destination = "b")
+ queue = session.incoming("b")
+ session.message_flow(unit = 0, value = 1, destination = "b")
for i in range(6, 11):
msg = queue.get(timeout = 1)
- self.assertEquals("message-%d" % (i), msg.content.body)
+ self.assertEquals("message-%d" % (i), msg.body)
msg.complete()
- channel.message_flow(unit = 0, value = 1, destination = "b")
+ session.message_flow(unit = 0, value = 1, destination = "b")
self.assertEmpty(queue)
#check all 'browsed' messages are still on the queue
- self.assertEqual(5, channel.queue_query(queue="q").message_count)
+ self.assertEqual(5, session.queue_query(queue="q").message_count)
def test_subscribe_not_acquired_3(self):
- channel = self.channel
+ session = self.session
#publish some messages
self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range(1, 11):
- channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
+ session.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
#create a not-acquired subscriber
- channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1)
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
- channel.message_flow(unit = 0, value = 10, destination = "a")
+ session.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1)
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = 0, value = 10, destination = "a")
#browse through messages
- queue = self.client.queue("a")
+ queue = session.incoming("a")
for i in range(1, 11):
msg = queue.get(timeout = 1)
- self.assertEquals("message-%d" % (i), msg.content.body)
+ self.assertEquals("message-%d" % (i), msg.body)
if (i % 2):
#try to acquire every second message
- channel.message_acquire([msg.command_id, msg.command_id])
+ session.message_acquire([msg.command_id, msg.command_id])
#check that acquire succeeds
- response = channel.control_queue.get(timeout=1)
+ response = session.control_queue.get(timeout=1)
self.assertEquals(response.transfers, [msg.command_id, msg.command_id])
msg.complete()
self.assertEmpty(queue)
#create a second not-acquired subscriber
- channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
- channel.message_flow(unit = 0, value = 1, destination = "b")
+ session.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
+ session.message_flow(unit = 0, value = 1, destination = "b")
#check it gets those not consumed
- queue = self.client.queue("b")
+ queue = session.incoming("b")
for i in [2,4,6,8,10]:
msg = queue.get(timeout = 1)
- self.assertEquals("message-%d" % (i), msg.content.body)
+ self.assertEquals("message-%d" % (i), msg.body)
msg.complete()
- channel.message_flow(unit = 0, value = 1, destination = "b")
+ session.message_flow(unit = 0, value = 1, destination = "b")
self.assertEmpty(queue)
#check all 'browsed' messages are still on the queue
- self.assertEqual(5, channel.queue_query(queue="q").message_count)
+ self.assertEqual(5, session.queue_query(queue="q").message_count)
def test_release_unacquired(self):
- channel = self.channel
+ session = self.session
#create queue
- self.queue_declare(queue = "q", exclusive=True, auto_delete=True, durable=True)
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#send message
- channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "my-message"))
#create two 'browsers'
- channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1)
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
- channel.message_flow(unit = 0, value = 10, destination = "a")
- queueA = self.client.queue("a")
-
- channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
- channel.message_flow(unit = 0, value = 10, destination = "b")
- queueB = self.client.queue("b")
+ session.message_subscribe(queue = "q", destination = "a", acquire_mode=1)
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = 0, value = 10, destination = "a")
+ queueA = session.incoming("a")
+
+ session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
+ session.message_flow(unit = 0, value = 10, destination = "b")
+ queueB = session.incoming("b")
#have each browser release the message
msgA = queueA.get(timeout = 1)
- channel.message_release([msgA.command_id, msgA.command_id])
+ session.message_release(RangedSet(msgA.id))
msgB = queueB.get(timeout = 1)
- channel.message_release([msgB.command_id, msgB.command_id])
+ session.message_release(RangedSet(msgB.id))
#cancel browsers
- channel.message_cancel(destination = "a")
- channel.message_cancel(destination = "b")
+ session.message_cancel(destination = "a")
+ session.message_cancel(destination = "b")
#create consumer
- channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1, acquire_mode=0)
- channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
- channel.message_flow(unit = 0, value = 10, destination = "c")
- queueC = self.client.queue("c")
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+ session.message_flow(unit = 0, value = 10, destination = "c")
+ queueC = session.incoming("c")
#consume the message then ack it
msgC = queueC.get(timeout = 1)
- msgC.complete()
+ session.message_accept(RangedSet(msgC.id))
#ensure there are no other messages
self.assertEmpty(queueC)
def test_no_size(self):
self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
- ch = self.channel
+ ch = self.session
ch.message_transfer(content=SizelessContent(properties={'routing_key' : "q"}, body="message-body"))
ch.message_subscribe(queue = "q", destination="d", confirm_mode = 0)
ch.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "d")
ch.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "d")
- queue = self.client.queue("d")
+ queue = session.incoming("d")
msg = queue.get(timeout = 3)
- self.assertEquals("message-body", msg.content.body)
+ self.assertEquals("message-body", msg.body)
- def assertDataEquals(self, channel, msg, expected):
- self.assertEquals(expected, msg.content.body)
+ def assertDataEquals(self, session, msg, expected):
+ self.assertEquals(expected, msg.body)
def assertEmpty(self, queue):
try:
diff --git a/python/tests_0-10/queue.py b/python/tests_0-10/queue.py
index 7b3590d11b..f192a2af90 100644
--- a/python/tests_0-10/queue.py
+++ b/python/tests_0-10/queue.py
@@ -28,66 +28,64 @@ class QueueTests(TestBase):
"""
Test that the purge method removes messages from the queue
"""
- channel = self.channel
+ session = self.session
#setup, declare a queue and add some messages to it:
- channel.exchange_declare(exchange="test-exchange", type="direct")
- channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
- channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
- channel.message_transfer(destination="test-exchange", content=Content("one", properties={'routing_key':"key"}))
- channel.message_transfer(destination="test-exchange", content=Content("two", properties={'routing_key':"key"}))
- channel.message_transfer(destination="test-exchange", content=Content("three", properties={'routing_key':"key"}))
+ session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+ session.message_transfer(content=Content("one", properties={'routing_key':"test-queue"}))
+ session.message_transfer(content=Content("two", properties={'routing_key':"test-queue"}))
+ session.message_transfer(content=Content("three", properties={'routing_key':"test-queue"}))
#check that the queue now reports 3 messages:
- channel.queue_declare(queue="test-queue")
- reply = channel.queue_query(queue="test-queue")
+ session.queue_declare(queue="test-queue")
+ reply = session.queue_query(queue="test-queue")
self.assertEqual(3, reply.message_count)
#now do the purge, then test that three messages are purged and the count drops to 0
- channel.queue_purge(queue="test-queue");
- reply = channel.queue_query(queue="test-queue")
+ session.queue_purge(queue="test-queue");
+ reply = session.queue_query(queue="test-queue")
self.assertEqual(0, reply.message_count)
#send a further message and consume it, ensuring that the other messages are really gone
- channel.message_transfer(destination="test-exchange", content=Content("four", properties={'routing_key':"key"}))
+ session.message_transfer(content=Content("four", properties={'routing_key':"test-queue"}))
self.subscribe(queue="test-queue", destination="tag")
queue = self.client.queue("tag")
msg = queue.get(timeout=1)
self.assertEqual("four", msg.content.body)
- #check error conditions (use new channels):
- channel = self.client.channel(2)
- channel.session_open()
+ #check error conditions (use new sessions):
+ session = self.client.session(2)
+ session.session_open()
try:
#queue specified but doesn't exist:
- channel.queue_purge(queue="invalid-queue")
+ session.queue_purge(queue="invalid-queue")
self.fail("Expected failure when purging non-existent queue")
except Closed, e:
self.assertChannelException(404, e.args[0])
- channel = self.client.channel(3)
- channel.session_open()
+ session = self.client.session(3)
+ session.session_open()
try:
#queue not specified and none previously declared for channel:
- channel.queue_purge()
+ session.queue_purge()
self.fail("Expected failure when purging unspecified queue")
except Closed, e:
self.assertConnectionException(530, e.args[0])
#cleanup
other = self.connect()
- channel = other.channel(1)
- channel.session_open()
- channel.exchange_delete(exchange="test-exchange")
+ session = other.session(1)
+ session.session_open()
+ session.exchange_delete(exchange="test-exchange")
def test_declare_exclusive(self):
"""
Test that the exclusive field is honoured in queue.declare
"""
- # TestBase.setUp has already opened channel(1)
- c1 = self.channel
+ # TestBase.setUp has already opened session(1)
+ c1 = self.session
# Here we open a second separate connection:
other = self.connect()
- c2 = other.channel(1)
+ c2 = other.session(1)
c2.session_open()
#declare an exclusive queue:
@@ -104,13 +102,13 @@ class QueueTests(TestBase):
"""
Test that the passive field is honoured in queue.declare
"""
- channel = self.channel
+ session = self.session
#declare an exclusive queue:
- channel.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="passive-queue-1", passive=True)
+ session.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="passive-queue-1", passive=True)
try:
#other connection should not be allowed to declare this:
- channel.queue_declare(queue="passive-queue-2", passive=True)
+ session.queue_declare(queue="passive-queue-2", passive=True)
self.fail("Expected passive declaration of non-existant queue to raise a channel exception")
except Closed, e:
self.assertChannelException(404, e.args[0])
@@ -120,29 +118,29 @@ class QueueTests(TestBase):
"""
Test various permutations of the queue.bind method
"""
- channel = self.channel
- channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
+ session = self.session
+ session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
#straightforward case, both exchange & queue exist so no errors expected:
- channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1")
+ session.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1")
#use the queue name where the routing key is not specified:
- channel.queue_bind(queue="queue-1", exchange="amq.direct")
+ session.queue_bind(queue="queue-1", exchange="amq.direct")
#try and bind to non-existant exchange
try:
- channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1")
+ session.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1")
self.fail("Expected bind to non-existant exchange to fail")
except Closed, e:
self.assertChannelException(404, e.args[0])
- #need to reopen a channel:
- channel = self.client.channel(2)
- channel.session_open()
+ #need to reopen a session:
+ session = self.client.session(2)
+ session.session_open()
#try and bind non-existant queue:
try:
- channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1")
+ session.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1")
self.fail("Expected bind of non-existant queue to fail")
except Closed, e:
self.assertChannelException(404, e.args[0])
@@ -161,10 +159,10 @@ class QueueTests(TestBase):
def unbind_test(self, exchange, routing_key="", args=None, headers={}):
#bind two queues and consume from them
- channel = self.channel
+ session = self.session
- channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
- channel.queue_declare(queue="queue-2", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="queue-2", exclusive=True, auto_delete=True)
self.subscribe(queue="queue-1", destination="queue-1")
self.subscribe(queue="queue-2", destination="queue-2")
@@ -172,18 +170,18 @@ class QueueTests(TestBase):
queue1 = self.client.queue("queue-1")
queue2 = self.client.queue("queue-2")
- channel.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
- channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args)
+ session.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
+ session.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args)
#send a message that will match both bindings
- channel.message_transfer(destination=exchange,
+ session.message_transfer(destination=exchange,
content=Content("one", properties={'routing_key':routing_key, 'application_headers':headers}))
#unbind first queue
- channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
+ session.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
#send another message
- channel.message_transfer(destination=exchange,
+ session.message_transfer(destination=exchange,
content=Content("two", properties={'routing_key':routing_key, 'application_headers':headers}))
#check one queue has both messages and the other has only one
@@ -205,26 +203,26 @@ class QueueTests(TestBase):
"""
Test core queue deletion behaviour
"""
- channel = self.channel
+ session = self.session
#straight-forward case:
- channel.queue_declare(queue="delete-me")
- channel.message_transfer(content=Content("a", properties={'routing_key':"delete-me"}))
- channel.message_transfer(content=Content("b", properties={'routing_key':"delete-me"}))
- channel.message_transfer(content=Content("c", properties={'routing_key':"delete-me"}))
- channel.queue_delete(queue="delete-me")
+ session.queue_declare(queue="delete-me")
+ session.message_transfer(content=Content("a", properties={'routing_key':"delete-me"}))
+ session.message_transfer(content=Content("b", properties={'routing_key':"delete-me"}))
+ session.message_transfer(content=Content("c", properties={'routing_key':"delete-me"}))
+ session.queue_delete(queue="delete-me")
#check that it has gone be declaring passively
try:
- channel.queue_declare(queue="delete-me", passive=True)
+ session.queue_declare(queue="delete-me", passive=True)
self.fail("Queue has not been deleted")
except Closed, e:
self.assertChannelException(404, e.args[0])
#check attempted deletion of non-existant queue is handled correctly:
- channel = self.client.channel(2)
- channel.session_open()
+ session = self.client.session(2)
+ session.session_open()
try:
- channel.queue_delete(queue="i-dont-exist", if_empty=True)
+ session.queue_delete(queue="i-dont-exist", if_empty=True)
self.fail("Expected delete of non-existant queue to fail")
except Closed, e:
self.assertChannelException(404, e.args[0])
@@ -235,37 +233,37 @@ class QueueTests(TestBase):
"""
Test that if_empty field of queue_delete is honoured
"""
- channel = self.channel
+ session = self.session
#create a queue and add a message to it (use default binding):
- channel.queue_declare(queue="delete-me-2")
- channel.queue_declare(queue="delete-me-2", passive=True)
- channel.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"}))
+ session.queue_declare(queue="delete-me-2")
+ session.queue_declare(queue="delete-me-2", passive=True)
+ session.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"}))
#try to delete, but only if empty:
try:
- channel.queue_delete(queue="delete-me-2", if_empty=True)
+ session.queue_delete(queue="delete-me-2", if_empty=True)
self.fail("Expected delete if_empty to fail for non-empty queue")
except Closed, e:
self.assertChannelException(406, e.args[0])
#need new channel now:
- channel = self.client.channel(2)
- channel.session_open()
+ session = self.client.session(2)
+ session.session_open()
#empty queue:
- self.subscribe(channel, destination="consumer_tag", queue="delete-me-2")
+ self.subscribe(session, destination="consumer_tag", queue="delete-me-2")
queue = self.client.queue("consumer_tag")
msg = queue.get(timeout=1)
self.assertEqual("message", msg.content.body)
- channel.message_cancel(destination="consumer_tag")
+ session.message_cancel(destination="consumer_tag")
#retry deletion on empty queue:
- channel.queue_delete(queue="delete-me-2", if_empty=True)
+ session.queue_delete(queue="delete-me-2", if_empty=True)
#check that it has gone by declaring passively:
try:
- channel.queue_declare(queue="delete-me-2", passive=True)
+ session.queue_declare(queue="delete-me-2", passive=True)
self.fail("Queue has not been deleted")
except Closed, e:
self.assertChannelException(404, e.args[0])
@@ -274,29 +272,29 @@ class QueueTests(TestBase):
"""
Test that if_unused field of queue_delete is honoured
"""
- channel = self.channel
+ session = self.channel
#create a queue and register a consumer:
- channel.queue_declare(queue="delete-me-3")
- channel.queue_declare(queue="delete-me-3", passive=True)
+ session.queue_declare(queue="delete-me-3")
+ session.queue_declare(queue="delete-me-3", passive=True)
self.subscribe(destination="consumer_tag", queue="delete-me-3")
- #need new channel now:
- channel2 = self.client.channel(2)
- channel2.session_open()
+ #need new session now:
+ session2 = self.client.session(2)
+ session2.session_open()
#try to delete, but only if empty:
try:
- channel2.queue_delete(queue="delete-me-3", if_unused=True)
+ session2.queue_delete(queue="delete-me-3", if_unused=True)
self.fail("Expected delete if_unused to fail for queue with existing consumer")
except Closed, e:
self.assertChannelException(406, e.args[0])
- channel.message_cancel(destination="consumer_tag")
- channel.queue_delete(queue="delete-me-3", if_unused=True)
+ session.message_cancel(destination="consumer_tag")
+ session.queue_delete(queue="delete-me-3", if_unused=True)
#check that it has gone by declaring passively:
try:
- channel.queue_declare(queue="delete-me-3", passive=True)
+ session.queue_declare(queue="delete-me-3", passive=True)
self.fail("Queue has not been deleted")
except Closed, e:
self.assertChannelException(404, e.args[0])
@@ -306,31 +304,31 @@ class QueueTests(TestBase):
"""
Test auto-deletion (of non-exclusive queues)
"""
- channel = self.channel
+ session = self.session
other = self.connect()
- channel2 = other.channel(1)
- channel2.session_open()
+ session2 = other.session(1)
+ session2.session_open()
- channel.queue_declare(queue="auto-delete-me", auto_delete=True)
+ session.queue_declare(queue="auto-delete-me", auto_delete=True)
- #consume from both channels
- reply = channel.basic_consume(queue="auto-delete-me")
- channel2.basic_consume(queue="auto-delete-me")
+ #consume from both sessions
+ reply = session.basic_consume(queue="auto-delete-me")
+ session2.basic_consume(queue="auto-delete-me")
#implicit cancel
- channel2.session_close()
+ session2.session_close()
#check it is still there
- channel.queue_declare(queue="auto-delete-me", passive=True)
+ session.queue_declare(queue="auto-delete-me", passive=True)
#explicit cancel => queue is now unused again:
- channel.basic_cancel(consumer_tag=reply.consumer_tag)
+ session.basic_cancel(consumer_tag=reply.consumer_tag)
#NOTE: this assumes there is no timeout in use
#check that it has gone be declaring passively
try:
- channel.queue_declare(queue="auto-delete-me", passive=True)
+ session.queue_declare(queue="auto-delete-me", passive=True)
self.fail("Expected queue to have been deleted")
except Closed, e:
self.assertChannelException(404, e.args[0])