summaryrefslogtreecommitdiff
path: root/kombu
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2012-06-15 19:32:40 +0200
committerAsk Solem <ask@celeryproject.org>2012-06-15 19:32:40 +0200
commit5e7a32440f803321a58453efb3ea0fde0d4497b2 (patch)
tree3fe4f3e58b85b18105141d799779f30d043ecd82 /kombu
parent8d5652600e9afc76c803958638821a12018b8803 (diff)
downloadkombu-5e7a32440f803321a58453efb3ea0fde0d4497b2.tar.gz
Use single quotes
Diffstat (limited to 'kombu')
-rw-r--r--kombu/__init__.py60
-rw-r--r--kombu/abstract.py8
-rw-r--r--kombu/clocks.py2
-rw-r--r--kombu/common.py26
-rw-r--r--kombu/compat.py22
-rw-r--r--kombu/compression.py8
-rw-r--r--kombu/connection.py134
-rw-r--r--kombu/entity.py102
-rw-r--r--kombu/exceptions.py8
-rw-r--r--kombu/log.py22
-rw-r--r--kombu/messaging.py16
-rw-r--r--kombu/mixins.py24
-rw-r--r--kombu/pidbox.py52
-rw-r--r--kombu/pools.py10
-rw-r--r--kombu/serialization.py36
-rw-r--r--kombu/simple.py6
-rw-r--r--kombu/syn.py12
-rw-r--r--kombu/tests/__init__.py32
-rw-r--r--kombu/tests/compat.py20
-rw-r--r--kombu/tests/mocks.py50
-rw-r--r--kombu/tests/test_common.py90
-rw-r--r--kombu/tests/test_compat.py164
-rw-r--r--kombu/tests/test_compression.py22
-rw-r--r--kombu/tests/test_connection.py160
-rw-r--r--kombu/tests/test_entities.py122
-rw-r--r--kombu/tests/test_log.py82
-rw-r--r--kombu/tests/test_messaging.py276
-rw-r--r--kombu/tests/test_pidbox.py98
-rw-r--r--kombu/tests/test_pools.py30
-rw-r--r--kombu/tests/test_serialization.py82
-rw-r--r--kombu/tests/test_simple.py44
-rw-r--r--kombu/tests/test_utils.py26
-rw-r--r--kombu/tests/transport/test_amqplib.py44
-rw-r--r--kombu/tests/transport/test_base.py20
-rw-r--r--kombu/tests/transport/test_memory.py36
-rw-r--r--kombu/tests/transport/test_mongodb.py30
-rw-r--r--kombu/tests/transport/test_redis.py168
-rw-r--r--kombu/tests/transport/test_sqlalchemy.py10
-rw-r--r--kombu/tests/transport/test_transport.py12
-rw-r--r--kombu/tests/transport/virtual/test_base.py132
-rw-r--r--kombu/tests/transport/virtual/test_exchange.py92
-rw-r--r--kombu/tests/transport/virtual/test_scheduling.py18
-rw-r--r--kombu/tests/utilities/test_encoding.py34
-rw-r--r--kombu/tests/utilities/test_functional.py6
-rw-r--r--kombu/tests/utils.py18
-rw-r--r--kombu/transport/SQS.py42
-rw-r--r--kombu/transport/__init__.py62
-rw-r--r--kombu/transport/amqplib.py30
-rw-r--r--kombu/transport/base.py54
-rw-r--r--kombu/transport/beanstalk.py14
-rw-r--r--kombu/transport/couchdb.py18
-rw-r--r--kombu/transport/django/__init__.py12
-rw-r--r--kombu/transport/django/management/commands/clean_kombu_messages.py4
-rw-r--r--kombu/transport/django/managers.py2
-rw-r--r--kombu/transport/django/models.py18
-rw-r--r--kombu/transport/librabbitmq.py28
-rw-r--r--kombu/transport/memory.py6
-rw-r--r--kombu/transport/mongodb.py94
-rw-r--r--kombu/transport/pika.py44
-rw-r--r--kombu/transport/pika2.py44
-rw-r--r--kombu/transport/redis.py94
-rw-r--r--kombu/transport/sqlalchemy/__init__.py6
-rw-r--r--kombu/transport/sqlalchemy/models.py24
-rw-r--r--kombu/transport/virtual/__init__.py140
-rw-r--r--kombu/transport/virtual/exchange.py30
-rw-r--r--kombu/transport/virtual/scheduling.py2
-rw-r--r--kombu/transport/zookeeper.py9
-rw-r--r--kombu/utils/__init__.py30
-rw-r--r--kombu/utils/amq_manager.py10
-rw-r--r--kombu/utils/compat.py8
-rw-r--r--kombu/utils/debug.py20
-rw-r--r--kombu/utils/encoding.py20
-rw-r--r--kombu/utils/eventio.py4
-rw-r--r--kombu/utils/finalize.py4
-rw-r--r--kombu/utils/functional.py4
-rw-r--r--kombu/utils/limits.py2
-rw-r--r--kombu/utils/url.py2
77 files changed, 1673 insertions, 1674 deletions
diff --git a/kombu/__init__.py b/kombu/__init__.py
index 7aefc76f..8d29715c 100644
--- a/kombu/__init__.py
+++ b/kombu/__init__.py
@@ -2,11 +2,11 @@
from __future__ import absolute_import
VERSION = (2, 2, 1)
-__version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:])
-__author__ = "Ask Solem"
-__contact__ = "ask@celeryproject.org"
-__homepage__ = "http://kombu.readthedocs.org"
-__docformat__ = "restructuredtext en"
+__version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:])
+__author__ = 'Ask Solem'
+__contact__ = 'ask@celeryproject.org'
+__homepage__ = 'http://kombu.readthedocs.org'
+__docformat__ = 'restructuredtext en'
# -eof meta-
@@ -16,20 +16,20 @@ import sys
if sys.version_info < (2, 5): # pragma: no cover
if sys.version_info >= (2, 4):
raise Exception(
- "Python 2.4 is not supported by this version. "
- "Please use Kombu versions 1.x.")
+ 'Python 2.4 is not supported by this version. '
+ 'Please use Kombu versions 1.x.')
else:
- raise Exception("Kombu requires Python versions 2.5 or later.")
+ raise Exception('Kombu requires Python versions 2.5 or later.')
# Lazy loading.
# - See werkzeug/__init__.py for the rationale behind this.
from types import ModuleType
all_by_module = {
- "kombu.connection": ["BrokerConnection", "Connection"],
- "kombu.entity": ["Exchange", "Queue"],
- "kombu.messaging": ["Consumer", "Producer"],
- "kombu.pools": ["connections", "producers"],
+ 'kombu.connection': ['BrokerConnection', 'Connection'],
+ 'kombu.entity': ['Exchange', 'Queue'],
+ 'kombu.messaging': ['Consumer', 'Producer'],
+ 'kombu.pools': ['connections', 'producers'],
}
object_origins = {}
@@ -50,36 +50,36 @@ class module(ModuleType):
def __dir__(self):
result = list(new_module.__all__)
- result.extend(("__file__", "__path__", "__doc__", "__all__",
- "__docformat__", "__name__", "__path__", "VERSION",
- "__package__", "__version__", "__author__",
- "__contact__", "__homepage__", "__docformat__"))
+ result.extend(('__file__', '__path__', '__doc__', '__all__',
+ '__docformat__', '__name__', '__path__', 'VERSION',
+ '__package__', '__version__', '__author__',
+ '__contact__', '__homepage__', '__docformat__'))
return result
# 2.5 does not define __package__
try:
package = __package__
except NameError:
- package = "kombu"
+ package = 'kombu'
# keep a reference to this module so that it's not garbage collected
old_module = sys.modules[__name__]
new_module = sys.modules[__name__] = module(__name__)
new_module.__dict__.update({
- "__file__": __file__,
- "__path__": __path__,
- "__doc__": __doc__,
- "__all__": tuple(object_origins),
- "__version__": __version__,
- "__author__": __author__,
- "__contact__": __contact__,
- "__homepage__": __homepage__,
- "__docformat__": __docformat__,
- "__package__": package,
- "VERSION": VERSION})
+ '__file__': __file__,
+ '__path__': __path__,
+ '__doc__': __doc__,
+ '__all__': tuple(object_origins),
+ '__version__': __version__,
+ '__author__': __author__,
+ '__contact__': __contact__,
+ '__homepage__': __homepage__,
+ '__docformat__': __docformat__,
+ '__package__': package,
+ 'VERSION': VERSION})
-if os.environ.get("KOMBU_LOG_DEBUG"):
- os.environ.update(KOMBU_LOG_CHANNEL="1", KOMBU_LOG_CONNECTION="1")
+if os.environ.get('KOMBU_LOG_DEBUG'):
+ os.environ.update(KOMBU_LOG_CHANNEL='1', KOMBU_LOG_CONNECTION='1')
from .utils import debug
debug.setup_logging()
diff --git a/kombu/abstract.py b/kombu/abstract.py
index 80a2d5bb..daa4077f 100644
--- a/kombu/abstract.py
+++ b/kombu/abstract.py
@@ -15,7 +15,7 @@ from copy import copy
from .connection import maybe_channel
from .exceptions import NotBoundError
-__all__ = ["Object", "MaybeChannelBound"]
+__all__ = ['Object', 'MaybeChannelBound']
def unpickle_dict(cls, kwargs):
@@ -96,11 +96,11 @@ class MaybeChannelBound(Object):
"""Callback called when the class is bound."""
pass
- def __repr__(self, item=""):
+ def __repr__(self, item=''):
if self.is_bound:
- return "<bound %s of %s>" % (item or self.__class__.__name__,
+ return '<bound %s of %s>' % (item or self.__class__.__name__,
self.channel)
- return "<unbound %s>" % (item, )
+ return '<unbound %s>' % (item, )
@property
def is_bound(self):
diff --git a/kombu/clocks.py b/kombu/clocks.py
index dbcae198..a45bc767 100644
--- a/kombu/clocks.py
+++ b/kombu/clocks.py
@@ -13,7 +13,7 @@ from __future__ import with_statement
from threading import Lock
-__all__ = ["LamportClock"]
+__all__ = ['LamportClock']
class LamportClock(object):
diff --git a/kombu/common.py b/kombu/common.py
index b06002f9..87ebb200 100644
--- a/kombu/common.py
+++ b/kombu/common.py
@@ -23,11 +23,11 @@ from .log import Log
from .messaging import Consumer as _Consumer
from .utils import uuid
-__all__ = ["Broadcast", "maybe_declare", "uuid",
- "itermessages", "send_reply", "isend_reply",
- "collect_replies", "insured", "ipublish"]
+__all__ = ['Broadcast', 'maybe_declare', 'uuid',
+ 'itermessages', 'send_reply', 'isend_reply',
+ 'collect_replies', 'insured', 'ipublish']
-insured_logger = Log("kombu.insurance")
+insured_logger = Log('kombu.insurance')
class Broadcast(Queue):
@@ -47,10 +47,10 @@ class Broadcast(Queue):
def __init__(self, name=None, queue=None, **kwargs):
return super(Broadcast, self).__init__(
- name=queue or "bcast.%s" % (uuid(), ),
- **dict({"alias": name,
- "auto_delete": True,
- "exchange": Exchange(name, type="fanout"),
+ name=queue or 'bcast.%s' % (uuid(), ),
+ **dict({'alias': name,
+ 'auto_delete': True,
+ 'exchange': Exchange(name, type='fanout'),
}, **kwargs))
@@ -150,9 +150,9 @@ def send_reply(exchange, req, msg, producer=None, **props):
serializer = serialization.registry.type_to_name[content_type]
maybe_declare(exchange, producer.channel)
producer.publish(msg, exchange=exchange,
- **dict({"routing_key": req.properties["reply_to"],
- "correlation_id": req.properties.get("correlation_id"),
- "serializer": serializer},
+ **dict({'routing_key': req.properties['reply_to'],
+ 'correlation_id': req.properties.get('correlation_id'),
+ 'serializer': serializer},
**props))
@@ -162,7 +162,7 @@ def isend_reply(pool, exchange, req, msg, props, **retry_policy):
def collect_replies(conn, channel, queue, *args, **kwargs):
- no_ack = kwargs.setdefault("no_ack", True)
+ no_ack = kwargs.setdefault('no_ack', True)
received = False
for body, message in itermessages(conn, channel, queue, *args, **kwargs):
if not no_ack:
@@ -175,7 +175,7 @@ def collect_replies(conn, channel, queue, *args, **kwargs):
def _ensure_errback(exc, interval):
insured_logger.error(
- "Connection error: %r. Retry in %ss\n" % (exc, interval),
+ 'Connection error: %r. Retry in %ss\n' % (exc, interval),
exc_info=True)
diff --git a/kombu/compat.py b/kombu/compat.py
index 96fa026a..88874aad 100644
--- a/kombu/compat.py
+++ b/kombu/compat.py
@@ -17,7 +17,7 @@ from itertools import count
from . import messaging
from .entity import Exchange, Queue
-__all__ = ["Publisher", "Consumer"]
+__all__ = ['Publisher', 'Consumer']
def _iterconsume(connection, consumer, no_ack=False, limit=None):
@@ -29,9 +29,9 @@ def _iterconsume(connection, consumer, no_ack=False, limit=None):
class Publisher(messaging.Producer):
- exchange = ""
- exchange_type = "direct"
- routing_key = ""
+ exchange = ''
+ exchange_type = 'direct'
+ routing_key = ''
durable = True
auto_delete = False
_closed = False
@@ -78,14 +78,14 @@ class Publisher(messaging.Producer):
class Consumer(messaging.Consumer):
- queue = ""
- exchange = ""
- routing_key = ""
- exchange_type = "direct"
+ queue = ''
+ exchange = ''
+ routing_key = ''
+ exchange_type = 'direct'
durable = True
exclusive = False
auto_delete = False
- exchange_type = "direct"
+ exchange_type = 'direct'
_closed = False
def __init__(self, connection, queue=None, exchange=None,
@@ -146,12 +146,12 @@ class Consumer(messaging.Consumer):
return message
def process_next(self):
- raise NotImplementedError("Use fetch(enable_callbacks=True)")
+ raise NotImplementedError('Use fetch(enable_callbacks=True)')
def discard_all(self, filterfunc=None):
if filterfunc is not None:
raise NotImplementedError(
- "discard_all does not implement filters")
+ 'discard_all does not implement filters')
return self.purge()
def iterconsume(self, limit=None, no_ack=None):
diff --git a/kombu/compression.py b/kombu/compression.py
index dcf583d8..5c88fee4 100644
--- a/kombu/compression.py
+++ b/kombu/compression.py
@@ -18,8 +18,8 @@ _aliases = {}
_encoders = {}
_decoders = {}
-__all__ = ["register", "encoders", "get_encoder",
- "get_decoder", "compress", "decompress"]
+__all__ = ['register', 'encoders', 'get_encoder',
+ 'get_decoder', 'compress', 'decompress']
def register(encoder, decoder, content_type, aliases=[]):
@@ -75,7 +75,7 @@ def decompress(body, content_type):
register(zlib.compress,
zlib.decompress,
- "application/x-gzip", aliases=["gzip", "zlib"])
+ 'application/x-gzip', aliases=['gzip', 'zlib'])
try:
import bz2
except ImportError:
@@ -83,4 +83,4 @@ except ImportError:
else:
register(bz2.compress,
bz2.decompress,
- "application/x-bz2", aliases=["bzip2", "bzip"])
+ 'application/x-bz2', aliases=['bzip2', 'bzip'])
diff --git a/kombu/connection.py b/kombu/connection.py
index 5395ed2e..8f4cc765 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -29,12 +29,12 @@ from .utils import cached_property, retry_over_time
from .utils.compat import OrderedDict, LifoQueue as _LifoQueue
from .utils.url import parse_url
-_LOG_CONNECTION = os.environ.get("KOMBU_LOG_CONNECTION", False)
-_LOG_CHANNEL = os.environ.get("KOMBU_LOG_CHANNEL", False)
+_LOG_CONNECTION = os.environ.get('KOMBU_LOG_CONNECTION', False)
+_LOG_CHANNEL = os.environ.get('KOMBU_LOG_CHANNEL', False)
-__all__ = ["parse_url", "BrokerConnection", "Resource",
- "ConnectionPool", "ChannelPool"]
-URI_PASSTHROUGH = frozenset(["sqla", "sqlalchemy"])
+__all__ = ['parse_url', 'BrokerConnection', 'Resource',
+ 'ConnectionPool', 'ChannelPool']
+URI_PASSTHROUGH = frozenset(['sqla', 'sqlalchemy'])
logger = get_logger(__name__)
@@ -73,7 +73,7 @@ class BrokerConnection(object):
"""
port = None
- virtual_host = "/"
+ virtual_host = '/'
connect_timeout = 5
_closed = None
@@ -91,29 +91,29 @@ class BrokerConnection(object):
#: after a call to :meth:`drain_nowait`.
more_to_read = False
- def __init__(self, hostname="localhost", userid=None,
+ def __init__(self, hostname='localhost', userid=None,
password=None, virtual_host=None, port=None, insist=False,
ssl=False, transport=None, connect_timeout=5,
transport_options=None, login_method=None, uri_prefix=None,
**kwargs):
# have to spell the args out, just to get nice docstrings :(
- params = {"hostname": hostname, "userid": userid,
- "password": password, "virtual_host": virtual_host,
- "port": port, "insist": insist, "ssl": ssl,
- "transport": transport, "connect_timeout": connect_timeout,
- "login_method": login_method}
- if hostname and "://" in hostname \
+ params = {'hostname': hostname, 'userid': userid,
+ 'password': password, 'virtual_host': virtual_host,
+ 'port': port, 'insist': insist, 'ssl': ssl,
+ 'transport': transport, 'connect_timeout': connect_timeout,
+ 'login_method': login_method}
+ if hostname and '://' in hostname \
and transport not in URI_PASSTHROUGH:
- if '+' in hostname[:hostname.index("://")]:
+ if '+' in hostname[:hostname.index('://')]:
# e.g. sqla+mysql://root:masterkey@localhost/
- params["transport"], params["hostname"] = hostname.split('+')
- self.uri_prefix = params["transport"]
+ params['transport'], params['hostname'] = hostname.split('+')
+ self.uri_prefix = params['transport']
else:
params.update(parse_url(hostname))
self._init_params(**params)
# backend_cls argument will be removed shortly.
- self.transport_cls = self.transport_cls or kwargs.get("backend_cls")
+ self.transport_cls = self.transport_cls or kwargs.get('backend_cls')
if transport_options is None:
transport_options = {}
@@ -140,9 +140,9 @@ class BrokerConnection(object):
self.ssl = ssl
self.transport_cls = transport
- def _debug(self, msg, ident="[Kombu connection:0x%(id)x] ", **kwargs):
+ def _debug(self, msg, ident='[Kombu connection:0x%(id)x] ', **kwargs):
if self._logger: # pragma: no cover
- logger.debug((ident + unicode(msg)) % {"id": id(self)},
+ logger.debug((ident + unicode(msg)) % {'id': id(self)},
**kwargs)
def connect(self):
@@ -152,12 +152,12 @@ class BrokerConnection(object):
def channel(self):
"""Request a new channel."""
- self._debug("create channel")
+ self._debug('create channel')
chan = self.transport.create_channel(self.connection)
if _LOG_CHANNEL: # pragma: no cover
from .utils.debug import Logwrapped
- return Logwrapped(chan, "kombu.channel",
- "[Kombu channel:%(channel_id)s] ")
+ return Logwrapped(chan, 'kombu.channel',
+ '[Kombu channel:%(channel_id)s] ')
return chan
def drain_events(self, **kwargs):
@@ -208,7 +208,7 @@ class BrokerConnection(object):
if self._transport:
self._transport.client = None
self._transport = None
- self._debug("closed")
+ self._debug('closed')
self._closed = True
def release(self):
@@ -298,7 +298,7 @@ class BrokerConnection(object):
try:
return fun(*args, **kwargs)
except self.connection_errors + self.channel_errors, exc:
- self._debug("ensure got exception: %r" % (exc, ),
+ self._debug('ensure got exception: %r' % (exc, ),
exc_info=True)
if got_connection:
raise
@@ -360,7 +360,7 @@ class BrokerConnection(object):
def __call__(self, *args, **kwargs):
if channels[0] is None:
self.revive(create_channel())
- kwargs["channel"] = channels[0]
+ kwargs['channel'] = channels[0]
return fun(*args, **kwargs), channels[0]
revive = Revival()
@@ -383,24 +383,24 @@ class BrokerConnection(object):
return self.__class__(**dict(self._info(), **kwargs))
def _info(self):
- transport_cls = self.transport_cls or "amqp"
- transport_cls = {AMQP_ALIAS: "amqp"}.get(transport_cls, transport_cls)
+ transport_cls = self.transport_cls or 'amqp'
+ transport_cls = {AMQP_ALIAS: 'amqp'}.get(transport_cls, transport_cls)
D = self.transport.default_connection_params
hostname = self.hostname
if self.uri_prefix:
- hostname = "%s+%s" % (self.uri_prefix, hostname)
- info = (("hostname", hostname or D.get("hostname")),
- ("userid", self.userid or D.get("userid")),
- ("password", self.password or D.get("password")),
- ("virtual_host", self.virtual_host or D.get("virtual_host")),
- ("port", self.port or D.get("port")),
- ("insist", self.insist),
- ("ssl", self.ssl),
- ("transport", transport_cls),
- ("connect_timeout", self.connect_timeout),
- ("transport_options", self.transport_options),
- ("login_method", self.login_method or D.get("login_method")),
- ("uri_prefix", self.uri_prefix))
+ hostname = '%s+%s' % (self.uri_prefix, hostname)
+ info = (('hostname', hostname or D.get('hostname')),
+ ('userid', self.userid or D.get('userid')),
+ ('password', self.password or D.get('password')),
+ ('virtual_host', self.virtual_host or D.get('virtual_host')),
+ ('port', self.port or D.get('port')),
+ ('insist', self.insist),
+ ('ssl', self.ssl),
+ ('transport', transport_cls),
+ ('connect_timeout', self.connect_timeout),
+ ('transport_options', self.transport_options),
+ ('login_method', self.login_method or D.get('login_method')),
+ ('uri_prefix', self.uri_prefix))
return info
def info(self):
@@ -408,36 +408,36 @@ class BrokerConnection(object):
return OrderedDict(self._info())
def __eqhash__(self):
- return hash("%s|%s|%s|%s|%s|%s" % (
+ return hash('%s|%s|%s|%s|%s|%s' % (
self.transport_cls, self.hostname, self.userid,
self.password, self.virtual_host, self.port))
def as_uri(self, include_password=False):
if self.transport_cls in URI_PASSTHROUGH:
- return self.transport_cls + '+' + (self.hostname or "localhost")
- quoteS = partial(quote, safe="") # strict quote
+ return self.transport_cls + '+' + (self.hostname or 'localhost')
+ quoteS = partial(quote, safe='') # strict quote
fields = self.info()
- port = fields["port"]
- userid = fields["userid"]
- password = fields["password"]
- transport = fields["transport"]
- url = "%s://" % transport
+ port = fields['port']
+ userid = fields['userid']
+ password = fields['password']
+ transport = fields['transport']
+ url = '%s://' % transport
if userid:
url += quoteS(userid)
if include_password and password:
url += ':' + quoteS(password)
url += '@'
- url += quoteS(fields["hostname"])
+ url += quoteS(fields['hostname'])
# If the transport equals 'mongodb' the
# hostname contains a full mongodb connection
# URI. Let pymongo retreive the port from there.
- if port and transport != "mongodb":
+ if port and transport != 'mongodb':
url += ':' + str(port)
- url += '/' + quote(fields["virtual_host"])
+ url += '/' + quote(fields['virtual_host'])
if self.uri_prefix:
- return "%s+%s" % (self.uri_prefix, url)
+ return '%s+%s' % (self.uri_prefix, url)
return url
def Pool(self, limit=None, preload=None):
@@ -545,14 +545,14 @@ class BrokerConnection(object):
exchange_opts, **kwargs)
def _establish_connection(self):
- self._debug("establishing connection...")
+ self._debug('establishing connection...')
conn = self.transport.establish_connection()
- self._debug("connection established: %r" % (conn, ))
+ self._debug('connection established: %r' % (conn, ))
return conn
def __repr__(self):
"""``x.__repr__() <==> repr(x)``"""
- return "<BrokerConnection: %s at 0x%x>" % (self.as_uri(), id(self))
+ return '<BrokerConnection: %s at 0x%x>' % (self.as_uri(), id(self))
def __copy__(self):
"""``x.__copy__() <==> copy(x)``"""
@@ -602,7 +602,7 @@ class BrokerConnection(object):
@property
def host(self):
"""The host as a host name/port pair separated by colon."""
- return ":".join([self.hostname, str(self.port)])
+ return ':'.join([self.hostname, str(self.port)])
@property
def transport(self):
@@ -633,7 +633,7 @@ class BrokerConnection(object):
@property
def is_evented(self):
- return getattr(self.transport, "on_poll_start", None)
+ return getattr(self.transport, 'on_poll_start', None)
Connection = BrokerConnection
@@ -649,7 +649,7 @@ class Resource(object):
self.setup()
def setup(self):
- raise NotImplementedError("subclass responsibility")
+ raise NotImplementedError('subclass responsibility')
def _add_when_empty(self):
if self.limit and len(self._dirty) >= self.limit:
@@ -740,7 +740,7 @@ class Resource(object):
except AttributeError: # Issue #78
pass
- mutex = getattr(resource, "mutex", None)
+ mutex = getattr(resource, 'mutex', None)
if mutex:
mutex.acquire()
try:
@@ -757,7 +757,7 @@ class Resource(object):
if mutex:
mutex.release()
- if os.environ.get("KOMBU_DEBUG_POOL"): # pragma: no cover
+ if os.environ.get('KOMBU_DEBUG_POOL'): # pragma: no cover
_orig_acquire = acquire
_orig_release = release
@@ -766,20 +766,20 @@ class Resource(object):
def acquire(self, *args, **kwargs): # noqa
import traceback
id = self._next_resource_id = self._next_resource_id + 1
- print("+%s ACQUIRE %s" % (id, self.__class__.__name__, ))
+ print('+%s ACQUIRE %s' % (id, self.__class__.__name__, ))
r = self._orig_acquire(*args, **kwargs)
r._resource_id = id
- print("-%s ACQUIRE %s" % (id, self.__class__.__name__, ))
- if not hasattr(r, "acquired_by"):
+ print('-%s ACQUIRE %s' % (id, self.__class__.__name__, ))
+ if not hasattr(r, 'acquired_by'):
r.acquired_by = []
r.acquired_by.append(traceback.format_stack())
return r
def release(self, resource): # noqa
id = resource._resource_id
- print("+%s RELEASE %s" % (id, self.__class__.__name__, ))
+ print('+%s RELEASE %s' % (id, self.__class__.__name__, ))
r = self._orig_release(resource)
- print("-%s RELEASE %s" % (id, self.__class__.__name__, ))
+ print('-%s RELEASE %s' % (id, self.__class__.__name__, ))
self._next_resource_id -= 1
return r
@@ -796,7 +796,7 @@ class ConnectionPool(Resource):
return self.connection.clone()
def release_resource(self, resource):
- resource._debug("released")
+ resource._debug('released')
def close_resource(self, resource):
resource._close()
@@ -819,7 +819,7 @@ class ConnectionPool(Resource):
def prepare(self, resource):
if callable(resource):
resource = resource()
- resource._debug("acquired")
+ resource._debug('acquired')
return resource
diff --git a/kombu/entity.py b/kombu/entity.py
index 4cfb08dd..c63ee1c4 100644
--- a/kombu/entity.py
+++ b/kombu/entity.py
@@ -14,10 +14,10 @@ from .abstract import MaybeChannelBound
TRANSIENT_DELIVERY_MODE = 1
PERSISTENT_DELIVERY_MODE = 2
-DELIVERY_MODES = {"transient": TRANSIENT_DELIVERY_MODE,
- "persistent": PERSISTENT_DELIVERY_MODE}
+DELIVERY_MODES = {'transient': TRANSIENT_DELIVERY_MODE,
+ 'persistent': PERSISTENT_DELIVERY_MODE}
-__all__ = ["Exchange", "Queue"]
+__all__ = ['Exchange', 'Queue']
class Exchange(MaybeChannelBound):
@@ -118,27 +118,27 @@ class Exchange(MaybeChannelBound):
TRANSIENT_DELIVERY_MODE = TRANSIENT_DELIVERY_MODE
PERSISTENT_DELIVERY_MODE = PERSISTENT_DELIVERY_MODE
- name = ""
- type = "direct"
+ name = ''
+ type = 'direct'
durable = True
auto_delete = False
delivery_mode = PERSISTENT_DELIVERY_MODE
- attrs = (("name", None),
- ("type", None),
- ("arguments", None),
- ("durable", bool),
- ("auto_delete", bool),
- ("delivery_mode", lambda m: DELIVERY_MODES.get(m) or m))
+ attrs = (('name', None),
+ ('type', None),
+ ('arguments', None),
+ ('durable', bool),
+ ('auto_delete', bool),
+ ('delivery_mode', lambda m: DELIVERY_MODES.get(m) or m))
- def __init__(self, name="", type="", channel=None, **kwargs):
+ def __init__(self, name='', type='', channel=None, **kwargs):
super(Exchange, self).__init__(**kwargs)
self.name = name or self.name
self.type = type or self.type
self.maybe_bind(channel)
def __hash__(self):
- return hash("E|%s" % (self.name, ))
+ return hash('E|%s' % (self.name, ))
def declare(self, nowait=False, passive=False):
"""Declare the exchange.
@@ -188,7 +188,7 @@ class Exchange(MaybeChannelBound):
"""
properties = {} if properties is None else properties
dm = delivery_mode or self.delivery_mode
- properties["delivery_mode"] = \
+ properties['delivery_mode'] = \
DELIVERY_MODES[dm] if (dm != 2 and dm != 1) else dm
return self.channel.prepare_message(body,
properties=properties,
@@ -239,7 +239,7 @@ class Exchange(MaybeChannelBound):
return False
def __repr__(self):
- return super(Exchange, self).__repr__("Exchange %s(%s)" % (self.name,
+ return super(Exchange, self).__repr__('Exchange %s(%s)' % (self.name,
self.type))
@property
@@ -341,27 +341,27 @@ class Queue(MaybeChannelBound):
generated queue names.
"""
- name = ""
- exchange = Exchange("")
- routing_key = ""
+ name = ''
+ exchange = Exchange('')
+ routing_key = ''
durable = True
exclusive = False
auto_delete = False
no_ack = False
- attrs = (("name", None),
- ("exchange", None),
- ("routing_key", None),
- ("queue_arguments", None),
- ("binding_arguments", None),
- ("durable", bool),
- ("exclusive", bool),
- ("auto_delete", bool),
- ("no_ack", None),
- ("alias", None))
-
- def __init__(self, name="", exchange=None, routing_key="", channel=None,
+ attrs = (('name', None),
+ ('exchange', None),
+ ('routing_key', None),
+ ('queue_arguments', None),
+ ('binding_arguments', None),
+ ('durable', bool),
+ ('exclusive', bool),
+ ('auto_delete', bool),
+ ('no_ack', None),
+ ('alias', None))
+
+ def __init__(self, name='', exchange=None, routing_key='', channel=None,
**kwargs):
super(Queue, self).__init__(**kwargs)
self.name = name or self.name
@@ -373,7 +373,7 @@ class Queue(MaybeChannelBound):
self.maybe_bind(channel)
def __hash__(self):
- return hash("Q|%s" % (self.name, ))
+ return hash('Q|%s' % (self.name, ))
def when_bound(self):
if self.exchange:
@@ -437,7 +437,7 @@ class Queue(MaybeChannelBound):
no_ack = self.no_ack if no_ack is None else no_ack
message = self.channel.basic_get(queue=self.name, no_ack=no_ack)
if message is not None:
- m2p = getattr(self.channel, "message_to_python", None)
+ m2p = getattr(self.channel, 'message_to_python', None)
if m2p:
message = m2p(message)
return message
@@ -518,7 +518,7 @@ class Queue(MaybeChannelBound):
def __repr__(self):
return super(Queue, self).__repr__(
- "Queue %s -> %s -> %s" % (self.name,
+ 'Queue %s -> %s -> %s' % (self.name,
self.exchange,
self.routing_key))
@@ -528,32 +528,32 @@ class Queue(MaybeChannelBound):
@classmethod
def from_dict(self, queue, **options):
- binding_key = options.get("binding_key") or options.get("routing_key")
+ binding_key = options.get('binding_key') or options.get('routing_key')
- e_durable = options.get("exchange_durable")
+ e_durable = options.get('exchange_durable')
if e_durable is None:
- e_durable = options.get("durable")
+ e_durable = options.get('durable')
- e_auto_delete = options.get("exchange_auto_delete")
+ e_auto_delete = options.get('exchange_auto_delete')
if e_auto_delete is None:
- e_auto_delete = options.get("auto_delete")
+ e_auto_delete = options.get('auto_delete')
- q_durable = options.get("queue_durable")
+ q_durable = options.get('queue_durable')
if q_durable is None:
- q_durable = options.get("durable")
+ q_durable = options.get('durable')
- q_auto_delete = options.get("queue_auto_delete")
+ q_auto_delete = options.get('queue_auto_delete')
if q_auto_delete is None:
- q_auto_delete = options.get("auto_delete")
+ q_auto_delete = options.get('auto_delete')
- e_arguments = options.get("exchange_arguments")
- q_arguments = options.get("queue_arguments")
- b_arguments = options.get("binding_arguments")
+ e_arguments = options.get('exchange_arguments')
+ q_arguments = options.get('queue_arguments')
+ b_arguments = options.get('binding_arguments')
- exchange = Exchange(options.get("exchange"),
- type=options.get("exchange_type"),
- delivery_mode=options.get("delivery_mode"),
- routing_key=options.get("routing_key"),
+ exchange = Exchange(options.get('exchange'),
+ type=options.get('exchange_type'),
+ delivery_mode=options.get('delivery_mode'),
+ routing_key=options.get('routing_key'),
durable=e_durable,
auto_delete=e_auto_delete,
arguments=e_arguments)
@@ -561,8 +561,8 @@ class Queue(MaybeChannelBound):
exchange=exchange,
routing_key=binding_key,
durable=q_durable,
- exclusive=options.get("exclusive"),
+ exclusive=options.get('exclusive'),
auto_delete=q_auto_delete,
- no_ack=options.get("no_ack"),
+ no_ack=options.get('no_ack'),
queue_arguments=q_arguments,
binding_arguments=b_arguments)
diff --git a/kombu/exceptions.py b/kombu/exceptions.py
index 43eb409c..887d2bcb 100644
--- a/kombu/exceptions.py
+++ b/kombu/exceptions.py
@@ -12,10 +12,10 @@ from __future__ import absolute_import
import socket
-__all__ = ["NotBoundError", "MessageStateError", "TimeoutError",
- "LimitExceeded", "ConnectionLimitExceeded",
- "ChannelLimitExceeded", "StdChannelError", "VersionMismatch",
- "SerializerNotInstalled"]
+__all__ = ['NotBoundError', 'MessageStateError', 'TimeoutError',
+ 'LimitExceeded', 'ConnectionLimitExceeded',
+ 'ChannelLimitExceeded', 'StdChannelError', 'VersionMismatch',
+ 'SerializerNotInstalled']
TimeoutError = socket.timeout
diff --git a/kombu/log.py b/kombu/log.py
index 5b5cedee..76655942 100644
--- a/kombu/log.py
+++ b/kombu/log.py
@@ -9,12 +9,12 @@ from .utils.compat import WatchedFileHandler
from .utils.encoding import safe_repr, safe_str
from .utils.functional import maybe_promise
-__all__ = ["LogMixin", "LOG_LEVELS", "get_loglevel", "setup_logging"]
+__all__ = ['LogMixin', 'LOG_LEVELS', 'get_loglevel', 'setup_logging']
LOG_LEVELS = dict(logging._levelNames)
-LOG_LEVELS["FATAL"] = logging.FATAL
-LOG_LEVELS[logging.FATAL] = "FATAL"
-DISABLE_TRACEBACKS = os.environ.get("DISABLE_TRACEBACKS")
+LOG_LEVELS['FATAL'] = logging.FATAL
+LOG_LEVELS[logging.FATAL] = 'FATAL'
+DISABLE_TRACEBACKS = os.environ.get('DISABLE_TRACEBACKS')
class NullHandler(logging.Handler):
@@ -48,7 +48,7 @@ def naive_format_parts(fmt):
for i, e in enumerate(l[1:]):
if not e or not l[i - 1]:
yield
- elif e[0] in ["r", "s"]:
+ elif e[0] in ['r', 's']:
yield e[0]
@@ -80,13 +80,13 @@ class LogMixin(object):
return self._error(logging.CRITICAL, *args, **kwargs)
def _error(self, severity, *args, **kwargs):
- kwargs.setdefault("exc_info", True)
+ kwargs.setdefault('exc_info', True)
if DISABLE_TRACEBACKS:
- kwargs.pop("exc_info", None)
+ kwargs.pop('exc_info', None)
return self.log(severity, *args, **kwargs)
def annotate(self, text):
- return "%s - %s" % (self.logger_name, text)
+ return '%s - %s' % (self.logger_name, text)
def log(self, severity, *args, **kwargs):
if self.logger.isEnabledFor(severity):
@@ -98,7 +98,7 @@ class LogMixin(object):
*list(safeify_format(args[0], *expand)), **kwargs)
else:
return self.logger.log(severity,
- self.annotate(" ".join(map(safe_str, args))),
+ self.annotate(' '.join(map(safe_str, args))),
**kwargs)
def get_logger(self):
@@ -139,10 +139,10 @@ class Log(LogMixin):
def setup_logging(loglevel=None, logfile=None):
logger = logging.getLogger()
- loglevel = get_loglevel(loglevel or "ERROR")
+ loglevel = get_loglevel(loglevel or 'ERROR')
logfile = logfile if logfile else sys.__stderr__
if not logger.handlers:
- if hasattr(logfile, "write"):
+ if hasattr(logfile, 'write'):
handler = logging.StreamHandler(logfile)
else:
handler = WatchedFileHandler(logfile)
diff --git a/kombu/messaging.py b/kombu/messaging.py
index 63d03cf2..46fb0060 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -18,7 +18,7 @@ from .compression import compress
from .serialization import encode
from .utils import maybe_list
-__all__ = ["Exchange", "Queue", "Producer", "Consumer"]
+__all__ = ['Exchange', 'Queue', 'Producer', 'Consumer']
class Producer(object):
@@ -46,7 +46,7 @@ class Producer(object):
exchange = None
# Default routing key.
- routing_key = ""
+ routing_key = ''
#: Default serializer to use. Default is JSON.
serializer = None
@@ -68,7 +68,7 @@ class Producer(object):
self.channel = channel
self.exchange = exchange or self.exchange
if self.exchange is None:
- self.exchange = Exchange("")
+ self.exchange = Exchange('')
self.routing_key = routing_key or self.routing_key
self.serializer = serializer or self.serializer
self.compression = compression or self.compression
@@ -171,7 +171,7 @@ class Producer(object):
if self.auto_declare:
self.declare()
if self.on_return:
- self.channel.events["basic_return"].append(self.on_return)
+ self.channel.events['basic_return'].append(self.on_return)
def __enter__(self):
return self
@@ -206,7 +206,7 @@ class Producer(object):
content_encoding = 'binary'
if compression:
- body, headers["compression"] = compress(body, compression)
+ body, headers['compression'] = compress(body, compression)
return body, content_type, content_encoding
@@ -446,7 +446,7 @@ class Consumer(object):
"""
callbacks = self.callbacks
if not callbacks:
- raise NotImplementedError("Consumer does not have any callback")
+ raise NotImplementedError('Consumer does not have any callback')
[callback(body, message) for callback in callbacks]
def _basic_consume(self, queue, consumer_tag=None,
@@ -466,7 +466,7 @@ class Consumer(object):
def _receive_callback(self, message):
channel = self.channel
try:
- m2p = getattr(channel, "message_to_python", None)
+ m2p = getattr(channel, 'message_to_python', None)
if m2p:
message = m2p(message)
decoded = message.decode()
@@ -478,7 +478,7 @@ class Consumer(object):
self.receive(decoded, message)
def __repr__(self):
- return "<Consumer: %s>" % (self.queues, )
+ return '<Consumer: %s>' % (self.queues, )
@property
def connection(self):
diff --git a/kombu/mixins.py b/kombu/mixins.py
index 5df8b5e7..25168b03 100644
--- a/kombu/mixins.py
+++ b/kombu/mixins.py
@@ -23,7 +23,7 @@ from .utils import cached_property, nested
from .utils.encoding import safe_repr
from .utils.limits import TokenBucket
-__all__ = ["ConsumerMixin"]
+__all__ = ['ConsumerMixin']
class ConsumerMixin(LogMixin):
@@ -45,7 +45,7 @@ class ConsumerMixin(LogMixin):
class Worker(ConsumerMixin):
- task_queue = Queue("tasks", Exchange("tasks"), "tasks"))
+ task_queue = Queue('tasks', Exchange('tasks'), 'tasks'))
def __init__(self, connection):
self.connection = None
@@ -55,7 +55,7 @@ class ConsumerMixin(LogMixin):
callback=[self.on_task])]
def on_task(self, body, message):
- print("Got task: %r" % (body, ))
+ print('Got task: %r' % (body, ))
message.ack()
**Additional handler methods**:
@@ -129,7 +129,7 @@ class ConsumerMixin(LogMixin):
should_stop = False
def get_consumers(self, Consumer, channel):
- raise NotImplementedError("Subclass responsibility")
+ raise NotImplementedError('Subclass responsibility')
def on_connection_revived(self):
pass
@@ -151,8 +151,8 @@ class ConsumerMixin(LogMixin):
message.ack()
def on_connection_error(self, exc, interval):
- self.error("Broker connection error: %r. "
- "Trying again in %s seconds.", exc, interval)
+ self.error('Broker connection error: %r. '
+ 'Trying again in %s seconds.', exc, interval)
@contextmanager
def extra_context(self, connection, channel):
@@ -165,8 +165,8 @@ class ConsumerMixin(LogMixin):
for _ in self.consume(limit=None):
pass
except self.connection.connection_errors:
- self.error("Connection to broker lost. "
- "Trying to re-establish the connection...")
+ self.error('Connection to broker lost. '
+ 'Trying to re-establish the connection...')
def consume(self, limit=None, timeout=None, safety_interval=1, **kwargs):
elapsed = 0
@@ -189,7 +189,7 @@ class ConsumerMixin(LogMixin):
else:
yield
elapsed = 0
- self.debug("consume exiting")
+ self.debug('consume exiting')
def maybe_conn_error(self, fun):
"""Applies function but ignores any connection or channel
@@ -212,15 +212,15 @@ class ConsumerMixin(LogMixin):
def Consumer(self):
with self.establish_connection() as conn:
self.on_connection_revived()
- self.info("Connected to %s", conn.as_uri())
+ self.info('Connected to %s', conn.as_uri())
channel = conn.default_channel
cls = partial(Consumer, channel,
on_decode_error=self.on_decode_error)
with self._consume_from(*self.get_consumers(cls, channel)) as c:
yield conn, channel, c
- self.debug("Consumers cancelled")
+ self.debug('Consumers cancelled')
self.on_consume_end(conn, channel)
- self.debug("Connection closed")
+ self.debug('Connection closed')
def _consume_from(self, *consumers):
return nested(*consumers)
diff --git a/kombu/pidbox.py b/kombu/pidbox.py
index edc39af1..faf21d3b 100644
--- a/kombu/pidbox.py
+++ b/kombu/pidbox.py
@@ -20,7 +20,7 @@ from .entity import Exchange, Queue
from .messaging import Consumer, Producer
from .utils import kwdict, uuid
-__all__ = ["Node", "Mailbox"]
+__all__ = ['Node', 'Mailbox']
class Node(object):
@@ -51,7 +51,7 @@ class Node(object):
self.handlers = handlers
def Consumer(self, channel=None, **options):
- options.setdefault("no_ack", True)
+ options.setdefault('no_ack', True)
return Consumer(channel or self.channel,
[self.mailbox.get_queue(self.hostname)],
**options)
@@ -69,10 +69,10 @@ class Node(object):
def dispatch_from_message(self, message):
message = dict(message)
- method = message["method"]
- destination = message.get("destination")
- reply_to = message.get("reply_to")
- arguments = message.get("arguments")
+ method = message['method']
+ destination = message.get('destination')
+ reply_to = message.get('reply_to')
+ arguments = message.get('arguments')
if not destination or self.hostname in destination:
return self.dispatch(method, arguments, reply_to)
@@ -84,12 +84,12 @@ class Node(object):
except SystemExit:
raise
except Exception, exc:
- reply = {"error": repr(exc)}
+ reply = {'error': repr(exc)}
if reply_to:
self.reply({self.hostname: reply},
- exchange=reply_to["exchange"],
- routing_key=reply_to["routing_key"])
+ exchange=reply_to['exchange'],
+ routing_key=reply_to['routing_key'])
return reply
def handle(self, method, arguments={}):
@@ -111,8 +111,8 @@ class Node(object):
class Mailbox(object):
node_cls = Node
- exchange_fmt = "%s.pidbox"
- reply_exchange_fmt = "reply.%s.pidbox"
+ exchange_fmt = '%s.pidbox'
+ reply_exchange_fmt = 'reply.%s.pidbox'
#: Name of application.
namespace = None
@@ -121,7 +121,7 @@ class Mailbox(object):
connection = None
#: Exchange type (usually direct, or fanout for broadcast).
- type = "direct"
+ type = 'direct'
#: mailbox exchange (init by constructor).
exchange = None
@@ -129,7 +129,7 @@ class Mailbox(object):
#: exchange to send replies to.
reply_exchange = None
- def __init__(self, namespace, type="direct", connection=None):
+ def __init__(self, namespace, type='direct', connection=None):
self.namespace = namespace
self.connection = connection
self.type = type
@@ -166,22 +166,22 @@ class Mailbox(object):
channel=channel)
def get_reply_queue(self, ticket):
- return Queue("%s.%s" % (ticket, self.reply_exchange.name),
+ return Queue('%s.%s' % (ticket, self.reply_exchange.name),
exchange=self.reply_exchange,
routing_key=ticket,
durable=False,
auto_delete=True)
def get_queue(self, hostname):
- return Queue("%s.%s.pidbox" % (hostname, self.namespace),
+ return Queue('%s.%s.pidbox' % (hostname, self.namespace),
exchange=self.exchange,
durable=False,
auto_delete=True)
def _publish_reply(self, reply, exchange, routing_key, channel=None):
chan = channel or self.connection.default_channel
- exchange = Exchange(exchange, exchange_type="direct",
- delivery_mode="transient",
+ exchange = Exchange(exchange, exchange_type='direct',
+ delivery_mode='transient',
durable=False)
producer = Producer(chan, exchange=exchange,
auto_declare=True)
@@ -189,12 +189,12 @@ class Mailbox(object):
def _publish(self, type, arguments, destination=None, reply_ticket=None,
channel=None):
- message = {"method": type,
- "arguments": arguments,
- "destination": destination}
+ message = {'method': type,
+ 'arguments': arguments,
+ 'destination': destination}
if reply_ticket:
- message["reply_to"] = {"exchange": self.reply_exchange.name,
- "routing_key": reply_ticket}
+ message['reply_to'] = {'exchange': self.reply_exchange.name,
+ 'routing_key': reply_ticket}
chan = channel or self.connection.default_channel
producer = Producer(chan, exchange=self.exchange)
producer.publish(message)
@@ -203,7 +203,7 @@ class Mailbox(object):
reply=False, timeout=1, limit=None, callback=None, channel=None):
if destination is not None and \
not isinstance(destination, (list, tuple)):
- raise ValueError("destination must be a list/tuple not %s" % (
+ raise ValueError('destination must be a list/tuple not %s' % (
type(destination)))
arguments = arguments or {}
@@ -253,10 +253,10 @@ class Mailbox(object):
return Exchange(self.exchange_fmt % namespace,
type=type,
durable=False,
- delivery_mode="transient")
+ delivery_mode='transient')
def _get_reply_exchange(self, namespace):
return Exchange(self.reply_exchange_fmt % namespace,
- type="direct",
+ type='direct',
durable=False,
- delivery_mode="transient")
+ delivery_mode='transient')
diff --git a/kombu/pools.py b/kombu/pools.py
index 4f8bcbb9..4d4b7e99 100644
--- a/kombu/pools.py
+++ b/kombu/pools.py
@@ -18,13 +18,13 @@ from .connection import Resource
from .messaging import Producer
from .utils import EqualityDict
-__all__ = ["ProducerPool", "PoolGroup", "register_group",
- "connections", "producers", "get_limit", "set_limit", "reset"]
+__all__ = ['ProducerPool', 'PoolGroup', 'register_group',
+ 'connections', 'producers', 'get_limit', 'set_limit', 'reset']
_limit = [200]
_used = [False]
_groups = []
use_global_limit = object()
-disable_limit_protection = os.environ.get("KOMBU_DISABLE_LIMIT_PROTECTION")
+disable_limit_protection = os.environ.get('KOMBU_DISABLE_LIMIT_PROTECTION')
class ProducerPool(Resource):
@@ -32,7 +32,7 @@ class ProducerPool(Resource):
def __init__(self, connections, *args, **kwargs):
self.connections = connections
- self.Producer = kwargs.pop("Producer", None) or self.Producer
+ self.Producer = kwargs.pop('Producer', None) or self.Producer
super(ProducerPool, self).__init__(*args, **kwargs)
def _acquire_connection(self):
@@ -70,7 +70,7 @@ class PoolGroup(EqualityDict):
self.limit = limit
def create(self, resource, limit):
- raise NotImplementedError("PoolGroups must define ``create``")
+ raise NotImplementedError('PoolGroups must define ``create``')
def __missing__(self, resource):
limit = self.limit
diff --git a/kombu/serialization.py b/kombu/serialization.py
index 3070c854..f0f89966 100644
--- a/kombu/serialization.py
+++ b/kombu/serialization.py
@@ -22,11 +22,11 @@ except ImportError: # pragma: no cover
from .exceptions import SerializerNotInstalled
from .utils.encoding import bytes_to_str, str_to_bytes, bytes_t
-__all__ = ["pickle", "encode", "decode",
- "register", "unregister"]
-SKIP_DECODE = frozenset(["binary", "ascii-8bit"])
+__all__ = ['pickle', 'encode', 'decode',
+ 'register', 'unregister']
+SKIP_DECODE = frozenset(['binary', 'ascii-8bit'])
-if sys.platform.startswith("java"): # pragma: no cover
+if sys.platform.startswith('java'): # pragma: no cover
def _decode(t, coding):
return codecs.getdecoder(coding)(t)[0]
@@ -99,7 +99,7 @@ class SerializerRegistry(object):
self.type_to_name.pop(content_type, None)
except KeyError:
raise SerializerNotInstalled(
- "No encoder/decoder installed for %s" % name)
+ 'No encoder/decoder installed for %s' % name)
def _set_default_serializer(self, name):
"""
@@ -117,14 +117,14 @@ class SerializerRegistry(object):
self._default_encode) = self._encoders[name]
except KeyError:
raise SerializerNotInstalled(
- "No encoder installed for %s" % name)
+ 'No encoder installed for %s' % name)
def encode(self, data, serializer=None):
- if serializer == "raw":
+ if serializer == 'raw':
return raw_encode(data)
if serializer and not self._encoders.get(serializer):
raise SerializerNotInstalled(
- "No encoder installed for %s" % serializer)
+ 'No encoder installed for %s' % serializer)
# If a raw string was sent, assume binary encoding
# (it's likely either ASCII or a raw binary file, and a character
@@ -132,12 +132,12 @@ class SerializerRegistry(object):
if not serializer and isinstance(data, bytes_t):
# In Python 3+, this would be "bytes"; allow binary data to be
# sent as a message without getting encoder errors
- return "application/data", "binary", data
+ return 'application/data', 'binary', data
# For Unicode objects, force it into a string
if not serializer and isinstance(data, unicode):
- payload = data.encode("utf-8")
- return "text/plain", "utf-8", payload
+ payload = data.encode('utf-8')
+ return 'text/plain', 'utf-8', payload
if serializer:
content_type, content_encoding, encoder = \
@@ -153,7 +153,7 @@ class SerializerRegistry(object):
def decode(self, data, content_type, content_encoding, force=False):
if content_type in self._disabled_content_types and not force:
raise SerializerNotInstalled(
- "Content-type %r has been disabled." % (content_type, ))
+ 'Content-type %r has been disabled.' % (content_type, ))
content_type = content_type or 'application/data'
content_encoding = (content_encoding or 'utf-8').lower()
@@ -230,7 +230,7 @@ decode = registry.decode
"""
.. function:: register(name, encoder, decoder, content_type,
- content_encoding="utf-8"):
+ content_encoding='utf-8'):
Register a new encoder/decoder.
:param name: A convenience name for the serialization method.
@@ -252,7 +252,7 @@ decode = registry.decode
the `decoder` method will be returning. Will usually be
utf-8`, `us-ascii`, or `binary`.
- """
+"""
register = registry.register
@@ -262,7 +262,7 @@ register = registry.register
:param name: Registered serialization method name.
- """
+"""
unregister = registry.unregister
@@ -306,7 +306,7 @@ def register_yaml():
"""In case a client receives a yaml message, but yaml
isn't installed."""
raise SerializerNotInstalled(
- "No decoder installed for YAML. Install the PyYAML library")
+ 'No decoder installed for YAML. Install the PyYAML library')
registry.register('yaml', None, not_available, 'application/x-yaml')
@@ -340,8 +340,8 @@ def register_msgpack():
"""In case a client receives a msgpack message, but yaml
isn't installed."""
raise SerializerNotInstalled(
- "No decoder installed for msgpack. "
- "Install the msgpack library")
+ 'No decoder installed for msgpack. '
+ 'Install the msgpack library')
registry.register('msgpack', None, not_available,
'application/x-msgpack')
diff --git a/kombu/simple.py b/kombu/simple.py
index 7d101ea5..3b5b6ed6 100644
--- a/kombu/simple.py
+++ b/kombu/simple.py
@@ -20,7 +20,7 @@ from . import entity
from . import messaging
from .connection import maybe_channel
-__all__ = ["SimpleQueue", "SimpleBuffer"]
+__all__ = ['SimpleQueue', 'SimpleBuffer']
class SimpleBase(object):
@@ -113,7 +113,7 @@ class SimpleQueue(SimpleBase):
if no_ack is None:
no_ack = self.no_ack
if not isinstance(queue, entity.Queue):
- exchange = entity.Exchange(name, "direct", **exchange_opts)
+ exchange = entity.Exchange(name, 'direct', **exchange_opts)
queue = entity.Queue(name, exchange, name, **queue_opts)
else:
name = queue.name
@@ -132,5 +132,5 @@ class SimpleBuffer(SimpleQueue):
queue_opts = dict(durable=False,
auto_delete=True)
exchange_opts = dict(durable=False,
- delivery_mode="transient",
+ delivery_mode='transient',
auto_delete=True)
diff --git a/kombu/syn.py b/kombu/syn.py
index 2ec281b9..7983f41c 100644
--- a/kombu/syn.py
+++ b/kombu/syn.py
@@ -10,7 +10,7 @@ from __future__ import absolute_import
import sys
-__all__ = ["detect_environment"]
+__all__ = ['detect_environment']
_environment = None
@@ -25,28 +25,28 @@ def select_blocking_method(type):
def _detect_environment():
## -eventlet-
- if "eventlet" in sys.modules:
+ if 'eventlet' in sys.modules:
try:
from eventlet.patcher import is_monkey_patched as is_eventlet
import socket
if is_eventlet(socket):
- return "eventlet"
+ return 'eventlet'
except ImportError:
pass
# -gevent-
- if "gevent" in sys.modules:
+ if 'gevent' in sys.modules:
try:
from gevent import socket as _gsocket
import socket
if socket.socket is _gsocket.socket:
- return "gevent"
+ return 'gevent'
except ImportError:
pass
- return "default"
+ return 'default'
def detect_environment():
diff --git a/kombu/tests/__init__.py b/kombu/tests/__init__.py
index ca31361a..730d9404 100644
--- a/kombu/tests/__init__.py
+++ b/kombu/tests/__init__.py
@@ -9,31 +9,31 @@ from kombu.exceptions import VersionMismatch
# avoid json implementation inconsistencies.
try:
import json # noqa
- anyjson.force_implementation("json")
+ anyjson.force_implementation('json')
except ImportError:
- anyjson.force_implementation("simplejson")
+ anyjson.force_implementation('simplejson')
def find_distribution_modules(name=__name__, file=__file__):
- current_dist_depth = len(name.split(".")) - 1
+ current_dist_depth = len(name.split('.')) - 1
current_dist = os.path.join(os.path.dirname(file),
*([os.pardir] * current_dist_depth))
abs = os.path.abspath(current_dist)
dist_name = os.path.basename(abs)
for dirpath, dirnames, filenames in os.walk(abs):
- package = (dist_name + dirpath[len(abs):]).replace("/", ".")
- if "__init__.py" in filenames:
+ package = (dist_name + dirpath[len(abs):]).replace('/', '.')
+ if '__init__.py' in filenames:
yield package
for filename in filenames:
- if filename.endswith(".py") and filename != "__init__.py":
- yield ".".join([package, filename])[:-3]
+ if filename.endswith('.py') and filename != '__init__.py':
+ yield '.'.join([package, filename])[:-3]
def import_all_modules(name=__name__, file=__file__, skip=[]):
for module in find_distribution_modules(name, file):
if module not in skip:
- print("preimporting %r for coverage..." % (module, ))
+ print('preimporting %r for coverage...' % (module, ))
try:
__import__(module)
except (ImportError, VersionMismatch, AttributeError):
@@ -41,8 +41,8 @@ def import_all_modules(name=__name__, file=__file__, skip=[]):
def is_in_coverage():
- return (os.environ.get("COVER_ALL_MODULES") or
- "--with-coverage3" in sys.argv)
+ return (os.environ.get('COVER_ALL_MODULES') or
+ '--with-coverage3' in sys.argv)
def setup_django_env():
@@ -52,12 +52,12 @@ def setup_django_env():
return
if not settings.configured:
- settings.configure(DATABASES={"default": {
- "ENGINE": "django.db.backends.sqlite3",
- "NAME": ":memory:"}},
- DATABASE_ENGINE="sqlite3",
- DATABASE_NAME=":memory:",
- INSTALLED_APPS=("kombu.transport.django", ))
+ settings.configure(DATABASES={'default': {
+ 'ENGINE': 'django.db.backends.sqlite3',
+ 'NAME': ':memory:'}},
+ DATABASE_ENGINE='sqlite3',
+ DATABASE_NAME=':memory:',
+ INSTALLED_APPS=('kombu.transport.django', ))
def setup():
diff --git a/kombu/tests/compat.py b/kombu/tests/compat.py
index 4cfe6b79..391b7f1a 100644
--- a/kombu/tests/compat.py
+++ b/kombu/tests/compat.py
@@ -7,8 +7,8 @@ class WarningMessage(object):
"""Holds the result of a single showwarning() call."""
- _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
- "line")
+ _WARNING_DETAILS = ('message', 'category', 'filename', 'lineno', 'file',
+ 'line')
def __init__(self, message, category, filename, lineno, file=None,
line=None):
@@ -19,8 +19,8 @@ class WarningMessage(object):
self._category_name = category and category.__name__ or None
def __str__(self):
- return ("{message : %r, category : %r, filename : %r, lineno : %s, "
- "line : %r}" % (self.message, self._category_name,
+ return ('{message : %r, category : %r, filename : %r, lineno : %s, '
+ 'line : %r}' % (self.message, self._category_name,
self.filename, self.lineno, self.line))
@@ -50,21 +50,21 @@ class catch_warnings(object):
"""
self._record = record
- self._module = module is None and sys.modules["warnings"] or module
+ self._module = module is None and sys.modules['warnings'] or module
self._entered = False
def __repr__(self):
args = []
if self._record:
- args.append("record=True")
+ args.append('record=True')
if self._module is not sys.modules['warnings']:
- args.append("module=%r" % self._module)
+ args.append('module=%r' % self._module)
name = type(self).__name__
- return "%s(%s)" % (name, ", ".join(args))
+ return '%s(%s)' % (name, ', '.join(args))
def __enter__(self):
if self._entered:
- raise RuntimeError("Cannot enter %r twice" % self)
+ raise RuntimeError('Cannot enter %r twice' % self)
self._entered = True
self._filters = self._module.filters
self._module.filters = self._filters[:]
@@ -82,6 +82,6 @@ class catch_warnings(object):
def __exit__(self, *exc_info):
if not self._entered:
- raise RuntimeError("Cannot exit %r without entering first" % self)
+ raise RuntimeError('Cannot exit %r without entering first' % self)
self._module.filters = self._filters
self._module.showwarning = self._showwarning
diff --git a/kombu/tests/mocks.py b/kombu/tests/mocks.py
index 8d81bc90..dfe172f0 100644
--- a/kombu/tests/mocks.py
+++ b/kombu/tests/mocks.py
@@ -10,7 +10,7 @@ from kombu.transport import base
class Message(base.Message):
def __init__(self, *args, **kwargs):
- self.throw_decode_error = kwargs.get("throw_decode_error", False)
+ self.throw_decode_error = kwargs.get('throw_decode_error', False)
super(Message, self).__init__(*args, **kwargs)
def decode(self):
@@ -28,7 +28,7 @@ class Channel(base.StdChannel):
self.called = []
self.deliveries = count(1).next
self.to_deliver = []
- self.events = {"basic_return": []}
+ self.events = {'basic_return': []}
def _called(self, name):
self.called.append(name)
@@ -37,11 +37,11 @@ class Channel(base.StdChannel):
return key in self.called
def exchange_declare(self, *args, **kwargs):
- self._called("exchange_declare")
+ self._called('exchange_declare')
def prepare_message(self, message_data, properties={}, priority=0,
content_type=None, content_encoding=None, headers=None):
- self._called("prepare_message")
+ self._called('prepare_message')
return dict(body=message_data,
headers=headers,
properties=properties,
@@ -49,69 +49,69 @@ class Channel(base.StdChannel):
content_type=content_type,
content_encoding=content_encoding)
- def basic_publish(self, message, exchange="", routing_key="",
+ def basic_publish(self, message, exchange='', routing_key='',
mandatory=False, immediate=False, **kwargs):
- self._called("basic_publish")
+ self._called('basic_publish')
return message, exchange, routing_key
def exchange_delete(self, *args, **kwargs):
- self._called("exchange_delete")
+ self._called('exchange_delete')
def queue_declare(self, *args, **kwargs):
- self._called("queue_declare")
+ self._called('queue_declare')
def queue_bind(self, *args, **kwargs):
- self._called("queue_bind")
+ self._called('queue_bind')
def queue_unbind(self, *args, **kwargs):
- self._called("queue_unbind")
+ self._called('queue_unbind')
def queue_delete(self, queue, if_unused=False, if_empty=False, **kwargs):
- self._called("queue_delete")
+ self._called('queue_delete')
def basic_get(self, *args, **kwargs):
- self._called("basic_get")
+ self._called('basic_get')
try:
return self.to_deliver.pop()
except IndexError:
pass
def queue_purge(self, *args, **kwargs):
- self._called("queue_purge")
+ self._called('queue_purge')
def basic_consume(self, *args, **kwargs):
- self._called("basic_consume")
+ self._called('basic_consume')
def basic_cancel(self, *args, **kwargs):
- self._called("basic_cancel")
+ self._called('basic_cancel')
def basic_ack(self, *args, **kwargs):
- self._called("basic_ack")
+ self._called('basic_ack')
def basic_recover(self, requeue=False):
- self._called("basic_recover")
+ self._called('basic_recover')
def close(self):
- self._called("close")
+ self._called('close')
def message_to_python(self, message, *args, **kwargs):
- self._called("message_to_python")
+ self._called('message_to_python')
return Message(self, body=anyjson.dumps(message),
delivery_tag=self.deliveries(),
throw_decode_error=self.throw_decode_error,
- content_type="application/json", content_encoding="utf-8")
+ content_type='application/json', content_encoding='utf-8')
def flow(self, active):
- self._called("flow")
+ self._called('flow')
def basic_reject(self, delivery_tag, requeue=False):
if requeue:
- return self._called("basic_reject:requeue")
- return self._called("basic_reject")
+ return self._called('basic_reject:requeue')
+ return self._called('basic_reject')
def basic_qos(self, prefetch_size=0, prefetch_count=0,
apply_global=False):
- self._called("basic_qos")
+ self._called('basic_qos')
class Connection(object):
@@ -133,7 +133,7 @@ class Transport(base.Transport):
return connection.channel()
def drain_events(self, connection, **kwargs):
- return "event"
+ return 'event'
def close_connection(self, connection):
connection.connected = False
diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py
index 452c26a8..01937885 100644
--- a/kombu/tests/test_common.py
+++ b/kombu/tests/test_common.py
@@ -16,16 +16,16 @@ from .utils import ContextMock, Mock, MockPool
class test_Broadcast(TestCase):
def test_arguments(self):
- q = Broadcast(name="test_Broadcast")
- self.assertTrue(q.name.startswith("bcast."))
- self.assertEqual(q.alias, "test_Broadcast")
+ q = Broadcast(name='test_Broadcast')
+ self.assertTrue(q.name.startswith('bcast.'))
+ self.assertEqual(q.alias, 'test_Broadcast')
self.assertTrue(q.auto_delete)
- self.assertEqual(q.exchange.name, "test_Broadcast")
- self.assertEqual(q.exchange.type, "fanout")
+ self.assertEqual(q.exchange.name, 'test_Broadcast')
+ self.assertEqual(q.exchange.type, 'fanout')
- q = Broadcast("test_Broadcast", "explicit_queue_name")
- self.assertEqual(q.name, "explicit_queue_name")
- self.assertEqual(q.exchange.name, "test_Broadcast")
+ q = Broadcast('test_Broadcast', 'explicit_queue_name')
+ self.assertEqual(q.name, 'explicit_queue_name')
+ self.assertEqual(q.exchange.name, 'test_Broadcast')
class test_maybe_declare(TestCase):
@@ -69,26 +69,26 @@ class test_replies(TestCase):
def test_send_reply(self):
req = Mock()
- req.content_type = "application/json"
- req.properties = {"reply_to": "hello",
- "correlation_id": "world"}
+ req.content_type = 'application/json'
+ req.properties = {'reply_to': 'hello',
+ 'correlation_id': 'world'}
exchange = Mock()
exchange.is_bound = True
producer = Mock()
producer.channel.connection.client.declared_entities = set()
- send_reply(exchange, req, {"hello": "world"}, producer)
+ send_reply(exchange, req, {'hello': 'world'}, producer)
self.assertTrue(producer.publish.call_count)
args = producer.publish.call_args
- self.assertDictEqual(args[0][0], {"hello": "world"})
- self.assertDictEqual(args[1], {"exchange": exchange,
- "routing_key": "hello",
- "correlation_id": "world",
- "serializer": "json"})
+ self.assertDictEqual(args[0][0], {'hello': 'world'})
+ self.assertDictEqual(args[1], {'exchange': exchange,
+ 'routing_key': 'hello',
+ 'correlation_id': 'world',
+ 'serializer': 'json'})
exchange.declare.assert_called_with()
- @patch("kombu.common.ipublish")
+ @patch('kombu.common.ipublish')
def test_isend_reply(self, ipublish):
pool, exchange, req, msg, props = (Mock(), Mock(), Mock(),
Mock(), Mock())
@@ -97,7 +97,7 @@ class test_replies(TestCase):
ipublish.assert_called_with(pool, send_reply,
(exchange, req, msg), props)
- @patch("kombu.common.itermessages")
+ @patch('kombu.common.itermessages')
def test_collect_replies_with_ack(self, itermessages):
conn, channel, queue = Mock(), Mock(), Mock()
body, message = Mock(), Mock()
@@ -113,7 +113,7 @@ class test_replies(TestCase):
channel.after_reply_message_received.assert_called_with(queue.name)
- @patch("kombu.common.itermessages")
+ @patch('kombu.common.itermessages')
def test_collect_replies_no_ack(self, itermessages):
conn, channel, queue = Mock(), Mock(), Mock()
body, message = Mock(), Mock()
@@ -124,7 +124,7 @@ class test_replies(TestCase):
itermessages.assert_called_with(conn, channel, queue, no_ack=True)
self.assertFalse(message.ack.called)
- @patch("kombu.common.itermessages")
+ @patch('kombu.common.itermessages')
def test_collect_replies_no_replies(self, itermessages):
conn, channel, queue = Mock(), Mock(), Mock()
itermessages.return_value = []
@@ -137,9 +137,9 @@ class test_replies(TestCase):
class test_insured(TestCase):
- @patch("kombu.common.insured_logger")
+ @patch('kombu.common.insured_logger')
def test_ensure_errback(self, insured_logger):
- common._ensure_errback("foo", 30)
+ common._ensure_errback('foo', 30)
self.assertTrue(insured_logger.error.called)
def test_revive_connection(self):
@@ -158,7 +158,7 @@ class test_insured(TestCase):
common.revive_producer(Mock(), channel, None)
- def get_insured_mocks(self, insured_returns=("works", "ignored")):
+ def get_insured_mocks(self, insured_returns=('works', 'ignored')):
conn = ContextMock()
pool = MockPool(conn)
fun = Mock()
@@ -169,28 +169,28 @@ class test_insured(TestCase):
def test_insured(self):
conn, pool, fun, insured = self.get_insured_mocks()
- ret = common.insured(pool, fun, (2, 2), {"foo": "bar"})
- self.assertEqual(ret, "works")
+ ret = common.insured(pool, fun, (2, 2), {'foo': 'bar'})
+ self.assertEqual(ret, 'works')
conn.ensure_connection.assert_called_with(
errback=common._ensure_errback)
self.assertTrue(insured.called)
i_args, i_kwargs = insured.call_args
self.assertTupleEqual(i_args, (2, 2))
- self.assertDictEqual(i_kwargs, {"foo": "bar",
- "connection": conn})
+ self.assertDictEqual(i_kwargs, {'foo': 'bar',
+ 'connection': conn})
self.assertTrue(conn.autoretry.called)
ar_args, ar_kwargs = conn.autoretry.call_args
self.assertTupleEqual(ar_args, (fun, conn.default_channel))
- self.assertTrue(ar_kwargs.get("on_revive"))
- self.assertTrue(ar_kwargs.get("errback"))
+ self.assertTrue(ar_kwargs.get('on_revive'))
+ self.assertTrue(ar_kwargs.get('errback'))
def test_insured_custom_errback(self):
conn, pool, fun, insured = self.get_insured_mocks()
custom_errback = Mock()
- common.insured(pool, fun, (2, 2), {"foo": "bar"},
+ common.insured(pool, fun, (2, 2), {'foo': 'bar'},
errback=custom_errback)
conn.ensure_connection.assert_called_with(errback=custom_errback)
@@ -206,26 +206,26 @@ class test_insured(TestCase):
def test_ipublish(self):
producer, pool, fun, ensure_returns = self.get_ipublish_args()
- ensure_returns.return_value = "works"
+ ensure_returns.return_value = 'works'
- ret = common.ipublish(pool, fun, (2, 2), {"foo": "bar"})
- self.assertEqual(ret, "works")
+ ret = common.ipublish(pool, fun, (2, 2), {'foo': 'bar'})
+ self.assertEqual(ret, 'works')
self.assertTrue(producer.connection.ensure.called)
e_args, e_kwargs = producer.connection.ensure.call_args
self.assertTupleEqual(e_args, (producer, fun))
- self.assertTrue(e_kwargs.get("on_revive"))
- self.assertEqual(e_kwargs.get("errback"), common._ensure_errback)
+ self.assertTrue(e_kwargs.get('on_revive'))
+ self.assertEqual(e_kwargs.get('errback'), common._ensure_errback)
- ensure_returns.assert_called_with(2, 2, foo="bar", producer=producer)
+ ensure_returns.assert_called_with(2, 2, foo='bar', producer=producer)
def test_ipublish_with_custom_errback(self):
producer, pool, fun, _ = self.get_ipublish_args()
errback = Mock()
- common.ipublish(pool, fun, (2, 2), {"foo": "bar"}, errback=errback)
+ common.ipublish(pool, fun, (2, 2), {'foo': 'bar'}, errback=errback)
_, e_kwargs = producer.connection.ensure.call_args
- self.assertEqual(e_kwargs.get("errback"), errback)
+ self.assertEqual(e_kwargs.get('errback'), errback)
class MockConsumer(object):
@@ -254,17 +254,17 @@ class test_itermessages(TestCase):
raise socket.timeout()
for consumer in MockConsumer.consumers:
for callback in consumer.callbacks:
- callback("body", "message")
+ callback('body', 'message')
def test_default(self):
conn = self.MockConnection()
channel = Mock()
channel.connection.client = conn
- it = common.itermessages(conn, channel, "q", limit=1,
+ it = common.itermessages(conn, channel, 'q', limit=1,
Consumer=MockConsumer)
ret = it.next()
- self.assertTupleEqual(ret, ("body", "message"))
+ self.assertTupleEqual(ret, ('body', 'message'))
with self.assertRaises(StopIteration):
it.next()
@@ -274,19 +274,19 @@ class test_itermessages(TestCase):
conn.should_raise_timeout = True
channel = Mock()
channel.connection.client = conn
- it = common.itermessages(conn, channel, "q", limit=1,
+ it = common.itermessages(conn, channel, 'q', limit=1,
Consumer=MockConsumer)
with self.assertRaises(StopIteration):
it.next()
- @patch("kombu.common.deque")
+ @patch('kombu.common.deque')
def test_when_raises_IndexError(self, deque):
deque_instance = deque.return_value = Mock()
deque_instance.popleft.side_effect = IndexError()
conn = self.MockConnection()
channel = Mock()
- it = common.itermessages(conn, channel, "q", limit=1,
+ it = common.itermessages(conn, channel, 'q', limit=1,
Consumer=MockConsumer)
with self.assertRaises(StopIteration):
diff --git a/kombu/tests/test_compat.py b/kombu/tests/test_compat.py
index 588ac8be..36a60d39 100644
--- a/kombu/tests/test_compat.py
+++ b/kombu/tests/test_compat.py
@@ -38,44 +38,44 @@ class test_misc(TestCase):
self.assertEqual(list(it2), [2, 3, 4, 5, 6, 7, 8, 9, 10, 11])
def test_Queue_from_dict(self):
- defs = {"binding_key": "foo.#",
- "exchange": "fooex",
- "exchange_type": "topic",
- "durable": True,
- "auto_delete": False}
-
- q1 = Queue.from_dict("foo", **dict(defs))
- self.assertEqual(q1.name, "foo")
- self.assertEqual(q1.routing_key, "foo.#")
- self.assertEqual(q1.exchange.name, "fooex")
- self.assertEqual(q1.exchange.type, "topic")
+ defs = {'binding_key': 'foo.#',
+ 'exchange': 'fooex',
+ 'exchange_type': 'topic',
+ 'durable': True,
+ 'auto_delete': False}
+
+ q1 = Queue.from_dict('foo', **dict(defs))
+ self.assertEqual(q1.name, 'foo')
+ self.assertEqual(q1.routing_key, 'foo.#')
+ self.assertEqual(q1.exchange.name, 'fooex')
+ self.assertEqual(q1.exchange.type, 'topic')
self.assertTrue(q1.durable)
self.assertTrue(q1.exchange.durable)
self.assertFalse(q1.auto_delete)
self.assertFalse(q1.exchange.auto_delete)
- q2 = Queue.from_dict("foo", **dict(defs,
+ q2 = Queue.from_dict('foo', **dict(defs,
exchange_durable=False))
self.assertTrue(q2.durable)
self.assertFalse(q2.exchange.durable)
- q3 = Queue.from_dict("foo", **dict(defs,
+ q3 = Queue.from_dict('foo', **dict(defs,
exchange_auto_delete=True))
self.assertFalse(q3.auto_delete)
self.assertTrue(q3.exchange.auto_delete)
- q4 = Queue.from_dict("foo", **dict(defs,
+ q4 = Queue.from_dict('foo', **dict(defs,
queue_durable=False))
self.assertFalse(q4.durable)
self.assertTrue(q4.exchange.durable)
- q5 = Queue.from_dict("foo", **dict(defs,
+ q5 = Queue.from_dict('foo', **dict(defs,
queue_auto_delete=True))
self.assertTrue(q5.auto_delete)
self.assertFalse(q5.exchange.auto_delete)
- self.assertEqual(Queue.from_dict("foo", **dict(defs)),
- Queue.from_dict("foo", **dict(defs)))
+ self.assertEqual(Queue.from_dict('foo', **dict(defs)),
+ Queue.from_dict('foo', **dict(defs)))
class test_Publisher(TestCase):
@@ -85,44 +85,44 @@ class test_Publisher(TestCase):
def test_constructor(self):
pub = compat.Publisher(self.connection,
- exchange="test_Publisher_constructor",
- routing_key="rkey")
+ exchange='test_Publisher_constructor',
+ routing_key='rkey')
self.assertIsInstance(pub.backend, Channel)
- self.assertEqual(pub.exchange.name, "test_Publisher_constructor")
+ self.assertEqual(pub.exchange.name, 'test_Publisher_constructor')
self.assertTrue(pub.exchange.durable)
self.assertFalse(pub.exchange.auto_delete)
- self.assertEqual(pub.exchange.type, "direct")
+ self.assertEqual(pub.exchange.type, 'direct')
pub2 = compat.Publisher(self.connection,
- exchange="test_Publisher_constructor2",
- routing_key="rkey",
+ exchange='test_Publisher_constructor2',
+ routing_key='rkey',
auto_delete=True,
durable=False)
self.assertTrue(pub2.exchange.auto_delete)
self.assertFalse(pub2.exchange.durable)
- explicit = Exchange("test_Publisher_constructor_explicit",
- type="topic")
+ explicit = Exchange('test_Publisher_constructor_explicit',
+ type='topic')
pub3 = compat.Publisher(self.connection,
exchange=explicit)
self.assertEqual(pub3.exchange, explicit)
compat.Publisher(self.connection,
- exchange="test_Publisher_constructor3",
+ exchange='test_Publisher_constructor3',
channel=self.connection.default_channel)
def test_send(self):
pub = compat.Publisher(self.connection,
- exchange="test_Publisher_send",
- routing_key="rkey")
- pub.send({"foo": "bar"})
- self.assertIn("basic_publish", pub.backend)
+ exchange='test_Publisher_send',
+ routing_key='rkey')
+ pub.send({'foo': 'bar'})
+ self.assertIn('basic_publish', pub.backend)
pub.close()
def test__enter__exit__(self):
pub = compat.Publisher(self.connection,
- exchange="test_Publisher_send",
- routing_key="rkey")
+ exchange='test_Publisher_send',
+ routing_key='rkey')
x = pub.__enter__()
self.assertIs(x, pub)
x.__exit__()
@@ -134,15 +134,15 @@ class test_Consumer(TestCase):
def setUp(self):
self.connection = Connection(transport=Transport)
- @patch("kombu.compat._iterconsume")
- def test_iterconsume_calls__iterconsume(self, it, n="test_iterconsume"):
+ @patch('kombu.compat._iterconsume')
+ def test_iterconsume_calls__iterconsume(self, it, n='test_iterconsume'):
c = compat.Consumer(self.connection, queue=n, exchange=n)
c.iterconsume(limit=10, no_ack=True)
it.assert_called_with(c.connection, c, True, 10)
- def test_constructor(self, n="test_Consumer_constructor"):
+ def test_constructor(self, n='test_Consumer_constructor'):
c = compat.Consumer(self.connection, queue=n, exchange=n,
- routing_key="rkey")
+ routing_key='rkey')
self.assertIsInstance(c.backend, Channel)
q = c.queues[0]
self.assertTrue(q.durable)
@@ -152,9 +152,9 @@ class test_Consumer(TestCase):
self.assertEqual(q.name, n)
self.assertEqual(q.exchange.name, n)
- c2 = compat.Consumer(self.connection, queue=n + "2",
- exchange=n + "2",
- routing_key="rkey", durable=False,
+ c2 = compat.Consumer(self.connection, queue=n + '2',
+ exchange=n + '2',
+ routing_key='rkey', durable=False,
auto_delete=True, exclusive=True)
q2 = c2.queues[0]
self.assertFalse(q2.durable)
@@ -162,78 +162,78 @@ class test_Consumer(TestCase):
self.assertTrue(q2.auto_delete)
self.assertTrue(q2.exchange.auto_delete)
- def test__enter__exit__(self, n="test__enter__exit__"):
+ def test__enter__exit__(self, n='test__enter__exit__'):
c = compat.Consumer(self.connection, queue=n, exchange=n,
- routing_key="rkey")
+ routing_key='rkey')
x = c.__enter__()
self.assertIs(x, c)
x.__exit__()
self.assertTrue(c._closed)
- def test_revive(self, n="test_revive"):
+ def test_revive(self, n='test_revive'):
c = compat.Consumer(self.connection, queue=n, exchange=n)
with self.connection.channel() as c2:
c.revive(c2)
self.assertIs(c.backend, c2)
- def test__iter__(self, n="test__iter__"):
+ def test__iter__(self, n='test__iter__'):
c = compat.Consumer(self.connection, queue=n, exchange=n)
c.iterqueue = Mock()
c.__iter__()
c.iterqueue.assert_called_with(infinite=True)
- def test_iter(self, n="test_iterqueue"):
+ def test_iter(self, n='test_iterqueue'):
c = compat.Consumer(self.connection, queue=n, exchange=n,
- routing_key="rkey")
+ routing_key='rkey')
c.close()
- def test_process_next(self, n="test_process_next"):
+ def test_process_next(self, n='test_process_next'):
c = compat.Consumer(self.connection, queue=n, exchange=n,
- routing_key="rkey")
+ routing_key='rkey')
with self.assertRaises(NotImplementedError):
c.process_next()
c.close()
- def test_iterconsume(self, n="test_iterconsume"):
+ def test_iterconsume(self, n='test_iterconsume'):
c = compat.Consumer(self.connection, queue=n, exchange=n,
- routing_key="rkey")
+ routing_key='rkey')
c.close()
- def test_discard_all(self, n="test_discard_all"):
+ def test_discard_all(self, n='test_discard_all'):
c = compat.Consumer(self.connection, queue=n, exchange=n,
- routing_key="rkey")
+ routing_key='rkey')
c.discard_all()
- self.assertIn("queue_purge", c.backend)
+ self.assertIn('queue_purge', c.backend)
- def test_fetch(self, n="test_fetch"):
+ def test_fetch(self, n='test_fetch'):
c = compat.Consumer(self.connection, queue=n, exchange=n,
- routing_key="rkey")
+ routing_key='rkey')
self.assertIsNone(c.fetch())
self.assertIsNone(c.fetch(no_ack=True))
- self.assertIn("basic_get", c.backend)
+ self.assertIn('basic_get', c.backend)
callback_called = [False]
def receive(payload, message):
callback_called[0] = True
- c.backend.to_deliver.append("42")
- self.assertEqual(c.fetch().payload, "42")
- c.backend.to_deliver.append("46")
+ c.backend.to_deliver.append('42')
+ self.assertEqual(c.fetch().payload, '42')
+ c.backend.to_deliver.append('46')
c.register_callback(receive)
- self.assertEqual(c.fetch(enable_callbacks=True).payload, "46")
+ self.assertEqual(c.fetch(enable_callbacks=True).payload, '46')
self.assertTrue(callback_called[0])
- def test_discard_all_filterfunc_not_supported(self, n="xjf21j21"):
+ def test_discard_all_filterfunc_not_supported(self, n='xjf21j21'):
c = compat.Consumer(self.connection, queue=n, exchange=n,
- routing_key="rkey")
+ routing_key='rkey')
with self.assertRaises(NotImplementedError):
c.discard_all(filterfunc=lambda x: x)
c.close()
- def test_wait(self, n="test_wait"):
+ def test_wait(self, n='test_wait'):
class C(compat.Consumer):
@@ -242,11 +242,11 @@ class test_Consumer(TestCase):
yield i
c = C(self.connection, queue=n, exchange=n,
- routing_key="rkey")
+ routing_key='rkey')
self.assertEqual(c.wait(10), range(10))
c.close()
- def test_iterqueue(self, n="test_iterqueue"):
+ def test_iterqueue(self, n='test_iterqueue'):
i = [0]
class C(compat.Consumer):
@@ -257,7 +257,7 @@ class test_Consumer(TestCase):
return z
c = C(self.connection, queue=n, exchange=n,
- routing_key="rkey")
+ routing_key='rkey')
self.assertEqual(list(c.iterqueue(limit=10)), range(10))
c.close()
@@ -267,14 +267,14 @@ class test_ConsumerSet(TestCase):
def setUp(self):
self.connection = Connection(transport=Transport)
- @patch("kombu.compat._iterconsume")
- def test_iterconsume(self, _iterconsume, n="test_iterconsume"):
+ @patch('kombu.compat._iterconsume')
+ def test_iterconsume(self, _iterconsume, n='test_iterconsume'):
c = compat.Consumer(self.connection, queue=n, exchange=n)
cs = compat.ConsumerSet(self.connection, consumers=[c])
cs.iterconsume(limit=10, no_ack=True)
_iterconsume.assert_called_with(c.connection, cs, True, 10)
- def test_revive(self, n="test_revive"):
+ def test_revive(self, n='test_revive'):
c = compat.Consumer(self.connection, queue=n, exchange=n)
cs = compat.ConsumerSet(self.connection, consumers=[c])
@@ -282,11 +282,11 @@ class test_ConsumerSet(TestCase):
cs.revive(c2)
self.assertIs(cs.backend, c2)
- def test_constructor(self, prefix="0daf8h21"):
- dcon = {"%s.xyx" % prefix: {"exchange": "%s.xyx" % prefix,
- "routing_key": "xyx"},
- "%s.xyz" % prefix: {"exchange": "%s.xyz" % prefix,
- "routing_key": "xyz"}}
+ def test_constructor(self, prefix='0daf8h21'):
+ dcon = {'%s.xyx' % prefix: {'exchange': '%s.xyx' % prefix,
+ 'routing_key': 'xyx'},
+ '%s.xyz' % prefix: {'exchange': '%s.xyz' % prefix,
+ 'routing_key': 'xyz'}}
consumers = [compat.Consumer(self.connection, queue=prefix + str(i),
exchange=prefix + str(i))
for i in range(3)]
@@ -297,25 +297,25 @@ class test_ConsumerSet(TestCase):
self.assertEqual(len(c2.queues), 2)
c.add_consumer(compat.Consumer(self.connection,
- queue=prefix + "xaxxxa",
- exchange=prefix + "xaxxxa"))
+ queue=prefix + 'xaxxxa',
+ exchange=prefix + 'xaxxxa'))
self.assertEqual(len(c.queues), 4)
for cq in c.queues:
self.assertIs(cq.channel, c.channel)
- c2.add_consumer_from_dict({"%s.xxx" % prefix: {
- "exchange": "%s.xxx" % prefix,
- "routing_key": "xxx"}})
+ c2.add_consumer_from_dict({'%s.xxx' % prefix: {
+ 'exchange': '%s.xxx' % prefix,
+ 'routing_key': 'xxx'}})
self.assertEqual(len(c2.queues), 3)
for c2q in c2.queues:
self.assertIs(c2q.channel, c2.channel)
c.discard_all()
- self.assertEqual(c.channel.called.count("queue_purge"), 4)
+ self.assertEqual(c.channel.called.count('queue_purge'), 4)
c.consume()
c.close()
c2.close()
- self.assertIn("basic_cancel", c.channel)
- self.assertIn("close", c.channel)
- self.assertIn("close", c2.channel)
+ self.assertIn('basic_cancel', c.channel)
+ self.assertIn('close', c.channel)
+ self.assertIn('close', c2.channel)
diff --git a/kombu/tests/test_compression.py b/kombu/tests/test_compression.py
index 97fcd17d..2df3b388 100644
--- a/kombu/tests/test_compression.py
+++ b/kombu/tests/test_compression.py
@@ -20,34 +20,34 @@ class test_compression(TestCase):
else:
self.has_bzip2 = True
- @mask_modules("bz2")
+ @mask_modules('bz2')
def test_no_bz2(self):
- c = sys.modules.pop("kombu.compression")
+ c = sys.modules.pop('kombu.compression')
try:
import kombu.compression
- self.assertFalse(hasattr(kombu.compression, "bz2"))
+ self.assertFalse(hasattr(kombu.compression, 'bz2'))
finally:
if c is not None:
- sys.modules["kombu.compression"] = c
+ sys.modules['kombu.compression'] = c
def test_encoders(self):
encoders = compression.encoders()
- self.assertIn("application/x-gzip", encoders)
+ self.assertIn('application/x-gzip', encoders)
if self.has_bzip2:
- self.assertIn("application/x-bz2", encoders)
+ self.assertIn('application/x-bz2', encoders)
def test_compress__decompress__zlib(self):
- text = "The Quick Brown Fox Jumps Over The Lazy Dog"
- c, ctype = compression.compress(text, "zlib")
+ text = 'The Quick Brown Fox Jumps Over The Lazy Dog'
+ c, ctype = compression.compress(text, 'zlib')
self.assertNotEqual(text, c)
d = compression.decompress(c, ctype)
self.assertEqual(d, text)
def test_compress__decompress__bzip2(self):
if not self.has_bzip2:
- raise SkipTest("bzip2 not available")
- text = "The Brown Quick Fox Over The Lazy Dog Jumps"
- c, ctype = compression.compress(text, "bzip2")
+ raise SkipTest('bzip2 not available')
+ text = 'The Brown Quick Fox Over The Lazy Dog Jumps'
+ c, ctype = compression.compress(text, 'bzip2')
self.assertNotEqual(text, c)
d = compression.decompress(c, ctype)
self.assertEqual(d, text)
diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py
index 8a4719b2..eb464855 100644
--- a/kombu/tests/test_connection.py
+++ b/kombu/tests/test_connection.py
@@ -16,15 +16,15 @@ from .utils import Mock, skip_if_not_module
class test_connection_utils(TestCase):
def setUp(self):
- self.url = "amqp://user:pass@localhost:5672/my/vhost"
- self.nopass = "amqp://user@localhost:5672/my/vhost"
+ self.url = 'amqp://user:pass@localhost:5672/my/vhost'
+ self.nopass = 'amqp://user@localhost:5672/my/vhost'
self.expected = {
- "transport": "amqp",
- "userid": "user",
- "password": "pass",
- "hostname": "localhost",
- "port": 5672,
- "virtual_host": "my/vhost",
+ 'transport': 'amqp',
+ 'userid': 'user',
+ 'password': 'pass',
+ 'hostname': 'localhost',
+ 'port': 5672,
+ 'virtual_host': 'my/vhost',
}
def test_parse_url(self):
@@ -32,8 +32,8 @@ class test_connection_utils(TestCase):
self.assertDictEqual(result, self.expected)
def test_parse_url_mongodb(self):
- result = parse_url("mongodb://example.com/")
- self.assertEqual(result["hostname"], "example.com/")
+ result = parse_url('mongodb://example.com/')
+ self.assertEqual(result['hostname'], 'example.com/')
def test_parse_generated_as_uri(self):
conn = BrokerConnection(self.url)
@@ -44,14 +44,14 @@ class test_connection_utils(TestCase):
self.assertEqual(conn.as_uri(), self.nopass)
self.assertEqual(conn.as_uri(include_password=True), self.url)
- @skip_if_not_module("pymongo")
+ @skip_if_not_module('pymongo')
def test_as_uri_when_mongodb(self):
- x = BrokerConnection("mongodb://localhost")
+ x = BrokerConnection('mongodb://localhost')
self.assertTrue(x.as_uri())
def test_bogus_scheme(self):
with self.assertRaises(KeyError):
- BrokerConnection("bogus://localhost:7421").transport
+ BrokerConnection('bogus://localhost:7421').transport
def assert_info(self, conn, **fields):
info = conn.info()
@@ -63,79 +63,79 @@ class test_connection_utils(TestCase):
C = BrokerConnection
self.assert_info(
- C("amqp://user:pass@host:10000/vhost"),
- userid="user", password="pass", hostname="host",
- port=10000, virtual_host="vhost")
+ C('amqp://user:pass@host:10000/vhost'),
+ userid='user', password='pass', hostname='host',
+ port=10000, virtual_host='vhost')
self.assert_info(
- C("amqp://user%61:%61pass@ho%61st:10000/v%2fhost"),
- userid="usera", password="apass",
- hostname="hoast", port=10000,
- virtual_host="v/host")
+ C('amqp://user%61:%61pass@ho%61st:10000/v%2fhost'),
+ userid='usera', password='apass',
+ hostname='hoast', port=10000,
+ virtual_host='v/host')
self.assert_info(
- C("amqp://"),
- userid="guest", password="guest",
- hostname="localhost", port=5672,
- virtual_host="/")
+ C('amqp://'),
+ userid='guest', password='guest',
+ hostname='localhost', port=5672,
+ virtual_host='/')
self.assert_info(
- C("amqp://:@/"),
- userid="guest", password="guest",
- hostname="localhost", port=5672,
- virtual_host="/")
+ C('amqp://:@/'),
+ userid='guest', password='guest',
+ hostname='localhost', port=5672,
+ virtual_host='/')
self.assert_info(
- C("amqp://user@/"),
- userid="user", password="guest",
- hostname="localhost", port=5672,
- virtual_host="/")
+ C('amqp://user@/'),
+ userid='user', password='guest',
+ hostname='localhost', port=5672,
+ virtual_host='/')
self.assert_info(
- C("amqp://user:pass@/"),
- userid="user", password="pass",
- hostname="localhost", port=5672,
- virtual_host="/")
+ C('amqp://user:pass@/'),
+ userid='user', password='pass',
+ hostname='localhost', port=5672,
+ virtual_host='/')
self.assert_info(
- C("amqp://host"),
- userid="guest", password="guest",
- hostname="host", port=5672,
- virtual_host="/")
+ C('amqp://host'),
+ userid='guest', password='guest',
+ hostname='host', port=5672,
+ virtual_host='/')
self.assert_info(
- C("amqp://:10000"),
- userid="guest", password="guest",
- hostname="localhost", port=10000,
- virtual_host="/")
+ C('amqp://:10000'),
+ userid='guest', password='guest',
+ hostname='localhost', port=10000,
+ virtual_host='/')
self.assert_info(
- C("amqp:///vhost"),
- userid="guest", password="guest",
- hostname="localhost", port=5672,
- virtual_host="vhost")
+ C('amqp:///vhost'),
+ userid='guest', password='guest',
+ hostname='localhost', port=5672,
+ virtual_host='vhost')
self.assert_info(
- C("amqp://host/"),
- userid="guest", password="guest",
- hostname="host", port=5672,
- virtual_host="/")
+ C('amqp://host/'),
+ userid='guest', password='guest',
+ hostname='host', port=5672,
+ virtual_host='/')
self.assert_info(
- C("amqp://host/%2f"),
- userid="guest", password="guest",
- hostname="host", port=5672,
- virtual_host="/")
+ C('amqp://host/%2f'),
+ userid='guest', password='guest',
+ hostname='host', port=5672,
+ virtual_host='/')
def test_url_IPV6(self):
C = BrokerConnection
raise SkipTest("urllib can't parse ipv6 urls")
self.assert_info(
- C("amqp://[::1]"),
- userid="guest", password="guest",
- hostname="[::1]", port=5672,
- virtual_host="/")
+ C('amqp://[::1]'),
+ userid='guest', password='guest',
+ hostname='[::1]', port=5672,
+ virtual_host='/')
class test_Connection(TestCase):
@@ -147,10 +147,10 @@ class test_Connection(TestCase):
conn = self.conn
conn.connect()
self.assertTrue(conn.connection.connected)
- self.assertEqual(conn.host, "localhost:5672")
+ self.assertEqual(conn.host, 'localhost:5672')
channel = conn.channel()
self.assertTrue(channel.open)
- self.assertEqual(conn.drain_events(), "event")
+ self.assertEqual(conn.drain_events(), 'event')
_connection = conn.connection
conn.close()
self.assertFalse(_connection.connected)
@@ -175,7 +175,7 @@ class test_Connection(TestCase):
connection_errors = (_CustomError, )
def close_connection(self, connection):
- raise _CustomError("foo")
+ raise _CustomError('foo')
conn = BrokerConnection(transport=MyTransport)
conn.connect()
@@ -196,7 +196,7 @@ class test_Connection(TestCase):
def connection_errors(self):
return (KeyError, )
- conn = Conn("memory://")
+ conn = Conn('memory://')
conn._default_channel = Mock()
conn._default_channel.close.side_effect = KeyError()
@@ -216,17 +216,17 @@ class test_Connection(TestCase):
def test_ensure_success(self):
def publish():
- return "foobar"
+ return 'foobar'
ensured = self.conn.ensure(None, publish)
- self.assertEqual(ensured(), "foobar")
+ self.assertEqual(ensured(), 'foobar')
def test_ensure_failure(self):
class _CustomError(Exception):
pass
def publish():
- raise _CustomError("bar")
+ raise _CustomError('bar')
ensured = self.conn.ensure(None, publish)
with self.assertRaises(_CustomError):
@@ -237,7 +237,7 @@ class test_Connection(TestCase):
pass
def publish():
- raise _ConnectionError("failed connection")
+ raise _ConnectionError('failed connection')
self.conn.transport.connection_errors = (_ConnectionError,)
ensured = self.conn.ensure(self.conn, publish)
@@ -246,13 +246,13 @@ class test_Connection(TestCase):
def test_autoretry(self):
myfun = Mock()
- myfun.__name__ = "test_autoretry"
+ myfun.__name__ = 'test_autoretry'
self.conn.transport.connection_errors = (KeyError, )
def on_call(*args, **kwargs):
myfun.side_effect = None
- raise KeyError("foo")
+ raise KeyError('foo')
myfun.side_effect = on_call
insured = self.conn.autoretry(myfun)
@@ -262,18 +262,18 @@ class test_Connection(TestCase):
def test_SimpleQueue(self):
conn = self.conn
- q = conn.SimpleQueue("foo")
+ q = conn.SimpleQueue('foo')
self.assertIs(q.channel, conn.default_channel)
chan = conn.channel()
- q2 = conn.SimpleQueue("foo", channel=chan)
+ q2 = conn.SimpleQueue('foo', channel=chan)
self.assertIs(q2.channel, chan)
def test_SimpleBuffer(self):
conn = self.conn
- q = conn.SimpleBuffer("foo")
+ q = conn.SimpleBuffer('foo')
self.assertIs(q.channel, conn.default_channel)
chan = conn.channel()
- q2 = conn.SimpleBuffer("foo", channel=chan)
+ q2 = conn.SimpleBuffer('foo', channel=chan)
self.assertIs(q2.channel, chan)
def test_Producer(self):
@@ -313,7 +313,7 @@ class test_Connection(TestCase):
class test_Connection_with_transport_options(TestCase):
- transport_options = {"pool_recycler": 3600, "echo": True}
+ transport_options = {'pool_recycler': 3600, 'echo': True}
def setUp(self):
self.conn = BrokerConnection(port=5672, transport=Transport,
@@ -334,7 +334,7 @@ class ResourceCase(TestCase):
abstract = True
def create_resource(self, limit, preload):
- raise NotImplementedError("subclass responsibility")
+ raise NotImplementedError('subclass responsibility')
def assertState(self, P, avail, dirty):
self.assertEqual(P._resource.qsize(), avail)
@@ -400,7 +400,7 @@ class ResourceCase(TestCase):
return
P = self.create_resource(10, 10)
cr = P.close_resource = Mock()
- cr.side_effect = AttributeError("x")
+ cr.side_effect = AttributeError('x')
P.acquire()
self.assertTrue(P._dirty)
@@ -450,7 +450,7 @@ class test_ConnectionPool(ResourceCase):
def test_prepare_not_callable(self):
P = self.create_resource(None, None)
- conn = BrokerConnection("memory://")
+ conn = BrokerConnection('memory://')
self.assertIs(P.prepare(conn), conn)
def test_acquire_channel(self):
@@ -472,7 +472,7 @@ class test_ChannelPool(ResourceCase):
self.assertTrue(q[0].basic_consume)
self.assertTrue(q[1].basic_consume)
with self.assertRaises(AttributeError):
- getattr(q[2], "basic_consume")
+ getattr(q[2], 'basic_consume')
def test_setup_no_limit(self):
P = self.create_resource(None, None)
@@ -481,6 +481,6 @@ class test_ChannelPool(ResourceCase):
def test_prepare_not_callable(self):
P = self.create_resource(10, 0)
- conn = BrokerConnection("memory://")
+ conn = BrokerConnection('memory://')
chan = conn.default_channel
self.assertIs(P.prepare(chan), chan)
diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py
index 2ad1df1e..95892edc 100644
--- a/kombu/tests/test_entities.py
+++ b/kombu/tests/test_entities.py
@@ -17,36 +17,36 @@ def get_conn():
class test_Exchange(TestCase):
def test_bound(self):
- exchange = Exchange("foo", "direct")
+ exchange = Exchange('foo', 'direct')
self.assertFalse(exchange.is_bound)
- self.assertIn("<unbound", repr(exchange))
+ self.assertIn('<unbound', repr(exchange))
chan = get_conn().channel()
bound = exchange.bind(chan)
self.assertTrue(bound.is_bound)
self.assertIs(bound.channel, chan)
- self.assertIn("<bound", repr(bound))
+ self.assertIn('<bound', repr(bound))
def test_hash(self):
- self.assertEqual(hash(Exchange("a")), hash(Exchange("a")))
- self.assertNotEqual(hash(Exchange("a")), hash(Exchange("b")))
+ self.assertEqual(hash(Exchange('a')), hash(Exchange('a')))
+ self.assertNotEqual(hash(Exchange('a')), hash(Exchange('b')))
def test_can_cache_declaration(self):
- self.assertTrue(Exchange("a", durable=True).can_cache_declaration)
- self.assertFalse(Exchange("a", durable=False).can_cache_declaration)
+ self.assertTrue(Exchange('a', durable=True).can_cache_declaration)
+ self.assertFalse(Exchange('a', durable=False).can_cache_declaration)
def test_eq(self):
- e1 = Exchange("foo", "direct")
- e2 = Exchange("foo", "direct")
+ e1 = Exchange('foo', 'direct')
+ e2 = Exchange('foo', 'direct')
self.assertEqual(e1, e2)
- e3 = Exchange("foo", "topic")
+ e3 = Exchange('foo', 'topic')
self.assertNotEqual(e1, e3)
self.assertFalse(e1.__eq__(True))
def test_revive(self):
- exchange = Exchange("foo", "direct")
+ exchange = Exchange('foo', 'direct')
conn = get_conn()
chan = conn.channel()
@@ -65,63 +65,63 @@ class test_Exchange(TestCase):
self.assertIs(bound._channel, chan2)
def test_assert_is_bound(self):
- exchange = Exchange("foo", "direct")
+ exchange = Exchange('foo', 'direct')
with self.assertRaises(NotBoundError):
exchange.declare()
conn = get_conn()
chan = conn.channel()
exchange.bind(chan).declare()
- self.assertIn("exchange_declare", chan)
+ self.assertIn('exchange_declare', chan)
def test_set_transient_delivery_mode(self):
- exc = Exchange("foo", "direct", delivery_mode="transient")
+ exc = Exchange('foo', 'direct', delivery_mode='transient')
self.assertEqual(exc.delivery_mode, Exchange.TRANSIENT_DELIVERY_MODE)
def test_set_persistent_delivery_mode(self):
- exc = Exchange("foo", "direct", delivery_mode="persistent")
+ exc = Exchange('foo', 'direct', delivery_mode='persistent')
self.assertEqual(exc.delivery_mode, Exchange.PERSISTENT_DELIVERY_MODE)
def test_bind_at_instantiation(self):
- self.assertTrue(Exchange("foo", channel=get_conn().channel()).is_bound)
+ self.assertTrue(Exchange('foo', channel=get_conn().channel()).is_bound)
def test_create_message(self):
chan = get_conn().channel()
- Exchange("foo", channel=chan).Message({"foo": "bar"})
- self.assertIn("prepare_message", chan)
+ Exchange('foo', channel=chan).Message({'foo': 'bar'})
+ self.assertIn('prepare_message', chan)
def test_publish(self):
chan = get_conn().channel()
- Exchange("foo", channel=chan).publish("the quick brown fox")
- self.assertIn("basic_publish", chan)
+ Exchange('foo', channel=chan).publish('the quick brown fox')
+ self.assertIn('basic_publish', chan)
def test_delete(self):
chan = get_conn().channel()
- Exchange("foo", channel=chan).delete()
- self.assertIn("exchange_delete", chan)
+ Exchange('foo', channel=chan).delete()
+ self.assertIn('exchange_delete', chan)
def test__repr__(self):
- b = Exchange("foo", "topic")
- self.assertIn("foo(topic)", repr(b))
- self.assertIn("Exchange", repr(b))
+ b = Exchange('foo', 'topic')
+ self.assertIn('foo(topic)', repr(b))
+ self.assertIn('Exchange', repr(b))
class test_Queue(TestCase):
def setUp(self):
- self.exchange = Exchange("foo", "direct")
+ self.exchange = Exchange('foo', 'direct')
def test_hash(self):
- self.assertEqual(hash(Queue("a")), hash(Queue("a")))
- self.assertNotEqual(hash(Queue("a")), hash(Queue("b")))
+ self.assertEqual(hash(Queue('a')), hash(Queue('a')))
+ self.assertNotEqual(hash(Queue('a')), hash(Queue('b')))
def test_when_bound_but_no_exchange(self):
- q = Queue("a")
+ q = Queue('a')
q.exchange = None
self.assertIsNone(q.when_bound())
def test_declare_but_no_exchange(self):
- q = Queue("a")
+ q = Queue('a')
q.queue_declare = Mock()
q.queue_bind = Mock()
q.exchange = None
@@ -131,29 +131,29 @@ class test_Queue(TestCase):
q.queue_bind.assert_called_with(False)
def test_can_cache_declaration(self):
- self.assertTrue(Queue("a", durable=True).can_cache_declaration)
- self.assertFalse(Queue("a", durable=False).can_cache_declaration)
+ self.assertTrue(Queue('a', durable=True).can_cache_declaration)
+ self.assertFalse(Queue('a', durable=False).can_cache_declaration)
def test_eq(self):
- q1 = Queue("xxx", Exchange("xxx", "direct"), "xxx")
- q2 = Queue("xxx", Exchange("xxx", "direct"), "xxx")
+ q1 = Queue('xxx', Exchange('xxx', 'direct'), 'xxx')
+ q2 = Queue('xxx', Exchange('xxx', 'direct'), 'xxx')
self.assertEqual(q1, q2)
self.assertFalse(q1.__eq__(True))
- q3 = Queue("yyy", Exchange("xxx", "direct"), "xxx")
+ q3 = Queue('yyy', Exchange('xxx', 'direct'), 'xxx')
self.assertNotEqual(q1, q3)
def test_exclusive_implies_auto_delete(self):
self.assertTrue(
- Queue("foo", self.exchange, exclusive=True).auto_delete)
+ Queue('foo', self.exchange, exclusive=True).auto_delete)
def test_binds_at_instantiation(self):
- self.assertTrue(Queue("foo", self.exchange,
+ self.assertTrue(Queue('foo', self.exchange,
channel=get_conn().channel()).is_bound)
def test_also_binds_exchange(self):
chan = get_conn().channel()
- b = Queue("foo", self.exchange)
+ b = Queue('foo', self.exchange)
self.assertFalse(b.is_bound)
self.assertFalse(b.exchange.is_bound)
b = b.bind(chan)
@@ -164,49 +164,49 @@ class test_Queue(TestCase):
def test_declare(self):
chan = get_conn().channel()
- b = Queue("foo", self.exchange, "foo", channel=chan)
+ b = Queue('foo', self.exchange, 'foo', channel=chan)
self.assertTrue(b.is_bound)
b.declare()
- self.assertIn("exchange_declare", chan)
- self.assertIn("queue_declare", chan)
- self.assertIn("queue_bind", chan)
+ self.assertIn('exchange_declare', chan)
+ self.assertIn('queue_declare', chan)
+ self.assertIn('queue_bind', chan)
def test_get(self):
- b = Queue("foo", self.exchange, "foo", channel=get_conn().channel())
+ b = Queue('foo', self.exchange, 'foo', channel=get_conn().channel())
b.get()
- self.assertIn("basic_get", b.channel)
+ self.assertIn('basic_get', b.channel)
def test_purge(self):
- b = Queue("foo", self.exchange, "foo", channel=get_conn().channel())
+ b = Queue('foo', self.exchange, 'foo', channel=get_conn().channel())
b.purge()
- self.assertIn("queue_purge", b.channel)
+ self.assertIn('queue_purge', b.channel)
def test_consume(self):
- b = Queue("foo", self.exchange, "foo", channel=get_conn().channel())
- b.consume("fifafo", None)
- self.assertIn("basic_consume", b.channel)
+ b = Queue('foo', self.exchange, 'foo', channel=get_conn().channel())
+ b.consume('fifafo', None)
+ self.assertIn('basic_consume', b.channel)
def test_cancel(self):
- b = Queue("foo", self.exchange, "foo", channel=get_conn().channel())
- b.cancel("fifafo")
- self.assertIn("basic_cancel", b.channel)
+ b = Queue('foo', self.exchange, 'foo', channel=get_conn().channel())
+ b.cancel('fifafo')
+ self.assertIn('basic_cancel', b.channel)
def test_delete(self):
- b = Queue("foo", self.exchange, "foo", channel=get_conn().channel())
+ b = Queue('foo', self.exchange, 'foo', channel=get_conn().channel())
b.delete()
- self.assertIn("queue_delete", b.channel)
+ self.assertIn('queue_delete', b.channel)
def test_unbind(self):
- b = Queue("foo", self.exchange, "foo", channel=get_conn().channel())
+ b = Queue('foo', self.exchange, 'foo', channel=get_conn().channel())
b.unbind()
- self.assertIn("queue_unbind", b.channel)
+ self.assertIn('queue_unbind', b.channel)
def test_as_dict(self):
- q = Queue("foo", self.exchange, "rk")
+ q = Queue('foo', self.exchange, 'rk')
d = q.as_dict(recurse=True)
- self.assertEqual(d["exchange"]["name"], self.exchange.name)
+ self.assertEqual(d['exchange']['name'], self.exchange.name)
def test__repr__(self):
- b = Queue("foo", self.exchange, "foo")
- self.assertIn("foo", repr(b))
- self.assertIn("Queue", repr(b))
+ b = Queue('foo', self.exchange, 'foo')
+ self.assertIn('foo', repr(b))
+ self.assertIn('Queue', repr(b))
diff --git a/kombu/tests/test_log.py b/kombu/tests/test_log.py
index 5ca7d1ed..62dd1b35 100644
--- a/kombu/tests/test_log.py
+++ b/kombu/tests/test_log.py
@@ -15,119 +15,119 @@ class test_NullHandler(TestCase):
def test_emit(self):
h = log.NullHandler()
- h.emit("record")
+ h.emit('record')
class test_get_logger(TestCase):
def test_when_string(self):
- l = log.get_logger("foo")
+ l = log.get_logger('foo')
- self.assertIs(l, logging.getLogger("foo"))
+ self.assertIs(l, logging.getLogger('foo'))
h1 = l.handlers[0]
self.assertIsInstance(h1, log.NullHandler)
def test_when_logger(self):
- l = log.get_logger(logging.getLogger("foo"))
+ l = log.get_logger(logging.getLogger('foo'))
h1 = l.handlers[0]
self.assertIsInstance(h1, log.NullHandler)
def test_with_custom_handler(self):
- l = logging.getLogger("bar")
+ l = logging.getLogger('bar')
handler = log.NullHandler()
l.addHandler(handler)
- l = log.get_logger("bar")
+ l = log.get_logger('bar')
self.assertIs(l.handlers[0], handler)
def test_anon_logger(self):
- l = log.anon_logger("test_anon_logger")
+ l = log.anon_logger('test_anon_logger')
self.assertIsInstance(l.handlers[0], log.NullHandler)
def test_get_loglevel(self):
- self.assertEqual(log.get_loglevel("DEBUG"), logging.DEBUG)
- self.assertEqual(log.get_loglevel("ERROR"), logging.ERROR)
+ self.assertEqual(log.get_loglevel('DEBUG'), logging.DEBUG)
+ self.assertEqual(log.get_loglevel('ERROR'), logging.ERROR)
self.assertEqual(log.get_loglevel(logging.INFO), logging.INFO)
class test_safe_format(TestCase):
def test_formatting(self):
- fmt = "The %r jumped over the %s"
- args = ["frog", "elephant"]
+ fmt = 'The %r jumped over the %s'
+ args = ['frog', 'elephant']
res = list(log.safeify_format(fmt, *args))
- self.assertListEqual(res, ["'frog'", "elephant"])
+ self.assertListEqual(res, ["'frog'", 'elephant'])
class test_LogMixin(TestCase):
def setUp(self):
- self.log = log.Log("Log", Mock())
+ self.log = log.Log('Log', Mock())
self.logger = self.log.logger
def test_debug(self):
- self.log.debug("debug")
- self.logger.log.assert_called_with(logging.DEBUG, "Log - debug")
+ self.log.debug('debug')
+ self.logger.log.assert_called_with(logging.DEBUG, 'Log - debug')
def test_info(self):
- self.log.info("info")
- self.logger.log.assert_called_with(logging.INFO, "Log - info")
+ self.log.info('info')
+ self.logger.log.assert_called_with(logging.INFO, 'Log - info')
def test_warning(self):
- self.log.warn("warning")
- self.logger.log.assert_called_with(logging.WARN, "Log - warning")
+ self.log.warn('warning')
+ self.logger.log.assert_called_with(logging.WARN, 'Log - warning')
def test_error(self):
- self.log.error("error", exc_info="exc")
- self.logger.log.assert_called_with(logging.ERROR, "Log - error",
- exc_info="exc")
+ self.log.error('error', exc_info='exc')
+ self.logger.log.assert_called_with(logging.ERROR, 'Log - error',
+ exc_info='exc')
def test_critical(self):
- self.log.critical("crit", exc_info="exc")
- self.logger.log.assert_called_with(logging.CRITICAL, "Log - crit",
- exc_info="exc")
+ self.log.critical('crit', exc_info='exc')
+ self.logger.log.assert_called_with(logging.CRITICAL, 'Log - crit',
+ exc_info='exc')
def test_error_when_DISABLE_TRACEBACKS(self):
log.DISABLE_TRACEBACKS = True
try:
- self.log.error("error")
- self.logger.log.assert_called_with(logging.ERROR, "Log - error")
+ self.log.error('error')
+ self.logger.log.assert_called_with(logging.ERROR, 'Log - error')
finally:
log.DISABLE_TRACEBACKS = False
def test_get_loglevel(self):
- self.assertEqual(self.log.get_loglevel("DEBUG"), logging.DEBUG)
- self.assertEqual(self.log.get_loglevel("ERROR"), logging.ERROR)
+ self.assertEqual(self.log.get_loglevel('DEBUG'), logging.DEBUG)
+ self.assertEqual(self.log.get_loglevel('ERROR'), logging.ERROR)
self.assertEqual(self.log.get_loglevel(logging.INFO), logging.INFO)
def test_is_enabled_for(self):
self.logger.isEnabledFor.return_value = True
- self.assertTrue(self.log.is_enabled_for("DEBUG"))
+ self.assertTrue(self.log.is_enabled_for('DEBUG'))
self.logger.isEnabledFor.assert_called_with(logging.DEBUG)
def test_LogMixin_get_logger(self):
self.assertIs(log.LogMixin().get_logger(),
- logging.getLogger("LogMixin"))
+ logging.getLogger('LogMixin'))
def test_Log_get_logger(self):
- self.assertIs(log.Log("test_Log").get_logger(),
- logging.getLogger("test_Log"))
+ self.assertIs(log.Log('test_Log').get_logger(),
+ logging.getLogger('test_Log'))
def test_log_when_not_enabled(self):
self.logger.isEnabledFor.return_value = False
- self.log.debug("debug")
+ self.log.debug('debug')
self.assertFalse(self.logger.log.called)
def test_log_with_format(self):
- self.log.debug("Host %r removed", "example.com")
+ self.log.debug('Host %r removed', 'example.com')
self.logger.log.assert_called_with(logging.DEBUG,
- "Log - Host %s removed", "'example.com'")
+ 'Log - Host %s removed', "'example.com'")
class test_setup_logging(TestCase):
- @patch("logging.getLogger")
+ @patch('logging.getLogger')
def test_set_up_default_values(self, getLogger):
logger = logging.getLogger.return_value = Mock()
logger.handlers = []
@@ -140,18 +140,18 @@ class test_setup_logging(TestCase):
self.assertIsInstance(handler, logging.StreamHandler)
self.assertIs(handler.stream, sys.__stderr__)
- @patch("logging.getLogger")
- @patch("kombu.log.WatchedFileHandler")
+ @patch('logging.getLogger')
+ @patch('kombu.log.WatchedFileHandler')
def test_setup_custom_values(self, getLogger, WatchedFileHandler):
logger = logging.getLogger.return_value = Mock()
logger.handlers = []
- log.setup_logging(loglevel=logging.DEBUG, logfile="/var/logfile")
+ log.setup_logging(loglevel=logging.DEBUG, logfile='/var/logfile')
logger.setLevel.assert_called_with(logging.DEBUG)
self.assertTrue(logger.addHandler.called)
self.assertTrue(WatchedFileHandler.called)
- @patch("logging.getLogger")
+ @patch('logging.getLogger')
def test_logger_already_setup(self, getLogger):
logger = logging.getLogger.return_value = Mock()
logger.handlers = [Mock()]
diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py
index 1c74d74f..b5385d71 100644
--- a/kombu/tests/test_messaging.py
+++ b/kombu/tests/test_messaging.py
@@ -18,20 +18,20 @@ from .utils import Mock
class test_Producer(TestCase):
def setUp(self):
- self.exchange = Exchange("foo", "direct")
+ self.exchange = Exchange('foo', 'direct')
self.connection = BrokerConnection(transport=Transport)
self.connection.connect()
self.assertTrue(self.connection.connection.connected)
self.assertFalse(self.exchange.is_bound)
- @patch("kombu.common.maybe_declare")
+ @patch('kombu.common.maybe_declare')
def test_maybe_declare(self, maybe_declare):
p = self.connection.Producer()
- q = Queue("foo")
+ q = Queue('foo')
p.maybe_declare(q)
maybe_declare.assert_called_with(q, p.channel, False)
- @patch("kombu.common.maybe_declare")
+ @patch('kombu.common.maybe_declare')
def test_maybe_declare_when_entity_false(self, maybe_declare):
p = self.connection.Producer()
p.maybe_declare(None)
@@ -41,92 +41,92 @@ class test_Producer(TestCase):
channel = self.connection.channel()
p = Producer(channel, self.exchange, auto_declare=True)
self.assertIsNot(p.exchange, self.exchange,
- "creates Exchange clone at bind")
+ 'creates Exchange clone at bind')
self.assertTrue(p.exchange.is_bound)
- self.assertIn("exchange_declare", channel,
- "auto_declare declares exchange")
+ self.assertIn('exchange_declare', channel,
+ 'auto_declare declares exchange')
def test_manual_declare(self):
channel = self.connection.channel()
p = Producer(channel, self.exchange, auto_declare=False)
self.assertTrue(p.exchange.is_bound)
- self.assertNotIn("exchange_declare", channel,
- "auto_declare=False does not declare exchange")
+ self.assertNotIn('exchange_declare', channel,
+ 'auto_declare=False does not declare exchange')
p.declare()
- self.assertIn("exchange_declare", channel,
- "p.declare() declares exchange")
+ self.assertIn('exchange_declare', channel,
+ 'p.declare() declares exchange')
def test_prepare(self):
- message = {u"the quick brown fox": u"jumps over the lazy dog"}
+ message = {u'the quick brown fox': u'jumps over the lazy dog'}
channel = self.connection.channel()
- p = Producer(channel, self.exchange, serializer="json")
+ p = Producer(channel, self.exchange, serializer='json')
m, ctype, cencoding = p._prepare(message, headers={})
self.assertDictEqual(message, anyjson.loads(m))
- self.assertEqual(ctype, "application/json")
- self.assertEqual(cencoding, "utf-8")
+ self.assertEqual(ctype, 'application/json')
+ self.assertEqual(cencoding, 'utf-8')
def test_prepare_compression(self):
- message = {u"the quick brown fox": u"jumps over the lazy dog"}
+ message = {u'the quick brown fox': u'jumps over the lazy dog'}
channel = self.connection.channel()
- p = Producer(channel, self.exchange, serializer="json")
+ p = Producer(channel, self.exchange, serializer='json')
headers = {}
- m, ctype, cencoding = p._prepare(message, compression="zlib",
+ m, ctype, cencoding = p._prepare(message, compression='zlib',
headers=headers)
- self.assertEqual(ctype, "application/json")
- self.assertEqual(cencoding, "utf-8")
- self.assertEqual(headers["compression"], "application/x-gzip")
+ self.assertEqual(ctype, 'application/json')
+ self.assertEqual(cencoding, 'utf-8')
+ self.assertEqual(headers['compression'], 'application/x-gzip')
import zlib
self.assertEqual(anyjson.loads(
- zlib.decompress(m).decode("utf-8")), message)
+ zlib.decompress(m).decode('utf-8')), message)
def test_prepare_custom_content_type(self):
- message = "the quick brown fox".encode("utf-8")
+ message = 'the quick brown fox'.encode('utf-8')
channel = self.connection.channel()
- p = Producer(channel, self.exchange, serializer="json")
- m, ctype, cencoding = p._prepare(message, content_type="custom")
+ p = Producer(channel, self.exchange, serializer='json')
+ m, ctype, cencoding = p._prepare(message, content_type='custom')
self.assertEqual(m, message)
- self.assertEqual(ctype, "custom")
- self.assertEqual(cencoding, "binary")
- m, ctype, cencoding = p._prepare(message, content_type="custom",
- content_encoding="alien")
+ self.assertEqual(ctype, 'custom')
+ self.assertEqual(cencoding, 'binary')
+ m, ctype, cencoding = p._prepare(message, content_type='custom',
+ content_encoding='alien')
self.assertEqual(m, message)
- self.assertEqual(ctype, "custom")
- self.assertEqual(cencoding, "alien")
+ self.assertEqual(ctype, 'custom')
+ self.assertEqual(cencoding, 'alien')
def test_prepare_is_already_unicode(self):
- message = u"the quick brown fox"
+ message = u'the quick brown fox'
channel = self.connection.channel()
- p = Producer(channel, self.exchange, serializer="json")
- m, ctype, cencoding = p._prepare(message, content_type="text/plain")
- self.assertEqual(m, message.encode("utf-8"))
- self.assertEqual(ctype, "text/plain")
- self.assertEqual(cencoding, "utf-8")
- m, ctype, cencoding = p._prepare(message, content_type="text/plain",
- content_encoding="utf-8")
- self.assertEqual(m, message.encode("utf-8"))
- self.assertEqual(ctype, "text/plain")
- self.assertEqual(cencoding, "utf-8")
+ p = Producer(channel, self.exchange, serializer='json')
+ m, ctype, cencoding = p._prepare(message, content_type='text/plain')
+ self.assertEqual(m, message.encode('utf-8'))
+ self.assertEqual(ctype, 'text/plain')
+ self.assertEqual(cencoding, 'utf-8')
+ m, ctype, cencoding = p._prepare(message, content_type='text/plain',
+ content_encoding='utf-8')
+ self.assertEqual(m, message.encode('utf-8'))
+ self.assertEqual(ctype, 'text/plain')
+ self.assertEqual(cencoding, 'utf-8')
def test_publish_with_Exchange_instance(self):
p = self.connection.Producer()
p.exchange.publish = Mock()
- p.publish("hello", exchange=Exchange("foo"))
- self.assertEqual(p.exchange.publish.call_args[0][4], "foo")
+ p.publish('hello', exchange=Exchange('foo'))
+ self.assertEqual(p.exchange.publish.call_args[0][4], 'foo')
def test_publish_retry_with_declare(self):
p = self.connection.Producer()
p.maybe_declare = Mock()
ensure = p.connection.ensure = Mock()
- ex = Exchange("foo")
- p.publish("hello", exchange=ex, declare=[ex], retry=True,
- retry_policy={"step": 4})
+ ex = Exchange('foo')
+ p.publish('hello', exchange=ex, declare=[ex], retry=True,
+ retry_policy={'step': 4})
p.maybe_declare.assert_called_with(ex, True, step=4)
ensure.assert_called_with(p, p.exchange.publish, step=4)
def test_revive_when_channel_is_connection(self):
p = self.connection.Producer()
p.exchange = Mock()
- new_conn = BrokerConnection("memory://")
+ new_conn = BrokerConnection('memory://')
defchan = new_conn.default_channel
p.revive(new_conn)
@@ -148,20 +148,20 @@ class test_Producer(TestCase):
def test_publish(self):
channel = self.connection.channel()
- p = Producer(channel, self.exchange, serializer="json")
- message = {u"the quick brown fox": u"jumps over the lazy dog"}
- ret = p.publish(message, routing_key="process")
- self.assertIn("prepare_message", channel)
- self.assertIn("basic_publish", channel)
+ p = Producer(channel, self.exchange, serializer='json')
+ message = {u'the quick brown fox': u'jumps over the lazy dog'}
+ ret = p.publish(message, routing_key='process')
+ self.assertIn('prepare_message', channel)
+ self.assertIn('basic_publish', channel)
m, exc, rkey = ret
- self.assertDictEqual(message, anyjson.loads(m["body"]))
- self.assertDictContainsSubset({"content_type": "application/json",
- "content_encoding": "utf-8",
- "priority": 0}, m)
- self.assertDictContainsSubset({"delivery_mode": 2}, m["properties"])
+ self.assertDictEqual(message, anyjson.loads(m['body']))
+ self.assertDictContainsSubset({'content_type': 'application/json',
+ 'content_encoding': 'utf-8',
+ 'priority': 0}, m)
+ self.assertDictContainsSubset({'delivery_mode': 2}, m['properties'])
self.assertEqual(exc, p.exchange.name)
- self.assertEqual(rkey, "process")
+ self.assertEqual(rkey, 'process')
def test_no_exchange(self):
chan = self.connection.channel()
@@ -183,7 +183,7 @@ class test_Producer(TestCase):
pass
p = Producer(chan, on_return=on_return)
- self.assertTrue(on_return in chan.events["basic_return"])
+ self.assertTrue(on_return in chan.events['basic_return'])
self.assertTrue(p.on_return)
@@ -193,11 +193,11 @@ class test_Consumer(TestCase):
self.connection = BrokerConnection(transport=Transport)
self.connection.connect()
self.assertTrue(self.connection.connection.connected)
- self.exchange = Exchange("foo", "direct")
+ self.exchange = Exchange('foo', 'direct')
def test_set_no_ack(self):
channel = self.connection.channel()
- queue = Queue("qname", self.exchange, "rkey")
+ queue = Queue('qname', self.exchange, 'rkey')
consumer = Consumer(channel, queue, auto_declare=True, no_ack=True)
self.assertTrue(consumer.no_ack)
@@ -224,30 +224,30 @@ class test_Consumer(TestCase):
def test_consuming_from(self):
consumer = self.connection.Consumer()
- consumer.queues[:] = [Queue("a"), Queue("b")]
- self.assertFalse(consumer.consuming_from(Queue("c")))
- self.assertFalse(consumer.consuming_from("c"))
- self.assertTrue(consumer.consuming_from(Queue("a")))
- self.assertTrue(consumer.consuming_from(Queue("b")))
- self.assertTrue(consumer.consuming_from("b"))
+ consumer.queues[:] = [Queue('a'), Queue('b')]
+ self.assertFalse(consumer.consuming_from(Queue('c')))
+ self.assertFalse(consumer.consuming_from('c'))
+ self.assertTrue(consumer.consuming_from(Queue('a')))
+ self.assertTrue(consumer.consuming_from(Queue('b')))
+ self.assertTrue(consumer.consuming_from('b'))
def test_receive_callback_without_m2p(self):
channel = self.connection.channel()
c = channel.Consumer()
- m2p = getattr(channel, "message_to_python")
+ m2p = getattr(channel, 'message_to_python')
channel.message_to_python = None
try:
message = Mock()
- message.decode.return_value = "Hello"
+ message.decode.return_value = 'Hello'
recv = c.receive = Mock()
c._receive_callback(message)
- recv.assert_called_with("Hello", message)
+ recv.assert_called_with('Hello', message)
finally:
channel.message_to_python = m2p
def test_set_callbacks(self):
channel = self.connection.channel()
- queue = Queue("qname", self.exchange, "rkey")
+ queue = Queue('qname', self.exchange, 'rkey')
callbacks = [lambda x, y: x,
lambda x, y: x]
consumer = Consumer(channel, queue, auto_declare=True,
@@ -256,7 +256,7 @@ class test_Consumer(TestCase):
def test_auto_declare(self):
channel = self.connection.channel()
- queue = Queue("qname", self.exchange, "rkey")
+ queue = Queue('qname', self.exchange, 'rkey')
consumer = Consumer(channel, queue, auto_declare=True)
consumer.consume()
consumer.consume() # twice is a noop
@@ -265,12 +265,12 @@ class test_Consumer(TestCase):
self.assertTrue(consumer.queues[0].exchange.is_bound)
self.assertIsNot(consumer.queues[0].exchange, self.exchange)
- for meth in ("exchange_declare",
- "queue_declare",
- "queue_bind",
- "basic_consume"):
+ for meth in ('exchange_declare',
+ 'queue_declare',
+ 'queue_bind',
+ 'basic_consume'):
self.assertIn(meth, channel)
- self.assertEqual(channel.called.count("basic_consume"), 1)
+ self.assertEqual(channel.called.count('basic_consume'), 1)
self.assertTrue(consumer._active_tags)
consumer.cancel_by_queue(queue.name)
@@ -279,93 +279,93 @@ class test_Consumer(TestCase):
def test_manual_declare(self):
channel = self.connection.channel()
- queue = Queue("qname", self.exchange, "rkey")
+ queue = Queue('qname', self.exchange, 'rkey')
consumer = Consumer(channel, queue, auto_declare=False)
self.assertIsNot(consumer.queues[0], queue)
self.assertTrue(consumer.queues[0].is_bound)
self.assertTrue(consumer.queues[0].exchange.is_bound)
self.assertIsNot(consumer.queues[0].exchange, self.exchange)
- for meth in ("exchange_declare",
- "queue_declare",
- "basic_consume"):
+ for meth in ('exchange_declare',
+ 'queue_declare',
+ 'basic_consume'):
self.assertNotIn(meth, channel)
consumer.declare()
- for meth in ("exchange_declare",
- "queue_declare",
- "queue_bind"):
+ for meth in ('exchange_declare',
+ 'queue_declare',
+ 'queue_bind'):
self.assertIn(meth, channel)
- self.assertNotIn("basic_consume", channel)
+ self.assertNotIn('basic_consume', channel)
consumer.consume()
- self.assertIn("basic_consume", channel)
+ self.assertIn('basic_consume', channel)
def test_consume__cancel(self):
channel = self.connection.channel()
- queue = Queue("qname", self.exchange, "rkey")
+ queue = Queue('qname', self.exchange, 'rkey')
consumer = Consumer(channel, queue, auto_declare=True)
consumer.consume()
consumer.cancel()
- self.assertIn("basic_cancel", channel)
+ self.assertIn('basic_cancel', channel)
self.assertFalse(consumer._active_tags)
def test___enter____exit__(self):
channel = self.connection.channel()
- queue = Queue("qname", self.exchange, "rkey")
+ queue = Queue('qname', self.exchange, 'rkey')
consumer = Consumer(channel, queue, auto_declare=True)
context = consumer.__enter__()
self.assertIs(context, consumer)
self.assertTrue(consumer._active_tags)
res = consumer.__exit__(None, None, None)
self.assertFalse(res)
- self.assertIn("basic_cancel", channel)
+ self.assertIn('basic_cancel', channel)
self.assertFalse(consumer._active_tags)
def test_flow(self):
channel = self.connection.channel()
- queue = Queue("qname", self.exchange, "rkey")
+ queue = Queue('qname', self.exchange, 'rkey')
consumer = Consumer(channel, queue, auto_declare=True)
consumer.flow(False)
- self.assertIn("flow", channel)
+ self.assertIn('flow', channel)
def test_qos(self):
channel = self.connection.channel()
- queue = Queue("qname", self.exchange, "rkey")
+ queue = Queue('qname', self.exchange, 'rkey')
consumer = Consumer(channel, queue, auto_declare=True)
consumer.qos(30, 10, False)
- self.assertIn("basic_qos", channel)
+ self.assertIn('basic_qos', channel)
def test_purge(self):
channel = self.connection.channel()
- b1 = Queue("qname1", self.exchange, "rkey")
- b2 = Queue("qname2", self.exchange, "rkey")
- b3 = Queue("qname3", self.exchange, "rkey")
- b4 = Queue("qname4", self.exchange, "rkey")
+ b1 = Queue('qname1', self.exchange, 'rkey')
+ b2 = Queue('qname2', self.exchange, 'rkey')
+ b3 = Queue('qname3', self.exchange, 'rkey')
+ b4 = Queue('qname4', self.exchange, 'rkey')
consumer = Consumer(channel, [b1, b2, b3, b4], auto_declare=True)
consumer.purge()
- self.assertEqual(channel.called.count("queue_purge"), 4)
+ self.assertEqual(channel.called.count('queue_purge'), 4)
def test_multiple_queues(self):
channel = self.connection.channel()
- b1 = Queue("qname1", self.exchange, "rkey")
- b2 = Queue("qname2", self.exchange, "rkey")
- b3 = Queue("qname3", self.exchange, "rkey")
- b4 = Queue("qname4", self.exchange, "rkey")
+ b1 = Queue('qname1', self.exchange, 'rkey')
+ b2 = Queue('qname2', self.exchange, 'rkey')
+ b3 = Queue('qname3', self.exchange, 'rkey')
+ b4 = Queue('qname4', self.exchange, 'rkey')
consumer = Consumer(channel, [b1, b2, b3, b4])
consumer.consume()
- self.assertEqual(channel.called.count("exchange_declare"), 4)
- self.assertEqual(channel.called.count("queue_declare"), 4)
- self.assertEqual(channel.called.count("queue_bind"), 4)
- self.assertEqual(channel.called.count("basic_consume"), 4)
+ self.assertEqual(channel.called.count('exchange_declare'), 4)
+ self.assertEqual(channel.called.count('queue_declare'), 4)
+ self.assertEqual(channel.called.count('queue_bind'), 4)
+ self.assertEqual(channel.called.count('basic_consume'), 4)
self.assertEqual(len(consumer._active_tags), 4)
consumer.cancel()
- self.assertEqual(channel.called.count("basic_cancel"), 4)
+ self.assertEqual(channel.called.count('basic_cancel'), 4)
self.assertFalse(len(consumer._active_tags))
def test_receive_callback(self):
channel = self.connection.channel()
- b1 = Queue("qname1", self.exchange, "rkey")
+ b1 = Queue('qname1', self.exchange, 'rkey')
consumer = Consumer(channel, [b1])
received = []
@@ -375,15 +375,15 @@ class test_Consumer(TestCase):
message.payload # trigger cache
consumer.register_callback(callback)
- consumer._receive_callback({u"foo": u"bar"})
+ consumer._receive_callback({u'foo': u'bar'})
- self.assertIn("basic_ack", channel)
- self.assertIn("message_to_python", channel)
- self.assertEqual(received[0], {u"foo": u"bar"})
+ self.assertIn('basic_ack', channel)
+ self.assertIn('message_to_python', channel)
+ self.assertEqual(received[0], {u'foo': u'bar'})
def test_basic_ack_twice(self):
channel = self.connection.channel()
- b1 = Queue("qname1", self.exchange, "rkey")
+ b1 = Queue('qname1', self.exchange, 'rkey')
consumer = Consumer(channel, [b1])
def callback(message_data, message):
@@ -392,23 +392,23 @@ class test_Consumer(TestCase):
consumer.register_callback(callback)
with self.assertRaises(MessageStateError):
- consumer._receive_callback({"foo": "bar"})
+ consumer._receive_callback({'foo': 'bar'})
def test_basic_reject(self):
channel = self.connection.channel()
- b1 = Queue("qname1", self.exchange, "rkey")
+ b1 = Queue('qname1', self.exchange, 'rkey')
consumer = Consumer(channel, [b1])
def callback(message_data, message):
message.reject()
consumer.register_callback(callback)
- consumer._receive_callback({"foo": "bar"})
- self.assertIn("basic_reject", channel)
+ consumer._receive_callback({'foo': 'bar'})
+ self.assertIn('basic_reject', channel)
def test_basic_reject_twice(self):
channel = self.connection.channel()
- b1 = Queue("qname1", self.exchange, "rkey")
+ b1 = Queue('qname1', self.exchange, 'rkey')
consumer = Consumer(channel, [b1])
def callback(message_data, message):
@@ -417,24 +417,24 @@ class test_Consumer(TestCase):
consumer.register_callback(callback)
with self.assertRaises(MessageStateError):
- consumer._receive_callback({"foo": "bar"})
- self.assertIn("basic_reject", channel)
+ consumer._receive_callback({'foo': 'bar'})
+ self.assertIn('basic_reject', channel)
def test_basic_reject__requeue(self):
channel = self.connection.channel()
- b1 = Queue("qname1", self.exchange, "rkey")
+ b1 = Queue('qname1', self.exchange, 'rkey')
consumer = Consumer(channel, [b1])
def callback(message_data, message):
message.requeue()
consumer.register_callback(callback)
- consumer._receive_callback({"foo": "bar"})
- self.assertIn("basic_reject:requeue", channel)
+ consumer._receive_callback({'foo': 'bar'})
+ self.assertIn('basic_reject:requeue', channel)
def test_basic_reject__requeue_twice(self):
channel = self.connection.channel()
- b1 = Queue("qname1", self.exchange, "rkey")
+ b1 = Queue('qname1', self.exchange, 'rkey')
consumer = Consumer(channel, [b1])
def callback(message_data, message):
@@ -443,28 +443,28 @@ class test_Consumer(TestCase):
consumer.register_callback(callback)
with self.assertRaises(MessageStateError):
- consumer._receive_callback({"foo": "bar"})
- self.assertIn("basic_reject:requeue", channel)
+ consumer._receive_callback({'foo': 'bar'})
+ self.assertIn('basic_reject:requeue', channel)
def test_receive_without_callbacks_raises(self):
channel = self.connection.channel()
- b1 = Queue("qname1", self.exchange, "rkey")
+ b1 = Queue('qname1', self.exchange, 'rkey')
consumer = Consumer(channel, [b1])
with self.assertRaises(NotImplementedError):
consumer.receive(1, 2)
def test_decode_error(self):
channel = self.connection.channel()
- b1 = Queue("qname1", self.exchange, "rkey")
+ b1 = Queue('qname1', self.exchange, 'rkey')
consumer = Consumer(channel, [b1])
consumer.channel.throw_decode_error = True
with self.assertRaises(ValueError):
- consumer._receive_callback({"foo": "bar"})
+ consumer._receive_callback({'foo': 'bar'})
def test_on_decode_error_callback(self):
channel = self.connection.channel()
- b1 = Queue("qname1", self.exchange, "rkey")
+ b1 = Queue('qname1', self.exchange, 'rkey')
thrown = []
def on_decode_error(msg, exc):
@@ -472,23 +472,23 @@ class test_Consumer(TestCase):
consumer = Consumer(channel, [b1], on_decode_error=on_decode_error)
consumer.channel.throw_decode_error = True
- consumer._receive_callback({"foo": "bar"})
+ consumer._receive_callback({'foo': 'bar'})
self.assertTrue(thrown)
m, exc = thrown[0]
- self.assertEqual(anyjson.loads(m), {"foo": "bar"})
+ self.assertEqual(anyjson.loads(m), {'foo': 'bar'})
self.assertIsInstance(exc, ValueError)
def test_recover(self):
channel = self.connection.channel()
- b1 = Queue("qname1", self.exchange, "rkey")
+ b1 = Queue('qname1', self.exchange, 'rkey')
consumer = Consumer(channel, [b1])
consumer.recover()
- self.assertIn("basic_recover", channel)
+ self.assertIn('basic_recover', channel)
def test_revive(self):
channel = self.connection.channel()
- b1 = Queue("qname1", self.exchange, "rkey")
+ b1 = Queue('qname1', self.exchange, 'rkey')
consumer = Consumer(channel, [b1])
channel2 = self.connection.channel()
consumer.revive(channel2)
@@ -498,7 +498,7 @@ class test_Consumer(TestCase):
def test__repr__(self):
channel = self.connection.channel()
- b1 = Queue("qname1", self.exchange, "rkey")
+ b1 = Queue('qname1', self.exchange, 'rkey')
self.assertTrue(repr(Consumer(channel, [b1])))
def test_connection_property_handles_AttributeError(self):
diff --git a/kombu/tests/test_pidbox.py b/kombu/tests/test_pidbox.py
index 28ac977a..72d974ea 100644
--- a/kombu/tests/test_pidbox.py
+++ b/kombu/tests/test_pidbox.py
@@ -14,32 +14,32 @@ from .utils import Mock
class test_Mailbox(TestCase):
def _handler(self, state):
- return self.stats["var"]
+ return self.stats['var']
def setUp(self):
class Mailbox(pidbox.Mailbox):
def _collect(self, *args, **kwargs):
- return "COLLECTED"
+ return 'COLLECTED'
- self.mailbox = Mailbox("test_pidbox")
- self.connection = BrokerConnection(transport="memory")
- self.state = {"var": 1}
- self.handlers = {"mymethod": self._handler}
+ self.mailbox = Mailbox('test_pidbox')
+ self.connection = BrokerConnection(transport='memory')
+ self.state = {'var': 1}
+ self.handlers = {'mymethod': self._handler}
self.bound = self.mailbox(self.connection)
self.default_chan = self.connection.channel()
- self.node = self.bound.Node("test_pidbox", state=self.state,
+ self.node = self.bound.Node('test_pidbox', state=self.state,
handlers=self.handlers,
channel=self.default_chan)
def test_reply__collect(self):
- mailbox = pidbox.Mailbox("test_reply__collect")(self.connection)
+ mailbox = pidbox.Mailbox('test_reply__collect')(self.connection)
exchange = mailbox.reply_exchange.name
ticket = uuid()
mailbox.get_reply_queue(ticket)(self.connection.channel()).declare()
- mailbox._publish_reply({"foo": "bar"}, exchange, ticket)
+ mailbox._publish_reply({'foo': 'bar'}, exchange, ticket)
_callback_called = [False]
def callback(body):
@@ -48,14 +48,14 @@ class test_Mailbox(TestCase):
channel = self.connection.channel()
reply = mailbox._collect(ticket, limit=1, callback=callback,
channel=channel)
- self.assertEqual(reply, [{"foo": "bar"}])
+ self.assertEqual(reply, [{'foo': 'bar'}])
self.assertTrue(_callback_called[0])
ticket = uuid()
mailbox.get_reply_queue(ticket)(self.connection.channel()).declare()
- mailbox._publish_reply({"biz": "boz"}, exchange, ticket)
+ mailbox._publish_reply({'biz': 'boz'}, exchange, ticket)
reply = mailbox._collect(ticket, limit=1, channel=channel)
- self.assertEqual(reply, [{"biz": "boz"}])
+ self.assertEqual(reply, [{'biz': 'boz'}])
de = mailbox.connection.drain_events = Mock()
de.side_effect = socket.timeout
@@ -77,7 +77,7 @@ class test_Mailbox(TestCase):
self.assertTrue(self.handlers)
# No initial handlers
- node2 = self.bound.Node("test_pidbox2", state=self.state)
+ node2 = self.bound.Node('test_pidbox2', state=self.state)
self.assertDictEqual(node2.handlers, {})
def test_Node_consumer(self):
@@ -91,26 +91,26 @@ class test_Mailbox(TestCase):
self.assertFalse(consumer2.no_ack)
def test_handler(self):
- node = self.bound.Node("test_handler", state=self.state)
+ node = self.bound.Node('test_handler', state=self.state)
@node.handler
def my_handler_name(state):
return 42
- self.assertIn("my_handler_name", node.handlers)
+ self.assertIn('my_handler_name', node.handlers)
def test_dispatch(self):
- node = self.bound.Node("test_dispatch", state=self.state)
+ node = self.bound.Node('test_dispatch', state=self.state)
@node.handler
def my_handler_name(state, x=None, y=None):
return x + y
- self.assertEqual(node.dispatch("my_handler_name",
- arguments={"x": 10, "y": 10}), 20)
+ self.assertEqual(node.dispatch('my_handler_name',
+ arguments={'x': 10, 'y': 10}), 20)
def test_dispatch_raising_SystemExit(self):
- node = self.bound.Node("test_dispatch_raising_SystemExit",
+ node = self.bound.Node('test_dispatch_raising_SystemExit',
state=self.state)
@node.handler
@@ -118,18 +118,18 @@ class test_Mailbox(TestCase):
raise SystemExit
with self.assertRaises(SystemExit):
- node.dispatch("my_handler_name")
+ node.dispatch('my_handler_name')
def test_dispatch_raising(self):
- node = self.bound.Node("test_dispatch_raising", state=self.state)
+ node = self.bound.Node('test_dispatch_raising', state=self.state)
@node.handler
def my_handler_name(state):
- raise KeyError("foo")
+ raise KeyError('foo')
- res = node.dispatch("my_handler_name")
- self.assertIn("error", res)
- self.assertIn("KeyError", res["error"])
+ res = node.dispatch('my_handler_name')
+ self.assertIn('error', res)
+ self.assertIn('KeyError', res['error'])
def test_dispatch_replies(self):
_replied = [False]
@@ -137,16 +137,16 @@ class test_Mailbox(TestCase):
def reply(data, **options):
_replied[0] = True
- node = self.bound.Node("test_dispatch", state=self.state)
+ node = self.bound.Node('test_dispatch', state=self.state)
node.reply = reply
@node.handler
def my_handler_name(state, x=None, y=None):
return x + y
- node.dispatch("my_handler_name",
- arguments={"x": 10, "y": 10},
- reply_to={"exchange": "foo", "routing_key": "bar"})
+ node.dispatch('my_handler_name',
+ arguments={'x': 10, 'y': 10},
+ reply_to={'exchange': 'foo', 'routing_key': 'bar'})
self.assertTrue(_replied[0])
def test_reply(self):
@@ -157,34 +157,34 @@ class test_Mailbox(TestCase):
mailbox = self.mailbox(self.connection)
mailbox._publish_reply = publish_reply
- node = mailbox.Node("test_reply")
+ node = mailbox.Node('test_reply')
@node.handler
def my_handler_name(state):
return 42
- node.dispatch("my_handler_name",
- reply_to={"exchange": "exchange",
- "routing_key": "rkey"})
+ node.dispatch('my_handler_name',
+ reply_to={'exchange': 'exchange',
+ 'routing_key': 'rkey'})
data, exchange, routing_key = _replied[0]
- self.assertEqual(data, {"test_reply": 42})
- self.assertEqual(exchange, "exchange")
- self.assertEqual(routing_key, "rkey")
+ self.assertEqual(data, {'test_reply': 42})
+ self.assertEqual(exchange, 'exchange')
+ self.assertEqual(routing_key, 'rkey')
def test_handle_message(self):
- node = self.bound.Node("test_dispatch_from_message")
+ node = self.bound.Node('test_dispatch_from_message')
@node.handler
def my_handler_name(state, x=None, y=None):
return x * y
- body = {"method": "my_handler_name",
- "arguments": {"x": 64, "y": 64}}
+ body = {'method': 'my_handler_name',
+ 'arguments': {'x': 64, 'y': 64}}
self.assertEqual(node.handle_message(body, None), 64 * 64)
# message not for me should not be processed.
- body["destination"] = ["some_other_node"]
+ body['destination'] = ['some_other_node']
self.assertIsNone(node.handle_message(body, None))
def test_listen(self):
@@ -194,27 +194,27 @@ class test_Mailbox(TestCase):
self.assertEqual(consumer.channel, self.default_chan)
def test_cast(self):
- self.bound.cast(["somenode"], "mymethod")
+ self.bound.cast(['somenode'], 'mymethod')
consumer = self.node.Consumer()
self.assertIsCast(self.get_next(consumer))
def test_abcast(self):
- self.bound.abcast("mymethod")
+ self.bound.abcast('mymethod')
consumer = self.node.Consumer()
self.assertIsCast(self.get_next(consumer))
def test_call_destination_must_be_sequence(self):
with self.assertRaises(ValueError):
- self.bound.call("some_node", "mymethod")
+ self.bound.call('some_node', 'mymethod')
def test_call(self):
- self.assertEqual(self.bound.call(["some_node"], "mymethod"),
- "COLLECTED")
+ self.assertEqual(self.bound.call(['some_node'], 'mymethod'),
+ 'COLLECTED')
consumer = self.node.Consumer()
self.assertIsCall(self.get_next(consumer))
def test_multi_call(self):
- self.assertEqual(self.bound.multi_call("mymethod"), "COLLECTED")
+ self.assertEqual(self.bound.multi_call('mymethod'), 'COLLECTED')
consumer = self.node.Consumer()
self.assertIsCall(self.get_next(consumer))
@@ -224,8 +224,8 @@ class test_Mailbox(TestCase):
return m.payload
def assertIsCast(self, message):
- self.assertTrue(message["method"])
+ self.assertTrue(message['method'])
def assertIsCall(self, message):
- self.assertTrue(message["method"])
- self.assertTrue(message["reply_to"])
+ self.assertTrue(message['method'])
+ self.assertTrue(message['reply_to'])
diff --git a/kombu/tests/test_pools.py b/kombu/tests/test_pools.py
index f1a9b44d..ce11f584 100644
--- a/kombu/tests/test_pools.py
+++ b/kombu/tests/test_pools.py
@@ -100,24 +100,24 @@ class test_PoolGroup(TestCase):
def test_getitem_using_global_limit(self):
pools._used[0] = False
g = self.MyGroup(limit=pools.use_global_limit)
- res = g["foo"]
- self.assertTupleEqual(res, ("foo", pools.get_limit()))
+ res = g['foo']
+ self.assertTupleEqual(res, ('foo', pools.get_limit()))
self.assertTrue(pools._used[0])
def test_getitem_using_custom_limit(self):
pools._used[0] = True
g = self.MyGroup(limit=102456)
- res = g["foo"]
- self.assertTupleEqual(res, ("foo", 102456))
+ res = g['foo']
+ self.assertTupleEqual(res, ('foo', 102456))
def test_delitem(self):
g = self.MyGroup()
- g["foo"]
- del(g["foo"])
- self.assertNotIn("foo", g)
+ g['foo']
+ del(g['foo'])
+ self.assertNotIn('foo', g)
def test_Connections(self):
- conn = Connection("memory://")
+ conn = Connection('memory://')
p = pools.connections[conn]
self.assertTrue(p)
self.assertIsInstance(p, ConnectionPool)
@@ -125,7 +125,7 @@ class test_PoolGroup(TestCase):
self.assertEqual(p.limit, pools.get_limit())
def test_Producers(self):
- conn = Connection("memory://")
+ conn = Connection('memory://')
p = pools.producers[conn]
self.assertTrue(p)
self.assertIsInstance(p, pools.ProducerPool)
@@ -134,7 +134,7 @@ class test_PoolGroup(TestCase):
self.assertEqual(p.limit, pools.get_limit())
def test_all_groups(self):
- conn = Connection("memory://")
+ conn = Connection('memory://')
pools.connections[conn]
self.assertTrue(list(pools._all_pools()))
@@ -148,7 +148,7 @@ class test_PoolGroup(TestCase):
def clear(self):
self.clear_called = True
- p1 = pools.connections["foo"] = Mock()
+ p1 = pools.connections['foo'] = Mock()
g1 = MyGroup()
pools._groups.append(g1)
@@ -156,7 +156,7 @@ class test_PoolGroup(TestCase):
p1.force_close_all.assert_called_with()
self.assertTrue(g1.clear_called)
- p1 = pools.connections["foo"] = Mock()
+ p1 = pools.connections['foo'] = Mock()
p1.force_close_all.side_effect = KeyError()
pools.reset()
@@ -166,7 +166,7 @@ class test_PoolGroup(TestCase):
limit = pools.get_limit()
self.assertEqual(limit, 34576)
- pools.connections[Connection("memory://")]
+ pools.connections[Connection('memory://')]
pools.set_limit(limit + 1)
self.assertEqual(pools.get_limit(), limit + 1)
limit = pools.get_limit()
@@ -181,8 +181,8 @@ class test_PoolGroup(TestCase):
class test_fun_PoolGroup(TestCase):
def test_connections_behavior(self):
- c1u = "memory://localhost:123"
- c2u = "memory://localhost:124"
+ c1u = 'memory://localhost:123'
+ c2u = 'memory://localhost:124'
c1 = Connection(c1u)
c2 = Connection(c2u)
c3 = Connection(c1u)
diff --git a/kombu/tests/test_serialization.py b/kombu/tests/test_serialization.py
index b5ad35b1..c5e0eccd 100644
--- a/kombu/tests/test_serialization.py
+++ b/kombu/tests/test_serialization.py
@@ -22,19 +22,19 @@ latin_string_as_utf8 = latin_string.encode('utf-8')
# For serialization tests
-py_data = {"string": "The quick brown fox jumps over the lazy dog",
- "int": 10,
- "float": 3.14159265,
- "unicode": u"Thé quick brown fox jumps over thé lazy dog",
- "list": ["george", "jerry", "elaine", "cosmo"],
+py_data = {'string': 'The quick brown fox jumps over the lazy dog',
+ 'int': 10,
+ 'float': 3.14159265,
+ 'unicode': u'Thé quick brown fox jumps over thé lazy dog',
+ 'list': ['george', 'jerry', 'elaine', 'cosmo'],
}
# JSON serialization tests
-json_data = ('{"int": 10, "float": 3.1415926500000002, '
+json_data = ('''{"int": 10, "float": 3.1415926500000002, '
'"list": ["george", "jerry", "elaine", "cosmo"], '
'"string": "The quick brown fox jumps over the lazy '
'dog", "unicode": "Th\\u00e9 quick brown fox jumps over '
- 'th\\u00e9 lazy dog"}')
+ 'th\\u00e9 lazy dog"}''')
# Pickle serialization tests
pickle_data = pickle.dumps(py_data)
@@ -49,9 +49,9 @@ yaml_data = ('float: 3.1415926500000002\nint: 10\n'
msgpack_py_data = dict(py_data)
# msgpack only supports tuples
-msgpack_py_data["list"] = tuple(msgpack_py_data["list"])
+msgpack_py_data['list'] = tuple(msgpack_py_data['list'])
# Unicode chars are lost in transmit :(
-msgpack_py_data["unicode"] = 'Th quick brown fox jumps over th lazy dog'
+msgpack_py_data['unicode'] = 'Th quick brown fox jumps over th lazy dog'
msgpack_data = ('\x85\xa3int\n\xa5float\xcb@\t!\xfbS\xc8\xd4\xf1\xa4list'
'\x94\xa6george\xa5jerry\xa6elaine\xa5cosmo\xa6string\xda'
'\x00+The quick brown fox jumps over the lazy dog\xa7unicode'
@@ -59,11 +59,11 @@ msgpack_data = ('\x85\xa3int\n\xa5float\xcb@\t!\xfbS\xc8\xd4\xf1\xa4list'
def say(m):
- sys.stderr.write("%s\n" % (m, ))
+ sys.stderr.write('%s\n' % (m, ))
-registry.register('testS', lambda s: s, lambda s: "decoded",
- "application/testS", "utf-8")
+registry.register('testS', lambda s: s, lambda s: 'decoded',
+ 'application/testS', 'utf-8')
class test_Serialization(TestCase):
@@ -71,32 +71,32 @@ class test_Serialization(TestCase):
def test_disable(self):
disabled = registry._disabled_content_types
try:
- registry.disable("testS")
- self.assertIn("application/testS", disabled)
+ registry.disable('testS')
+ self.assertIn('application/testS', disabled)
disabled.clear()
- registry.disable("application/testS")
- self.assertIn("application/testS", disabled)
+ registry.disable('application/testS')
+ self.assertIn('application/testS', disabled)
finally:
disabled.clear()
def test_decode_when_disabled(self):
disabled = registry._disabled_content_types
try:
- registry.disable("testS")
+ registry.disable('testS')
with self.assertRaises(SerializerNotInstalled):
- registry.decode("xxd", "application/testS", "utf-8",
+ registry.decode('xxd', 'application/testS', 'utf-8',
force=False)
- ret = registry.decode("xxd", "application/testS", "utf-8",
+ ret = registry.decode('xxd', 'application/testS', 'utf-8',
force=True)
- self.assertEqual(ret, "decoded")
+ self.assertEqual(ret, 'decoded')
finally:
disabled.clear()
def test_decode_when_data_is_None(self):
- registry.decode(None, "application/testS", "utf-8")
+ registry.decode(None, 'application/testS', 'utf-8')
def test_content_type_decoding(self):
self.assertEqual(unicode_string,
@@ -123,13 +123,13 @@ class test_Serialization(TestCase):
content_encoding='binary'))
def test_content_type_encoding(self):
- # Using the "raw" serializer
+ # Using the 'raw' serializer
self.assertEqual(unicode_string_as_utf8,
registry.encode(
- unicode_string, serializer="raw")[-1])
+ unicode_string, serializer='raw')[-1])
self.assertEqual(latin_string_as_utf8,
registry.encode(
- latin_string, serializer="raw")[-1])
+ latin_string, serializer='raw')[-1])
# And again w/o a specific serializer to check the
# code where we force unicode objects into a string.
self.assertEqual(unicode_string_as_utf8,
@@ -146,7 +146,7 @@ class test_Serialization(TestCase):
def test_json_encode(self):
self.assertEqual(registry.decode(
- registry.encode(py_data, serializer="json")[-1],
+ registry.encode(py_data, serializer='json')[-1],
content_type='application/json',
content_encoding='utf-8'),
registry.decode(
@@ -167,7 +167,7 @@ class test_Serialization(TestCase):
def test_msgpack_encode(self):
register_msgpack()
self.assertEqual(registry.decode(
- registry.encode(msgpack_py_data, serializer="msgpack")[-1],
+ registry.encode(msgpack_py_data, serializer='msgpack')[-1],
content_type='application/x-msgpack',
content_encoding='binary'),
registry.decode(
@@ -188,7 +188,7 @@ class test_Serialization(TestCase):
def test_yaml_encode(self):
register_yaml()
self.assertEqual(registry.decode(
- registry.encode(py_data, serializer="yaml")[-1],
+ registry.encode(py_data, serializer='yaml')[-1],
content_type='application/x-yaml',
content_encoding='utf-8'),
registry.decode(
@@ -206,41 +206,41 @@ class test_Serialization(TestCase):
def test_pickle_encode(self):
self.assertEqual(pickle_data,
registry.encode(py_data,
- serializer="pickle")[-1])
+ serializer='pickle')[-1])
def test_register(self):
register(None, None, None, None)
def test_unregister(self):
with self.assertRaises(SerializerNotInstalled):
- unregister("nonexisting")
- registry.encode("foo", serializer="pickle")
- unregister("pickle")
+ unregister('nonexisting')
+ registry.encode('foo', serializer='pickle')
+ unregister('pickle')
with self.assertRaises(SerializerNotInstalled):
- registry.encode("foo", serializer="pickle")
+ registry.encode('foo', serializer='pickle')
register_pickle()
def test_set_default_serializer_missing(self):
with self.assertRaises(SerializerNotInstalled):
- registry._set_default_serializer("nonexisting")
+ registry._set_default_serializer('nonexisting')
def test_encode_missing(self):
with self.assertRaises(SerializerNotInstalled):
- registry.encode("foo", serializer="nonexisting")
+ registry.encode('foo', serializer='nonexisting')
def test_raw_encode(self):
- self.assertTupleEqual(raw_encode("foo".encode("utf-8")),
- ("application/data", "binary",
- "foo".encode("utf-8")))
+ self.assertTupleEqual(raw_encode('foo'.encode('utf-8')),
+ ('application/data', 'binary',
+ 'foo'.encode('utf-8')))
- @mask_modules("yaml")
+ @mask_modules('yaml')
def test_register_yaml__no_yaml(self):
register_yaml()
with self.assertRaises(SerializerNotInstalled):
- decode("foo", "application/x-yaml", "utf-8")
+ decode('foo', 'application/x-yaml', 'utf-8')
- @mask_modules("msgpack")
+ @mask_modules('msgpack')
def test_register_msgpack__no_msgpack(self):
register_msgpack()
with self.assertRaises(SerializerNotInstalled):
- decode("foo", "application/x-msgpack", "utf-8")
+ decode('foo', 'application/x-msgpack', 'utf-8')
diff --git a/kombu/tests/test_simple.py b/kombu/tests/test_simple.py
index 130708bf..7580e650 100644
--- a/kombu/tests/test_simple.py
+++ b/kombu/tests/test_simple.py
@@ -17,7 +17,7 @@ class SimpleBase(TestCase):
if not isinstance(q, Queue):
q = self.__class__.__name__
if name:
- q = "%s.%s" % (q, name)
+ q = '%s.%s' % (q, name)
return self._Queue(q, *args, **kwargs)
def _Queue(self, *args, **kwargs):
@@ -25,7 +25,7 @@ class SimpleBase(TestCase):
def setUp(self):
if not self.abstract:
- self.connection = BrokerConnection(transport="memory")
+ self.connection = BrokerConnection(transport='memory')
self.q = self.Queue(None, no_ack=True)
def tearDown(self):
@@ -36,42 +36,42 @@ class SimpleBase(TestCase):
def test_produce__consume(self):
if self.abstract:
return
- q = self.Queue("test_produce__consume", no_ack=True)
+ q = self.Queue('test_produce__consume', no_ack=True)
- q.put({"hello": "Simple"})
+ q.put({'hello': 'Simple'})
- self.assertEqual(q.get(timeout=1).payload, {"hello": "Simple"})
+ self.assertEqual(q.get(timeout=1).payload, {'hello': 'Simple'})
with self.assertRaises(Empty):
q.get(timeout=0.1)
def test_produce__basic_get(self):
if self.abstract:
return
- q = self.Queue("test_produce__basic_get", no_ack=True)
- q.put({"hello": "SimpleSync"})
- self.assertEqual(q.get_nowait().payload, {"hello": "SimpleSync"})
+ q = self.Queue('test_produce__basic_get', no_ack=True)
+ q.put({'hello': 'SimpleSync'})
+ self.assertEqual(q.get_nowait().payload, {'hello': 'SimpleSync'})
with self.assertRaises(Empty):
q.get_nowait()
- q.put({"hello": "SimpleSync"})
- self.assertEqual(q.get(block=False).payload, {"hello": "SimpleSync"})
+ q.put({'hello': 'SimpleSync'})
+ self.assertEqual(q.get(block=False).payload, {'hello': 'SimpleSync'})
with self.assertRaises(Empty):
q.get(block=False)
def test_clear(self):
if self.abstract:
return
- q = self.Queue("test_clear", no_ack=True)
+ q = self.Queue('test_clear', no_ack=True)
for i in range(10):
- q.put({"hello": "SimplePurge%d" % (i, )})
+ q.put({'hello': 'SimplePurge%d' % (i, )})
self.assertEqual(q.clear(), 10)
def test_enter_exit(self):
if self.abstract:
return
- q = self.Queue("test_enter_exit")
+ q = self.Queue('test_enter_exit')
q.close = Mock()
self.assertIs(q.__enter__(), q)
@@ -81,10 +81,10 @@ class SimpleBase(TestCase):
def test_qsize(self):
if self.abstract:
return
- q = self.Queue("test_clear", no_ack=True)
+ q = self.Queue('test_clear', no_ack=True)
for i in range(10):
- q.put({"hello": "SimplePurge%d" % (i, )})
+ q.put({'hello': 'SimplePurge%d' % (i, )})
self.assertEqual(q.qsize(), 10)
self.assertEqual(len(q), 10)
@@ -93,17 +93,17 @@ class SimpleBase(TestCase):
if self.abstract:
return
channel = self.connection.channel()
- q = self.Queue("test_autoclose", no_ack=True, channel=channel)
+ q = self.Queue('test_autoclose', no_ack=True, channel=channel)
q.close()
def test_custom_Queue(self):
if self.abstract:
return
n = self.__class__.__name__
- exchange = Exchange("%s-test.custom.Queue" % (n, ))
- queue = Queue("%s-test.custom.Queue" % (n, ),
+ exchange = Exchange('%s-test.custom.Queue' % (n, ))
+ queue = Queue('%s-test.custom.Queue' % (n, ),
exchange,
- "my.routing.key")
+ 'my.routing.key')
q = self.Queue(queue)
self.assertEqual(q.consumer.queues[0], queue)
@@ -112,7 +112,7 @@ class SimpleBase(TestCase):
def test_bool(self):
if self.abstract:
return
- q = self.Queue("test_nonzero")
+ q = self.Queue('test_nonzero')
self.assertTrue(q)
@@ -123,7 +123,7 @@ class test_SimpleQueue(SimpleBase):
return self.connection.SimpleQueue(*args, **kwargs)
def test_is_ack(self):
- q = self.Queue("test_is_no_ack")
+ q = self.Queue('test_is_no_ack')
self.assertFalse(q.no_ack)
@@ -134,5 +134,5 @@ class test_SimpleBuffer(SimpleBase):
return self.connection.SimpleBuffer(*args, **kwargs)
def test_is_no_ack(self):
- q = self.Queue("test_is_no_ack")
+ q = self.Queue('test_is_no_ack')
self.assertTrue(q.no_ack)
diff --git a/kombu/tests/test_utils.py b/kombu/tests/test_utils.py
index e9d5ac1c..9bf7f5d8 100644
--- a/kombu/tests/test_utils.py
+++ b/kombu/tests/test_utils.py
@@ -50,11 +50,11 @@ class test_utils(TestCase):
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0])
def test_reprkwargs(self):
- self.assertTrue(utils.reprkwargs({"foo": "bar", 1: 2, u"k": "v"}))
+ self.assertTrue(utils.reprkwargs({'foo': 'bar', 1: 2, u'k': 'v'}))
def test_reprcall(self):
- self.assertTrue(utils.reprcall("add",
- (2, 2), {"copy": True}))
+ self.assertTrue(utils.reprcall('add',
+ (2, 2), {'copy': True}))
class test_UUID(TestCase):
@@ -71,9 +71,9 @@ class test_UUID(TestCase):
@skip_if_module('__pypy__')
def test_uuid_without_ctypes(self):
- old_utils = sys.modules.pop("kombu.utils")
+ old_utils = sys.modules.pop('kombu.utils')
- @mask_modules("ctypes")
+ @mask_modules('ctypes')
def with_ctypes_masked():
from kombu.utils import ctypes, uuid
@@ -85,7 +85,7 @@ class test_UUID(TestCase):
try:
with_ctypes_masked()
finally:
- sys.modules["celery.utils"] = old_utils
+ sys.modules['celery.utils'] = old_utils
class test_Misc(TestCase):
@@ -95,8 +95,8 @@ class test_Misc(TestCase):
def f(**kwargs):
return kwargs
- kw = {u"foo": "foo",
- u"bar": "bar"}
+ kw = {u'foo': 'foo',
+ u'bar': 'bar'}
self.assertTrue(f(**utils.kwdict(kw)))
@@ -118,8 +118,8 @@ class test_emergency_dump_state(TestCase):
def test_dump(self, stdout, stderr):
fh = MyBytesIO()
- utils.emergency_dump_state({"foo": "bar"}, open_file=lambda n, m: fh)
- self.assertDictEqual(pickle.loads(fh.getvalue()), {"foo": "bar"})
+ utils.emergency_dump_state({'foo': 'bar'}, open_file=lambda n, m: fh)
+ self.assertDictEqual(pickle.loads(fh.getvalue()), {'foo': 'bar'})
self.assertTrue(stderr.getvalue())
self.assertFalse(stdout.getvalue())
@@ -128,9 +128,9 @@ class test_emergency_dump_state(TestCase):
fh = MyStringIO()
def raise_something(*args, **kwargs):
- raise KeyError("foo")
+ raise KeyError('foo')
- utils.emergency_dump_state({"foo": "bar"}, open_file=lambda n, m: fh,
+ utils.emergency_dump_state({'foo': 'bar'}, open_file=lambda n, m: fh,
dump=raise_something)
self.assertIn("'foo': 'bar'", fh.getvalue())
self.assertTrue(stderr.getvalue())
@@ -213,7 +213,7 @@ class test_cached_property(TestCase):
def foo(self, value):
self.xx = 10
- desc = X.__dict__["foo"]
+ desc = X.__dict__['foo']
self.assertIs(X.foo, desc)
self.assertIs(desc.__get__(None), desc)
diff --git a/kombu/tests/transport/test_amqplib.py b/kombu/tests/transport/test_amqplib.py
index 226e5b31..803af460 100644
--- a/kombu/tests/transport/test_amqplib.py
+++ b/kombu/tests/transport/test_amqplib.py
@@ -39,8 +39,8 @@ class test_Channel(TestCase):
self.assertFalse(self.channel.no_ack_consumers)
def test_prepare_message(self):
- x = self.channel.prepare_message("foobar", 10,
- "application/data", "utf-8",
+ x = self.channel.prepare_message('foobar', 10,
+ 'application/data', 'utf-8',
properties={})
self.assertTrue(x)
@@ -56,22 +56,22 @@ class test_Channel(TestCase):
self.assertIsNone(self.channel.connection)
def test_basic_consume_registers_ack_status(self):
- self.channel.wait_returns = "my-consumer-tag"
- self.channel.basic_consume("foo", no_ack=True)
- self.assertIn("my-consumer-tag", self.channel.no_ack_consumers)
+ self.channel.wait_returns = 'my-consumer-tag'
+ self.channel.basic_consume('foo', no_ack=True)
+ self.assertIn('my-consumer-tag', self.channel.no_ack_consumers)
- self.channel.wait_returns = "other-consumer-tag"
- self.channel.basic_consume("bar", no_ack=False)
- self.assertNotIn("other-consumer-tag", self.channel.no_ack_consumers)
+ self.channel.wait_returns = 'other-consumer-tag'
+ self.channel.basic_consume('bar', no_ack=False)
+ self.assertNotIn('other-consumer-tag', self.channel.no_ack_consumers)
- self.channel.basic_cancel("my-consumer-tag")
- self.assertNotIn("my-consumer-tag", self.channel.no_ack_consumers)
+ self.channel.basic_cancel('my-consumer-tag')
+ self.assertNotIn('my-consumer-tag', self.channel.no_ack_consumers)
class test_Transport(TestCase):
def setUp(self):
- self.connection = BrokerConnection("amqplib://")
+ self.connection = BrokerConnection('amqplib://')
self.transport = self.connection.transport
def test_create_channel(self):
@@ -92,13 +92,13 @@ class test_Transport(TestCase):
vars(self).update(kwargs)
self.transport.Connection = Conn
- self.transport.client.hostname = "localhost"
+ self.transport.client.hostname = 'localhost'
conn1 = self.transport.establish_connection()
- self.assertEqual(conn1.host, "127.0.0.1:5672")
+ self.assertEqual(conn1.host, '127.0.0.1:5672')
- self.transport.client.hostname = "example.com"
+ self.transport.client.hostname = 'example.com'
conn2 = self.transport.establish_connection()
- self.assertEqual(conn2.host, "example.com:5672")
+ self.assertEqual(conn2.host, 'example.com:5672')
def test_close_connection(self):
connection = Mock()
@@ -116,15 +116,15 @@ class test_Transport(TestCase):
connection.channels = {1: 1, 2: 2}
self.assertTrue(self.transport.verify_connection(connection))
- @mask_modules("ssl")
+ @mask_modules('ssl')
def test_import_no_ssl(self):
- pm = sys.modules.pop("kombu.transport.amqplib")
+ pm = sys.modules.pop('kombu.transport.amqplib')
try:
from kombu.transport.amqplib import SSLError
- self.assertEqual(SSLError.__module__, "kombu.transport.amqplib")
+ self.assertEqual(SSLError.__module__, 'kombu.transport.amqplib')
finally:
if pm is not None:
- sys.modules["kombu.transport.amqplib"] = pm
+ sys.modules['kombu.transport.amqplib'] = pm
class test_amqplib(TestCase):
@@ -135,8 +135,8 @@ class test_amqplib(TestCase):
Connection = MockConnection
c = BrokerConnection(port=None, transport=Transport).connect()
- self.assertEqual(c["host"],
- "127.0.0.1:%s" % (Transport.default_port, ))
+ self.assertEqual(c['host'],
+ '127.0.0.1:%s' % (Transport.default_port, ))
def test_custom_port(self):
@@ -144,4 +144,4 @@ class test_amqplib(TestCase):
Connection = MockConnection
c = BrokerConnection(port=1337, transport=Transport).connect()
- self.assertEqual(c["host"], "127.0.0.1:1337")
+ self.assertEqual(c['host'], '127.0.0.1:1337')
diff --git a/kombu/tests/transport/test_base.py b/kombu/tests/transport/test_base.py
index d8822b18..691e132c 100644
--- a/kombu/tests/transport/test_base.py
+++ b/kombu/tests/transport/test_base.py
@@ -11,13 +11,13 @@ from kombu.tests.utils import Mock
class test_StdChannel(TestCase):
def setUp(self):
- self.conn = BrokerConnection("memory://")
+ self.conn = BrokerConnection('memory://')
self.channel = self.conn.channel()
self.channel.queues.clear()
self.conn.connection.state.clear()
def test_Consumer(self):
- q = Queue("foo")
+ q = Queue('foo')
print(self.channel.queues)
cons = self.channel.Consumer(q)
self.assertIsInstance(cons, Consumer)
@@ -34,27 +34,27 @@ class test_StdChannel(TestCase):
def test_interface_after_reply_message_received(self):
self.assertIsNone(StdChannel().after_reply_message_received(
- Queue("foo")))
+ Queue('foo')))
class test_Message(TestCase):
def setUp(self):
- self.conn = BrokerConnection("memory://")
+ self.conn = BrokerConnection('memory://')
self.channel = self.conn.channel()
self.message = Message(self.channel, delivery_tag=313)
def test_ack_respects_no_ack_consumers(self):
- self.channel.no_ack_consumers = set(["abc"])
- self.message.delivery_info["consumer_tag"] = "abc"
+ self.channel.no_ack_consumers = set(['abc'])
+ self.message.delivery_info['consumer_tag'] = 'abc'
ack = self.channel.basic_ack = Mock()
self.message.ack()
- self.assertNotEqual(self.message._state, "ACK")
+ self.assertNotEqual(self.message._state, 'ACK')
self.assertFalse(ack.called)
def test_ack_missing_consumer_tag(self):
- self.channel.no_ack_consumers = set(["abc"])
+ self.channel.no_ack_consumers = set(['abc'])
self.message.delivery_info = {}
ack = self.channel.basic_ack = Mock()
@@ -63,7 +63,7 @@ class test_Message(TestCase):
def test_ack_not_no_ack(self):
self.channel.no_ack_consumers = set()
- self.message.delivery_info["consumer_tag"] = "abc"
+ self.message.delivery_info['consumer_tag'] = 'abc'
ack = self.channel.basic_ack = Mock()
self.message.ack()
@@ -76,7 +76,7 @@ class test_Message(TestCase):
def test_ack_log_error_when_error(self):
ack = self.message.ack = Mock()
- ack.side_effect = KeyError("foo")
+ ack.side_effect = KeyError('foo')
logger = Mock()
self.message.ack_log_error(logger, KeyError)
ack.assert_called_with()
diff --git a/kombu/tests/transport/test_memory.py b/kombu/tests/transport/test_memory.py
index 1942a611..2865d6a4 100644
--- a/kombu/tests/transport/test_memory.py
+++ b/kombu/tests/transport/test_memory.py
@@ -13,14 +13,14 @@ from kombu.tests.utils import TestCase
class test_MemoryTransport(TestCase):
def setUp(self):
- self.c = BrokerConnection(transport="memory")
- self.e = Exchange("test_transport_memory")
- self.q = Queue("test_transport_memory",
+ self.c = BrokerConnection(transport='memory')
+ self.e = Exchange('test_transport_memory')
+ self.q = Queue('test_transport_memory',
exchange=self.e,
- routing_key="test_transport_memory")
- self.q2 = Queue("test_transport_memory2",
+ routing_key='test_transport_memory')
+ self.q2 = Queue('test_transport_memory2',
exchange=self.e,
- routing_key="test_transport_memory2")
+ routing_key='test_transport_memory2')
def test_produce_consume_noack(self):
channel = self.c.channel()
@@ -28,7 +28,7 @@ class test_MemoryTransport(TestCase):
consumer = Consumer(channel, self.q, no_ack=True)
for i in range(10):
- producer.publish({"foo": i}, routing_key="test_transport_memory")
+ producer.publish({'foo': i}, routing_key='test_transport_memory')
_received = []
@@ -53,9 +53,9 @@ class test_MemoryTransport(TestCase):
self.q2(channel).declare()
for i in range(10):
- producer.publish({"foo": i}, routing_key="test_transport_memory")
+ producer.publish({'foo': i}, routing_key='test_transport_memory')
for i in range(10):
- producer.publish({"foo": i}, routing_key="test_transport_memory2")
+ producer.publish({'foo': i}, routing_key='test_transport_memory2')
_received1 = []
_received2 = []
@@ -82,15 +82,15 @@ class test_MemoryTransport(TestCase):
self.assertEqual(len(_received1) + len(_received2), 20)
# compression
- producer.publish({"compressed": True},
- routing_key="test_transport_memory",
- compression="zlib")
+ producer.publish({'compressed': True},
+ routing_key='test_transport_memory',
+ compression='zlib')
m = self.q(channel).get()
- self.assertDictEqual(m.payload, {"compressed": True})
+ self.assertDictEqual(m.payload, {'compressed': True})
# queue.delete
for i in range(10):
- producer.publish({"foo": i}, routing_key="test_transport_memory")
+ producer.publish({'foo': i}, routing_key='test_transport_memory')
self.assertTrue(self.q(channel).get())
self.q(channel).delete()
self.q(channel).declare()
@@ -98,7 +98,7 @@ class test_MemoryTransport(TestCase):
# queue.purge
for i in range(10):
- producer.publish({"foo": i}, routing_key="test_transport_memory2")
+ producer.publish({'foo': i}, routing_key='test_transport_memory2')
self.assertTrue(self.q2(channel).get())
self.q2(channel).purge()
self.assertIsNone(self.q2(channel).get())
@@ -122,7 +122,7 @@ class test_MemoryTransport(TestCase):
class Cycle(object):
def get(self, timeout=None):
- return ("foo", "foo"), c1
+ return ('foo', 'foo'), c1
self.c.transport.cycle = Cycle()
with self.assertRaises(KeyError):
@@ -132,6 +132,6 @@ class test_MemoryTransport(TestCase):
chan = self.c.channel()
chan.queues.clear()
- x = chan._queue_for("foo")
+ x = chan._queue_for('foo')
self.assertTrue(x)
- self.assertIs(chan._queue_for("foo"), x)
+ self.assertIs(chan._queue_for('foo'), x)
diff --git a/kombu/tests/transport/test_mongodb.py b/kombu/tests/transport/test_mongodb.py
index 43def853..1d4145be 100644
--- a/kombu/tests/transport/test_mongodb.py
+++ b/kombu/tests/transport/test_mongodb.py
@@ -15,52 +15,52 @@ class MockConnection(dict):
class test_mongodb(TestCase):
- @skip_if_not_module("pymongo")
+ @skip_if_not_module('pymongo')
def test_url_parser(self):
from kombu.transport import mongodb
from pymongo.errors import ConfigurationError
raise SkipTest(
- "Test is functional: it actually connects to mongod")
+ 'Test is functional: it actually connects to mongod')
class Transport(mongodb.Transport):
Connection = MockConnection
- url = "mongodb://"
+ url = 'mongodb://'
c = BrokerConnection(url, transport=Transport).connect()
client = c.channels[0].client
- self.assertEquals(client.name, "kombu_default")
- self.assertEquals(client.connection.host, "127.0.0.1")
+ self.assertEquals(client.name, 'kombu_default')
+ self.assertEquals(client.connection.host, '127.0.0.1')
- url = "mongodb://localhost"
+ url = 'mongodb://localhost'
c = BrokerConnection(url, transport=Transport).connect()
client = c.channels[0].client
- self.assertEquals(client.name, "kombu_default")
+ self.assertEquals(client.name, 'kombu_default')
- url = "mongodb://localhost/dbname"
+ url = 'mongodb://localhost/dbname'
c = BrokerConnection(url, transport=Transport).connect()
client = c.channels[0].client
- self.assertEquals(client.name, "dbname")
+ self.assertEquals(client.name, 'dbname')
- url = "mongodb://localhost,example.org:29017/dbname"
+ url = 'mongodb://localhost,example.org:29017/dbname'
c = BrokerConnection(url, transport=Transport).connect()
client = c.channels[0].client
nodes = client.connection.nodes
self.assertEquals(len(nodes), 2)
- self.assertTrue(("example.org", 29017) in nodes)
- self.assertEquals(client.name, "dbname")
+ self.assertTrue(('example.org', 29017) in nodes)
+ self.assertEquals(client.name, 'dbname')
# Passing options breaks kombu's _init_params method
- # url = "mongodb://localhost,localhost2:29017/dbname?safe=true"
+ # url = 'mongodb://localhost,localhost2:29017/dbname?safe=true'
# c = BrokerConnection(url, transport=Transport).connect()
# client = c.channels[0].client
- url = "mongodb://localhost:27017,localhost2:29017/dbname"
+ url = 'mongodb://localhost:27017,localhost2:29017/dbname'
c = BrokerConnection(url, transport=Transport).connect()
client = c.channels[0].client
- url = "mongodb://username:password@localhost/dbname"
+ url = 'mongodb://username:password@localhost/dbname'
c = BrokerConnection(url, transport=Transport).connect()
# Assuming there's no user 'username' with password 'password'
# configured in mongodb
diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py
index c14c8a9a..3d9fb361 100644
--- a/kombu/tests/transport/test_redis.py
+++ b/kombu/tests/transport/test_redis.py
@@ -52,7 +52,7 @@ class Client(object):
self.connection = self._sconnection(self)
def bgsave(self):
- self._called.append("BGSAVE")
+ self._called.append('BGSAVE')
if self.bgsave_raises_ResponseError:
raise ResponseError()
@@ -95,7 +95,7 @@ class Client(object):
cmd, queues = self.connection._sock.data.pop()
assert cmd == type
self.connection._sock.data = []
- if type == "BRPOP":
+ if type == 'BRPOP':
item = self.brpop(queues, 0.001)
if item:
return item
@@ -156,7 +156,7 @@ class Client(object):
self._sock.data.append((cmd, args))
def info(self):
- return {"foo": 1}
+ return {'foo': 1}
def pubsub(self, *args, **kwargs):
connection = self.connection
@@ -221,18 +221,18 @@ class test_Channel(TestCase):
self.channel = self.connection.channel()
def test_basic_consume_when_fanout_queue(self):
- self.channel.exchange_declare(exchange="txconfan", type="fanout")
- self.channel.queue_declare(queue="txconfanq")
- self.channel.queue_bind(queue="txconfanq", exchange="txconfan")
+ self.channel.exchange_declare(exchange='txconfan', type='fanout')
+ self.channel.queue_declare(queue='txconfanq')
+ self.channel.queue_bind(queue='txconfanq', exchange='txconfan')
- self.assertIn("txconfanq", self.channel._fanout_queues)
- self.channel.basic_consume("txconfanq", False, None, 1)
- self.assertIn("txconfanq", self.channel.active_fanout_queues)
- self.assertEqual(self.channel._fanout_to_queue.get("txconfan"),
- "txconfanq")
+ self.assertIn('txconfanq', self.channel._fanout_queues)
+ self.channel.basic_consume('txconfanq', False, None, 1)
+ self.assertIn('txconfanq', self.channel.active_fanout_queues)
+ self.assertEqual(self.channel._fanout_to_queue.get('txconfan'),
+ 'txconfanq')
def test_basic_cancel_unknown_delivery_tag(self):
- self.assertIsNone(self.channel.basic_cancel("txaseqwewq"))
+ self.assertIsNone(self.channel.basic_cancel('txaseqwewq'))
def test_subscribe_no_queues(self):
self.channel.subclient = Mock()
@@ -243,14 +243,14 @@ class test_Channel(TestCase):
def test_subscribe(self):
self.channel.subclient = Mock()
- self.channel.active_fanout_queues.add("a")
- self.channel.active_fanout_queues.add("b")
- self.channel._fanout_queues.update(a="a", b="b")
+ self.channel.active_fanout_queues.add('a')
+ self.channel.active_fanout_queues.add('b')
+ self.channel._fanout_queues.update(a='a', b='b')
self.channel._subscribe()
self.assertTrue(self.channel.subclient.subscribe.called)
s_args, _ = self.channel.subclient.subscribe.call_args
- self.assertItemsEqual(s_args[0], ["a", "b"])
+ self.assertItemsEqual(s_args[0], ['a', 'b'])
self.channel.subclient.connection._sock = None
self.channel._subscribe()
@@ -259,43 +259,43 @@ class test_Channel(TestCase):
def test_handle_unsubscribe_message(self):
s = self.channel.subclient
s.subscribed = True
- self.channel._handle_message(s, ["unsubscribe", "a", 0])
+ self.channel._handle_message(s, ['unsubscribe', 'a', 0])
self.assertFalse(s.subscribed)
def test_handle_pmessage_message(self):
self.assertDictEqual(self.channel._handle_message(
self.channel.subclient,
- ["pmessage", "pattern", "channel", "data"]),
- {"type": "pmessage",
- "pattern": "pattern",
- "channel": "channel",
- "data": "data"})
+ ['pmessage', 'pattern', 'channel', 'data']),
+ {'type': 'pmessage',
+ 'pattern': 'pattern',
+ 'channel': 'channel',
+ 'data': 'data'})
def test_handle_message(self):
self.assertDictEqual(self.channel._handle_message(
self.channel.subclient,
- ["type", "channel", "data"]),
- {"type": "type",
- "pattern": None,
- "channel": "channel",
- "data": "data"})
+ ['type', 'channel', 'data']),
+ {'type': 'type',
+ 'pattern': None,
+ 'channel': 'channel',
+ 'data': 'data'})
def test_brpop_start_but_no_queues(self):
self.assertIsNone(self.channel._brpop_start())
def test_receive(self):
s = self.channel.subclient = Mock()
- self.channel._fanout_to_queue["a"] = "b"
- s.parse_response.return_value = ["message", "a",
- dumps({"hello": "world"})]
+ self.channel._fanout_to_queue['a'] = 'b'
+ s.parse_response.return_value = ['message', 'a',
+ dumps({'hello': 'world'})]
payload, queue = self.channel._receive()
- self.assertDictEqual(payload, {"hello": "world"})
- self.assertEqual(queue, "b")
+ self.assertDictEqual(payload, {'hello': 'world'})
+ self.assertEqual(queue, 'b')
def test_receive_raises(self):
self.channel._in_listen = True
s = self.channel.subclient = Mock()
- s.parse_response.side_effect = KeyError("foo")
+ s.parse_response.side_effect = KeyError('foo')
with self.assertRaises(redis.Empty):
self.channel._receive()
@@ -310,14 +310,14 @@ class test_Channel(TestCase):
def test_receive_different_message_Type(self):
s = self.channel.subclient = Mock()
- s.parse_response.return_value = ["pmessage", "/foo/", 0, "data"]
+ s.parse_response.return_value = ['pmessage', '/foo/', 0, 'data']
with self.assertRaises(redis.Empty):
self.channel._receive()
def test_brpop_read_raises(self):
c = self.channel.client = Mock()
- c.parse_response.side_effect = KeyError("foo")
+ c.parse_response.side_effect = KeyError('foo')
with self.assertRaises(redis.Empty):
self.channel._brpop_read()
@@ -334,20 +334,20 @@ class test_Channel(TestCase):
def test_poll_error(self):
c = self.channel.client = Mock()
c.parse_response = Mock()
- self.channel._poll_error("BRPOP")
+ self.channel._poll_error('BRPOP')
- c.parse_response.assert_called_with("BRPOP")
+ c.parse_response.assert_called_with('BRPOP')
- c.parse_response.side_effect = KeyError("foo")
- self.assertIsNone(self.channel._poll_error("BRPOP"))
+ c.parse_response.side_effect = KeyError('foo')
+ self.assertIsNone(self.channel._poll_error('BRPOP'))
def test_put_fanout(self):
self.channel._in_poll = False
c = self.channel.client = Mock()
- body = {"hello": "world"}
- self.channel._put_fanout("exchange", body)
- c.publish.assert_called_with("exchange", dumps(body))
+ body = {'hello': 'world'}
+ self.channel._put_fanout('exchange', body)
+ c.publish.assert_called_with('exchange', dumps(body))
def test_delete(self):
x = self.channel
@@ -355,20 +355,20 @@ class test_Channel(TestCase):
delete = x.client.delete = Mock()
srem = x.client.srem = Mock()
- x._delete("queue", "exchange", "routing_key", None)
- delete.assert_has_call("queue")
- srem.assert_has_call(x.keyprefix_queue % ("exchange", ),
- x.sep.join(["routing_key", "", "queue"]))
+ x._delete('queue', 'exchange', 'routing_key', None)
+ delete.assert_has_call('queue')
+ srem.assert_has_call(x.keyprefix_queue % ('exchange', ),
+ x.sep.join(['routing_key', '', 'queue']))
def test_has_queue(self):
self.channel._in_poll = False
exists = self.channel.client.exists = Mock()
exists.return_value = True
- self.assertTrue(self.channel._has_queue("foo"))
- exists.assert_has_call("foo")
+ self.assertTrue(self.channel._has_queue('foo'))
+ exists.assert_has_call('foo')
exists.return_value = False
- self.assertFalse(self.channel._has_queue("foo"))
+ self.assertFalse(self.channel._has_queue('foo'))
def test_close_when_closed(self):
self.channel.closed = True
@@ -382,27 +382,27 @@ class test_Channel(TestCase):
c.connection.disconnect.assert_called_with()
def test_invalid_database_raises_ValueError(self):
- self.channel.connection.client.virtual_host = "xfeqwewkfk"
+ self.channel.connection.client.virtual_host = 'xfeqwewkfk'
with self.assertRaises(ValueError):
self.channel._create_client()
- @skip_if_not_module("redis")
+ @skip_if_not_module('redis')
def test_get_client(self):
import redis as R
KombuRedis = redis.Channel._get_client(self.channel)
self.assertTrue(KombuRedis)
- Rv = getattr(R, "__version__")
+ Rv = getattr(R, '__version__')
try:
- R.__version__ = "2.4.0"
+ R.__version__ = '2.4.0'
with self.assertRaises(VersionMismatch):
redis.Channel._get_client(self.channel)
finally:
if Rv is not None:
R.__version__ = Rv
- @skip_if_not_module("redis")
+ @skip_if_not_module('redis')
def test_get_response_error(self):
from redis.exceptions import ResponseError
self.assertIs(redis.Channel._get_response_error(self.channel),
@@ -421,19 +421,19 @@ class test_Channel(TestCase):
self.assertTrue(self.channel._avail_client)
cc.assert_called_with()
- @skip_if_not_module("redis")
+ @skip_if_not_module('redis')
def test_transport_get_errors(self):
self.assertTrue(redis.Transport._get_errors(self.connection.transport))
- @skip_if_not_module("redis")
+ @skip_if_not_module('redis')
def test_transport_get_errors_when_InvalidData_used(self):
from redis import exceptions
class ID(Exception):
pass
- DataError = getattr(exceptions, "DataError", None)
- InvalidData = getattr(exceptions, "InvalidData", None)
+ DataError = getattr(exceptions, 'DataError', None)
+ InvalidData = getattr(exceptions, 'InvalidData', None)
exceptions.InvalidData = ID
exceptions.DataError = None
try:
@@ -462,28 +462,28 @@ class test_Channel(TestCase):
# which raises a channel error so that the consumer/publisher
# can recover by redeclaring the required entities.
with self.assertRaises(InconsistencyError):
- self.channel.get_table("celery")
+ self.channel.get_table('celery')
class test_Redis(TestCase):
def setUp(self):
self.connection = BrokerConnection(transport=Transport)
- self.exchange = Exchange("test_Redis", type="direct")
- self.queue = Queue("test_Redis", self.exchange, "test_Redis")
+ self.exchange = Exchange('test_Redis', type='direct')
+ self.queue = Queue('test_Redis', self.exchange, 'test_Redis')
def tearDown(self):
self.connection.close()
def test_publish__get(self):
channel = self.connection.channel()
- producer = Producer(channel, self.exchange, routing_key="test_Redis")
+ producer = Producer(channel, self.exchange, routing_key='test_Redis')
self.queue(channel).declare()
- producer.publish({"hello": "world"})
+ producer.publish({'hello': 'world'})
self.assertDictEqual(self.queue(channel).get().payload,
- {"hello": "world"})
+ {'hello': 'world'})
self.assertIsNone(self.queue(channel).get())
self.assertIsNone(self.queue(channel).get())
self.assertIsNone(self.queue(channel).get())
@@ -491,10 +491,10 @@ class test_Redis(TestCase):
def test_publish__consume(self):
connection = BrokerConnection(transport=Transport)
channel = connection.channel()
- producer = Producer(channel, self.exchange, routing_key="test_Redis")
+ producer = Producer(channel, self.exchange, routing_key='test_Redis')
consumer = Consumer(channel, self.queue)
- producer.publish({"hello2": "world2"})
+ producer.publish({'hello2': 'world2'})
_received = []
def callback(message_data, message):
@@ -515,13 +515,13 @@ class test_Redis(TestCase):
def test_purge(self):
channel = self.connection.channel()
- producer = Producer(channel, self.exchange, routing_key="test_Redis")
+ producer = Producer(channel, self.exchange, routing_key='test_Redis')
self.queue(channel).declare()
for i in range(10):
- producer.publish({"hello": "world-%s" % (i, )})
+ producer.publish({'hello': 'world-%s' % (i, )})
- self.assertEqual(channel._size("test_Redis"), 10)
+ self.assertEqual(channel._size('test_Redis'), 10)
self.assertEqual(self.queue(channel).purge(), 10)
channel.close()
@@ -530,16 +530,16 @@ class test_Redis(TestCase):
transport=Transport).channel()
self.assertEqual(c1.client.db, 1)
- c2 = BrokerConnection(virtual_host="1",
+ c2 = BrokerConnection(virtual_host='1',
transport=Transport).channel()
self.assertEqual(c2.client.db, 1)
- c3 = BrokerConnection(virtual_host="/1",
+ c3 = BrokerConnection(virtual_host='/1',
transport=Transport).channel()
self.assertEqual(c3.client.db, 1)
with self.assertRaises(Exception):
- BrokerConnection(virtual_host="/foo",
+ BrokerConnection(virtual_host='/foo',
transport=Transport).channel()
def test_db_port(self):
@@ -574,7 +574,7 @@ class test_Redis(TestCase):
def test_get__Empty(self):
channel = self.connection.channel()
with self.assertRaises(Empty):
- channel._get("does-not-exist")
+ channel._get('does-not-exist')
channel.close()
def test_get_client(self):
@@ -610,7 +610,7 @@ def _redis_modules():
class ResponseError(Exception):
pass
- exceptions = types.ModuleType("redis.exceptions")
+ exceptions = types.ModuleType('redis.exceptions')
exceptions.ConnectionError = ConnectionError
exceptions.AuthenticationError = AuthenticationError
exceptions.InvalidData = InvalidData
@@ -620,7 +620,7 @@ def _redis_modules():
class Redis(object):
pass
- myredis = types.ModuleType("redis")
+ myredis = types.ModuleType('redis')
myredis.exceptions = exceptions
myredis.Redis = Redis
@@ -703,7 +703,7 @@ class test_MultiChannelPoller(TestCase):
self.assertEqual(p._register.call_count, 1)
channel.client.connection._sock = Mock()
- p._chan_to_sock[(channel, channel.client, "BRPOP")] = True
+ p._chan_to_sock[(channel, channel.client, 'BRPOP')] = True
channel._in_poll = True
p._register_BRPOP(channel)
self.assertEqual(channel._brpop_start.call_count, 1)
@@ -717,7 +717,7 @@ class test_MultiChannelPoller(TestCase):
p._register = Mock()
p._register_LISTEN(channel)
- p._register.assert_called_with(channel, channel.subclient, "LISTEN")
+ p._register.assert_called_with(channel, channel.subclient, 'LISTEN')
self.assertEqual(p._register.call_count, 1)
self.assertEqual(channel._subscribe.call_count, 1)
@@ -753,7 +753,7 @@ class test_MultiChannelPoller(TestCase):
p.get()
def test_get_brpop_qos_allow(self):
- p, channel = self.create_get(queues=["a_queue"])
+ p, channel = self.create_get(queues=['a_queue'])
channel.qos.can_consume.return_value = True
with self.assertRaises(redis.Empty):
@@ -762,7 +762,7 @@ class test_MultiChannelPoller(TestCase):
p._register_BRPOP.assert_called_with(channel)
def test_get_brpop_qos_disallow(self):
- p, channel = self.create_get(queues=["a_queue"])
+ p, channel = self.create_get(queues=['a_queue'])
channel.qos.can_consume.return_value = False
with self.assertRaises(redis.Empty):
@@ -771,7 +771,7 @@ class test_MultiChannelPoller(TestCase):
self.assertFalse(p._register_BRPOP.called)
def test_get_listen(self):
- p, channel = self.create_get(fanouts=["f_queue"])
+ p, channel = self.create_get(fanouts=['f_queue'])
with self.assertRaises(redis.Empty):
p.get()
@@ -780,19 +780,19 @@ class test_MultiChannelPoller(TestCase):
def test_get_receives_ERR(self):
p, channel = self.create_get(events=[(1, eventio.ERR)])
- p._fd_to_chan[1] = (channel, "BRPOP")
+ p._fd_to_chan[1] = (channel, 'BRPOP')
with self.assertRaises(redis.Empty):
p.get()
- channel._poll_error.assert_called_with("BRPOP")
+ channel._poll_error.assert_called_with('BRPOP')
def test_get_receives_multiple(self):
p, channel = self.create_get(events=[(1, eventio.ERR),
(1, eventio.ERR)])
- p._fd_to_chan[1] = (channel, "BRPOP")
+ p._fd_to_chan[1] = (channel, 'BRPOP')
with self.assertRaises(redis.Empty):
p.get()
- channel._poll_error.assert_called_with("BRPOP")
+ channel._poll_error.assert_called_with('BRPOP')
diff --git a/kombu/tests/transport/test_sqlalchemy.py b/kombu/tests/transport/test_sqlalchemy.py
index b827bc88..8fb8e6dd 100644
--- a/kombu/tests/transport/test_sqlalchemy.py
+++ b/kombu/tests/transport/test_sqlalchemy.py
@@ -14,15 +14,15 @@ class test_sqlalchemy(TestCase):
try:
import sqlalchemy # noqa
except ImportError:
- raise SkipTest("sqlalchemy not installed")
- with patch("kombu.transport.sqlalchemy.Channel._open"):
- url = "sqlalchemy+sqlite://celerydb.sqlite"
+ raise SkipTest('sqlalchemy not installed')
+ with patch('kombu.transport.sqlalchemy.Channel._open'):
+ url = 'sqlalchemy+sqlite://celerydb.sqlite'
BrokerConnection(url).connect()
- url = "sqla+sqlite://celerydb.sqlite"
+ url = 'sqla+sqlite://celerydb.sqlite'
BrokerConnection(url).connect()
# Should prevent regression fixed by f187ccd
- url = "sqlb+sqlite://celerydb.sqlite"
+ url = 'sqlb+sqlite://celerydb.sqlite'
with self.assertRaises(KeyError):
BrokerConnection(url).connect()
diff --git a/kombu/tests/transport/test_transport.py b/kombu/tests/transport/test_transport.py
index cc3eeea7..151f4b56 100644
--- a/kombu/tests/transport/test_transport.py
+++ b/kombu/tests/transport/test_transport.py
@@ -12,19 +12,19 @@ class test_transport(TestCase):
def test_resolve_transport__no_class_name(self):
with self.assertRaises(KeyError):
- transport.resolve_transport("nonexistant")
+ transport.resolve_transport('nonexistant')
def test_resolve_transport_when_callable(self):
self.assertTupleEqual(transport.resolve_transport(
- lambda: "kombu.transport.memory.Transport"),
- ("kombu.transport.memory", "Transport"))
+ lambda: 'kombu.transport.memory.Transport'),
+ ('kombu.transport.memory', 'Transport'))
class test_transport_gettoq(TestCase):
- @patch("warnings.warn")
+ @patch('warnings.warn')
def test_compat(self, warn):
- x = transport._ghettoq("Redis", "redis", "redis")
+ x = transport._ghettoq('Redis', 'redis', 'redis')
- self.assertEqual(x(), "kombu.transport.redis.Transport")
+ self.assertEqual(x(), 'kombu.transport.redis.Transport')
self.assertTrue(warn.called)
diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py
index 301d71e7..d4a5fe3f 100644
--- a/kombu/tests/transport/virtual/test_base.py
+++ b/kombu/tests/transport/virtual/test_base.py
@@ -16,20 +16,20 @@ from kombu.tests.utils import Mock, redirect_stdouts
def client(**kwargs):
- return BrokerConnection(transport="kombu.transport.virtual.Transport",
+ return BrokerConnection(transport='kombu.transport.virtual.Transport',
**kwargs)
def memory_client():
- return BrokerConnection(transport="memory")
+ return BrokerConnection(transport='memory')
class test_BrokerState(TestCase):
def test_constructor(self):
s = virtual.BrokerState()
- self.assertTrue(hasattr(s, "exchanges"))
- self.assertTrue(hasattr(s, "bindings"))
+ self.assertTrue(hasattr(s, 'exchanges'))
+ self.assertTrue(hasattr(s, 'bindings'))
t = virtual.BrokerState(exchanges=16, bindings=32)
self.assertEqual(t.exchanges, 16)
@@ -95,66 +95,66 @@ class test_QoS(TestCase):
self.assertFalse(stdout.getvalue())
def test_get(self):
- self.q._delivered["foo"] = 1
- self.assertEqual(self.q.get("foo"), 1)
+ self.q._delivered['foo'] = 1
+ self.assertEqual(self.q.get('foo'), 1)
class test_Message(TestCase):
def test_create(self):
c = client().channel()
- data = c.prepare_message("the quick brown fox...")
- tag = data["properties"]["delivery_tag"] = uuid()
+ data = c.prepare_message('the quick brown fox...')
+ tag = data['properties']['delivery_tag'] = uuid()
message = c.message_to_python(data)
self.assertIsInstance(message, virtual.Message)
self.assertIs(message, c.message_to_python(message))
self.assertEqual(message.body,
- "the quick brown fox...".encode("utf-8"))
+ 'the quick brown fox...'.encode('utf-8'))
self.assertTrue(message.delivery_tag, tag)
def test_create_no_body(self):
virtual.Message(Mock(), {
- "body": None,
- "properties": {"delivery_tag": 1}})
+ 'body': None,
+ 'properties': {'delivery_tag': 1}})
def test_serializable(self):
c = client().channel()
- data = c.prepare_message("the quick brown fox...")
- tag = data["properties"]["delivery_tag"] = uuid()
+ data = c.prepare_message('the quick brown fox...')
+ tag = data['properties']['delivery_tag'] = uuid()
message = c.message_to_python(data)
dict_ = message.serializable()
- self.assertEqual(dict_["body"],
- "the quick brown fox...".encode("utf-8"))
- self.assertEqual(dict_["properties"]["delivery_tag"], tag)
+ self.assertEqual(dict_['body'],
+ 'the quick brown fox...'.encode('utf-8'))
+ self.assertEqual(dict_['properties']['delivery_tag'], tag)
class test_AbstractChannel(TestCase):
def test_get(self):
with self.assertRaises(NotImplementedError):
- virtual.AbstractChannel()._get("queue")
+ virtual.AbstractChannel()._get('queue')
def test_put(self):
with self.assertRaises(NotImplementedError):
- virtual.AbstractChannel()._put("queue", "m")
+ virtual.AbstractChannel()._put('queue', 'm')
def test_size(self):
- self.assertEqual(virtual.AbstractChannel()._size("queue"), 0)
+ self.assertEqual(virtual.AbstractChannel()._size('queue'), 0)
def test_purge(self):
with self.assertRaises(NotImplementedError):
- virtual.AbstractChannel()._purge("queue")
+ virtual.AbstractChannel()._purge('queue')
def test_delete(self):
with self.assertRaises(NotImplementedError):
- virtual.AbstractChannel()._delete("queue")
+ virtual.AbstractChannel()._delete('queue')
def test_new_queue(self):
- self.assertIsNone(virtual.AbstractChannel()._new_queue("queue"))
+ self.assertIsNone(virtual.AbstractChannel()._new_queue('queue'))
def test_has_queue(self):
- self.assertTrue(virtual.AbstractChannel()._has_queue("queue"))
+ self.assertTrue(virtual.AbstractChannel()._has_queue('queue'))
def test_poll(self):
@@ -181,20 +181,20 @@ class test_Channel(TestCase):
def test_exchange_declare(self):
c = self.channel
- c.exchange_declare("test_exchange_declare", "direct",
+ c.exchange_declare('test_exchange_declare', 'direct',
durable=True, auto_delete=True)
- self.assertIn("test_exchange_declare", c.state.exchanges)
+ self.assertIn('test_exchange_declare', c.state.exchanges)
# can declare again with same values
- c.exchange_declare("test_exchange_declare", "direct",
+ c.exchange_declare('test_exchange_declare', 'direct',
durable=True, auto_delete=True)
- self.assertIn("test_exchange_declare", c.state.exchanges)
+ self.assertIn('test_exchange_declare', c.state.exchanges)
# using different values raises NotEquivalentError
with self.assertRaises(virtual.NotEquivalentError):
- c.exchange_declare("test_exchange_declare", "direct",
+ c.exchange_declare('test_exchange_declare', 'direct',
durable=False, auto_delete=True)
- def test_exchange_delete(self, ex="test_exchange_delete"):
+ def test_exchange_delete(self, ex='test_exchange_delete'):
class PurgeChannel(virtual.Channel):
purged = []
@@ -204,13 +204,13 @@ class test_Channel(TestCase):
c = PurgeChannel(self.channel.connection)
- c.exchange_declare(ex, "direct", durable=True, auto_delete=True)
+ c.exchange_declare(ex, 'direct', durable=True, auto_delete=True)
self.assertIn(ex, c.state.exchanges)
self.assertNotIn(ex, c.state.bindings) # no bindings yet
c.exchange_delete(ex)
self.assertNotIn(ex, c.state.exchanges)
- c.exchange_declare(ex, "direct", durable=True, auto_delete=True)
+ c.exchange_declare(ex, 'direct', durable=True, auto_delete=True)
c.queue_declare(ex)
c.queue_bind(ex, ex, ex)
self.assertTrue(c.state.bindings[ex])
@@ -218,7 +218,7 @@ class test_Channel(TestCase):
self.assertNotIn(ex, c.state.bindings)
self.assertIn(ex, c.purged)
- def test_queue_delete__if_empty(self, n="test_queue_delete__if_empty"):
+ def test_queue_delete__if_empty(self, n='test_queue_delete__if_empty'):
class PurgeChannel(virtual.Channel):
purged = []
size = 30
@@ -244,7 +244,7 @@ class test_Channel(TestCase):
self.assertNotIn(n, c.state.bindings)
self.assertIn(n, c.purged)
- def test_queue_purge(self, n="test_queue_purge"):
+ def test_queue_purge(self, n='test_queue_purge'):
class PurgeChannel(virtual.Channel):
purged = []
@@ -260,35 +260,35 @@ class test_Channel(TestCase):
self.assertIn(n, c.purged)
def test_basic_publish__get__consume__restore(self,
- n="test_basic_publish"):
+ n='test_basic_publish'):
c = memory_client().channel()
c.exchange_declare(n)
c.queue_declare(n)
c.queue_bind(n, n, n)
- c.queue_declare(n + "2")
- c.queue_bind(n + "2", n, n)
+ c.queue_declare(n + '2')
+ c.queue_bind(n + '2', n, n)
- m = c.prepare_message("nthex quick brown fox...")
+ m = c.prepare_message('nthex quick brown fox...')
c.basic_publish(m, n, n)
r1 = c.message_to_python(c.basic_get(n))
self.assertTrue(r1)
self.assertEqual(r1.body,
- "nthex quick brown fox...".encode("utf-8"))
+ 'nthex quick brown fox...'.encode('utf-8'))
self.assertIsNone(c.basic_get(n))
consumer_tag = uuid()
- c.basic_consume(n + "2", False, consumer_tag=consumer_tag,
+ c.basic_consume(n + '2', False, consumer_tag=consumer_tag,
callback=lambda *a: None)
- self.assertIn(n + "2", c._active_queues)
+ self.assertIn(n + '2', c._active_queues)
r2, _ = c.drain_events()
r2 = c.message_to_python(r2)
self.assertEqual(r2.body,
- "nthex quick brown fox...".encode("utf-8"))
- self.assertEqual(r2.delivery_info["exchange"], n)
- self.assertEqual(r2.delivery_info["routing_key"], n)
+ 'nthex quick brown fox...'.encode('utf-8'))
+ self.assertEqual(r2.delivery_info['exchange'], n)
+ self.assertEqual(r2.delivery_info['routing_key'], n)
with self.assertRaises(virtual.Empty):
c.drain_events()
c.basic_cancel(consumer_tag)
@@ -296,7 +296,7 @@ class test_Channel(TestCase):
c._restore(r2)
r3 = c.message_to_python(c.basic_get(n))
self.assertTrue(r3)
- self.assertEqual(r3.body, "nthex quick brown fox...".encode("utf-8"))
+ self.assertEqual(r3.body, 'nthex quick brown fox...'.encode('utf-8'))
self.assertIsNone(c.basic_get(n))
def test_basic_ack(self):
@@ -308,7 +308,7 @@ class test_Channel(TestCase):
self.was_acked = True
self.channel._qos = MockQoS(self.channel)
- self.channel.basic_ack("foo")
+ self.channel.basic_ack('foo')
self.assertTrue(self.channel._qos.was_acked)
def test_basic_recover__requeue(self):
@@ -336,8 +336,8 @@ class test_Channel(TestCase):
self.assertEqual(errors[0][1], 1)
self.assertFalse(q._delivered)
- @patch("kombu.transport.virtual.emergency_dump_state")
- @patch("kombu.transport.virtual.say")
+ @patch('kombu.transport.virtual.emergency_dump_state')
+ @patch('kombu.transport.virtual.say')
def test_restore_unacked_once_when_unrestored(self, say,
emergency_dump_state):
q = self.channel.qos
@@ -373,20 +373,20 @@ class test_Channel(TestCase):
self.was_rejected = True
self.channel._qos = MockQoS(self.channel)
- self.channel.basic_reject("foo")
+ self.channel.basic_reject('foo')
self.assertTrue(self.channel._qos.was_rejected)
def test_basic_qos(self):
self.channel.basic_qos(prefetch_count=128)
self.assertEqual(self.channel._qos.prefetch_count, 128)
- def test_lookup__undeliverable(self, n="test_lookup__undeliverable"):
+ def test_lookup__undeliverable(self, n='test_lookup__undeliverable'):
warnings.resetwarnings()
with catch_warnings(record=True) as log:
- self.assertListEqual(self.channel._lookup(n, n, "ae.undeliver"),
- ["ae.undeliver"])
+ self.assertListEqual(self.channel._lookup(n, n, 'ae.undeliver'),
+ ['ae.undeliver'])
self.assertTrue(log)
- self.assertIn("could not be delivered", log[0].message.args[0])
+ self.assertIn('could not be delivered', log[0].message.args[0])
def test_context(self):
x = self.channel.__enter__()
@@ -418,44 +418,44 @@ class test_Channel(TestCase):
c._get_many.assert_called_with(c._active_queues, timeout=10.0)
def test_get_exchanges(self):
- self.channel.exchange_declare(exchange="foo")
+ self.channel.exchange_declare(exchange='foo')
self.assertTrue(self.channel.get_exchanges())
def test_basic_cancel_not_in_active_queues(self):
c = self.channel
- c._consumers.add("x")
- c._tag_to_queue["x"] = "foo"
+ c._consumers.add('x')
+ c._tag_to_queue['x'] = 'foo'
c._active_queues = Mock()
c._active_queues.remove.side_effect = ValueError()
- c.basic_cancel("x")
- c._active_queues.remove.assert_called_with("foo")
+ c.basic_cancel('x')
+ c._active_queues.remove.assert_called_with('foo')
def test_basic_cancel_unknown_ctag(self):
- self.assertIsNone(self.channel.basic_cancel("unknown-tag"))
+ self.assertIsNone(self.channel.basic_cancel('unknown-tag'))
def test_list_bindings(self):
c = self.channel
- c.exchange_declare(exchange="foo")
- c.queue_declare(queue="q")
- c.queue_bind(queue="q", exchange="foo", routing_key="rk")
+ c.exchange_declare(exchange='foo')
+ c.queue_declare(queue='q')
+ c.queue_bind(queue='q', exchange='foo', routing_key='rk')
- self.assertIn(("q", "foo", "rk"), list(c.list_bindings()))
+ self.assertIn(('q', 'foo', 'rk'), list(c.list_bindings()))
def test_after_reply_message_received(self):
c = self.channel
c.queue_delete = Mock()
- c.after_reply_message_received("foo")
- c.queue_delete.assert_called_with("foo")
+ c.after_reply_message_received('foo')
+ c.queue_delete.assert_called_with('foo')
def test_queue_delete_unknown_queue(self):
- self.assertIsNone(self.channel.queue_delete("xiwjqjwel"))
+ self.assertIsNone(self.channel.queue_delete('xiwjqjwel'))
def test_queue_declare_passive(self):
has_queue = self.channel._has_queue = Mock()
has_queue.return_value = False
with self.assertRaises(StdChannelError):
- self.channel.queue_declare(queue="21wisdjwqe", passive=True)
+ self.channel.queue_declare(queue='21wisdjwqe', passive=True)
class test_Transport(TestCase):
diff --git a/kombu/tests/transport/virtual/test_exchange.py b/kombu/tests/transport/virtual/test_exchange.py
index d33b9efc..50498021 100644
--- a/kombu/tests/transport/virtual/test_exchange.py
+++ b/kombu/tests/transport/virtual/test_exchange.py
@@ -20,54 +20,54 @@ class ExchangeCase(TestCase):
class test_Direct(ExchangeCase):
type = exchange.DirectExchange
- table = [("rFoo", None, "qFoo"),
- ("rFoo", None, "qFox"),
- ("rBar", None, "qBar"),
- ("rBaz", None, "qBaz")]
+ table = [('rFoo', None, 'qFoo'),
+ ('rFoo', None, 'qFox'),
+ ('rBar', None, 'qBar'),
+ ('rBaz', None, 'qBaz')]
def test_lookup(self):
self.assertListEqual(self.e.lookup(
- self.table, "eFoo", "rFoo", None),
- ["qFoo", "qFox"])
+ self.table, 'eFoo', 'rFoo', None),
+ ['qFoo', 'qFox'])
self.assertListEqual(self.e.lookup(
- self.table, "eMoz", "rMoz", "DEFAULT"),
+ self.table, 'eMoz', 'rMoz', 'DEFAULT'),
[])
self.assertListEqual(self.e.lookup(
- self.table, "eBar", "rBar", None),
- ["qBar"])
+ self.table, 'eBar', 'rBar', None),
+ ['qBar'])
class test_Fanout(ExchangeCase):
type = exchange.FanoutExchange
- table = [(None, None, "qFoo"),
- (None, None, "qFox"),
- (None, None, "qBar")]
+ table = [(None, None, 'qFoo'),
+ (None, None, 'qFox'),
+ (None, None, 'qBar')]
def test_lookup(self):
self.assertListEqual(self.e.lookup(
- self.table, "eFoo", "rFoo", None),
- ["qFoo", "qFox", "qBar"])
+ self.table, 'eFoo', 'rFoo', None),
+ ['qFoo', 'qFox', 'qBar'])
def test_deliver_when_fanout_supported(self):
self.e.channel = Mock()
self.e.channel.supports_fanout = True
message = Mock()
- self.e.deliver(message, "exchange", None)
- self.e.channel._put_fanout.assert_called_with("exchange", message)
+ self.e.deliver(message, 'exchange', None)
+ self.e.channel._put_fanout.assert_called_with('exchange', message)
def test_deliver_when_fanout_unsupported(self):
self.e.channel = Mock()
self.e.channel.supports_fanout = False
- self.e.deliver(Mock(), "exchange", None)
+ self.e.deliver(Mock(), 'exchange', None)
self.assertFalse(self.e.channel._put_fanout.called)
class test_Topic(ExchangeCase):
type = exchange.TopicExchange
- table = [("stock.#", None, "rFoo"),
- ("stock.us.*", None, "rBar")]
+ table = [('stock.#', None, 'rFoo'),
+ ('stock.us.*', None, 'rBar')]
def setUp(self):
super(test_Topic, self).setUp()
@@ -75,32 +75,32 @@ class test_Topic(ExchangeCase):
for rkey, _, queue in self.table]
def test_prepare_bind(self):
- x = self.e.prepare_bind("qFoo", "eFoo", "stock.#", {})
- self.assertTupleEqual(x, ("stock.#", r'^stock\..*?$', "qFoo"))
+ x = self.e.prepare_bind('qFoo', 'eFoo', 'stock.#', {})
+ self.assertTupleEqual(x, ('stock.#', r'^stock\..*?$', 'qFoo'))
def test_lookup(self):
self.assertListEqual(self.e.lookup(
- self.table, "eFoo", "stock.us.nasdaq", None),
- ["rFoo", "rBar"])
+ self.table, 'eFoo', 'stock.us.nasdaq', None),
+ ['rFoo', 'rBar'])
self.assertTrue(self.e._compiled)
self.assertListEqual(self.e.lookup(
- self.table, "eFoo", "stock.europe.OSE", None),
- ["rFoo"])
+ self.table, 'eFoo', 'stock.europe.OSE', None),
+ ['rFoo'])
self.assertListEqual(self.e.lookup(
- self.table, "eFoo", "stockxeuropexOSE", None),
+ self.table, 'eFoo', 'stockxeuropexOSE', None),
[])
self.assertListEqual(self.e.lookup(
- self.table, "eFoo", "candy.schleckpulver.snap_crackle", None),
+ self.table, 'eFoo', 'candy.schleckpulver.snap_crackle', None),
[])
def test_deliver(self):
self.e.channel = Mock()
- self.e.channel._lookup.return_value = ("a", "b")
+ self.e.channel._lookup.return_value = ('a', 'b')
message = Mock()
- self.e.deliver(message, "exchange", "rkey")
+ self.e.deliver(message, 'exchange', 'rkey')
- expected = [(("a", message), {}),
- (("b", message), {})]
+ expected = [(('a', message), {}),
+ (('b', message), {})]
self.assertListEqual(self.e.channel._put.call_args_list, expected)
@@ -109,32 +109,32 @@ class test_ExchangeType(ExchangeCase):
def test_lookup(self):
with self.assertRaises(NotImplementedError):
- self.e.lookup([], "eFoo", "rFoo", None)
+ self.e.lookup([], 'eFoo', 'rFoo', None)
def test_prepare_bind(self):
- self.assertTupleEqual(self.e.prepare_bind("qFoo", "eFoo", "rFoo", {}),
- ("rFoo", None, "qFoo"))
+ self.assertTupleEqual(self.e.prepare_bind('qFoo', 'eFoo', 'rFoo', {}),
+ ('rFoo', None, 'qFoo'))
def test_equivalent(self):
- e1 = dict(type="direct",
+ e1 = dict(type='direct',
durable=True,
auto_delete=True,
arguments={})
self.assertTrue(
- self.e.equivalent(e1, "eFoo", "direct", True, True, {}))
+ self.e.equivalent(e1, 'eFoo', 'direct', True, True, {}))
self.assertFalse(
- self.e.equivalent(e1, "eFoo", "topic", True, True, {}))
+ self.e.equivalent(e1, 'eFoo', 'topic', True, True, {}))
self.assertFalse(
- self.e.equivalent(e1, "eFoo", "direct", False, True, {}))
+ self.e.equivalent(e1, 'eFoo', 'direct', False, True, {}))
self.assertFalse(
- self.e.equivalent(e1, "eFoo", "direct", True, False, {}))
+ self.e.equivalent(e1, 'eFoo', 'direct', True, False, {}))
self.assertFalse(
- self.e.equivalent(e1, "eFoo", "direct", True, True, {
- "expires": 3000}))
- e2 = dict(e1, arguments={"expires": 3000})
+ self.e.equivalent(e1, 'eFoo', 'direct', True, True, {
+ 'expires': 3000}))
+ e2 = dict(e1, arguments={'expires': 3000})
self.assertTrue(
- self.e.equivalent(e2, "eFoo", "direct", True, True, {
- "expires": 3000}))
+ self.e.equivalent(e2, 'eFoo', 'direct', True, True, {
+ 'expires': 3000}))
self.assertFalse(
- self.e.equivalent(e2, "eFoo", "direct", True, True, {
- "expires": 6000}))
+ self.e.equivalent(e2, 'eFoo', 'direct', True, True, {
+ 'expires': 6000}))
diff --git a/kombu/tests/transport/virtual/test_scheduling.py b/kombu/tests/transport/virtual/test_scheduling.py
index 6cae66f1..afbcd061 100644
--- a/kombu/tests/transport/virtual/test_scheduling.py
+++ b/kombu/tests/transport/virtual/test_scheduling.py
@@ -20,12 +20,12 @@ def consume(fun, n):
class test_FairCycle(TestCase):
def test_cycle(self):
- resources = ["a", "b", "c", "d", "e"]
+ resources = ['a', 'b', 'c', 'd', 'e']
def echo(r, timeout=None):
return r
- # cycle should be ["a", "b", "c", "d", "e", ... repeat]
+ # cycle should be ['a', 'b', 'c', 'd', 'e', ... repeat]
cycle = FairCycle(echo, resources, MyEmpty)
for i in range(len(resources)):
self.assertEqual(cycle.get(), (resources[i],
@@ -35,21 +35,21 @@ class test_FairCycle(TestCase):
resources[i]))
def test_cycle_breaks(self):
- resources = ["a", "b", "c", "d", "e"]
+ resources = ['a', 'b', 'c', 'd', 'e']
def echo(r):
- if r == "c":
+ if r == 'c':
raise MyEmpty(r)
return r
cycle = FairCycle(echo, resources, MyEmpty)
self.assertEqual(consume(cycle.get, len(resources)),
- [("a", "a"), ("b", "b"), ("d", "d"),
- ("e", "e"), ("a", "a")])
+ [('a', 'a'), ('b', 'b'), ('d', 'd'),
+ ('e', 'e'), ('a', 'a')])
self.assertEqual(consume(cycle.get, len(resources)),
- [("b", "b"), ("d", "d"), ("e", "e"),
- ("a", "a"), ("b", "b")])
- cycle2 = FairCycle(echo, ["c", "c"], MyEmpty)
+ [('b', 'b'), ('d', 'd'), ('e', 'e'),
+ ('a', 'a'), ('b', 'b')])
+ cycle2 = FairCycle(echo, ['c', 'c'], MyEmpty)
with self.assertRaises(MyEmpty):
consume(cycle2.get, 3)
diff --git a/kombu/tests/utilities/test_encoding.py b/kombu/tests/utilities/test_encoding.py
index 336ed037..eb19d1ad 100644
--- a/kombu/tests/utilities/test_encoding.py
+++ b/kombu/tests/utilities/test_encoding.py
@@ -15,24 +15,24 @@ from kombu.tests.utils import TestCase
@contextmanager
def clean_encoding():
- old_encoding = sys.modules.pop("kombu.utils.encoding", None)
+ old_encoding = sys.modules.pop('kombu.utils.encoding', None)
import kombu.utils.encoding
yield kombu.utils.encoding
if old_encoding:
- sys.modules["kombu.utils.encoding"] = old_encoding
+ sys.modules['kombu.utils.encoding'] = old_encoding
class test_default_encoding(TestCase):
- @patch("sys.getfilesystemencoding")
+ @patch('sys.getfilesystemencoding')
def test_default(self, getfilesystemencoding):
- getfilesystemencoding.return_value = "ascii"
+ getfilesystemencoding.return_value = 'ascii'
with clean_encoding() as encoding:
enc = encoding.default_encoding()
- if sys.platform.startswith("java"):
- self.assertEqual(enc, "utf-8")
+ if sys.platform.startswith('java'):
+ self.assertEqual(enc, 'utf-8')
else:
- self.assertEqual(enc, "ascii")
+ self.assertEqual(enc, 'ascii')
getfilesystemencoding.assert_called_with()
@@ -40,32 +40,32 @@ class test_encoding_utils(TestCase):
def setUp(self):
if sys.version_info >= (3, 0):
- raise SkipTest("not relevant on py3k")
+ raise SkipTest('not relevant on py3k')
def test_str_to_bytes(self):
with clean_encoding() as e:
- self.assertIsInstance(e.str_to_bytes(u"foobar"), str)
- self.assertIsInstance(e.str_to_bytes("foobar"), str)
+ self.assertIsInstance(e.str_to_bytes(u'foobar'), str)
+ self.assertIsInstance(e.str_to_bytes('foobar'), str)
def test_from_utf8(self):
with clean_encoding() as e:
- self.assertIsInstance(e.from_utf8(u"foobar"), str)
+ self.assertIsInstance(e.from_utf8(u'foobar'), str)
def test_default_encode(self):
with clean_encoding() as e:
- self.assertTrue(e.default_encode("foo"))
+ self.assertTrue(e.default_encode('foo'))
class test_safe_str(TestCase):
def test_when_str(self):
- self.assertEqual(safe_str("foo"), "foo")
+ self.assertEqual(safe_str('foo'), 'foo')
def test_when_unicode(self):
- self.assertIsInstance(safe_str(u"foo"), str)
+ self.assertIsInstance(safe_str(u'foo'), str)
def test_when_containing_high_chars(self):
- s = u"The quiæk fåx jømps øver the lazy dåg"
+ s = u'The quiæk fåx jømps øver the lazy dåg'
res = safe_str(s)
self.assertIsInstance(res, str)
@@ -78,6 +78,6 @@ class test_safe_str(TestCase):
class O(object):
def __repr__(self):
- raise KeyError("foo")
+ raise KeyError('foo')
- self.assertIn("<Unrepresentable", safe_str(O()))
+ self.assertIn('<Unrepresentable', safe_str(O()))
diff --git a/kombu/tests/utilities/test_functional.py b/kombu/tests/utilities/test_functional.py
index 841a9bda..c84a6854 100644
--- a/kombu/tests/utilities/test_functional.py
+++ b/kombu/tests/utilities/test_functional.py
@@ -14,11 +14,11 @@ def double(x):
class test_promise(TestCase):
def test__str__(self):
- self.assertEqual(str(promise(lambda: "the quick brown fox")),
- "the quick brown fox")
+ self.assertEqual(str(promise(lambda: 'the quick brown fox')),
+ 'the quick brown fox')
def test__repr__(self):
- self.assertEqual(repr(promise(lambda: "fi fa fo")),
+ self.assertEqual(repr(promise(lambda: 'fi fa fo')),
"'fi fa fo'")
def test_evaluate(self):
diff --git a/kombu/tests/utils.py b/kombu/tests/utils.py
index 2a98d3d0..5f743957 100644
--- a/kombu/tests/utils.py
+++ b/kombu/tests/utils.py
@@ -21,14 +21,14 @@ except AttributeError:
class TestCase(unittest.TestCase):
- if not hasattr(unittest.TestCase, "assertItemsEqual"):
+ if not hasattr(unittest.TestCase, 'assertItemsEqual'):
assertItemsEqual = unittest.TestCase.assertSameElements
class Mock(mock.Mock):
def __init__(self, *args, **kwargs):
- attrs = kwargs.pop("attrs", None) or {}
+ attrs = kwargs.pop('attrs', None) or {}
super(Mock, self).__init__(*args, **kwargs)
for attr_name, attr_value in attrs.items():
setattr(self, attr_name, attr_value)
@@ -94,12 +94,12 @@ def mask_modules(*modnames):
For example:
- >>> @missing_modules("sys"):
+ >>> @missing_modules('sys'):
>>> def foo():
... try:
... import sys
... except ImportError:
- ... print("sys not found")
+ ... print('sys not found')
sys not found
>>> import sys
@@ -116,7 +116,7 @@ def mask_modules(*modnames):
def myimp(name, *args, **kwargs):
if name in modnames:
- raise ImportError("No module named %s" % name)
+ raise ImportError('No module named %s' % name)
else:
return realimport(name, *args, **kwargs)
@@ -137,7 +137,7 @@ def skip_if_environ(env_var_name):
@wraps(fun)
def _skips_if_environ(*args, **kwargs):
if os.environ.get(env_var_name):
- raise SkipTest("SKIP %s: %s set\n" % (
+ raise SkipTest('SKIP %s: %s set\n' % (
fun.__name__, env_var_name))
return fun(*args, **kwargs)
@@ -152,7 +152,7 @@ def skip_if_module(module):
def _skip_if_module(*args, **kwargs):
try:
__import__(module)
- raise SkipTest("SKIP %s: %s available\n" % (
+ raise SkipTest('SKIP %s: %s available\n' % (
fun.__name__, module))
except ImportError:
pass
@@ -168,7 +168,7 @@ def skip_if_not_module(module):
try:
__import__(module)
except ImportError:
- raise SkipTest("SKIP %s: %s available\n" % (
+ raise SkipTest('SKIP %s: %s available\n' % (
fun.__name__, module))
return fun(*args, **kwargs)
return _skip_if_not_module
@@ -176,4 +176,4 @@ def skip_if_not_module(module):
def skip_if_quick(fun):
- return skip_if_environ("QUICKTEST")(fun)
+ return skip_if_environ('QUICKTEST')(fun)
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index d4174ccd..05f8e477 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -61,7 +61,7 @@ class Table(Domain):
"""
item = self.get_queue(queue)
if item:
- return item, item["id"]
+ return item, item['id']
id = uuid()
return self.new_item(id), id
@@ -69,9 +69,9 @@ class Table(Domain):
if queue not in self._already_bound:
binding, id = self.create_binding(queue)
binding.update(exchange=exchange,
- routing_key=routing_key or "",
- pattern=pattern or "",
- queue=queue or "",
+ routing_key=routing_key or '',
+ pattern=pattern or '',
+ queue=queue or '',
id=id)
binding.save()
self._already_bound.add(queue)
@@ -86,7 +86,7 @@ class Table(Domain):
def exchange_delete(self, exchange):
"""Delete all routes for `exchange`."""
for item in self.routes_for(exchange):
- self.delete_item(item["id"])
+ self.delete_item(item['id'])
def get_item(self, item_name):
"""Uses `consistent_read` by default."""
@@ -109,7 +109,7 @@ class Table(Domain):
return item
def get_exchanges(self):
- return list(set(i["exchange"] for i in self.select()))
+ return list(set(i['exchange'] for i in self.select()))
def _get_queue_item(self, queue):
return self._try_first("""WHERE queue = '%s' limit 1""" % queue)
@@ -117,14 +117,14 @@ class Table(Domain):
def _get_queue_id(self, queue):
item = self._get_queue_item(queue)
if item:
- return item["id"]
+ return item['id']
class Channel(virtual.Channel):
Table = Table
- default_region = "us-east-1"
- domain_format = "kombu%(vhost)s"
+ default_region = 'us-east-1'
+ domain_format = 'kombu%(vhost)s'
_sdb = None
_sqs = None
_queue_cache = {}
@@ -184,7 +184,7 @@ class Channel(virtual.Channel):
"""
if self.supports_fanout:
- return [(r["routing_key"], r["pattern"], r["queue"])
+ return [(r['routing_key'], r['pattern'], r['queue'])
for r in self.table.routes_for(exchange)]
return super(Channel, self).get_table(exchange)
@@ -222,7 +222,7 @@ class Channel(virtual.Channel):
def _put_fanout(self, exchange, message, **kwargs):
"""Deliver fanout message to all queues in ``exchange``."""
for route in self.table.routes_for(exchange):
- self._put(route["queue"], message, **kwargs)
+ self._put(route['queue'], message, **kwargs)
def _get(self, queue):
"""Try to retrieve a single message off ``queue``."""
@@ -234,19 +234,19 @@ class Channel(virtual.Channel):
if queue in self._noack_queues:
q.delete_message(m)
else:
- payload["properties"]["delivery_info"].update({
- "sqs_message": m, "sqs_queue": q, })
+ payload['properties']['delivery_info'].update({
+ 'sqs_message': m, 'sqs_queue': q, })
return payload
raise Empty()
def basic_ack(self, delivery_tag):
delivery_info = self.qos.get(delivery_tag).delivery_info
try:
- queue = delivery_info["sqs_queue"]
+ queue = delivery_info['sqs_queue']
except KeyError:
pass
else:
- queue.delete_message(delivery_info["sqs_message"])
+ queue.delete_message(delivery_info['sqs_message'])
super(Channel, self).basic_ack(delivery_tag)
def _size(self, queue):
@@ -308,8 +308,8 @@ class Channel(virtual.Channel):
@property
def table(self):
name = self.entity_name(self.domain_format % {
- "vhost": self.conninfo.virtual_host})
- d = self.sdb.get_object("CreateDomain", {"DomainName": name},
+ 'vhost': self.conninfo.virtual_host})
+ d = self.sdb.get_object('CreateDomain', {'DomainName': name},
self.Table)
d.name = name
return d
@@ -324,19 +324,19 @@ class Channel(virtual.Channel):
@cached_property
def visibility_timeout(self):
- return self.transport_options.get("visibility_timeout")
+ return self.transport_options.get('visibility_timeout')
@cached_property
def queue_name_prefix(self):
- return self.transport_options.get("queue_name_prefix", '')
+ return self.transport_options.get('queue_name_prefix', '')
@cached_property
def supports_fanout(self):
- return self.transport_options.get("sdb_persistence", False)
+ return self.transport_options.get('sdb_persistence', False)
@cached_property
def region(self):
- return self.transport_options.get("region") or self.default_region
+ return self.transport_options.get('region') or self.default_region
class Transport(virtual.Transport):
diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py
index 04fd1dcd..7605cd83 100644
--- a/kombu/transport/__init__.py
+++ b/kombu/transport/__init__.py
@@ -14,15 +14,15 @@ import sys
from kombu.syn import detect_environment
-DEFAULT_TRANSPORT = "amqp"
+DEFAULT_TRANSPORT = 'amqp'
-AMQP_TRANSPORT = "kombu.transport.amqplib.Transport"
-AMQP_ALIAS = "librabbitmq"
-if detect_environment() == "default":
+AMQP_TRANSPORT = 'kombu.transport.amqplib.Transport'
+AMQP_ALIAS = 'librabbitmq'
+if detect_environment() == 'default':
try:
import librabbitmq # noqa
- AMQP_TRANSPORT = "kombu.transport.librabbitmq.Transport" # noqa
- AMQP_ALIAS = "amqp" # noqa
+ AMQP_TRANSPORT = 'kombu.transport.librabbitmq.Transport' # noqa
+ AMQP_ALIAS = 'amqp' # noqa
except ImportError:
pass
@@ -33,8 +33,8 @@ def _ghettoq(name, new, alias=None):
def __inner():
import warnings
_new = callable(xxx) and xxx() or xxx
- gtransport = "ghettoq.taproot.%s" % name
- ktransport = "kombu.transport.%s.Transport" % _new
+ gtransport = 'ghettoq.taproot.%s' % name
+ ktransport = 'kombu.transport.%s.Transport' % _new
this = alias or name
warnings.warn("""
Ghettoq does not work with Kombu, but there is now a built-in version
@@ -48,27 +48,27 @@ def _ghettoq(name, new, alias=None):
TRANSPORT_ALIASES = {
- "amqp": AMQP_TRANSPORT,
- "amqplib": "kombu.transport.amqplib.Transport",
- "librabbitmq": "kombu.transport.librabbitmq.Transport",
- "pika": "kombu.transport.pika2.Transport",
- "oldpika": "kombu.transport.pika.SyncTransport",
- "memory": "kombu.transport.memory.Transport",
- "redis": "kombu.transport.redis.Transport",
- "SQS": "kombu.transport.SQS.Transport",
- "sqs": "kombu.transport.SQS.Transport",
- "beanstalk": "kombu.transport.beanstalk.Transport",
- "mongodb": "kombu.transport.mongodb.Transport",
- "couchdb": "kombu.transport.couchdb.Transport",
- "zookeeper": "kombu.transport.zookeeper.Transport",
- "django": "kombu.transport.django.Transport",
- "sqlalchemy": "kombu.transport.sqlalchemy.Transport",
- "sqla": "kombu.transport.sqlalchemy.Transport",
- "ghettoq.taproot.Redis": _ghettoq("Redis", "redis", "redis"),
- "ghettoq.taproot.Database": _ghettoq("Database", "django", "django"),
- "ghettoq.taproot.MongoDB": _ghettoq("MongoDB", "mongodb"),
- "ghettoq.taproot.Beanstalk": _ghettoq("Beanstalk", "beanstalk"),
- "ghettoq.taproot.CouchDB": _ghettoq("CouchDB", "couchdb"),
+ 'amqp': AMQP_TRANSPORT,
+ 'amqplib': 'kombu.transport.amqplib.Transport',
+ 'librabbitmq': 'kombu.transport.librabbitmq.Transport',
+ 'pika': 'kombu.transport.pika2.Transport',
+ 'oldpika': 'kombu.transport.pika.SyncTransport',
+ 'memory': 'kombu.transport.memory.Transport',
+ 'redis': 'kombu.transport.redis.Transport',
+ 'SQS': 'kombu.transport.SQS.Transport',
+ 'sqs': 'kombu.transport.SQS.Transport',
+ 'beanstalk': 'kombu.transport.beanstalk.Transport',
+ 'mongodb': 'kombu.transport.mongodb.Transport',
+ 'couchdb': 'kombu.transport.couchdb.Transport',
+ 'zookeeper': 'kombu.transport.zookeeper.Transport',
+ 'django': 'kombu.transport.django.Transport',
+ 'sqlalchemy': 'kombu.transport.sqlalchemy.Transport',
+ 'sqla': 'kombu.transport.sqlalchemy.Transport',
+ 'ghettoq.taproot.Redis': _ghettoq('Redis', 'redis', 'redis'),
+ 'ghettoq.taproot.Database': _ghettoq('Database', 'django', 'django'),
+ 'ghettoq.taproot.MongoDB': _ghettoq('MongoDB', 'mongodb'),
+ 'ghettoq.taproot.Beanstalk': _ghettoq('Beanstalk', 'beanstalk'),
+ 'ghettoq.taproot.CouchDB': _ghettoq('CouchDB', 'couchdb'),
}
_transport_cache = {}
@@ -78,9 +78,9 @@ def resolve_transport(transport=None):
transport = TRANSPORT_ALIASES.get(transport, transport)
if callable(transport):
transport = transport()
- transport_module_name, _, transport_cls_name = transport.rpartition(".")
+ transport_module_name, _, transport_cls_name = transport.rpartition('.')
if not transport_module_name:
- raise KeyError("No such transport: %s" % (transport, ))
+ raise KeyError('No such transport: %s' % (transport, ))
return transport_module_name, transport_cls_name
diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py
index 3ba6d1b7..f87269af 100644
--- a/kombu/transport/amqplib.py
+++ b/kombu/transport/amqplib.py
@@ -31,12 +31,12 @@ from kombu.utils.amq_manager import get_manager
from . import base
DEFAULT_PORT = 5672
-HAS_MSG_PEEK = hasattr(socket, "MSG_PEEK")
+HAS_MSG_PEEK = hasattr(socket, 'MSG_PEEK')
# amqplib's handshake mistakenly identifies as protocol version 1191,
# this breaks in RabbitMQ tip, which no longer falls back to
# 0-8 for unknown ids.
-transport.AMQP_PROTOCOL_HEADER = str_to_bytes("AMQP\x01\x01\x08\x00")
+transport.AMQP_PROTOCOL_HEADER = str_to_bytes('AMQP\x01\x01\x08\x00')
# - fixes warnings when socket is not connected.
@@ -80,8 +80,8 @@ class Connection(amqp.Connection): # pragma: no cover
routing_key = args.read_shortstr()
exc = AMQPChannelException(reply_code, reply_text, (50, 60))
- if channel.events["basic_return"]:
- for callback in channel.events["basic_return"]:
+ if channel.events['basic_return']:
+ for callback in channel.events['basic_return']:
callback(exc, exchange, routing_key, msg)
else:
raise exc
@@ -129,7 +129,7 @@ class Connection(amqp.Connection): # pragma: no cover
return self.method_reader.read_method()
except SSLError, exc:
# http://bugs.python.org/issue10272
- if "timed out" in str(exc):
+ if 'timed out' in str(exc):
raise socket.timeout()
raise
finally:
@@ -186,17 +186,17 @@ class Message(base.Message):
super(Message, self).__init__(channel,
body=msg.body,
delivery_tag=msg.delivery_tag,
- content_type=props.get("content_type"),
- content_encoding=props.get("content_encoding"),
+ content_type=props.get('content_type'),
+ content_encoding=props.get('content_encoding'),
delivery_info=msg.delivery_info,
properties=msg.properties,
- headers=props.get("application_headers") or {},
+ headers=props.get('application_headers') or {},
**kwargs)
class Channel(_Channel, base.StdChannel):
Message = Message
- events = {"basic_return": []}
+ events = {'basic_return': []}
def __init__(self, *args, **kwargs):
self.no_ack_consumers = set()
@@ -224,7 +224,7 @@ class Channel(_Channel, base.StdChannel):
def basic_consume(self, *args, **kwargs):
consumer_tag = super(Channel, self).basic_consume(*args, **kwargs)
- if kwargs["no_ack"]:
+ if kwargs['no_ack']:
self.no_ack_consumers.add(consumer_tag)
return consumer_tag
@@ -267,8 +267,8 @@ class Transport(base.Transport):
for name, default_value in self.default_connection_params.items():
if not getattr(conninfo, name, None):
setattr(conninfo, name, default_value)
- if conninfo.hostname == "localhost":
- conninfo.hostname = "127.0.0.1"
+ if conninfo.hostname == 'localhost':
+ conninfo.hostname = '127.0.0.1'
conn = self.Connection(host=conninfo.host,
userid=conninfo.userid,
password=conninfo.password,
@@ -314,9 +314,9 @@ class Transport(base.Transport):
@property
def default_connection_params(self):
- return {"userid": "guest", "password": "guest",
- "port": self.default_port,
- "hostname": "localhost", "login_method": "AMQPLAIN"}
+ return {'userid': 'guest', 'password': 'guest',
+ 'port': self.default_port,
+ 'hostname': 'localhost', 'login_method': 'AMQPLAIN'}
def get_manager(self, *args, **kwargs):
return get_manager(self.client, *args, **kwargs)
diff --git a/kombu/transport/base.py b/kombu/transport/base.py
index 970a1819..8ae44419 100644
--- a/kombu/transport/base.py
+++ b/kombu/transport/base.py
@@ -15,7 +15,7 @@ from kombu.exceptions import MessageStateError
from kombu.serialization import decode
from kombu.utils import cached_property
-ACKNOWLEDGED_STATES = frozenset(["ACK", "REJECTED", "REQUEUED"])
+ACKNOWLEDGED_STATES = frozenset(['ACK', 'REJECTED', 'REQUEUED'])
class StdChannel(object):
@@ -30,7 +30,7 @@ class StdChannel(object):
return Producer(self, *args, **kwargs)
def get_bindings(self):
- raise NotImplementedError("%r does not implement list_bindings" % (
+ raise NotImplementedError('%r does not implement list_bindings' % (
self.__class__, ))
def after_reply_message_received(self, queue):
@@ -47,12 +47,12 @@ class StdChannel(object):
class Message(object):
"""Base class for received messages."""
- __slots__ = ("_state", "channel", "delivery_tag",
- "content_type", "content_encoding",
- "delivery_info", "headers",
- "properties", "body",
- "_decoded_cache",
- "MessageStateError", "__dict__")
+ __slots__ = ('_state', 'channel', 'delivery_tag',
+ 'content_type', 'content_encoding',
+ 'delivery_info', 'headers',
+ 'properties', 'body',
+ '_decoded_cache',
+ 'MessageStateError', '__dict__')
MessageStateError = MessageStateError
def __init__(self, channel, body=None, delivery_tag=None,
@@ -67,10 +67,10 @@ class Message(object):
self.headers = headers or {}
self.properties = properties or {}
self._decoded_cache = None
- self._state = "RECEIVED"
+ self._state = 'RECEIVED'
try:
- body = decompress(body, self.headers["compression"])
+ body = decompress(body, self.headers['compression'])
except KeyError:
pass
if postencode and isinstance(body, unicode):
@@ -87,7 +87,7 @@ class Message(object):
"""
if self.channel.no_ack_consumers is not None:
try:
- consumer_tag = self.delivery_info["consumer_tag"]
+ consumer_tag = self.delivery_info['consumer_tag']
except KeyError:
pass
else:
@@ -95,9 +95,9 @@ class Message(object):
return
if self.acknowledged:
raise self.MessageStateError(
- "Message already acknowledged with state: %s" % self._state)
+ 'Message already acknowledged with state: %s' % self._state)
self.channel.basic_ack(self.delivery_tag)
- self._state = "ACK"
+ self._state = 'ACK'
def ack_log_error(self, logger, errors):
try:
@@ -124,9 +124,9 @@ class Message(object):
"""
if self.acknowledged:
raise self.MessageStateError(
- "Message already acknowledged with state: %s" % self._state)
+ 'Message already acknowledged with state: %s' % self._state)
self.channel.basic_reject(self.delivery_tag, requeue=False)
- self._state = "REJECTED"
+ self._state = 'REJECTED'
def requeue(self):
"""Reject this message and put it back on the queue.
@@ -140,9 +140,9 @@ class Message(object):
"""
if self.acknowledged:
raise self.MessageStateError(
- "Message already acknowledged with state: %s" % self._state)
+ 'Message already acknowledged with state: %s' % self._state)
self.channel.basic_reject(self.delivery_tag, requeue=True)
- self._state = "REQUEUED"
+ self._state = 'REQUEUED'
def decode(self):
"""Deserialize the message body, returning the original
@@ -170,7 +170,7 @@ class Management(object):
def get_bindings(self):
raise NotImplementedError(
- "Your transport does not implement list_bindings")
+ 'Your transport does not implement list_bindings')
class Transport(object):
@@ -196,31 +196,31 @@ class Transport(object):
#: Type of driver, can be used to separate transports
#: using the AMQP protocol (driver_type: 'amqp'),
#: Redis (driver_type: 'redis'), etc...
- driver_type = "N/A"
+ driver_type = 'N/A'
- #: Name of driver library (e.g. "amqplib", "redis", "beanstalkc").
- driver_name = "N/A"
+ #: Name of driver library (e.g. 'amqplib', 'redis', 'beanstalkc').
+ driver_name = 'N/A'
def __init__(self, client, **kwargs):
self.client = client
def establish_connection(self):
- raise NotImplementedError("Subclass responsibility")
+ raise NotImplementedError('Subclass responsibility')
def close_connection(self, connection):
- raise NotImplementedError("Subclass responsibility")
+ raise NotImplementedError('Subclass responsibility')
def create_channel(self, connection):
- raise NotImplementedError("Subclass responsibility")
+ raise NotImplementedError('Subclass responsibility')
def close_channel(self, connection):
- raise NotImplementedError("Subclass responsibility")
+ raise NotImplementedError('Subclass responsibility')
def drain_events(self, connection, **kwargs):
- raise NotImplementedError("Subclass responsibility")
+ raise NotImplementedError('Subclass responsibility')
def driver_version(self):
- return "N/A"
+ return 'N/A'
def eventmap(self, connection):
"""Map of fd -> event handler for event based use.
diff --git a/kombu/transport/beanstalk.py b/kombu/transport/beanstalk.py
index b748104c..6745bc1e 100644
--- a/kombu/transport/beanstalk.py
+++ b/kombu/transport/beanstalk.py
@@ -22,7 +22,7 @@ from . import virtual
DEFAULT_PORT = 11300
-__author__ = "David Ziegler <david.ziegler@gmail.com>"
+__author__ = 'David Ziegler <david.ziegler@gmail.com>'
class Channel(virtual.Channel):
@@ -33,7 +33,7 @@ class Channel(virtual.Channel):
if job:
try:
item = loads(job.body)
- dest = job.stats()["tube"]
+ dest = job.stats()['tube']
except Exception:
job.bury()
else:
@@ -44,10 +44,10 @@ class Channel(virtual.Channel):
def _put(self, queue, message, **kwargs):
extra = {}
- priority = message["properties"]["delivery_info"]["priority"]
- ttr = message["properties"].get("ttr")
+ priority = message['properties']['delivery_info']['priority']
+ ttr = message['properties'].get('ttr')
if ttr is not None:
- extra["ttr"] = ttr
+ extra['ttr'] = ttr
self.client.use(queue)
self.client.put(dumps(message), priority=priority, **extra)
@@ -135,8 +135,8 @@ class Transport(virtual.Transport):
IOError,
beanstalkc.SocketError,
beanstalkc.BeanstalkcException)
- driver_type = "beanstalk"
- driver_name = "beanstalkc"
+ driver_type = 'beanstalk'
+ driver_name = 'beanstalkc'
def driver_version(self):
return beanstalkc.__version__
diff --git a/kombu/transport/couchdb.py b/kombu/transport/couchdb.py
index 8a342444..d0c672b1 100644
--- a/kombu/transport/couchdb.py
+++ b/kombu/transport/couchdb.py
@@ -23,15 +23,15 @@ from kombu.utils import uuid4
from . import virtual
DEFAULT_PORT = 5984
-DEFAULT_DATABASE = "kombu_default"
+DEFAULT_DATABASE = 'kombu_default'
-__author__ = "David Clymer <david@zettazebra.com>"
+__author__ = 'David Clymer <david@zettazebra.com>'
def create_message_view(db):
from couchdb import design
- view = design.ViewDefinition("kombu", "messages", """
+ view = design.ViewDefinition('kombu', 'messages', """
function (doc) {
if (doc.queue && doc.payload)
emit(doc.queue, doc);
@@ -58,7 +58,7 @@ class Channel(virtual.Channel):
item = result.rows[0].value
self.client.delete(item)
- return loads(item["payload"])
+ return loads(item['payload'])
def _purge(self, queue):
result = self._query(queue)
@@ -72,8 +72,8 @@ class Channel(virtual.Channel):
def _open(self):
conninfo = self.connection.client
dbname = conninfo.virtual_host
- proto = conninfo.ssl and "https" or "http"
- if not dbname or dbname == "/":
+ proto = conninfo.ssl and 'https' or 'http'
+ if not dbname or dbname == '/':
dbname = DEFAULT_DATABASE
port = conninfo.port or DEFAULT_PORT
server = couchdb.Server('%s://%s:%s/' % (proto,
@@ -94,7 +94,7 @@ class Channel(virtual.Channel):
# if the message view is not yet set up, we'll need it now.
create_message_view(self.client)
self.view_created = True
- return self.client.view("kombu/messages", key=queue, **kwargs)
+ return self.client.view('kombu/messages', key=queue, **kwargs)
@property
def client(self):
@@ -118,8 +118,8 @@ class Transport(virtual.Transport):
couchdb.PreconditionFailed,
couchdb.ResourceConflict,
couchdb.ResourceNotFound)
- driver_type = "couchdb"
- driver_name = "couchdb"
+ driver_type = 'couchdb'
+ driver_name = 'couchdb'
def driver_version(self):
return couchdb.__version__
diff --git a/kombu/transport/django/__init__.py b/kombu/transport/django/__init__.py
index 88e0d4a5..b277432b 100644
--- a/kombu/transport/django/__init__.py
+++ b/kombu/transport/django/__init__.py
@@ -14,10 +14,10 @@ from kombu.exceptions import StdChannelError
from .models import Queue
VERSION = (1, 0, 0)
-__version__ = ".".join(map(str, VERSION))
+__version__ = '.'.join(map(str, VERSION))
-POLLING_INTERVAL = getattr(settings, "KOMBU_POLLING_INTERVAL",
- getattr(settings, "DJKOMBU_POLLING_INTERVAL", 5.0))
+POLLING_INTERVAL = getattr(settings, 'KOMBU_POLLING_INTERVAL',
+ getattr(settings, 'DJKOMBU_POLLING_INTERVAL', 5.0))
class Channel(virtual.Channel):
@@ -31,7 +31,7 @@ class Channel(virtual.Channel):
def basic_consume(self, queue, *args, **kwargs):
qinfo = self.state.bindings[queue]
exchange = qinfo[0]
- if self.typeof(exchange).type == "fanout":
+ if self.typeof(exchange).type == 'fanout':
return
super(Channel, self).basic_consume(queue, *args, **kwargs)
@@ -62,8 +62,8 @@ class Transport(virtual.Transport):
channel_errors = (StdChannelError,
errors.ObjectDoesNotExist,
errors.MultipleObjectsReturned)
- driver_type = "sql"
- driver_name = "django"
+ driver_type = 'sql'
+ driver_name = 'django'
def driver_version(self):
import django
diff --git a/kombu/transport/django/management/commands/clean_kombu_messages.py b/kombu/transport/django/management/commands/clean_kombu_messages.py
index 05eb59c1..5facee96 100644
--- a/kombu/transport/django/management/commands/clean_kombu_messages.py
+++ b/kombu/transport/django/management/commands/clean_kombu_messages.py
@@ -17,6 +17,6 @@ class Command(BaseCommand):
count = Message.objects.filter(visible=False).count()
- print("Removing %s invisible %s... " % (
- count, pluralize("message", count)))
+ print('Removing %s invisible %s... ' % (
+ count, pluralize('message', count)))
Message.objects.cleanup()
diff --git a/kombu/transport/django/managers.py b/kombu/transport/django/managers.py
index ea96d5ce..162f6544 100644
--- a/kombu/transport/django/managers.py
+++ b/kombu/transport/django/managers.py
@@ -69,7 +69,7 @@ class MessageManager(models.Manager):
def cleanup(self):
cursor = self.connection_for_write().cursor()
try:
- cursor.execute("DELETE FROM %s WHERE visible=%%s" % (
+ cursor.execute('DELETE FROM %s WHERE visible=%%s' % (
self.model._meta.db_table, ), (False, ))
except:
transaction.rollback_unless_managed()
diff --git a/kombu/transport/django/models.py b/kombu/transport/django/models.py
index ef6984f2..f6af9cb6 100644
--- a/kombu/transport/django/models.py
+++ b/kombu/transport/django/models.py
@@ -7,26 +7,26 @@ from .managers import QueueManager, MessageManager
class Queue(models.Model):
- name = models.CharField(_("name"), max_length=200, unique=True)
+ name = models.CharField(_('name'), max_length=200, unique=True)
objects = QueueManager()
class Meta:
- db_table = "djkombu_queue"
- verbose_name = _("queue")
- verbose_name_plural = _("queues")
+ db_table = 'djkombu_queue'
+ verbose_name = _('queue')
+ verbose_name_plural = _('queues')
class Message(models.Model):
visible = models.BooleanField(default=True, db_index=True)
sent_at = models.DateTimeField(null=True, blank=True, db_index=True,
auto_now_add=True)
- payload = models.TextField(_("payload"), null=False)
- queue = models.ForeignKey(Queue, related_name="messages")
+ payload = models.TextField(_('payload'), null=False)
+ queue = models.ForeignKey(Queue, related_name='messages')
objects = MessageManager()
class Meta:
- db_table = "djkombu_message"
- verbose_name = _("message")
- verbose_name_plural = _("messages")
+ db_table = 'djkombu_message'
+ verbose_name = _('message')
+ verbose_name_plural = _('messages')
diff --git a/kombu/transport/librabbitmq.py b/kombu/transport/librabbitmq.py
index 48a2a3e6..e0d349b7 100644
--- a/kombu/transport/librabbitmq.py
+++ b/kombu/transport/librabbitmq.py
@@ -36,10 +36,10 @@ class Message(base.Message):
body=body,
delivery_info=info,
properties=props,
- delivery_tag=info["delivery_tag"],
- content_type=props["content_type"],
- content_encoding=props["content_encoding"],
- headers=props.get("headers"))
+ delivery_tag=info['delivery_tag'],
+ content_type=props['content_type'],
+ content_encoding=props['content_encoding'],
+ headers=props.get('headers'))
class Channel(amqp.Channel, base.StdChannel):
@@ -50,10 +50,10 @@ class Channel(amqp.Channel, base.StdChannel):
properties=None):
"""Encapsulate data into a AMQP message."""
properties = properties if properties is not None else {}
- properties.update({"content_type": content_type,
- "content_encoding": content_encoding,
- "headers": headers,
- "priority": priority})
+ properties.update({'content_type': content_type,
+ 'content_encoding': content_encoding,
+ 'headers': headers,
+ 'priority': priority})
return body, properties
@@ -71,14 +71,14 @@ class Transport(base.Transport):
IOError,
OSError)
channel_errors = (StdChannelError, ChannelError, )
- driver_type = "amqp"
- driver_name = "librabbitmq"
+ driver_type = 'amqp'
+ driver_name = 'librabbitmq'
nb_keep_draining = True
def __init__(self, client, **kwargs):
self.client = client
- self.default_port = kwargs.get("default_port") or self.default_port
+ self.default_port = kwargs.get('default_port') or self.default_port
def driver_version(self):
return amqp.__version__
@@ -125,6 +125,6 @@ class Transport(base.Transport):
@property
def default_connection_params(self):
- return {"userid": "guest", "password": "guest",
- "port": self.default_port,
- "hostname": "localhost", "login_method": "AMQPLAIN"}
+ return {'userid': 'guest', 'password': 'guest',
+ 'port': self.default_port,
+ 'hostname': 'localhost', 'login_method': 'AMQPLAIN'}
diff --git a/kombu/transport/memory.py b/kombu/transport/memory.py
index 9c5f71f8..b3ed61cf 100644
--- a/kombu/transport/memory.py
+++ b/kombu/transport/memory.py
@@ -59,8 +59,8 @@ class Transport(virtual.Transport):
#: memory backend state is global.
state = virtual.BrokerState()
- driver_type = "memory"
- driver_name = "memory"
+ driver_type = 'memory'
+ driver_name = 'memory'
def driver_version(self):
- return "N/A"
+ return 'N/A'
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index c4283d74..e8342425 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -22,7 +22,7 @@ from kombu.exceptions import StdChannelError
from . import virtual
-DEFAULT_HOST = "127.0.0.1"
+DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 27017
__author__ = """\
@@ -51,33 +51,33 @@ class Channel(virtual.Channel):
if queue in self._fanout_queues:
msg = self._queue_cursors[queue].next()
self._queue_readcounts[queue] += 1
- return loads(msg["payload"])
+ return loads(msg['payload'])
else:
- msg = self.client.command("findandmodify", "messages",
- query={"queue": queue},
- sort={"_id": pymongo.ASCENDING}, remove=True)
+ msg = self.client.command('findandmodify', 'messages',
+ query={'queue': queue},
+ sort={'_id': pymongo.ASCENDING}, remove=True)
except errors.OperationFailure, exc:
- if "No matching object found" in exc.args[0]:
+ if 'No matching object found' in exc.args[0]:
raise Empty()
raise
except StopIteration:
raise Empty()
# as of mongo 2.0 empty results won't raise an error
- if msg["value"] is None:
+ if msg['value'] is None:
raise Empty()
- return loads(msg["value"]["payload"])
+ return loads(msg['value']['payload'])
def _size(self, queue):
if queue in self._fanout_queues:
return (self._queue_cursors[queue].count() -
self._queue_readcounts[queue])
- return self.client.messages.find({"queue": queue}).count()
+ return self.client.messages.find({'queue': queue}).count()
def _put(self, queue, message, **kwargs):
- self.client.messages.insert({"payload": dumps(message),
- "queue": queue})
+ self.client.messages.insert({'payload': dumps(message),
+ 'queue': queue})
def _purge(self, queue):
size = self._size(queue)
@@ -86,7 +86,7 @@ class Channel(virtual.Channel):
cursor.rewind()
self._queue_cursors[queue] = cursor.skip(cursor.count())
else:
- self.client.messages.remove({"queue": queue})
+ self.client.messages.remove({'queue': queue})
return size
def close(self):
@@ -107,18 +107,18 @@ class Channel(virtual.Channel):
if not conninfo.hostname:
conninfo.hostname = DEFAULT_HOST
- for part in conninfo.hostname.split("/"):
+ for part in conninfo.hostname.split('/'):
if not hostname:
- hostname = "mongodb://" + part
+ hostname = 'mongodb://' + part
continue
dbname = part
- if "?" in part:
+ if '?' in part:
# In case someone is passing options
# to the mongodb connection. Right now
# it is not permitted by kombu
- dbname, options = part.split("?")
- hostname += "/?" + options
+ dbname, options = part.split('?')
+ hostname += '/?' + options
# At this point we expect the hostname to be something like
# (considering replica set form too):
@@ -126,14 +126,14 @@ class Channel(virtual.Channel):
# mongodb://[username:password@]host1[:port1][,host2[:port2],
# ...[,hostN[:portN]]][/[?options]]
mongoconn = Connection(host=hostname)
- version = mongoconn.server_info()["version"]
- if tuple(map(int, version.split(".")[:2])) < (1, 3):
+ version = mongoconn.server_info()['version']
+ if tuple(map(int, version.split('.')[:2])) < (1, 3):
raise NotImplementedError(
- "Kombu requires MongoDB version 1.3+, but connected to %s" % (
+ 'Kombu requires MongoDB version 1.3+, but connected to %s' % (
version, ))
- if not dbname or dbname == "/":
- dbname = "kombu_default"
+ if not dbname or dbname == '/':
+ dbname = 'kombu_default'
database = getattr(mongoconn, dbname)
@@ -143,56 +143,56 @@ class Channel(virtual.Channel):
self.db = database
col = database.messages
- col.ensure_index([("queue", 1)])
+ col.ensure_index([('queue', 1)])
- if "messages.broadcast" not in database.collection_names():
+ if 'messages.broadcast' not in database.collection_names():
capsize = conninfo.transport_options.get(
- "capped_queue_size") or 100000
- database.create_collection("messages.broadcast", size=capsize,
+ 'capped_queue_size') or 100000
+ database.create_collection('messages.broadcast', size=capsize,
capped=True)
- self.bcast = getattr(database, "messages.broadcast")
- self.bcast.ensure_index([("queue", 1)])
+ self.bcast = getattr(database, 'messages.broadcast')
+ self.bcast.ensure_index([('queue', 1)])
- self.routing = getattr(database, "messages.routing")
- self.routing.ensure_index([("queue", 1), ("exchange", 1)])
+ self.routing = getattr(database, 'messages.routing')
+ self.routing.ensure_index([('queue', 1), ('exchange', 1)])
return database
#TODO: Store a more complete exchange metatable in the routing collection
def get_table(self, exchange):
"""Get table of bindings for ``exchange``."""
brokerRoutes = self.client.messages.routing.find({
- "exchange": exchange})
+ 'exchange': exchange})
- localRoutes = self.state.exchanges[exchange]["table"]
+ localRoutes = self.state.exchanges[exchange]['table']
for route in brokerRoutes:
- localRoutes.append((route["routing_key"],
- route["pattern"],
- route["queue"]))
+ localRoutes.append((route['routing_key'],
+ route['pattern'],
+ route['queue']))
return set(localRoutes)
def _put_fanout(self, exchange, message, **kwargs):
"""Deliver fanout message."""
- self.client.messages.broadcast.insert({"payload": dumps(message),
- "queue": exchange})
+ self.client.messages.broadcast.insert({'payload': dumps(message),
+ 'queue': exchange})
def _queue_bind(self, exchange, routing_key, pattern, queue):
- if self.typeof(exchange).type == "fanout":
- cursor = self.bcast.find(query={"queue": exchange},
- sort=[("$natural", 1)], tailable=True)
+ if self.typeof(exchange).type == 'fanout':
+ cursor = self.bcast.find(query={'queue': exchange},
+ sort=[('$natural', 1)], tailable=True)
# Fast forward the cursor past old events
self._queue_cursors[queue] = cursor.skip(cursor.count())
self._queue_readcounts[queue] = cursor.count()
self._fanout_queues[queue] = exchange
- meta = {"exchange": exchange,
- "queue": queue,
- "routing_key": routing_key,
- "pattern": pattern}
+ meta = {'exchange': exchange,
+ 'queue': queue,
+ 'routing_key': routing_key,
+ 'pattern': pattern}
self.client.messages.routing.update(meta, meta, upsert=True)
def queue_delete(self, queue, **kwargs):
- self.routing.remove({"queue": queue})
+ self.routing.remove({'queue': queue})
super(Channel, self).queue_delete(queue, **kwargs)
if queue in self._fanout_queues:
self._queue_cursors[queue].close()
@@ -215,8 +215,8 @@ class Transport(virtual.Transport):
channel_errors = (StdChannelError,
errors.ConnectionFailure,
errors.OperationFailure, )
- driver_type = "mongodb"
- driver_name = "pymongo"
+ driver_type = 'mongodb'
+ driver_name = 'pymongo'
def driver_version(self):
return pymongo.version
diff --git a/kombu/transport/pika.py b/kombu/transport/pika.py
index 441848c5..a8ba69e5 100644
--- a/kombu/transport/pika.py
+++ b/kombu/transport/pika.py
@@ -22,7 +22,7 @@ from pika import channel # must be here to raise import error
try:
from pika import asyncore_adapter
except ImportError:
- raise VersionMismatch("Kombu only works with pika version 0.5.2")
+ raise VersionMismatch('Kombu only works with pika version 0.5.2')
from pika import blocking_adapter
from pika import connection
from pika import exceptions
@@ -32,11 +32,11 @@ from pika.spec import Basic, BasicProperties
DEFAULT_PORT = 5672
-BASIC_PROPERTIES = ("content_type", "content_encoding",
- "headers", "delivery_mode", "priority",
- "correlation_id", "reply_to", "expiration",
- "message_id", "timestamp", "type", "user_id",
- "app_id", "cluster_id")
+BASIC_PROPERTIES = ('content_type', 'content_encoding',
+ 'headers', 'delivery_mode', 'priority',
+ 'correlation_id', 'reply_to', 'expiration',
+ 'message_id', 'timestamp', 'type', 'user_id',
+ 'app_id', 'cluster_id')
class Message(base.Message):
@@ -46,14 +46,14 @@ class Message(base.Message):
propdict = dict(zip(BASIC_PROPERTIES,
attrgetter(*BASIC_PROPERTIES)(props)))
- kwargs.update({"body": body,
- "delivery_tag": method.delivery_tag,
- "content_type": props.content_type,
- "content_encoding": props.content_encoding,
- "headers": props.headers,
- "properties": propdict,
- "delivery_info": dict(
- consumer_tag=getattr(method, "consumer_tag", None),
+ kwargs.update({'body': body,
+ 'delivery_tag': method.delivery_tag,
+ 'content_type': props.content_type,
+ 'content_encoding': props.content_encoding,
+ 'headers': props.headers,
+ 'properties': propdict,
+ 'delivery_info': dict(
+ consumer_tag=getattr(method, 'consumer_tag', None),
routing_key=method.routing_key,
delivery_tag=method.delivery_tag,
redelivered=method.redelivered,
@@ -129,8 +129,8 @@ class Channel(channel.Channel, base.StdChannel):
def close(self):
super(Channel, self).close()
- if getattr(self, "handler", None):
- if getattr(self.handler, "connection", None):
+ if getattr(self, 'handler', None):
+ if getattr(self.handler, 'connection', None):
self.handler.connection.channels.pop(
self.handler.channel_number, None)
self.handler.connection = None
@@ -181,7 +181,7 @@ class AsyncoreConnection(asyncore_adapter.AsyncoreConnection):
current_events = self._event_counter
self.drain_events(timeout=timeout)
if timeout and self._event_counter <= current_events:
- raise socket.timeout("timed out")
+ raise socket.timeout('timed out')
def on_data_available(self, buf):
self._event_counter += 1
@@ -212,12 +212,12 @@ class SyncTransport(base.Transport):
exceptions.DuplicateConsumerTag,
exceptions.UnknownConsumerTag,
exceptions.ProtocolSyntaxError)
- driver_type = "amqp"
- driver_name = "pika"
+ driver_type = 'amqp'
+ driver_name = 'pika'
def __init__(self, client, **kwargs):
self.client = client
- self.default_port = kwargs.get("default_port", self.default_port)
+ self.default_port = kwargs.get('default_port', self.default_port)
def driver_version(self):
import pika
@@ -249,8 +249,8 @@ class SyncTransport(base.Transport):
@property
def default_connection_params(self):
- return {"hostname": "localhost", "port": self.default_port,
- "userid": "guest", "password": "guest"}
+ return {'hostname': 'localhost', 'port': self.default_port,
+ 'userid': 'guest', 'password': 'guest'}
class AsyncoreTransport(SyncTransport):
diff --git a/kombu/transport/pika2.py b/kombu/transport/pika2.py
index 864ea300..bf1ab721 100644
--- a/kombu/transport/pika2.py
+++ b/kombu/transport/pika2.py
@@ -25,11 +25,11 @@ from pika.adapters import blocking_connection as blocking
from pika import exceptions
DEFAULT_PORT = 5672
-BASIC_PROPERTIES = ("content_type", "content_encoding",
- "headers", "delivery_mode", "priority",
- "correlation_id", "reply_to", "expiration",
- "message_id", "timestamp", "type", "user_id",
- "app_id", "cluster_id")
+BASIC_PROPERTIES = ('content_type', 'content_encoding',
+ 'headers', 'delivery_mode', 'priority',
+ 'correlation_id', 'reply_to', 'expiration',
+ 'message_id', 'timestamp', 'type', 'user_id',
+ 'app_id', 'cluster_id')
class Message(base.Message):
@@ -39,14 +39,14 @@ class Message(base.Message):
propdict = dict(zip(BASIC_PROPERTIES,
attrgetter(*BASIC_PROPERTIES)(props)))
- kwargs.update({"body": body,
- "delivery_tag": method.delivery_tag,
- "content_type": props.content_type,
- "content_encoding": props.content_encoding,
- "headers": props.headers,
- "properties": propdict,
- "delivery_info": dict(
- consumer_tag=getattr(method, "consumer_tag", None),
+ kwargs.update({'body': body,
+ 'delivery_tag': method.delivery_tag,
+ 'content_type': props.content_type,
+ 'content_encoding': props.content_encoding,
+ 'headers': props.headers,
+ 'properties': propdict,
+ 'delivery_info': dict(
+ consumer_tag=getattr(method, 'consumer_tag', None),
routing_key=method.routing_key,
delivery_tag=method.delivery_tag,
redelivered=method.redelivered,
@@ -124,8 +124,8 @@ class Channel(blocking.BlockingChannel, base.StdChannel):
def close(self, *args):
super(Channel, self).close(*args)
self.connection = None
- if getattr(self, "handler", None):
- if getattr(self.handler, "connection", None):
+ if getattr(self, 'handler', None):
+ if getattr(self.handler, 'connection', None):
self.handler.connection.channels.pop(
self.handler.channel_number, None)
self.handler.connection = None
@@ -169,8 +169,8 @@ class Connection(blocking.BlockingConnection):
super(Connection, self).close(*args)
-AuthenticationError = getattr(exceptions, "AuthenticationError",
- getattr(exceptions, "LoginError"))
+AuthenticationError = getattr(exceptions, 'AuthenticationError',
+ getattr(exceptions, 'LoginError'))
class Transport(base.Transport):
@@ -192,12 +192,12 @@ class Transport(base.Transport):
exceptions.DuplicateConsumerTag,
exceptions.UnknownConsumerTag,
exceptions.ProtocolSyntaxError)
- driver_type = "amqp"
- driver_name = "pika"
+ driver_type = 'amqp'
+ driver_name = 'pika'
def __init__(self, client, **kwargs):
self.client = client
- self.default_port = kwargs.get("default_port", self.default_port)
+ self.default_port = kwargs.get('default_port', self.default_port)
def driver_version(self):
return pika.__version__
@@ -231,5 +231,5 @@ class Transport(base.Transport):
@property
def default_connection_params(self):
- return {"hostname": "localhost", "port": self.default_port,
- "userid": "guest", "password": "guest"}
+ return {'hostname': 'localhost', 'port': self.default_port,
+ 'userid': 'guest', 'password': 'guest'}
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 6786f683..9125c55e 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -36,7 +36,7 @@ except ImportError:
from . import virtual
-logger = get_logger("kombu.transport.redis")
+logger = get_logger('kombu.transport.redis')
DEFAULT_PORT = 6379
DEFAULT_DB = 0
@@ -67,7 +67,7 @@ class QoS(virtual.QoS):
def append(self, message, delivery_tag):
delivery = message.delivery_info
- EX, RK = delivery["exchange"], delivery["routing_key"]
+ EX, RK = delivery['exchange'], delivery['routing_key']
self.client.pipeline() \
.zadd(self.unacked_index_key, delivery_tag, time()) \
.hset(self.unacked_key, delivery_tag,
@@ -173,7 +173,7 @@ class MultiChannelPoller(object):
def _register_BRPOP(self, channel):
"""enable BRPOP mode for channel."""
- ident = channel, channel.client, "BRPOP"
+ ident = channel, channel.client, 'BRPOP'
if channel.client.connection._sock is None or \
ident not in self._chan_to_sock:
channel._in_poll = False
@@ -186,7 +186,7 @@ class MultiChannelPoller(object):
"""enable LISTEN mode for channel."""
if channel.subclient.connection._sock is None:
channel._in_listen = False
- self._register(channel, channel.subclient, "LISTEN")
+ self._register(channel, channel.subclient, 'LISTEN')
if not channel._in_listen:
channel._subscribe() # send SUBSCRIBE
@@ -239,21 +239,21 @@ class Channel(virtual.Channel):
_client = None
_subclient = None
supports_fanout = True
- keyprefix_queue = "_kombu.binding.%s"
+ keyprefix_queue = '_kombu.binding.%s'
sep = '\x06\x16'
_in_poll = False
_in_listen = False
_fanout_queues = {}
- unacked_key = "unacked"
- unacked_index_key = "unacked_index"
+ unacked_key = 'unacked'
+ unacked_index_key = 'unacked_index'
visibility_timeout = 3600 # 1 hour
priority_steps = PRIORITY_STEPS
from_transport_options = (virtual.Channel.from_transport_options
- + ("unacked_key",
- "unacked_index_key",
- "visibility_timeout",
- "priority_steps"))
+ + ('unacked_key',
+ 'unacked_index_key',
+ 'visibility_timeout',
+ 'priority_steps'))
def __init__(self, *args, **kwargs):
super_ = super(Channel, self)
@@ -265,7 +265,7 @@ class Channel(virtual.Channel):
self.active_fanout_queues = set()
self.auto_delete_queues = set()
self._fanout_to_queue = {}
- self.handlers = {"BRPOP": self._brpop_read, "LISTEN": self._receive}
+ self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive}
# Evaluate connection.
self.client.info()
@@ -278,13 +278,13 @@ class Channel(virtual.Channel):
def _do_restore_message(self, payload, exchange, routing_key):
try:
try:
- payload["headers"]["redelivered"] = True
+ payload['headers']['redelivered'] = True
except KeyError:
pass
for queue in self._lookup(exchange, routing_key):
self._avail_client.lpush(queue, dumps(payload))
except Exception:
- logger.critical("Could not restore message: %r", payload,
+ logger.critical('Could not restore message: %r', payload,
exc_info=True)
def _restore(self, message, payload=None):
@@ -335,14 +335,14 @@ class Channel(virtual.Channel):
self._in_listen = True
def _handle_message(self, client, r):
- if r[0] == "unsubscribe" and r[2] == 0:
+ if r[0] == 'unsubscribe' and r[2] == 0:
client.subscribed = False
- elif r[0] == "pmessage":
- return {"type": r[0], "pattern": r[1],
- "channel": r[2], "data": r[3]}
+ elif r[0] == 'pmessage':
+ return {'type': r[0], 'pattern': r[1],
+ 'channel': r[2], 'data': r[3]}
else:
- return {"type": r[0], "pattern": None,
- "channel": r[1], "data": r[2]}
+ return {'type': r[0], 'pattern': None,
+ 'channel': r[1], 'data': r[2]}
def _receive(self):
c = self.subclient
@@ -353,9 +353,9 @@ class Channel(virtual.Channel):
self._in_listen = False
if response is not None:
payload = self._handle_message(c, response)
- if payload["type"] == "message":
- return (loads(payload["data"]),
- self._fanout_to_queue[payload["channel"]])
+ if payload['type'] == 'message':
+ return (loads(payload['data']),
+ self._fanout_to_queue[payload['channel']])
raise Empty()
def _brpop_start(self, timeout=1):
@@ -364,14 +364,14 @@ class Channel(virtual.Channel):
return
keys = [self._q_for_pri(queue, pri) for pri in PRIORITY_STEPS
for queue in queues] + [timeout or 0]
- self.client.connection.send_command("BRPOP", *keys)
+ self.client.connection.send_command('BRPOP', *keys)
self._in_poll = True
def _brpop_read(self, **options):
try:
try:
dest__item = self.client.parse_response(self.client.connection,
- "BRPOP",
+ 'BRPOP',
**options)
except self.connection_errors:
# if there's a ConnectionError, disconnect so the next
@@ -419,7 +419,7 @@ class Channel(virtual.Channel):
"""Deliver message."""
try:
pri = max(min(int(
- message["properties"]["delivery_info"]["priority"]), 9), 0)
+ message['properties']['delivery_info']['priority']), 9), 0)
except (TypeError, ValueError, KeyError):
pri = 0
self._avail_client.lpush(self._q_for_pri(queue, pri), dumps(message))
@@ -433,20 +433,20 @@ class Channel(virtual.Channel):
self.auto_delete_queues.add(queue)
def _queue_bind(self, exchange, routing_key, pattern, queue):
- if self.typeof(exchange).type == "fanout":
+ if self.typeof(exchange).type == 'fanout':
# Mark exchange as fanout.
self._fanout_queues[queue] = exchange
self._avail_client.sadd(self.keyprefix_queue % (exchange, ),
- self.sep.join([routing_key or "",
- pattern or "",
- queue or ""]))
+ self.sep.join([routing_key or '',
+ pattern or '',
+ queue or '']))
def _delete(self, queue, exchange, routing_key, pattern, *args):
self.auto_delete_queues.discard(queue)
self._avail_client.srem(self.keyprefix_queue % (exchange, ),
- self.sep.join([routing_key or "",
- pattern or "",
- queue or ""]))
+ self.sep.join([routing_key or '',
+ pattern or '',
+ queue or '']))
cmds = self._avail_client.pipeline()
for pri in PRIORITY_STEPS:
cmds = cmds.delete(self._q_for_pri(queue, pri))
@@ -463,7 +463,7 @@ class Channel(virtual.Channel):
values = self.client.smembers(key)
if not values:
raise InconsistencyError(
- "Queue list empty or key does not exist: %r" % (
+ 'Queue list empty or key does not exist: %r' % (
self.keyprefix_queue % exchange))
return [tuple(val.split(self.sep)) for val in values]
@@ -486,7 +486,7 @@ class Channel(virtual.Channel):
self.queue_delete(queue)
# Close connections
- for attr in "client", "subclient":
+ for attr in 'client', 'subclient':
try:
self.__dict__[attr].connection.disconnect()
except (KeyError, AttributeError, self.ResponseError):
@@ -497,28 +497,28 @@ class Channel(virtual.Channel):
conninfo = self.connection.client
database = conninfo.virtual_host
if not isinstance(database, int):
- if not database or database == "/":
+ if not database or database == '/':
database = DEFAULT_DB
- elif database.startswith("/"):
+ elif database.startswith('/'):
database = database[1:]
try:
database = int(database)
except ValueError:
raise ValueError(
- "Database name must be int between 0 and limit - 1")
+ 'Database name must be int between 0 and limit - 1')
- return self.Client(host=conninfo.hostname or "127.0.0.1",
+ return self.Client(host=conninfo.hostname or '127.0.0.1',
port=conninfo.port or DEFAULT_PORT,
db=database,
password=conninfo.password)
def _get_client(self):
- version = getattr(redis, "__version__", (0, 0, 0))
- version = tuple(map(int, version.split(".")))
+ version = getattr(redis, '__version__', (0, 0, 0))
+ version = tuple(map(int, version.split('.')))
if version < (2, 4, 4):
raise VersionMismatch(
- "Redis transport requires redis-py versions 2.4.4 or later. "
- "You have %r" % (".".join(map(str_t, version)), ))
+ 'Redis transport requires redis-py versions 2.4.4 or later. '
+ 'You have %r' % ('.'.join(map(str_t, version)), ))
# KombuRedis maintains a connection attribute on it's instance and
# uses that when executing commands
@@ -565,7 +565,7 @@ class Channel(virtual.Channel):
client = self._create_client()
pubsub = client.pubsub()
pool = pubsub.connection_pool
- pubsub.connection = pool.get_connection("pubsub", pubsub.shard_hint)
+ pubsub.connection = pool.get_connection('pubsub', pubsub.shard_hint)
return pubsub
def _update_cycle(self):
@@ -599,8 +599,8 @@ class Transport(virtual.Transport):
polling_interval = None # disable sleep between unsuccessful polls.
default_port = DEFAULT_PORT
- driver_type = "redis"
- driver_name = "redis"
+ driver_type = 'redis'
+ driver_name = 'redis'
def __init__(self, *args, **kwargs):
super(Transport, self).__init__(*args, **kwargs)
@@ -639,7 +639,7 @@ class Transport(virtual.Transport):
"""Utility to import redis-py's exceptions at runtime."""
from redis import exceptions
# This exception suddenly changed name between redis-py versions
- if hasattr(exceptions, "InvalidData"):
+ if hasattr(exceptions, 'InvalidData'):
DataError = exceptions.InvalidData
else:
DataError = exceptions.DataError
diff --git a/kombu/transport/sqlalchemy/__init__.py b/kombu/transport/sqlalchemy/__init__.py
index 921c8374..2c461c8a 100644
--- a/kombu/transport/sqlalchemy/__init__.py
+++ b/kombu/transport/sqlalchemy/__init__.py
@@ -14,7 +14,7 @@ from .models import Queue, Message, metadata
VERSION = (1, 1, 0)
-__version__ = ".".join(map(str, VERSION))
+__version__ = '.'.join(map(str, VERSION))
class Channel(virtual.Channel):
@@ -110,8 +110,8 @@ class Transport(virtual.Transport):
default_port = 0
connection_errors = ()
channel_errors = (StdChannelError, )
- driver_type = "sql"
- driver_name = "sqlalchemy"
+ driver_type = 'sql'
+ driver_name = 'sqlalchemy'
def driver_version(self):
import sqlalchemy
diff --git a/kombu/transport/sqlalchemy/models.py b/kombu/transport/sqlalchemy/models.py
index 8c4ca857..7437ea76 100644
--- a/kombu/transport/sqlalchemy/models.py
+++ b/kombu/transport/sqlalchemy/models.py
@@ -11,43 +11,43 @@ ModelBase = declarative_base(metadata=metadata)
class Queue(ModelBase):
- __tablename__ = "kombu_queue"
- __table_args__ = {"sqlite_autoincrement": True, 'mysql_engine': 'InnoDB'}
+ __tablename__ = 'kombu_queue'
+ __table_args__ = {'sqlite_autoincrement': True, 'mysql_engine': 'InnoDB'}
- id = Column(Integer, Sequence("queue_id_sequence"), primary_key=True,
+ id = Column(Integer, Sequence('queue_id_sequence'), primary_key=True,
autoincrement=True)
name = Column(String(200), unique=True)
- messages = relation("Message", backref="queue", lazy="noload")
+ messages = relation('Message', backref='queue', lazy='noload')
def __init__(self, name):
self.name = name
def __str__(self):
- return "<Queue(%s)>" % (self.name)
+ return '<Queue(%s)>' % (self.name)
class Message(ModelBase):
- __tablename__ = "kombu_message"
- __table_args__ = {"sqlite_autoincrement": True, 'mysql_engine': 'InnoDB'}
+ __tablename__ = 'kombu_message'
+ __table_args__ = {'sqlite_autoincrement': True, 'mysql_engine': 'InnoDB'}
- id = Column(Integer, Sequence("message_id_sequence"), primary_key=True,
+ id = Column(Integer, Sequence('message_id_sequence'), primary_key=True,
autoincrement=True)
visible = Column(Boolean, default=True, index=True)
sent_at = Column('timestamp', DateTime, nullable=True, index=True,
onupdate=datetime.datetime.now)
payload = Column(Text, nullable=False)
- queue_id = Column(Integer, ForeignKey("kombu_queue.id",
- name="FK_kombu_message_queue"))
+ queue_id = Column(Integer, ForeignKey('kombu_queue.id',
+ name='FK_kombu_message_queue'))
version = Column(SmallInteger, nullable=False, default=1)
- __mapper_args__ = {"version_id_col": version}
+ __mapper_args__ = {'version_id_col': version}
def __init__(self, payload, queue):
self.payload = payload
self.queue = queue
def __str__(self):
- return "<Message(%s, %s, %s, %s)>" % (self.visible,
+ return '<Message(%s, %s, %s, %s)>' % (self.visible,
self.sent_at,
self.payload,
self.queue_id)
diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py
index 011dab6a..22d345af 100644
--- a/kombu/transport/virtual/__init__.py
+++ b/kombu/transport/virtual/__init__.py
@@ -181,19 +181,19 @@ class QoS(object):
if not self.restore_at_shutdown:
return
- elif not self.channel.do_restore or getattr(state, "restored", None):
+ elif not self.channel.do_restore or getattr(state, 'restored', None):
assert not state
return
try:
if state:
- say("Restoring %r unacknowledged message(s).",
+ say('Restoring %r unacknowledged message(s).',
len(self._delivered))
unrestored = self.restore_unacked()
if unrestored:
errors, messages = zip(*unrestored)
- say("UNABLE TO RESTORE %s MESSAGES: %s",
+ say('UNABLE TO RESTORE %s MESSAGES: %s',
len(errors), errors)
emergency_dump_state(messages)
finally:
@@ -204,32 +204,32 @@ class Message(base.Message):
def __init__(self, channel, payload, **kwargs):
self._raw = payload
- properties = payload["properties"]
- body = payload.get("body")
+ properties = payload['properties']
+ body = payload.get('body')
if body:
- body = channel.decode_body(body, properties.get("body_encoding"))
- fields = {"body": body,
- "delivery_tag": properties["delivery_tag"],
- "content_type": payload.get("content-type"),
- "content_encoding": payload.get("content-encoding"),
- "headers": payload.get("headers"),
- "properties": properties,
- "delivery_info": properties.get("delivery_info"),
- "postencode": "utf-8"}
+ body = channel.decode_body(body, properties.get('body_encoding'))
+ fields = {'body': body,
+ 'delivery_tag': properties['delivery_tag'],
+ 'content_type': payload.get('content-type'),
+ 'content_encoding': payload.get('content-encoding'),
+ 'headers': payload.get('headers'),
+ 'properties': properties,
+ 'delivery_info': properties.get('delivery_info'),
+ 'postencode': 'utf-8'}
super(Message, self).__init__(channel, **dict(kwargs, **fields))
def serializable(self):
props = self.properties
body, _ = self.channel.encode_body(self.body,
- props.get("body_encoding"))
+ props.get('body_encoding'))
headers = dict(self.headers)
# remove compression header
- headers.pop("compression", None)
- return {"body": body,
- "properties": props,
- "content-type": self.content_type,
- "content-encoding": self.content_encoding,
- "headers": self.headers}
+ headers.pop('compression', None)
+ return {'body': body,
+ 'properties': props,
+ 'content-type': self.content_type,
+ 'content-encoding': self.content_encoding,
+ 'headers': self.headers}
class AbstractChannel(object):
@@ -243,15 +243,15 @@ class AbstractChannel(object):
def _get(self, queue, timeout=None):
"""Get next message from `queue`."""
- raise NotImplementedError("Virtual channels must implement _get")
+ raise NotImplementedError('Virtual channels must implement _get')
def _put(self, queue, message):
"""Put `message` onto `queue`."""
- raise NotImplementedError("Virtual channels must implement _put")
+ raise NotImplementedError('Virtual channels must implement _put')
def _purge(self, queue):
"""Remove all messages from `queue`."""
- raise NotImplementedError("Virtual channels must implement _purge")
+ raise NotImplementedError('Virtual channels must implement _purge')
def _size(self, queue):
"""Return the number of messages in `queue` as an :class:`int`."""
@@ -313,21 +313,21 @@ class Channel(AbstractChannel, base.StdChannel):
supports_fanout = False
#: Binary <-> ASCII codecs.
- codecs = {"base64": Base64()}
+ codecs = {'base64': Base64()}
#: Default body encoding.
- #: NOTE: ``transport_options["body_encoding"]`` will override this value.
- body_encoding = "base64"
+ #: NOTE: ``transport_options['body_encoding']`` will override this value.
+ body_encoding = 'base64'
#: counter used to generate delivery tags for this channel.
_next_delivery_tag = count(1).next
#: Optional queue where messages with no route is delivered.
- #: Set by ``transport_options["deadletter_queue"]``.
+ #: Set by ``transport_options['deadletter_queue']``.
deadletter_queue = None
# List of options to transfer from :attr:`transport_options`.
- from_transport_options = ("body_encoding", "deadletter_queue")
+ from_transport_options = ('body_encoding', 'deadletter_queue')
def __init__(self, connection, **kwargs):
self.connection = connection
@@ -351,15 +351,15 @@ class Channel(AbstractChannel, base.StdChannel):
except KeyError:
pass
- def exchange_declare(self, exchange, type="direct", durable=False,
+ def exchange_declare(self, exchange, type='direct', durable=False,
auto_delete=False, arguments=None, nowait=False, passive=False):
"""Declare exchange."""
if passive:
if exchange not in self.state.exchanges:
- raise StdChannelError("404",
- u"NOT_FOUND - no exchange %r in vhost %r" % (
+ raise StdChannelError('404',
+ u'NOT_FOUND - no exchange %r in vhost %r' % (
exchange, self.connection.client.virtual_host or '/'),
- (50, 10), "Channel.exchange_declare")
+ (50, 10), 'Channel.exchange_declare')
return
try:
prev = self.state.exchanges[exchange]
@@ -367,17 +367,17 @@ class Channel(AbstractChannel, base.StdChannel):
durable, auto_delete,
arguments):
raise NotEquivalentError(
- "Cannot redeclare exchange %r in vhost %r with "
- "different type, durable or autodelete value" % (
+ 'Cannot redeclare exchange %r in vhost %r with '
+ 'different type, durable or autodelete value' % (
exchange,
- self.connection.client.virtual_host or "/"))
+ self.connection.client.virtual_host or '/'))
except KeyError:
self.state.exchanges[exchange] = {
- "type": type,
- "durable": durable,
- "auto_delete": auto_delete,
- "arguments": arguments or {},
- "table": [],
+ 'type': type,
+ 'durable': durable,
+ 'auto_delete': auto_delete,
+ 'arguments': arguments or {},
+ 'table': [],
}
def exchange_delete(self, exchange, if_unused=False, nowait=False):
@@ -389,10 +389,10 @@ class Channel(AbstractChannel, base.StdChannel):
def queue_declare(self, queue, passive=False, **kwargs):
"""Declare queue."""
if passive and not self._has_queue(queue, **kwargs):
- raise StdChannelError("404",
- u"NOT_FOUND - no queue %r in vhost %r" % (
+ raise StdChannelError('404',
+ u'NOT_FOUND - no queue %r in vhost %r' % (
queue, self.connection.client.virtual_host or '/'),
- (50, 10), "Channel.queue_declare")
+ (50, 10), 'Channel.queue_declare')
else:
self._new_queue(queue, **kwargs)
return queue, self._size(queue), 0
@@ -413,12 +413,12 @@ class Channel(AbstractChannel, base.StdChannel):
def after_reply_message_received(self, queue):
self.queue_delete(queue)
- def queue_bind(self, queue, exchange, routing_key="", arguments=None,
+ def queue_bind(self, queue, exchange, routing_key='', arguments=None,
**kwargs):
"""Bind `queue` to `exchange` with `routing key`."""
if queue in self.state.bindings:
return
- table = self.state.exchanges[exchange].setdefault("table", [])
+ table = self.state.exchanges[exchange].setdefault('table', [])
self.state.bindings[queue] = exchange, routing_key, arguments
meta = self.typeof(exchange).prepare_bind(queue,
exchange,
@@ -440,12 +440,12 @@ class Channel(AbstractChannel, base.StdChannel):
def basic_publish(self, message, exchange, routing_key, **kwargs):
"""Publish message."""
- props = message["properties"]
- message["body"], props["body_encoding"] = \
- self.encode_body(message["body"], self.body_encoding)
- props["delivery_info"]["exchange"] = exchange
- props["delivery_info"]["routing_key"] = routing_key
- props["delivery_tag"] = self._next_delivery_tag()
+ props = message['properties']
+ message['body'], props['body_encoding'] = \
+ self.encode_body(message['body'], self.body_encoding)
+ props['delivery_info']['exchange'] = exchange
+ props['delivery_info']['routing_key'] = routing_key
+ props['delivery_tag'] = self._next_delivery_tag()
self.typeof(exchange).deliver(message,
exchange, routing_key, **kwargs)
@@ -492,7 +492,7 @@ class Channel(AbstractChannel, base.StdChannel):
"""Recover unacked messages."""
if requeue:
return self.qos.restore_unacked()
- raise NotImplementedError("Does not support recover(requeue=False)")
+ raise NotImplementedError('Does not support recover(requeue=False)')
def basic_reject(self, delivery_tag, requeue=False):
"""Reject message."""
@@ -512,11 +512,11 @@ class Channel(AbstractChannel, base.StdChannel):
def get_table(self, exchange):
"""Get table of bindings for `exchange`."""
- return self.state.exchanges[exchange]["table"]
+ return self.state.exchanges[exchange]['table']
def typeof(self, exchange):
"""Get the exchange type instance for `exchange`."""
- type = self.state.exchanges[exchange]["type"]
+ type = self.state.exchanges[exchange]['type']
return self.exchange_types[type]
def _lookup(self, exchange, routing_key, default=None):
@@ -535,7 +535,7 @@ class Channel(AbstractChannel, base.StdChannel):
if not R and default is not None:
warnings.warn(UndeliverableWarning(UNDELIVERABLE_FMT % {
- "exchange": exchange, "routing_key": routing_key}))
+ 'exchange': exchange, 'routing_key': routing_key}))
self._new_queue(default)
R = [default]
return R
@@ -544,14 +544,14 @@ class Channel(AbstractChannel, base.StdChannel):
"""Redeliver message to its original destination."""
delivery_info = message.delivery_info
message = message.serializable()
- message["redelivered"] = True
- for queue in self._lookup(delivery_info["exchange"],
- delivery_info["routing_key"]):
+ message['redelivered'] = True
+ for queue in self._lookup(delivery_info['exchange'],
+ delivery_info['routing_key']):
self._put(queue, message)
def drain_events(self, timeout=None):
if self._consumers and self.qos.can_consume():
- if hasattr(self, "_get_many"):
+ if hasattr(self, '_get_many'):
return self._get_many(self._active_queues, timeout=timeout)
return self._poll(self.cycle, timeout=timeout)
raise Empty()
@@ -567,14 +567,14 @@ class Channel(AbstractChannel, base.StdChannel):
properties=None):
"""Prepare message data."""
properties = properties or {}
- info = properties.setdefault("delivery_info", {})
- info["priority"] = priority or 0
+ info = properties.setdefault('delivery_info', {})
+ info['priority'] = priority or 0
- return {"body": message_data,
- "content-encoding": content_encoding,
- "content-type": content_type,
- "headers": headers or {},
- "properties": properties or {}}
+ return {'body': message_data,
+ 'content-encoding': content_encoding,
+ 'content-type': content_type,
+ 'headers': headers or {},
+ 'properties': properties or {}}
def flow(self, active=True):
"""Enable/disable message flow.
@@ -583,7 +583,7 @@ class Channel(AbstractChannel, base.StdChannel):
is not implemented by the base virtual implementation.
"""
- raise NotImplementedError("virtual channels does not support flow.")
+ raise NotImplementedError('virtual channels does not support flow.')
def close(self):
"""Close channel, cancel all consumers, and requeue unacked
@@ -690,7 +690,7 @@ class Transport(base.Transport):
self._callbacks = {}
self.cycle = self.Cycle(self._drain_channel, self.channels, Empty)
self._next_channel_id = count(1).next
- polling_interval = client.transport_options.get("polling_interval")
+ polling_interval = client.transport_options.get('polling_interval')
if polling_interval is not None:
self.polling_interval = polling_interval
@@ -760,4 +760,4 @@ class Transport(base.Transport):
@property
def default_connection_params(self):
- return {"port": self.default_port, "hostname": "localhost"}
+ return {'port': self.default_port, 'hostname': 'localhost'}
diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py
index a178847a..fed253ba 100644
--- a/kombu/transport/virtual/exchange.py
+++ b/kombu/transport/virtual/exchange.py
@@ -31,7 +31,7 @@ class ExchangeType(object):
:returns: `default` if no queues matched.
"""
- raise NotImplementedError("subclass responsibility")
+ raise NotImplementedError('subclass responsibility')
def prepare_bind(self, queue, exchange, routing_key, arguments):
"""Returns tuple of `(routing_key, regex, queue)` to be stored
@@ -41,15 +41,15 @@ class ExchangeType(object):
def equivalent(self, prev, exchange, type, durable, auto_delete,
arguments):
"""Returns true if `prev` and `exchange` is equivalent."""
- return (type == prev["type"] and
- durable == prev["durable"] and
- auto_delete == prev["auto_delete"] and
- (arguments or {}) == (prev["arguments"] or {}))
+ return (type == prev['type'] and
+ durable == prev['durable'] and
+ auto_delete == prev['auto_delete'] and
+ (arguments or {}) == (prev['arguments'] or {}))
class DirectExchange(ExchangeType):
"""The `direct` exchange routes based on exact routing keys."""
- type = "direct"
+ type = 'direct'
def lookup(self, table, exchange, routing_key, default):
return [queue for rkey, _, queue in table
@@ -66,11 +66,11 @@ class TopicExchange(ExchangeType):
"""The `topic` exchange routes messages based on words separated by
dots, using wildcard characters ``*`` (any single word), and ``#``
(one or more words)."""
- type = "topic"
+ type = 'topic'
#: map of wildcard to regex conversions
- wildcards = {"*": r".*?[^\.]",
- "#": r".*?"}
+ wildcards = {'*': r'.*?[^\.]',
+ '#': r'.*?'}
#: compiled regex cache
_compiled = {}
@@ -92,8 +92,8 @@ class TopicExchange(ExchangeType):
def key_to_pattern(self, rkey):
"""Get the corresponding regex for any routing key."""
- return "^%s$" % ("\.".join(self.wildcards.get(word, word)
- for word in rkey.split(".")))
+ return '^%s$' % ('\.'.join(self.wildcards.get(word, word)
+ for word in rkey.split('.')))
def _match(self, pattern, string):
"""Same as :func:`re.match`, except the regex is compiled and cached,
@@ -116,7 +116,7 @@ class FanoutExchange(ExchangeType):
for an example implementation of these methods.
"""
- type = "fanout"
+ type = 'fanout'
def lookup(self, table, exchange, routing_key, default):
return [queue for _, _, queue in table]
@@ -127,6 +127,6 @@ class FanoutExchange(ExchangeType):
#: Map of standard exchange types and corresponding classes.
-STANDARD_EXCHANGE_TYPES = {"direct": DirectExchange,
- "topic": TopicExchange,
- "fanout": FanoutExchange}
+STANDARD_EXCHANGE_TYPES = {'direct': DirectExchange,
+ 'topic': TopicExchange,
+ 'fanout': FanoutExchange}
diff --git a/kombu/transport/virtual/scheduling.py b/kombu/transport/virtual/scheduling.py
index 13fcb469..9fc3f8f0 100644
--- a/kombu/transport/virtual/scheduling.py
+++ b/kombu/transport/virtual/scheduling.py
@@ -48,5 +48,5 @@ class FairCycle(object):
pass
def __repr__(self):
- return "<FairCycle: %r/%r %r>" % (self.pos, len(self.resources),
+ return '<FairCycle: %r/%r %r>' % (self.pos, len(self.resources),
self.resources, )
diff --git a/kombu/transport/zookeeper.py b/kombu/transport/zookeeper.py
index 028256b1..4dcc3ab5 100644
--- a/kombu/transport/zookeeper.py
+++ b/kombu/transport/zookeeper.py
@@ -50,7 +50,7 @@ from . import virtual
DEFAULT_PORT = 2181
-__author__ = "Mahendra M <mahendra.m@gmail.com>"
+__author__ = 'Mahendra M <mahendra.m@gmail.com>'
class Channel(virtual.Channel):
@@ -62,7 +62,7 @@ class Channel(virtual.Channel):
def _put(self, queue, message, **kwargs):
try:
- priority = message["properties"]["delivery_info"]["priority"]
+ priority = message['properties']['delivery_info']['priority']
except KeyError:
priority = 0
@@ -164,9 +164,8 @@ class Transport(virtual.Transport):
kazoo.zkclient.NotEmptyException,
kazoo.zkclient.SessionExpiredException,
kazoo.zkclient.InvalidCallbackException)
- driver_type = "zookeeper"
- driver_name = "kazoo"
+ driver_type = 'zookeeper'
+ driver_name = 'kazoo'
def driver_version(self):
return kazoo.__version__
-
diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py
index 3f5d0701..8e230afe 100644
--- a/kombu/utils/__init__.py
+++ b/kombu/utils/__init__.py
@@ -23,10 +23,10 @@ try:
except:
ctypes = None # noqa
-__all__ = ["EqualityDict", "say", "uuid", "kwdict", "maybe_list",
- "fxrange", "fxrangemax", "retry_over_time",
- "emergency_dump_state", "cached_property",
- "reprkwargs", "reprcall", "nested"]
+__all__ = ['EqualityDict', 'say', 'uuid', 'kwdict', 'maybe_list',
+ 'fxrange', 'fxrangemax', 'retry_over_time',
+ 'emergency_dump_state', 'cached_property',
+ 'reprkwargs', 'reprcall', 'nested']
def eqhash(o):
@@ -52,7 +52,7 @@ class EqualityDict(dict):
def say(m, *s):
- sys.stderr.write(str(m) % s + "\n")
+ sys.stderr.write(str(m) % s + '\n')
def uuid4():
@@ -86,14 +86,14 @@ else:
see: http://bugs.python.org/issue4978.
"""
- return dict((key.encode("utf-8"), value)
+ return dict((key.encode('utf-8'), value)
for key, value in kwargs.items())
def maybe_list(v):
if v is None:
return []
- if hasattr(v, "__iter__"):
+ if hasattr(v, '__iter__'):
return v
return [v]
@@ -178,13 +178,13 @@ def emergency_dump_state(state, open_file=open, dump=None):
import pickle
dump = pickle.dump
persist = mktemp()
- say("EMERGENCY DUMP STATE TO FILE -> %s <-" % persist)
- fh = open_file(persist, "w")
+ say('EMERGENCY DUMP STATE TO FILE -> %s <-' % persist)
+ fh = open_file(persist, 'w')
try:
try:
dump(state, fh, protocol=0)
except Exception, exc:
- say("Cannot pickle state: %r. Fallback to pformat." % (exc, ))
+ say('Cannot pickle state: %r. Fallback to pformat.' % (exc, ))
fh.write(pformat(state))
finally:
fh.flush()
@@ -207,14 +207,14 @@ class cached_property(object):
@connection.setter # Prepares stored value
def connection(self, value):
if value is None:
- raise TypeError("Connection must be a connection")
+ raise TypeError('Connection must be a connection')
return value
@connection.deleter
def connection(self, value):
# Additional action to do at del(self.attr)
if value is not None:
- print("Connection %r deleted" % (value, ))
+ print('Connection %r deleted' % (value, ))
"""
@@ -260,13 +260,13 @@ class cached_property(object):
return self.__class__(self.__get, self.__set, fdel)
-def reprkwargs(kwargs, sep=', ', fmt="%s=%s"):
+def reprkwargs(kwargs, sep=', ', fmt='%s=%s'):
return sep.join(fmt % (k, _safe_repr(v)) for k, v in kwargs.iteritems())
def reprcall(name, args=(), kwargs=(), sep=', '):
- return "%s(%s%s%s)" % (name, sep.join(map(_safe_repr, args or ())),
- (args and kwargs) and sep or "",
+ return '%s(%s%s%s)' % (name, sep.join(map(_safe_repr, args or ())),
+ (args and kwargs) and sep or '',
reprkwargs(kwargs, sep))
diff --git a/kombu/utils/amq_manager.py b/kombu/utils/amq_manager.py
index b44ca721..0bb9ce4c 100644
--- a/kombu/utils/amq_manager.py
+++ b/kombu/utils/amq_manager.py
@@ -6,10 +6,10 @@ def get_manager(client, hostname=None, port=None, userid=None,
import pyrabbit
opt = client.transport_options.get
host = (hostname if hostname is not None
- else opt("manager_hostname", client.hostname))
- port = port if port is not None else opt("manager_port", 55672)
- return pyrabbit.Client("%s:%s" % (host, port),
+ else opt('manager_hostname', client.hostname))
+ port = port if port is not None else opt('manager_port', 55672)
+ return pyrabbit.Client('%s:%s' % (host, port),
userid if userid is not None
- else opt("manager_userid", client.userid),
+ else opt('manager_userid', client.userid),
password if password is not None
- else opt("manager_password", client.password))
+ else opt('manager_password', client.password))
diff --git a/kombu/utils/compat.py b/kombu/utils/compat.py
index 251bb67c..c1b3cec4 100644
--- a/kombu/utils/compat.py
+++ b/kombu/utils/compat.py
@@ -55,7 +55,7 @@ class CompatOrderedDict(dict, MutableMapping):
"""
if len(args) > 1:
- raise TypeError("expected at most 1 arguments, got %d" % (
+ raise TypeError('expected at most 1 arguments, got %d' % (
len(args)))
try:
self.__root
@@ -136,7 +136,7 @@ class CompatOrderedDict(dict, MutableMapping):
if isinstance(other, dict):
for key in other:
self[key] = other[key]
- elif hasattr(other, "keys"):
+ elif hasattr(other, 'keys'):
for key in other.keys():
self[key] = other[key]
else:
@@ -187,7 +187,7 @@ class CompatOrderedDict(dict, MutableMapping):
if not self:
raise KeyError('dictionary is empty')
if last:
- if sys.platform.startswith("java"):
+ if sys.platform.startswith('java'):
key = self.keys()[-1]
else:
key = reversed(self).next()
@@ -258,7 +258,7 @@ import platform as _platform
from stat import ST_DEV, ST_INO
-if _platform.system() == "Windows":
+if _platform.system() == 'Windows':
#since windows doesn't go with WatchedFileHandler use FileHandler instead
WatchedFileHandler = logging.FileHandler
else:
diff --git a/kombu/utils/debug.py b/kombu/utils/debug.py
index 5eb8a6a1..d8aba52c 100644
--- a/kombu/utils/debug.py
+++ b/kombu/utils/debug.py
@@ -16,11 +16,11 @@ from functools import wraps
from kombu.log import get_logger
-__all__ = ["setup_logging", "Logwrapped"]
+__all__ = ['setup_logging', 'Logwrapped']
-def setup_logging(loglevel=logging.DEBUG, loggers=["kombu.connection",
- "kombu.channel"]):
+def setup_logging(loglevel=logging.DEBUG, loggers=['kombu.connection',
+ 'kombu.channel']):
for logger in loggers:
l = get_logger(logger)
l.addHandler(logging.StreamHandler())
@@ -28,7 +28,7 @@ def setup_logging(loglevel=logging.DEBUG, loggers=["kombu.connection",
class Logwrapped(object):
- __ignore = ("__enter__", "__exit__")
+ __ignore = ('__enter__', '__exit__')
def __init__(self, instance, logger=None, ident=None):
self.instance = instance
@@ -43,18 +43,18 @@ class Logwrapped(object):
@wraps(meth)
def __wrapped(*args, **kwargs):
- info = ""
+ info = ''
if self.ident:
info += self.ident % vars(self.instance)
- info += "%s(" % (meth.__name__, )
+ info += '%s(' % (meth.__name__, )
if args:
- info += ", ".join(map(repr, args))
+ info += ', '.join(map(repr, args))
if kwargs:
if args:
- info += ", "
- info += ", ".join("%s=%r" % (key, value)
+ info += ', '
+ info += ', '.join('%s=%r' % (key, value)
for key, value in kwargs.iteritems())
- info += ")"
+ info += ')'
self.logger.debug(info)
return meth(*args, **kwargs)
diff --git a/kombu/utils/encoding.py b/kombu/utils/encoding.py
index f4edaf9e..5e779808 100644
--- a/kombu/utils/encoding.py
+++ b/kombu/utils/encoding.py
@@ -18,10 +18,10 @@ import traceback
is_py3k = sys.version_info >= (3, 0)
-if sys.platform.startswith("java"): # pragma: no cover
+if sys.platform.startswith('java'): # pragma: no cover
def default_encoding():
- return "utf-8"
+ return 'utf-8'
else:
def default_encoding(): # noqa
@@ -64,7 +64,7 @@ else:
return s
def from_utf8(s, *args, **kwargs): # noqa
- return s.encode("utf-8", *args, **kwargs)
+ return s.encode('utf-8', *args, **kwargs)
def default_encode(obj): # noqa
return unicode(obj, default_encoding())
@@ -74,33 +74,33 @@ else:
ensure_bytes = str_to_bytes
-def safe_str(s, errors="replace"):
+def safe_str(s, errors='replace'):
s = bytes_to_str(s)
if not isinstance(s, basestring):
return safe_repr(s, errors)
return _safe_str(s, errors)
-def _safe_str(s, errors="replace"):
+def _safe_str(s, errors='replace'):
if is_py3k: # pragma: no cover
if isinstance(s, str):
return s
try:
return str(s)
except Exception, exc:
- return "<Unrepresentable %r: %r %r>" % (
- type(s), exc, "\n".join(traceback.format_stack()))
+ return '<Unrepresentable %r: %r %r>' % (
+ type(s), exc, '\n'.join(traceback.format_stack()))
encoding = default_encoding()
try:
if isinstance(s, unicode):
return s.encode(encoding, errors)
return unicode(s, encoding, errors)
except Exception, exc:
- return "<Unrepresentable %r: %r %r>" % (
- type(s), exc, "\n".join(traceback.format_stack()))
+ return '<Unrepresentable %r: %r %r>' % (
+ type(s), exc, '\n'.join(traceback.format_stack()))
-def safe_repr(o, errors="replace"):
+def safe_repr(o, errors='replace'):
try:
return repr(o)
except Exception:
diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py
index 3d487068..e04e13c9 100644
--- a/kombu/utils/eventio.py
+++ b/kombu/utils/eventio.py
@@ -48,7 +48,7 @@ except ImportError:
from kombu.syn import detect_environment
-__all__ = ["poll"]
+__all__ = ['poll']
READ = POLL_READ = 0x001
WRITE = POLL_WRITE = 0x004
@@ -229,7 +229,7 @@ class _select(Poller):
def _get_poller():
- if detect_environment() in ("eventlet", "gevent"):
+ if detect_environment() != 'default':
# greenlet
return _select
elif epoll:
diff --git a/kombu/utils/finalize.py b/kombu/utils/finalize.py
index 9e0c1e6e..b8b13506 100644
--- a/kombu/utils/finalize.py
+++ b/kombu/utils/finalize.py
@@ -16,7 +16,7 @@ import weakref
from itertools import count
-__all__ = ["Finalize"]
+__all__ = ['Finalize']
class Finalize(object):
@@ -72,7 +72,7 @@ class Finalize(object):
try:
obj = self._weakref()
except (AttributeError, TypeError):
- return "<Finalize: (dead)>"
+ return '<Finalize: (dead)>'
if obj is None:
return
diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py
index 3c894dbd..34ff0657 100644
--- a/kombu/utils/functional.py
+++ b/kombu/utils/functional.py
@@ -40,8 +40,8 @@ class promise(object):
return self
def __reduce__(self):
- return (self.__class__, (self._fun, ), {"_args": self._args,
- "_kwargs": self._kwargs})
+ return (self.__class__, (self._fun, ), {'_args': self._args,
+ '_kwargs': self._kwargs})
def maybe_promise(value):
diff --git a/kombu/utils/limits.py b/kombu/utils/limits.py
index c97e76eb..60c6b201 100644
--- a/kombu/utils/limits.py
+++ b/kombu/utils/limits.py
@@ -10,7 +10,7 @@ Token bucket implementation for rate limiting.
"""
import time
-__all__ = ["TokenBucket"]
+__all__ = ['TokenBucket']
class TokenBucket(object):
diff --git a/kombu/utils/url.py b/kombu/utils/url.py
index eaee888f..af703381 100644
--- a/kombu/utils/url.py
+++ b/kombu/utils/url.py
@@ -12,7 +12,7 @@ def _parse_url(url):
scheme = urlparse(url).scheme
schemeless = url[len(scheme) + 3:]
# parse with HTTP URL semantics
- parts = urlparse("http://" + schemeless)
+ parts = urlparse('http://' + schemeless)
# The first pymongo.Connection() argument (host) can be
# a mongodb connection URI. If this is the case, don't