summaryrefslogtreecommitdiff
path: root/funtests/test_channel.py
diff options
context:
space:
mode:
Diffstat (limited to 'funtests/test_channel.py')
-rwxr-xr-xfuntests/test_channel.py56
1 files changed, 27 insertions, 29 deletions
diff --git a/funtests/test_channel.py b/funtests/test_channel.py
index 503a48e..10b860f 100755
--- a/funtests/test_channel.py
+++ b/funtests/test_channel.py
@@ -22,15 +22,8 @@ Test amqp.channel module
import sys
import unittest
-try:
- bytes
-except NameError:
- # Python 2.5 and lower
- bytes = str
-
import settings
-
from amqp import ChannelError, Connection, Message, FrameSyntaxError
@@ -163,14 +156,19 @@ class TestChannel(unittest.TestCase):
self.assertTrue(isinstance(msg2.body, bytes))
self.assertEqual(msg2.body, u'hello w\xf6rld'.encode('latin_1'))
- def test_exception(self):
- """
- Check that Channel exceptions are actually raised as Python
- exceptions.
+ def test_queue_delete_empty(self):
+ self.assertFalse(
+ self.ch.queue_delete('bogus_queue_that_does_not_exist')
+ )
- """
+ def test_survives_channel_error(self):
with self.assertRaises(ChannelError):
- self.ch.queue_delete('bogus_queue_that_does_not_exist')
+ self.ch.queue_declare('krjqheewq_bogus', passive=True)
+ self.ch.queue_declare('funtest_survive')
+ self.ch.queue_declare('funtest_survive', passive=True)
+ self.assertEqual(
+ 0, self.ch.queue_delete('funtest_survive'),
+ )
def test_invalid_header(self):
"""
@@ -181,7 +179,7 @@ class TestChannel(unittest.TestCase):
"""
qname, _, _ = self.ch.queue_declare()
- msg = Message(application_headers={'test': None})
+ msg = Message(application_headers={'test': object()})
self.assertRaises(
FrameSyntaxError, self.ch.basic_publish, msg, routing_key=qname,
@@ -248,10 +246,10 @@ class TestChannel(unittest.TestCase):
content_type='text/plain',
application_headers={'foo': 7, 'bar': 'baz'})
- self.ch.basic_publish(msg, 'unittest.fanout')
- self.ch.basic_publish(msg, 'unittest.fanout', mandatory=True)
- self.ch.basic_publish(msg, 'unittest.fanout', mandatory=True)
- self.ch.basic_publish(msg, 'unittest.fanout', mandatory=True)
+ self.ch.basic_publish(msg, 'funtest.fanout')
+ self.ch.basic_publish(msg, 'funtest.fanout', mandatory=True)
+ self.ch.basic_publish(msg, 'funtest.fanout', mandatory=True)
+ self.ch.basic_publish(msg, 'funtest.fanout', mandatory=True)
self.ch.close()
#
@@ -264,38 +262,37 @@ class TestChannel(unittest.TestCase):
Network configuration is as follows (-> is forwards to :
source_exchange -> dest_exchange -> queue
The test checks that once the message is publish to the
- destination exchange(unittest.topic_dest) it is delivered to the queue.
+ destination exchange(funtest.topic_dest) it is delivered to the queue.
"""
test_routing_key = 'unit_test__key'
- dest_exchange = 'unittest.topic_dest_bind'
- source_exchange = 'unittest.topic_source_bind'
+ dest_exchange = 'funtest.topic_dest_bind'
+ source_exchange = 'funtest.topic_source_bind'
self.ch.exchange_declare(dest_exchange, 'topic', auto_delete=True)
self.ch.exchange_declare(source_exchange, 'topic', auto_delete=True)
qname, _, _ = self.ch.queue_declare()
- self.ch.exchange_bind(destination = dest_exchange,
- source = source_exchange,
- routing_key = test_routing_key)
+ self.ch.exchange_bind(destination=dest_exchange,
+ source=source_exchange,
+ routing_key=test_routing_key)
self.ch.queue_bind(qname, dest_exchange,
routing_key=test_routing_key)
- msg = Message('unittest message',
+ msg = Message('funtest message',
content_type='text/plain',
application_headers={'foo': 7, 'bar': 'baz'})
-
self.ch.basic_publish(msg, source_exchange,
- routing_key = test_routing_key)
+ routing_key=test_routing_key)
msg2 = self.ch.basic_get(qname, no_ack=True)
self.assertEqual(msg, msg2)
def test_exchange_unbind(self):
- dest_exchange = 'unittest.topic_dest_unbind'
- source_exchange = 'unittest.topic_source_unbind'
+ dest_exchange = 'funtest.topic_dest_unbind'
+ source_exchange = 'funtest.topic_source_unbind'
test_routing_key = 'unit_test__key'
self.ch.exchange_declare(dest_exchange,
@@ -311,6 +308,7 @@ class TestChannel(unittest.TestCase):
source=source_exchange,
routing_key=test_routing_key)
+
def main():
suite = unittest.TestLoader().loadTestsFromTestCase(TestChannel)
unittest.TextTestRunner(**settings.test_args).run(suite)