summaryrefslogtreecommitdiff
path: root/kombu/tests/transport
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2012-06-15 19:32:40 +0200
committerAsk Solem <ask@celeryproject.org>2012-06-15 19:32:40 +0200
commit5e7a32440f803321a58453efb3ea0fde0d4497b2 (patch)
tree3fe4f3e58b85b18105141d799779f30d043ecd82 /kombu/tests/transport
parent8d5652600e9afc76c803958638821a12018b8803 (diff)
downloadkombu-5e7a32440f803321a58453efb3ea0fde0d4497b2.tar.gz
Use single quotes
Diffstat (limited to 'kombu/tests/transport')
-rw-r--r--kombu/tests/transport/test_amqplib.py44
-rw-r--r--kombu/tests/transport/test_base.py20
-rw-r--r--kombu/tests/transport/test_memory.py36
-rw-r--r--kombu/tests/transport/test_mongodb.py30
-rw-r--r--kombu/tests/transport/test_redis.py168
-rw-r--r--kombu/tests/transport/test_sqlalchemy.py10
-rw-r--r--kombu/tests/transport/test_transport.py12
-rw-r--r--kombu/tests/transport/virtual/test_base.py132
-rw-r--r--kombu/tests/transport/virtual/test_exchange.py92
-rw-r--r--kombu/tests/transport/virtual/test_scheduling.py18
10 files changed, 281 insertions, 281 deletions
diff --git a/kombu/tests/transport/test_amqplib.py b/kombu/tests/transport/test_amqplib.py
index 226e5b31..803af460 100644
--- a/kombu/tests/transport/test_amqplib.py
+++ b/kombu/tests/transport/test_amqplib.py
@@ -39,8 +39,8 @@ class test_Channel(TestCase):
self.assertFalse(self.channel.no_ack_consumers)
def test_prepare_message(self):
- x = self.channel.prepare_message("foobar", 10,
- "application/data", "utf-8",
+ x = self.channel.prepare_message('foobar', 10,
+ 'application/data', 'utf-8',
properties={})
self.assertTrue(x)
@@ -56,22 +56,22 @@ class test_Channel(TestCase):
self.assertIsNone(self.channel.connection)
def test_basic_consume_registers_ack_status(self):
- self.channel.wait_returns = "my-consumer-tag"
- self.channel.basic_consume("foo", no_ack=True)
- self.assertIn("my-consumer-tag", self.channel.no_ack_consumers)
+ self.channel.wait_returns = 'my-consumer-tag'
+ self.channel.basic_consume('foo', no_ack=True)
+ self.assertIn('my-consumer-tag', self.channel.no_ack_consumers)
- self.channel.wait_returns = "other-consumer-tag"
- self.channel.basic_consume("bar", no_ack=False)
- self.assertNotIn("other-consumer-tag", self.channel.no_ack_consumers)
+ self.channel.wait_returns = 'other-consumer-tag'
+ self.channel.basic_consume('bar', no_ack=False)
+ self.assertNotIn('other-consumer-tag', self.channel.no_ack_consumers)
- self.channel.basic_cancel("my-consumer-tag")
- self.assertNotIn("my-consumer-tag", self.channel.no_ack_consumers)
+ self.channel.basic_cancel('my-consumer-tag')
+ self.assertNotIn('my-consumer-tag', self.channel.no_ack_consumers)
class test_Transport(TestCase):
def setUp(self):
- self.connection = BrokerConnection("amqplib://")
+ self.connection = BrokerConnection('amqplib://')
self.transport = self.connection.transport
def test_create_channel(self):
@@ -92,13 +92,13 @@ class test_Transport(TestCase):
vars(self).update(kwargs)
self.transport.Connection = Conn
- self.transport.client.hostname = "localhost"
+ self.transport.client.hostname = 'localhost'
conn1 = self.transport.establish_connection()
- self.assertEqual(conn1.host, "127.0.0.1:5672")
+ self.assertEqual(conn1.host, '127.0.0.1:5672')
- self.transport.client.hostname = "example.com"
+ self.transport.client.hostname = 'example.com'
conn2 = self.transport.establish_connection()
- self.assertEqual(conn2.host, "example.com:5672")
+ self.assertEqual(conn2.host, 'example.com:5672')
def test_close_connection(self):
connection = Mock()
@@ -116,15 +116,15 @@ class test_Transport(TestCase):
connection.channels = {1: 1, 2: 2}
self.assertTrue(self.transport.verify_connection(connection))
- @mask_modules("ssl")
+ @mask_modules('ssl')
def test_import_no_ssl(self):
- pm = sys.modules.pop("kombu.transport.amqplib")
+ pm = sys.modules.pop('kombu.transport.amqplib')
try:
from kombu.transport.amqplib import SSLError
- self.assertEqual(SSLError.__module__, "kombu.transport.amqplib")
+ self.assertEqual(SSLError.__module__, 'kombu.transport.amqplib')
finally:
if pm is not None:
- sys.modules["kombu.transport.amqplib"] = pm
+ sys.modules['kombu.transport.amqplib'] = pm
class test_amqplib(TestCase):
@@ -135,8 +135,8 @@ class test_amqplib(TestCase):
Connection = MockConnection
c = BrokerConnection(port=None, transport=Transport).connect()
- self.assertEqual(c["host"],
- "127.0.0.1:%s" % (Transport.default_port, ))
+ self.assertEqual(c['host'],
+ '127.0.0.1:%s' % (Transport.default_port, ))
def test_custom_port(self):
@@ -144,4 +144,4 @@ class test_amqplib(TestCase):
Connection = MockConnection
c = BrokerConnection(port=1337, transport=Transport).connect()
- self.assertEqual(c["host"], "127.0.0.1:1337")
+ self.assertEqual(c['host'], '127.0.0.1:1337')
diff --git a/kombu/tests/transport/test_base.py b/kombu/tests/transport/test_base.py
index d8822b18..691e132c 100644
--- a/kombu/tests/transport/test_base.py
+++ b/kombu/tests/transport/test_base.py
@@ -11,13 +11,13 @@ from kombu.tests.utils import Mock
class test_StdChannel(TestCase):
def setUp(self):
- self.conn = BrokerConnection("memory://")
+ self.conn = BrokerConnection('memory://')
self.channel = self.conn.channel()
self.channel.queues.clear()
self.conn.connection.state.clear()
def test_Consumer(self):
- q = Queue("foo")
+ q = Queue('foo')
print(self.channel.queues)
cons = self.channel.Consumer(q)
self.assertIsInstance(cons, Consumer)
@@ -34,27 +34,27 @@ class test_StdChannel(TestCase):
def test_interface_after_reply_message_received(self):
self.assertIsNone(StdChannel().after_reply_message_received(
- Queue("foo")))
+ Queue('foo')))
class test_Message(TestCase):
def setUp(self):
- self.conn = BrokerConnection("memory://")
+ self.conn = BrokerConnection('memory://')
self.channel = self.conn.channel()
self.message = Message(self.channel, delivery_tag=313)
def test_ack_respects_no_ack_consumers(self):
- self.channel.no_ack_consumers = set(["abc"])
- self.message.delivery_info["consumer_tag"] = "abc"
+ self.channel.no_ack_consumers = set(['abc'])
+ self.message.delivery_info['consumer_tag'] = 'abc'
ack = self.channel.basic_ack = Mock()
self.message.ack()
- self.assertNotEqual(self.message._state, "ACK")
+ self.assertNotEqual(self.message._state, 'ACK')
self.assertFalse(ack.called)
def test_ack_missing_consumer_tag(self):
- self.channel.no_ack_consumers = set(["abc"])
+ self.channel.no_ack_consumers = set(['abc'])
self.message.delivery_info = {}
ack = self.channel.basic_ack = Mock()
@@ -63,7 +63,7 @@ class test_Message(TestCase):
def test_ack_not_no_ack(self):
self.channel.no_ack_consumers = set()
- self.message.delivery_info["consumer_tag"] = "abc"
+ self.message.delivery_info['consumer_tag'] = 'abc'
ack = self.channel.basic_ack = Mock()
self.message.ack()
@@ -76,7 +76,7 @@ class test_Message(TestCase):
def test_ack_log_error_when_error(self):
ack = self.message.ack = Mock()
- ack.side_effect = KeyError("foo")
+ ack.side_effect = KeyError('foo')
logger = Mock()
self.message.ack_log_error(logger, KeyError)
ack.assert_called_with()
diff --git a/kombu/tests/transport/test_memory.py b/kombu/tests/transport/test_memory.py
index 1942a611..2865d6a4 100644
--- a/kombu/tests/transport/test_memory.py
+++ b/kombu/tests/transport/test_memory.py
@@ -13,14 +13,14 @@ from kombu.tests.utils import TestCase
class test_MemoryTransport(TestCase):
def setUp(self):
- self.c = BrokerConnection(transport="memory")
- self.e = Exchange("test_transport_memory")
- self.q = Queue("test_transport_memory",
+ self.c = BrokerConnection(transport='memory')
+ self.e = Exchange('test_transport_memory')
+ self.q = Queue('test_transport_memory',
exchange=self.e,
- routing_key="test_transport_memory")
- self.q2 = Queue("test_transport_memory2",
+ routing_key='test_transport_memory')
+ self.q2 = Queue('test_transport_memory2',
exchange=self.e,
- routing_key="test_transport_memory2")
+ routing_key='test_transport_memory2')
def test_produce_consume_noack(self):
channel = self.c.channel()
@@ -28,7 +28,7 @@ class test_MemoryTransport(TestCase):
consumer = Consumer(channel, self.q, no_ack=True)
for i in range(10):
- producer.publish({"foo": i}, routing_key="test_transport_memory")
+ producer.publish({'foo': i}, routing_key='test_transport_memory')
_received = []
@@ -53,9 +53,9 @@ class test_MemoryTransport(TestCase):
self.q2(channel).declare()
for i in range(10):
- producer.publish({"foo": i}, routing_key="test_transport_memory")
+ producer.publish({'foo': i}, routing_key='test_transport_memory')
for i in range(10):
- producer.publish({"foo": i}, routing_key="test_transport_memory2")
+ producer.publish({'foo': i}, routing_key='test_transport_memory2')
_received1 = []
_received2 = []
@@ -82,15 +82,15 @@ class test_MemoryTransport(TestCase):
self.assertEqual(len(_received1) + len(_received2), 20)
# compression
- producer.publish({"compressed": True},
- routing_key="test_transport_memory",
- compression="zlib")
+ producer.publish({'compressed': True},
+ routing_key='test_transport_memory',
+ compression='zlib')
m = self.q(channel).get()
- self.assertDictEqual(m.payload, {"compressed": True})
+ self.assertDictEqual(m.payload, {'compressed': True})
# queue.delete
for i in range(10):
- producer.publish({"foo": i}, routing_key="test_transport_memory")
+ producer.publish({'foo': i}, routing_key='test_transport_memory')
self.assertTrue(self.q(channel).get())
self.q(channel).delete()
self.q(channel).declare()
@@ -98,7 +98,7 @@ class test_MemoryTransport(TestCase):
# queue.purge
for i in range(10):
- producer.publish({"foo": i}, routing_key="test_transport_memory2")
+ producer.publish({'foo': i}, routing_key='test_transport_memory2')
self.assertTrue(self.q2(channel).get())
self.q2(channel).purge()
self.assertIsNone(self.q2(channel).get())
@@ -122,7 +122,7 @@ class test_MemoryTransport(TestCase):
class Cycle(object):
def get(self, timeout=None):
- return ("foo", "foo"), c1
+ return ('foo', 'foo'), c1
self.c.transport.cycle = Cycle()
with self.assertRaises(KeyError):
@@ -132,6 +132,6 @@ class test_MemoryTransport(TestCase):
chan = self.c.channel()
chan.queues.clear()
- x = chan._queue_for("foo")
+ x = chan._queue_for('foo')
self.assertTrue(x)
- self.assertIs(chan._queue_for("foo"), x)
+ self.assertIs(chan._queue_for('foo'), x)
diff --git a/kombu/tests/transport/test_mongodb.py b/kombu/tests/transport/test_mongodb.py
index 43def853..1d4145be 100644
--- a/kombu/tests/transport/test_mongodb.py
+++ b/kombu/tests/transport/test_mongodb.py
@@ -15,52 +15,52 @@ class MockConnection(dict):
class test_mongodb(TestCase):
- @skip_if_not_module("pymongo")
+ @skip_if_not_module('pymongo')
def test_url_parser(self):
from kombu.transport import mongodb
from pymongo.errors import ConfigurationError
raise SkipTest(
- "Test is functional: it actually connects to mongod")
+ 'Test is functional: it actually connects to mongod')
class Transport(mongodb.Transport):
Connection = MockConnection
- url = "mongodb://"
+ url = 'mongodb://'
c = BrokerConnection(url, transport=Transport).connect()
client = c.channels[0].client
- self.assertEquals(client.name, "kombu_default")
- self.assertEquals(client.connection.host, "127.0.0.1")
+ self.assertEquals(client.name, 'kombu_default')
+ self.assertEquals(client.connection.host, '127.0.0.1')
- url = "mongodb://localhost"
+ url = 'mongodb://localhost'
c = BrokerConnection(url, transport=Transport).connect()
client = c.channels[0].client
- self.assertEquals(client.name, "kombu_default")
+ self.assertEquals(client.name, 'kombu_default')
- url = "mongodb://localhost/dbname"
+ url = 'mongodb://localhost/dbname'
c = BrokerConnection(url, transport=Transport).connect()
client = c.channels[0].client
- self.assertEquals(client.name, "dbname")
+ self.assertEquals(client.name, 'dbname')
- url = "mongodb://localhost,example.org:29017/dbname"
+ url = 'mongodb://localhost,example.org:29017/dbname'
c = BrokerConnection(url, transport=Transport).connect()
client = c.channels[0].client
nodes = client.connection.nodes
self.assertEquals(len(nodes), 2)
- self.assertTrue(("example.org", 29017) in nodes)
- self.assertEquals(client.name, "dbname")
+ self.assertTrue(('example.org', 29017) in nodes)
+ self.assertEquals(client.name, 'dbname')
# Passing options breaks kombu's _init_params method
- # url = "mongodb://localhost,localhost2:29017/dbname?safe=true"
+ # url = 'mongodb://localhost,localhost2:29017/dbname?safe=true'
# c = BrokerConnection(url, transport=Transport).connect()
# client = c.channels[0].client
- url = "mongodb://localhost:27017,localhost2:29017/dbname"
+ url = 'mongodb://localhost:27017,localhost2:29017/dbname'
c = BrokerConnection(url, transport=Transport).connect()
client = c.channels[0].client
- url = "mongodb://username:password@localhost/dbname"
+ url = 'mongodb://username:password@localhost/dbname'
c = BrokerConnection(url, transport=Transport).connect()
# Assuming there's no user 'username' with password 'password'
# configured in mongodb
diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py
index c14c8a9a..3d9fb361 100644
--- a/kombu/tests/transport/test_redis.py
+++ b/kombu/tests/transport/test_redis.py
@@ -52,7 +52,7 @@ class Client(object):
self.connection = self._sconnection(self)
def bgsave(self):
- self._called.append("BGSAVE")
+ self._called.append('BGSAVE')
if self.bgsave_raises_ResponseError:
raise ResponseError()
@@ -95,7 +95,7 @@ class Client(object):
cmd, queues = self.connection._sock.data.pop()
assert cmd == type
self.connection._sock.data = []
- if type == "BRPOP":
+ if type == 'BRPOP':
item = self.brpop(queues, 0.001)
if item:
return item
@@ -156,7 +156,7 @@ class Client(object):
self._sock.data.append((cmd, args))
def info(self):
- return {"foo": 1}
+ return {'foo': 1}
def pubsub(self, *args, **kwargs):
connection = self.connection
@@ -221,18 +221,18 @@ class test_Channel(TestCase):
self.channel = self.connection.channel()
def test_basic_consume_when_fanout_queue(self):
- self.channel.exchange_declare(exchange="txconfan", type="fanout")
- self.channel.queue_declare(queue="txconfanq")
- self.channel.queue_bind(queue="txconfanq", exchange="txconfan")
+ self.channel.exchange_declare(exchange='txconfan', type='fanout')
+ self.channel.queue_declare(queue='txconfanq')
+ self.channel.queue_bind(queue='txconfanq', exchange='txconfan')
- self.assertIn("txconfanq", self.channel._fanout_queues)
- self.channel.basic_consume("txconfanq", False, None, 1)
- self.assertIn("txconfanq", self.channel.active_fanout_queues)
- self.assertEqual(self.channel._fanout_to_queue.get("txconfan"),
- "txconfanq")
+ self.assertIn('txconfanq', self.channel._fanout_queues)
+ self.channel.basic_consume('txconfanq', False, None, 1)
+ self.assertIn('txconfanq', self.channel.active_fanout_queues)
+ self.assertEqual(self.channel._fanout_to_queue.get('txconfan'),
+ 'txconfanq')
def test_basic_cancel_unknown_delivery_tag(self):
- self.assertIsNone(self.channel.basic_cancel("txaseqwewq"))
+ self.assertIsNone(self.channel.basic_cancel('txaseqwewq'))
def test_subscribe_no_queues(self):
self.channel.subclient = Mock()
@@ -243,14 +243,14 @@ class test_Channel(TestCase):
def test_subscribe(self):
self.channel.subclient = Mock()
- self.channel.active_fanout_queues.add("a")
- self.channel.active_fanout_queues.add("b")
- self.channel._fanout_queues.update(a="a", b="b")
+ self.channel.active_fanout_queues.add('a')
+ self.channel.active_fanout_queues.add('b')
+ self.channel._fanout_queues.update(a='a', b='b')
self.channel._subscribe()
self.assertTrue(self.channel.subclient.subscribe.called)
s_args, _ = self.channel.subclient.subscribe.call_args
- self.assertItemsEqual(s_args[0], ["a", "b"])
+ self.assertItemsEqual(s_args[0], ['a', 'b'])
self.channel.subclient.connection._sock = None
self.channel._subscribe()
@@ -259,43 +259,43 @@ class test_Channel(TestCase):
def test_handle_unsubscribe_message(self):
s = self.channel.subclient
s.subscribed = True
- self.channel._handle_message(s, ["unsubscribe", "a", 0])
+ self.channel._handle_message(s, ['unsubscribe', 'a', 0])
self.assertFalse(s.subscribed)
def test_handle_pmessage_message(self):
self.assertDictEqual(self.channel._handle_message(
self.channel.subclient,
- ["pmessage", "pattern", "channel", "data"]),
- {"type": "pmessage",
- "pattern": "pattern",
- "channel": "channel",
- "data": "data"})
+ ['pmessage', 'pattern', 'channel', 'data']),
+ {'type': 'pmessage',
+ 'pattern': 'pattern',
+ 'channel': 'channel',
+ 'data': 'data'})
def test_handle_message(self):
self.assertDictEqual(self.channel._handle_message(
self.channel.subclient,
- ["type", "channel", "data"]),
- {"type": "type",
- "pattern": None,
- "channel": "channel",
- "data": "data"})
+ ['type', 'channel', 'data']),
+ {'type': 'type',
+ 'pattern': None,
+ 'channel': 'channel',
+ 'data': 'data'})
def test_brpop_start_but_no_queues(self):
self.assertIsNone(self.channel._brpop_start())
def test_receive(self):
s = self.channel.subclient = Mock()
- self.channel._fanout_to_queue["a"] = "b"
- s.parse_response.return_value = ["message", "a",
- dumps({"hello": "world"})]
+ self.channel._fanout_to_queue['a'] = 'b'
+ s.parse_response.return_value = ['message', 'a',
+ dumps({'hello': 'world'})]
payload, queue = self.channel._receive()
- self.assertDictEqual(payload, {"hello": "world"})
- self.assertEqual(queue, "b")
+ self.assertDictEqual(payload, {'hello': 'world'})
+ self.assertEqual(queue, 'b')
def test_receive_raises(self):
self.channel._in_listen = True
s = self.channel.subclient = Mock()
- s.parse_response.side_effect = KeyError("foo")
+ s.parse_response.side_effect = KeyError('foo')
with self.assertRaises(redis.Empty):
self.channel._receive()
@@ -310,14 +310,14 @@ class test_Channel(TestCase):
def test_receive_different_message_Type(self):
s = self.channel.subclient = Mock()
- s.parse_response.return_value = ["pmessage", "/foo/", 0, "data"]
+ s.parse_response.return_value = ['pmessage', '/foo/', 0, 'data']
with self.assertRaises(redis.Empty):
self.channel._receive()
def test_brpop_read_raises(self):
c = self.channel.client = Mock()
- c.parse_response.side_effect = KeyError("foo")
+ c.parse_response.side_effect = KeyError('foo')
with self.assertRaises(redis.Empty):
self.channel._brpop_read()
@@ -334,20 +334,20 @@ class test_Channel(TestCase):
def test_poll_error(self):
c = self.channel.client = Mock()
c.parse_response = Mock()
- self.channel._poll_error("BRPOP")
+ self.channel._poll_error('BRPOP')
- c.parse_response.assert_called_with("BRPOP")
+ c.parse_response.assert_called_with('BRPOP')
- c.parse_response.side_effect = KeyError("foo")
- self.assertIsNone(self.channel._poll_error("BRPOP"))
+ c.parse_response.side_effect = KeyError('foo')
+ self.assertIsNone(self.channel._poll_error('BRPOP'))
def test_put_fanout(self):
self.channel._in_poll = False
c = self.channel.client = Mock()
- body = {"hello": "world"}
- self.channel._put_fanout("exchange", body)
- c.publish.assert_called_with("exchange", dumps(body))
+ body = {'hello': 'world'}
+ self.channel._put_fanout('exchange', body)
+ c.publish.assert_called_with('exchange', dumps(body))
def test_delete(self):
x = self.channel
@@ -355,20 +355,20 @@ class test_Channel(TestCase):
delete = x.client.delete = Mock()
srem = x.client.srem = Mock()
- x._delete("queue", "exchange", "routing_key", None)
- delete.assert_has_call("queue")
- srem.assert_has_call(x.keyprefix_queue % ("exchange", ),
- x.sep.join(["routing_key", "", "queue"]))
+ x._delete('queue', 'exchange', 'routing_key', None)
+ delete.assert_has_call('queue')
+ srem.assert_has_call(x.keyprefix_queue % ('exchange', ),
+ x.sep.join(['routing_key', '', 'queue']))
def test_has_queue(self):
self.channel._in_poll = False
exists = self.channel.client.exists = Mock()
exists.return_value = True
- self.assertTrue(self.channel._has_queue("foo"))
- exists.assert_has_call("foo")
+ self.assertTrue(self.channel._has_queue('foo'))
+ exists.assert_has_call('foo')
exists.return_value = False
- self.assertFalse(self.channel._has_queue("foo"))
+ self.assertFalse(self.channel._has_queue('foo'))
def test_close_when_closed(self):
self.channel.closed = True
@@ -382,27 +382,27 @@ class test_Channel(TestCase):
c.connection.disconnect.assert_called_with()
def test_invalid_database_raises_ValueError(self):
- self.channel.connection.client.virtual_host = "xfeqwewkfk"
+ self.channel.connection.client.virtual_host = 'xfeqwewkfk'
with self.assertRaises(ValueError):
self.channel._create_client()
- @skip_if_not_module("redis")
+ @skip_if_not_module('redis')
def test_get_client(self):
import redis as R
KombuRedis = redis.Channel._get_client(self.channel)
self.assertTrue(KombuRedis)
- Rv = getattr(R, "__version__")
+ Rv = getattr(R, '__version__')
try:
- R.__version__ = "2.4.0"
+ R.__version__ = '2.4.0'
with self.assertRaises(VersionMismatch):
redis.Channel._get_client(self.channel)
finally:
if Rv is not None:
R.__version__ = Rv
- @skip_if_not_module("redis")
+ @skip_if_not_module('redis')
def test_get_response_error(self):
from redis.exceptions import ResponseError
self.assertIs(redis.Channel._get_response_error(self.channel),
@@ -421,19 +421,19 @@ class test_Channel(TestCase):
self.assertTrue(self.channel._avail_client)
cc.assert_called_with()
- @skip_if_not_module("redis")
+ @skip_if_not_module('redis')
def test_transport_get_errors(self):
self.assertTrue(redis.Transport._get_errors(self.connection.transport))
- @skip_if_not_module("redis")
+ @skip_if_not_module('redis')
def test_transport_get_errors_when_InvalidData_used(self):
from redis import exceptions
class ID(Exception):
pass
- DataError = getattr(exceptions, "DataError", None)
- InvalidData = getattr(exceptions, "InvalidData", None)
+ DataError = getattr(exceptions, 'DataError', None)
+ InvalidData = getattr(exceptions, 'InvalidData', None)
exceptions.InvalidData = ID
exceptions.DataError = None
try:
@@ -462,28 +462,28 @@ class test_Channel(TestCase):
# which raises a channel error so that the consumer/publisher
# can recover by redeclaring the required entities.
with self.assertRaises(InconsistencyError):
- self.channel.get_table("celery")
+ self.channel.get_table('celery')
class test_Redis(TestCase):
def setUp(self):
self.connection = BrokerConnection(transport=Transport)
- self.exchange = Exchange("test_Redis", type="direct")
- self.queue = Queue("test_Redis", self.exchange, "test_Redis")
+ self.exchange = Exchange('test_Redis', type='direct')
+ self.queue = Queue('test_Redis', self.exchange, 'test_Redis')
def tearDown(self):
self.connection.close()
def test_publish__get(self):
channel = self.connection.channel()
- producer = Producer(channel, self.exchange, routing_key="test_Redis")
+ producer = Producer(channel, self.exchange, routing_key='test_Redis')
self.queue(channel).declare()
- producer.publish({"hello": "world"})
+ producer.publish({'hello': 'world'})
self.assertDictEqual(self.queue(channel).get().payload,
- {"hello": "world"})
+ {'hello': 'world'})
self.assertIsNone(self.queue(channel).get())
self.assertIsNone(self.queue(channel).get())
self.assertIsNone(self.queue(channel).get())
@@ -491,10 +491,10 @@ class test_Redis(TestCase):
def test_publish__consume(self):
connection = BrokerConnection(transport=Transport)
channel = connection.channel()
- producer = Producer(channel, self.exchange, routing_key="test_Redis")
+ producer = Producer(channel, self.exchange, routing_key='test_Redis')
consumer = Consumer(channel, self.queue)
- producer.publish({"hello2": "world2"})
+ producer.publish({'hello2': 'world2'})
_received = []
def callback(message_data, message):
@@ -515,13 +515,13 @@ class test_Redis(TestCase):
def test_purge(self):
channel = self.connection.channel()
- producer = Producer(channel, self.exchange, routing_key="test_Redis")
+ producer = Producer(channel, self.exchange, routing_key='test_Redis')
self.queue(channel).declare()
for i in range(10):
- producer.publish({"hello": "world-%s" % (i, )})
+ producer.publish({'hello': 'world-%s' % (i, )})
- self.assertEqual(channel._size("test_Redis"), 10)
+ self.assertEqual(channel._size('test_Redis'), 10)
self.assertEqual(self.queue(channel).purge(), 10)
channel.close()
@@ -530,16 +530,16 @@ class test_Redis(TestCase):
transport=Transport).channel()
self.assertEqual(c1.client.db, 1)
- c2 = BrokerConnection(virtual_host="1",
+ c2 = BrokerConnection(virtual_host='1',
transport=Transport).channel()
self.assertEqual(c2.client.db, 1)
- c3 = BrokerConnection(virtual_host="/1",
+ c3 = BrokerConnection(virtual_host='/1',
transport=Transport).channel()
self.assertEqual(c3.client.db, 1)
with self.assertRaises(Exception):
- BrokerConnection(virtual_host="/foo",
+ BrokerConnection(virtual_host='/foo',
transport=Transport).channel()
def test_db_port(self):
@@ -574,7 +574,7 @@ class test_Redis(TestCase):
def test_get__Empty(self):
channel = self.connection.channel()
with self.assertRaises(Empty):
- channel._get("does-not-exist")
+ channel._get('does-not-exist')
channel.close()
def test_get_client(self):
@@ -610,7 +610,7 @@ def _redis_modules():
class ResponseError(Exception):
pass
- exceptions = types.ModuleType("redis.exceptions")
+ exceptions = types.ModuleType('redis.exceptions')
exceptions.ConnectionError = ConnectionError
exceptions.AuthenticationError = AuthenticationError
exceptions.InvalidData = InvalidData
@@ -620,7 +620,7 @@ def _redis_modules():
class Redis(object):
pass
- myredis = types.ModuleType("redis")
+ myredis = types.ModuleType('redis')
myredis.exceptions = exceptions
myredis.Redis = Redis
@@ -703,7 +703,7 @@ class test_MultiChannelPoller(TestCase):
self.assertEqual(p._register.call_count, 1)
channel.client.connection._sock = Mock()
- p._chan_to_sock[(channel, channel.client, "BRPOP")] = True
+ p._chan_to_sock[(channel, channel.client, 'BRPOP')] = True
channel._in_poll = True
p._register_BRPOP(channel)
self.assertEqual(channel._brpop_start.call_count, 1)
@@ -717,7 +717,7 @@ class test_MultiChannelPoller(TestCase):
p._register = Mock()
p._register_LISTEN(channel)
- p._register.assert_called_with(channel, channel.subclient, "LISTEN")
+ p._register.assert_called_with(channel, channel.subclient, 'LISTEN')
self.assertEqual(p._register.call_count, 1)
self.assertEqual(channel._subscribe.call_count, 1)
@@ -753,7 +753,7 @@ class test_MultiChannelPoller(TestCase):
p.get()
def test_get_brpop_qos_allow(self):
- p, channel = self.create_get(queues=["a_queue"])
+ p, channel = self.create_get(queues=['a_queue'])
channel.qos.can_consume.return_value = True
with self.assertRaises(redis.Empty):
@@ -762,7 +762,7 @@ class test_MultiChannelPoller(TestCase):
p._register_BRPOP.assert_called_with(channel)
def test_get_brpop_qos_disallow(self):
- p, channel = self.create_get(queues=["a_queue"])
+ p, channel = self.create_get(queues=['a_queue'])
channel.qos.can_consume.return_value = False
with self.assertRaises(redis.Empty):
@@ -771,7 +771,7 @@ class test_MultiChannelPoller(TestCase):
self.assertFalse(p._register_BRPOP.called)
def test_get_listen(self):
- p, channel = self.create_get(fanouts=["f_queue"])
+ p, channel = self.create_get(fanouts=['f_queue'])
with self.assertRaises(redis.Empty):
p.get()
@@ -780,19 +780,19 @@ class test_MultiChannelPoller(TestCase):
def test_get_receives_ERR(self):
p, channel = self.create_get(events=[(1, eventio.ERR)])
- p._fd_to_chan[1] = (channel, "BRPOP")
+ p._fd_to_chan[1] = (channel, 'BRPOP')
with self.assertRaises(redis.Empty):
p.get()
- channel._poll_error.assert_called_with("BRPOP")
+ channel._poll_error.assert_called_with('BRPOP')
def test_get_receives_multiple(self):
p, channel = self.create_get(events=[(1, eventio.ERR),
(1, eventio.ERR)])
- p._fd_to_chan[1] = (channel, "BRPOP")
+ p._fd_to_chan[1] = (channel, 'BRPOP')
with self.assertRaises(redis.Empty):
p.get()
- channel._poll_error.assert_called_with("BRPOP")
+ channel._poll_error.assert_called_with('BRPOP')
diff --git a/kombu/tests/transport/test_sqlalchemy.py b/kombu/tests/transport/test_sqlalchemy.py
index b827bc88..8fb8e6dd 100644
--- a/kombu/tests/transport/test_sqlalchemy.py
+++ b/kombu/tests/transport/test_sqlalchemy.py
@@ -14,15 +14,15 @@ class test_sqlalchemy(TestCase):
try:
import sqlalchemy # noqa
except ImportError:
- raise SkipTest("sqlalchemy not installed")
- with patch("kombu.transport.sqlalchemy.Channel._open"):
- url = "sqlalchemy+sqlite://celerydb.sqlite"
+ raise SkipTest('sqlalchemy not installed')
+ with patch('kombu.transport.sqlalchemy.Channel._open'):
+ url = 'sqlalchemy+sqlite://celerydb.sqlite'
BrokerConnection(url).connect()
- url = "sqla+sqlite://celerydb.sqlite"
+ url = 'sqla+sqlite://celerydb.sqlite'
BrokerConnection(url).connect()
# Should prevent regression fixed by f187ccd
- url = "sqlb+sqlite://celerydb.sqlite"
+ url = 'sqlb+sqlite://celerydb.sqlite'
with self.assertRaises(KeyError):
BrokerConnection(url).connect()
diff --git a/kombu/tests/transport/test_transport.py b/kombu/tests/transport/test_transport.py
index cc3eeea7..151f4b56 100644
--- a/kombu/tests/transport/test_transport.py
+++ b/kombu/tests/transport/test_transport.py
@@ -12,19 +12,19 @@ class test_transport(TestCase):
def test_resolve_transport__no_class_name(self):
with self.assertRaises(KeyError):
- transport.resolve_transport("nonexistant")
+ transport.resolve_transport('nonexistant')
def test_resolve_transport_when_callable(self):
self.assertTupleEqual(transport.resolve_transport(
- lambda: "kombu.transport.memory.Transport"),
- ("kombu.transport.memory", "Transport"))
+ lambda: 'kombu.transport.memory.Transport'),
+ ('kombu.transport.memory', 'Transport'))
class test_transport_gettoq(TestCase):
- @patch("warnings.warn")
+ @patch('warnings.warn')
def test_compat(self, warn):
- x = transport._ghettoq("Redis", "redis", "redis")
+ x = transport._ghettoq('Redis', 'redis', 'redis')
- self.assertEqual(x(), "kombu.transport.redis.Transport")
+ self.assertEqual(x(), 'kombu.transport.redis.Transport')
self.assertTrue(warn.called)
diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py
index 301d71e7..d4a5fe3f 100644
--- a/kombu/tests/transport/virtual/test_base.py
+++ b/kombu/tests/transport/virtual/test_base.py
@@ -16,20 +16,20 @@ from kombu.tests.utils import Mock, redirect_stdouts
def client(**kwargs):
- return BrokerConnection(transport="kombu.transport.virtual.Transport",
+ return BrokerConnection(transport='kombu.transport.virtual.Transport',
**kwargs)
def memory_client():
- return BrokerConnection(transport="memory")
+ return BrokerConnection(transport='memory')
class test_BrokerState(TestCase):
def test_constructor(self):
s = virtual.BrokerState()
- self.assertTrue(hasattr(s, "exchanges"))
- self.assertTrue(hasattr(s, "bindings"))
+ self.assertTrue(hasattr(s, 'exchanges'))
+ self.assertTrue(hasattr(s, 'bindings'))
t = virtual.BrokerState(exchanges=16, bindings=32)
self.assertEqual(t.exchanges, 16)
@@ -95,66 +95,66 @@ class test_QoS(TestCase):
self.assertFalse(stdout.getvalue())
def test_get(self):
- self.q._delivered["foo"] = 1
- self.assertEqual(self.q.get("foo"), 1)
+ self.q._delivered['foo'] = 1
+ self.assertEqual(self.q.get('foo'), 1)
class test_Message(TestCase):
def test_create(self):
c = client().channel()
- data = c.prepare_message("the quick brown fox...")
- tag = data["properties"]["delivery_tag"] = uuid()
+ data = c.prepare_message('the quick brown fox...')
+ tag = data['properties']['delivery_tag'] = uuid()
message = c.message_to_python(data)
self.assertIsInstance(message, virtual.Message)
self.assertIs(message, c.message_to_python(message))
self.assertEqual(message.body,
- "the quick brown fox...".encode("utf-8"))
+ 'the quick brown fox...'.encode('utf-8'))
self.assertTrue(message.delivery_tag, tag)
def test_create_no_body(self):
virtual.Message(Mock(), {
- "body": None,
- "properties": {"delivery_tag": 1}})
+ 'body': None,
+ 'properties': {'delivery_tag': 1}})
def test_serializable(self):
c = client().channel()
- data = c.prepare_message("the quick brown fox...")
- tag = data["properties"]["delivery_tag"] = uuid()
+ data = c.prepare_message('the quick brown fox...')
+ tag = data['properties']['delivery_tag'] = uuid()
message = c.message_to_python(data)
dict_ = message.serializable()
- self.assertEqual(dict_["body"],
- "the quick brown fox...".encode("utf-8"))
- self.assertEqual(dict_["properties"]["delivery_tag"], tag)
+ self.assertEqual(dict_['body'],
+ 'the quick brown fox...'.encode('utf-8'))
+ self.assertEqual(dict_['properties']['delivery_tag'], tag)
class test_AbstractChannel(TestCase):
def test_get(self):
with self.assertRaises(NotImplementedError):
- virtual.AbstractChannel()._get("queue")
+ virtual.AbstractChannel()._get('queue')
def test_put(self):
with self.assertRaises(NotImplementedError):
- virtual.AbstractChannel()._put("queue", "m")
+ virtual.AbstractChannel()._put('queue', 'm')
def test_size(self):
- self.assertEqual(virtual.AbstractChannel()._size("queue"), 0)
+ self.assertEqual(virtual.AbstractChannel()._size('queue'), 0)
def test_purge(self):
with self.assertRaises(NotImplementedError):
- virtual.AbstractChannel()._purge("queue")
+ virtual.AbstractChannel()._purge('queue')
def test_delete(self):
with self.assertRaises(NotImplementedError):
- virtual.AbstractChannel()._delete("queue")
+ virtual.AbstractChannel()._delete('queue')
def test_new_queue(self):
- self.assertIsNone(virtual.AbstractChannel()._new_queue("queue"))
+ self.assertIsNone(virtual.AbstractChannel()._new_queue('queue'))
def test_has_queue(self):
- self.assertTrue(virtual.AbstractChannel()._has_queue("queue"))
+ self.assertTrue(virtual.AbstractChannel()._has_queue('queue'))
def test_poll(self):
@@ -181,20 +181,20 @@ class test_Channel(TestCase):
def test_exchange_declare(self):
c = self.channel
- c.exchange_declare("test_exchange_declare", "direct",
+ c.exchange_declare('test_exchange_declare', 'direct',
durable=True, auto_delete=True)
- self.assertIn("test_exchange_declare", c.state.exchanges)
+ self.assertIn('test_exchange_declare', c.state.exchanges)
# can declare again with same values
- c.exchange_declare("test_exchange_declare", "direct",
+ c.exchange_declare('test_exchange_declare', 'direct',
durable=True, auto_delete=True)
- self.assertIn("test_exchange_declare", c.state.exchanges)
+ self.assertIn('test_exchange_declare', c.state.exchanges)
# using different values raises NotEquivalentError
with self.assertRaises(virtual.NotEquivalentError):
- c.exchange_declare("test_exchange_declare", "direct",
+ c.exchange_declare('test_exchange_declare', 'direct',
durable=False, auto_delete=True)
- def test_exchange_delete(self, ex="test_exchange_delete"):
+ def test_exchange_delete(self, ex='test_exchange_delete'):
class PurgeChannel(virtual.Channel):
purged = []
@@ -204,13 +204,13 @@ class test_Channel(TestCase):
c = PurgeChannel(self.channel.connection)
- c.exchange_declare(ex, "direct", durable=True, auto_delete=True)
+ c.exchange_declare(ex, 'direct', durable=True, auto_delete=True)
self.assertIn(ex, c.state.exchanges)
self.assertNotIn(ex, c.state.bindings) # no bindings yet
c.exchange_delete(ex)
self.assertNotIn(ex, c.state.exchanges)
- c.exchange_declare(ex, "direct", durable=True, auto_delete=True)
+ c.exchange_declare(ex, 'direct', durable=True, auto_delete=True)
c.queue_declare(ex)
c.queue_bind(ex, ex, ex)
self.assertTrue(c.state.bindings[ex])
@@ -218,7 +218,7 @@ class test_Channel(TestCase):
self.assertNotIn(ex, c.state.bindings)
self.assertIn(ex, c.purged)
- def test_queue_delete__if_empty(self, n="test_queue_delete__if_empty"):
+ def test_queue_delete__if_empty(self, n='test_queue_delete__if_empty'):
class PurgeChannel(virtual.Channel):
purged = []
size = 30
@@ -244,7 +244,7 @@ class test_Channel(TestCase):
self.assertNotIn(n, c.state.bindings)
self.assertIn(n, c.purged)
- def test_queue_purge(self, n="test_queue_purge"):
+ def test_queue_purge(self, n='test_queue_purge'):
class PurgeChannel(virtual.Channel):
purged = []
@@ -260,35 +260,35 @@ class test_Channel(TestCase):
self.assertIn(n, c.purged)
def test_basic_publish__get__consume__restore(self,
- n="test_basic_publish"):
+ n='test_basic_publish'):
c = memory_client().channel()
c.exchange_declare(n)
c.queue_declare(n)
c.queue_bind(n, n, n)
- c.queue_declare(n + "2")
- c.queue_bind(n + "2", n, n)
+ c.queue_declare(n + '2')
+ c.queue_bind(n + '2', n, n)
- m = c.prepare_message("nthex quick brown fox...")
+ m = c.prepare_message('nthex quick brown fox...')
c.basic_publish(m, n, n)
r1 = c.message_to_python(c.basic_get(n))
self.assertTrue(r1)
self.assertEqual(r1.body,
- "nthex quick brown fox...".encode("utf-8"))
+ 'nthex quick brown fox...'.encode('utf-8'))
self.assertIsNone(c.basic_get(n))
consumer_tag = uuid()
- c.basic_consume(n + "2", False, consumer_tag=consumer_tag,
+ c.basic_consume(n + '2', False, consumer_tag=consumer_tag,
callback=lambda *a: None)
- self.assertIn(n + "2", c._active_queues)
+ self.assertIn(n + '2', c._active_queues)
r2, _ = c.drain_events()
r2 = c.message_to_python(r2)
self.assertEqual(r2.body,
- "nthex quick brown fox...".encode("utf-8"))
- self.assertEqual(r2.delivery_info["exchange"], n)
- self.assertEqual(r2.delivery_info["routing_key"], n)
+ 'nthex quick brown fox...'.encode('utf-8'))
+ self.assertEqual(r2.delivery_info['exchange'], n)
+ self.assertEqual(r2.delivery_info['routing_key'], n)
with self.assertRaises(virtual.Empty):
c.drain_events()
c.basic_cancel(consumer_tag)
@@ -296,7 +296,7 @@ class test_Channel(TestCase):
c._restore(r2)
r3 = c.message_to_python(c.basic_get(n))
self.assertTrue(r3)
- self.assertEqual(r3.body, "nthex quick brown fox...".encode("utf-8"))
+ self.assertEqual(r3.body, 'nthex quick brown fox...'.encode('utf-8'))
self.assertIsNone(c.basic_get(n))
def test_basic_ack(self):
@@ -308,7 +308,7 @@ class test_Channel(TestCase):
self.was_acked = True
self.channel._qos = MockQoS(self.channel)
- self.channel.basic_ack("foo")
+ self.channel.basic_ack('foo')
self.assertTrue(self.channel._qos.was_acked)
def test_basic_recover__requeue(self):
@@ -336,8 +336,8 @@ class test_Channel(TestCase):
self.assertEqual(errors[0][1], 1)
self.assertFalse(q._delivered)
- @patch("kombu.transport.virtual.emergency_dump_state")
- @patch("kombu.transport.virtual.say")
+ @patch('kombu.transport.virtual.emergency_dump_state')
+ @patch('kombu.transport.virtual.say')
def test_restore_unacked_once_when_unrestored(self, say,
emergency_dump_state):
q = self.channel.qos
@@ -373,20 +373,20 @@ class test_Channel(TestCase):
self.was_rejected = True
self.channel._qos = MockQoS(self.channel)
- self.channel.basic_reject("foo")
+ self.channel.basic_reject('foo')
self.assertTrue(self.channel._qos.was_rejected)
def test_basic_qos(self):
self.channel.basic_qos(prefetch_count=128)
self.assertEqual(self.channel._qos.prefetch_count, 128)
- def test_lookup__undeliverable(self, n="test_lookup__undeliverable"):
+ def test_lookup__undeliverable(self, n='test_lookup__undeliverable'):
warnings.resetwarnings()
with catch_warnings(record=True) as log:
- self.assertListEqual(self.channel._lookup(n, n, "ae.undeliver"),
- ["ae.undeliver"])
+ self.assertListEqual(self.channel._lookup(n, n, 'ae.undeliver'),
+ ['ae.undeliver'])
self.assertTrue(log)
- self.assertIn("could not be delivered", log[0].message.args[0])
+ self.assertIn('could not be delivered', log[0].message.args[0])
def test_context(self):
x = self.channel.__enter__()
@@ -418,44 +418,44 @@ class test_Channel(TestCase):
c._get_many.assert_called_with(c._active_queues, timeout=10.0)
def test_get_exchanges(self):
- self.channel.exchange_declare(exchange="foo")
+ self.channel.exchange_declare(exchange='foo')
self.assertTrue(self.channel.get_exchanges())
def test_basic_cancel_not_in_active_queues(self):
c = self.channel
- c._consumers.add("x")
- c._tag_to_queue["x"] = "foo"
+ c._consumers.add('x')
+ c._tag_to_queue['x'] = 'foo'
c._active_queues = Mock()
c._active_queues.remove.side_effect = ValueError()
- c.basic_cancel("x")
- c._active_queues.remove.assert_called_with("foo")
+ c.basic_cancel('x')
+ c._active_queues.remove.assert_called_with('foo')
def test_basic_cancel_unknown_ctag(self):
- self.assertIsNone(self.channel.basic_cancel("unknown-tag"))
+ self.assertIsNone(self.channel.basic_cancel('unknown-tag'))
def test_list_bindings(self):
c = self.channel
- c.exchange_declare(exchange="foo")
- c.queue_declare(queue="q")
- c.queue_bind(queue="q", exchange="foo", routing_key="rk")
+ c.exchange_declare(exchange='foo')
+ c.queue_declare(queue='q')
+ c.queue_bind(queue='q', exchange='foo', routing_key='rk')
- self.assertIn(("q", "foo", "rk"), list(c.list_bindings()))
+ self.assertIn(('q', 'foo', 'rk'), list(c.list_bindings()))
def test_after_reply_message_received(self):
c = self.channel
c.queue_delete = Mock()
- c.after_reply_message_received("foo")
- c.queue_delete.assert_called_with("foo")
+ c.after_reply_message_received('foo')
+ c.queue_delete.assert_called_with('foo')
def test_queue_delete_unknown_queue(self):
- self.assertIsNone(self.channel.queue_delete("xiwjqjwel"))
+ self.assertIsNone(self.channel.queue_delete('xiwjqjwel'))
def test_queue_declare_passive(self):
has_queue = self.channel._has_queue = Mock()
has_queue.return_value = False
with self.assertRaises(StdChannelError):
- self.channel.queue_declare(queue="21wisdjwqe", passive=True)
+ self.channel.queue_declare(queue='21wisdjwqe', passive=True)
class test_Transport(TestCase):
diff --git a/kombu/tests/transport/virtual/test_exchange.py b/kombu/tests/transport/virtual/test_exchange.py
index d33b9efc..50498021 100644
--- a/kombu/tests/transport/virtual/test_exchange.py
+++ b/kombu/tests/transport/virtual/test_exchange.py
@@ -20,54 +20,54 @@ class ExchangeCase(TestCase):
class test_Direct(ExchangeCase):
type = exchange.DirectExchange
- table = [("rFoo", None, "qFoo"),
- ("rFoo", None, "qFox"),
- ("rBar", None, "qBar"),
- ("rBaz", None, "qBaz")]
+ table = [('rFoo', None, 'qFoo'),
+ ('rFoo', None, 'qFox'),
+ ('rBar', None, 'qBar'),
+ ('rBaz', None, 'qBaz')]
def test_lookup(self):
self.assertListEqual(self.e.lookup(
- self.table, "eFoo", "rFoo", None),
- ["qFoo", "qFox"])
+ self.table, 'eFoo', 'rFoo', None),
+ ['qFoo', 'qFox'])
self.assertListEqual(self.e.lookup(
- self.table, "eMoz", "rMoz", "DEFAULT"),
+ self.table, 'eMoz', 'rMoz', 'DEFAULT'),
[])
self.assertListEqual(self.e.lookup(
- self.table, "eBar", "rBar", None),
- ["qBar"])
+ self.table, 'eBar', 'rBar', None),
+ ['qBar'])
class test_Fanout(ExchangeCase):
type = exchange.FanoutExchange
- table = [(None, None, "qFoo"),
- (None, None, "qFox"),
- (None, None, "qBar")]
+ table = [(None, None, 'qFoo'),
+ (None, None, 'qFox'),
+ (None, None, 'qBar')]
def test_lookup(self):
self.assertListEqual(self.e.lookup(
- self.table, "eFoo", "rFoo", None),
- ["qFoo", "qFox", "qBar"])
+ self.table, 'eFoo', 'rFoo', None),
+ ['qFoo', 'qFox', 'qBar'])
def test_deliver_when_fanout_supported(self):
self.e.channel = Mock()
self.e.channel.supports_fanout = True
message = Mock()
- self.e.deliver(message, "exchange", None)
- self.e.channel._put_fanout.assert_called_with("exchange", message)
+ self.e.deliver(message, 'exchange', None)
+ self.e.channel._put_fanout.assert_called_with('exchange', message)
def test_deliver_when_fanout_unsupported(self):
self.e.channel = Mock()
self.e.channel.supports_fanout = False
- self.e.deliver(Mock(), "exchange", None)
+ self.e.deliver(Mock(), 'exchange', None)
self.assertFalse(self.e.channel._put_fanout.called)
class test_Topic(ExchangeCase):
type = exchange.TopicExchange
- table = [("stock.#", None, "rFoo"),
- ("stock.us.*", None, "rBar")]
+ table = [('stock.#', None, 'rFoo'),
+ ('stock.us.*', None, 'rBar')]
def setUp(self):
super(test_Topic, self).setUp()
@@ -75,32 +75,32 @@ class test_Topic(ExchangeCase):
for rkey, _, queue in self.table]
def test_prepare_bind(self):
- x = self.e.prepare_bind("qFoo", "eFoo", "stock.#", {})
- self.assertTupleEqual(x, ("stock.#", r'^stock\..*?$', "qFoo"))
+ x = self.e.prepare_bind('qFoo', 'eFoo', 'stock.#', {})
+ self.assertTupleEqual(x, ('stock.#', r'^stock\..*?$', 'qFoo'))
def test_lookup(self):
self.assertListEqual(self.e.lookup(
- self.table, "eFoo", "stock.us.nasdaq", None),
- ["rFoo", "rBar"])
+ self.table, 'eFoo', 'stock.us.nasdaq', None),
+ ['rFoo', 'rBar'])
self.assertTrue(self.e._compiled)
self.assertListEqual(self.e.lookup(
- self.table, "eFoo", "stock.europe.OSE", None),
- ["rFoo"])
+ self.table, 'eFoo', 'stock.europe.OSE', None),
+ ['rFoo'])
self.assertListEqual(self.e.lookup(
- self.table, "eFoo", "stockxeuropexOSE", None),
+ self.table, 'eFoo', 'stockxeuropexOSE', None),
[])
self.assertListEqual(self.e.lookup(
- self.table, "eFoo", "candy.schleckpulver.snap_crackle", None),
+ self.table, 'eFoo', 'candy.schleckpulver.snap_crackle', None),
[])
def test_deliver(self):
self.e.channel = Mock()
- self.e.channel._lookup.return_value = ("a", "b")
+ self.e.channel._lookup.return_value = ('a', 'b')
message = Mock()
- self.e.deliver(message, "exchange", "rkey")
+ self.e.deliver(message, 'exchange', 'rkey')
- expected = [(("a", message), {}),
- (("b", message), {})]
+ expected = [(('a', message), {}),
+ (('b', message), {})]
self.assertListEqual(self.e.channel._put.call_args_list, expected)
@@ -109,32 +109,32 @@ class test_ExchangeType(ExchangeCase):
def test_lookup(self):
with self.assertRaises(NotImplementedError):
- self.e.lookup([], "eFoo", "rFoo", None)
+ self.e.lookup([], 'eFoo', 'rFoo', None)
def test_prepare_bind(self):
- self.assertTupleEqual(self.e.prepare_bind("qFoo", "eFoo", "rFoo", {}),
- ("rFoo", None, "qFoo"))
+ self.assertTupleEqual(self.e.prepare_bind('qFoo', 'eFoo', 'rFoo', {}),
+ ('rFoo', None, 'qFoo'))
def test_equivalent(self):
- e1 = dict(type="direct",
+ e1 = dict(type='direct',
durable=True,
auto_delete=True,
arguments={})
self.assertTrue(
- self.e.equivalent(e1, "eFoo", "direct", True, True, {}))
+ self.e.equivalent(e1, 'eFoo', 'direct', True, True, {}))
self.assertFalse(
- self.e.equivalent(e1, "eFoo", "topic", True, True, {}))
+ self.e.equivalent(e1, 'eFoo', 'topic', True, True, {}))
self.assertFalse(
- self.e.equivalent(e1, "eFoo", "direct", False, True, {}))
+ self.e.equivalent(e1, 'eFoo', 'direct', False, True, {}))
self.assertFalse(
- self.e.equivalent(e1, "eFoo", "direct", True, False, {}))
+ self.e.equivalent(e1, 'eFoo', 'direct', True, False, {}))
self.assertFalse(
- self.e.equivalent(e1, "eFoo", "direct", True, True, {
- "expires": 3000}))
- e2 = dict(e1, arguments={"expires": 3000})
+ self.e.equivalent(e1, 'eFoo', 'direct', True, True, {
+ 'expires': 3000}))
+ e2 = dict(e1, arguments={'expires': 3000})
self.assertTrue(
- self.e.equivalent(e2, "eFoo", "direct", True, True, {
- "expires": 3000}))
+ self.e.equivalent(e2, 'eFoo', 'direct', True, True, {
+ 'expires': 3000}))
self.assertFalse(
- self.e.equivalent(e2, "eFoo", "direct", True, True, {
- "expires": 6000}))
+ self.e.equivalent(e2, 'eFoo', 'direct', True, True, {
+ 'expires': 6000}))
diff --git a/kombu/tests/transport/virtual/test_scheduling.py b/kombu/tests/transport/virtual/test_scheduling.py
index 6cae66f1..afbcd061 100644
--- a/kombu/tests/transport/virtual/test_scheduling.py
+++ b/kombu/tests/transport/virtual/test_scheduling.py
@@ -20,12 +20,12 @@ def consume(fun, n):
class test_FairCycle(TestCase):
def test_cycle(self):
- resources = ["a", "b", "c", "d", "e"]
+ resources = ['a', 'b', 'c', 'd', 'e']
def echo(r, timeout=None):
return r
- # cycle should be ["a", "b", "c", "d", "e", ... repeat]
+ # cycle should be ['a', 'b', 'c', 'd', 'e', ... repeat]
cycle = FairCycle(echo, resources, MyEmpty)
for i in range(len(resources)):
self.assertEqual(cycle.get(), (resources[i],
@@ -35,21 +35,21 @@ class test_FairCycle(TestCase):
resources[i]))
def test_cycle_breaks(self):
- resources = ["a", "b", "c", "d", "e"]
+ resources = ['a', 'b', 'c', 'd', 'e']
def echo(r):
- if r == "c":
+ if r == 'c':
raise MyEmpty(r)
return r
cycle = FairCycle(echo, resources, MyEmpty)
self.assertEqual(consume(cycle.get, len(resources)),
- [("a", "a"), ("b", "b"), ("d", "d"),
- ("e", "e"), ("a", "a")])
+ [('a', 'a'), ('b', 'b'), ('d', 'd'),
+ ('e', 'e'), ('a', 'a')])
self.assertEqual(consume(cycle.get, len(resources)),
- [("b", "b"), ("d", "d"), ("e", "e"),
- ("a", "a"), ("b", "b")])
- cycle2 = FairCycle(echo, ["c", "c"], MyEmpty)
+ [('b', 'b'), ('d', 'd'), ('e', 'e'),
+ ('a', 'a'), ('b', 'b')])
+ cycle2 = FairCycle(echo, ['c', 'c'], MyEmpty)
with self.assertRaises(MyEmpty):
consume(cycle2.get, 3)