diff options
author | Ask Solem <ask@celeryproject.org> | 2012-06-15 19:32:40 +0200 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2012-06-15 19:32:40 +0200 |
commit | 5e7a32440f803321a58453efb3ea0fde0d4497b2 (patch) | |
tree | 3fe4f3e58b85b18105141d799779f30d043ecd82 /kombu | |
parent | 8d5652600e9afc76c803958638821a12018b8803 (diff) | |
download | kombu-5e7a32440f803321a58453efb3ea0fde0d4497b2.tar.gz |
Use single quotes
Diffstat (limited to 'kombu')
77 files changed, 1673 insertions, 1674 deletions
diff --git a/kombu/__init__.py b/kombu/__init__.py index 7aefc76f..8d29715c 100644 --- a/kombu/__init__.py +++ b/kombu/__init__.py @@ -2,11 +2,11 @@ from __future__ import absolute_import VERSION = (2, 2, 1) -__version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:]) -__author__ = "Ask Solem" -__contact__ = "ask@celeryproject.org" -__homepage__ = "http://kombu.readthedocs.org" -__docformat__ = "restructuredtext en" +__version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:]) +__author__ = 'Ask Solem' +__contact__ = 'ask@celeryproject.org' +__homepage__ = 'http://kombu.readthedocs.org' +__docformat__ = 'restructuredtext en' # -eof meta- @@ -16,20 +16,20 @@ import sys if sys.version_info < (2, 5): # pragma: no cover if sys.version_info >= (2, 4): raise Exception( - "Python 2.4 is not supported by this version. " - "Please use Kombu versions 1.x.") + 'Python 2.4 is not supported by this version. ' + 'Please use Kombu versions 1.x.') else: - raise Exception("Kombu requires Python versions 2.5 or later.") + raise Exception('Kombu requires Python versions 2.5 or later.') # Lazy loading. # - See werkzeug/__init__.py for the rationale behind this. from types import ModuleType all_by_module = { - "kombu.connection": ["BrokerConnection", "Connection"], - "kombu.entity": ["Exchange", "Queue"], - "kombu.messaging": ["Consumer", "Producer"], - "kombu.pools": ["connections", "producers"], + 'kombu.connection': ['BrokerConnection', 'Connection'], + 'kombu.entity': ['Exchange', 'Queue'], + 'kombu.messaging': ['Consumer', 'Producer'], + 'kombu.pools': ['connections', 'producers'], } object_origins = {} @@ -50,36 +50,36 @@ class module(ModuleType): def __dir__(self): result = list(new_module.__all__) - result.extend(("__file__", "__path__", "__doc__", "__all__", - "__docformat__", "__name__", "__path__", "VERSION", - "__package__", "__version__", "__author__", - "__contact__", "__homepage__", "__docformat__")) + result.extend(('__file__', '__path__', '__doc__', '__all__', + '__docformat__', '__name__', '__path__', 'VERSION', + '__package__', '__version__', '__author__', + '__contact__', '__homepage__', '__docformat__')) return result # 2.5 does not define __package__ try: package = __package__ except NameError: - package = "kombu" + package = 'kombu' # keep a reference to this module so that it's not garbage collected old_module = sys.modules[__name__] new_module = sys.modules[__name__] = module(__name__) new_module.__dict__.update({ - "__file__": __file__, - "__path__": __path__, - "__doc__": __doc__, - "__all__": tuple(object_origins), - "__version__": __version__, - "__author__": __author__, - "__contact__": __contact__, - "__homepage__": __homepage__, - "__docformat__": __docformat__, - "__package__": package, - "VERSION": VERSION}) + '__file__': __file__, + '__path__': __path__, + '__doc__': __doc__, + '__all__': tuple(object_origins), + '__version__': __version__, + '__author__': __author__, + '__contact__': __contact__, + '__homepage__': __homepage__, + '__docformat__': __docformat__, + '__package__': package, + 'VERSION': VERSION}) -if os.environ.get("KOMBU_LOG_DEBUG"): - os.environ.update(KOMBU_LOG_CHANNEL="1", KOMBU_LOG_CONNECTION="1") +if os.environ.get('KOMBU_LOG_DEBUG'): + os.environ.update(KOMBU_LOG_CHANNEL='1', KOMBU_LOG_CONNECTION='1') from .utils import debug debug.setup_logging() diff --git a/kombu/abstract.py b/kombu/abstract.py index 80a2d5bb..daa4077f 100644 --- a/kombu/abstract.py +++ b/kombu/abstract.py @@ -15,7 +15,7 @@ from copy import copy from .connection import maybe_channel from .exceptions import NotBoundError -__all__ = ["Object", "MaybeChannelBound"] +__all__ = ['Object', 'MaybeChannelBound'] def unpickle_dict(cls, kwargs): @@ -96,11 +96,11 @@ class MaybeChannelBound(Object): """Callback called when the class is bound.""" pass - def __repr__(self, item=""): + def __repr__(self, item=''): if self.is_bound: - return "<bound %s of %s>" % (item or self.__class__.__name__, + return '<bound %s of %s>' % (item or self.__class__.__name__, self.channel) - return "<unbound %s>" % (item, ) + return '<unbound %s>' % (item, ) @property def is_bound(self): diff --git a/kombu/clocks.py b/kombu/clocks.py index dbcae198..a45bc767 100644 --- a/kombu/clocks.py +++ b/kombu/clocks.py @@ -13,7 +13,7 @@ from __future__ import with_statement from threading import Lock -__all__ = ["LamportClock"] +__all__ = ['LamportClock'] class LamportClock(object): diff --git a/kombu/common.py b/kombu/common.py index b06002f9..87ebb200 100644 --- a/kombu/common.py +++ b/kombu/common.py @@ -23,11 +23,11 @@ from .log import Log from .messaging import Consumer as _Consumer from .utils import uuid -__all__ = ["Broadcast", "maybe_declare", "uuid", - "itermessages", "send_reply", "isend_reply", - "collect_replies", "insured", "ipublish"] +__all__ = ['Broadcast', 'maybe_declare', 'uuid', + 'itermessages', 'send_reply', 'isend_reply', + 'collect_replies', 'insured', 'ipublish'] -insured_logger = Log("kombu.insurance") +insured_logger = Log('kombu.insurance') class Broadcast(Queue): @@ -47,10 +47,10 @@ class Broadcast(Queue): def __init__(self, name=None, queue=None, **kwargs): return super(Broadcast, self).__init__( - name=queue or "bcast.%s" % (uuid(), ), - **dict({"alias": name, - "auto_delete": True, - "exchange": Exchange(name, type="fanout"), + name=queue or 'bcast.%s' % (uuid(), ), + **dict({'alias': name, + 'auto_delete': True, + 'exchange': Exchange(name, type='fanout'), }, **kwargs)) @@ -150,9 +150,9 @@ def send_reply(exchange, req, msg, producer=None, **props): serializer = serialization.registry.type_to_name[content_type] maybe_declare(exchange, producer.channel) producer.publish(msg, exchange=exchange, - **dict({"routing_key": req.properties["reply_to"], - "correlation_id": req.properties.get("correlation_id"), - "serializer": serializer}, + **dict({'routing_key': req.properties['reply_to'], + 'correlation_id': req.properties.get('correlation_id'), + 'serializer': serializer}, **props)) @@ -162,7 +162,7 @@ def isend_reply(pool, exchange, req, msg, props, **retry_policy): def collect_replies(conn, channel, queue, *args, **kwargs): - no_ack = kwargs.setdefault("no_ack", True) + no_ack = kwargs.setdefault('no_ack', True) received = False for body, message in itermessages(conn, channel, queue, *args, **kwargs): if not no_ack: @@ -175,7 +175,7 @@ def collect_replies(conn, channel, queue, *args, **kwargs): def _ensure_errback(exc, interval): insured_logger.error( - "Connection error: %r. Retry in %ss\n" % (exc, interval), + 'Connection error: %r. Retry in %ss\n' % (exc, interval), exc_info=True) diff --git a/kombu/compat.py b/kombu/compat.py index 96fa026a..88874aad 100644 --- a/kombu/compat.py +++ b/kombu/compat.py @@ -17,7 +17,7 @@ from itertools import count from . import messaging from .entity import Exchange, Queue -__all__ = ["Publisher", "Consumer"] +__all__ = ['Publisher', 'Consumer'] def _iterconsume(connection, consumer, no_ack=False, limit=None): @@ -29,9 +29,9 @@ def _iterconsume(connection, consumer, no_ack=False, limit=None): class Publisher(messaging.Producer): - exchange = "" - exchange_type = "direct" - routing_key = "" + exchange = '' + exchange_type = 'direct' + routing_key = '' durable = True auto_delete = False _closed = False @@ -78,14 +78,14 @@ class Publisher(messaging.Producer): class Consumer(messaging.Consumer): - queue = "" - exchange = "" - routing_key = "" - exchange_type = "direct" + queue = '' + exchange = '' + routing_key = '' + exchange_type = 'direct' durable = True exclusive = False auto_delete = False - exchange_type = "direct" + exchange_type = 'direct' _closed = False def __init__(self, connection, queue=None, exchange=None, @@ -146,12 +146,12 @@ class Consumer(messaging.Consumer): return message def process_next(self): - raise NotImplementedError("Use fetch(enable_callbacks=True)") + raise NotImplementedError('Use fetch(enable_callbacks=True)') def discard_all(self, filterfunc=None): if filterfunc is not None: raise NotImplementedError( - "discard_all does not implement filters") + 'discard_all does not implement filters') return self.purge() def iterconsume(self, limit=None, no_ack=None): diff --git a/kombu/compression.py b/kombu/compression.py index dcf583d8..5c88fee4 100644 --- a/kombu/compression.py +++ b/kombu/compression.py @@ -18,8 +18,8 @@ _aliases = {} _encoders = {} _decoders = {} -__all__ = ["register", "encoders", "get_encoder", - "get_decoder", "compress", "decompress"] +__all__ = ['register', 'encoders', 'get_encoder', + 'get_decoder', 'compress', 'decompress'] def register(encoder, decoder, content_type, aliases=[]): @@ -75,7 +75,7 @@ def decompress(body, content_type): register(zlib.compress, zlib.decompress, - "application/x-gzip", aliases=["gzip", "zlib"]) + 'application/x-gzip', aliases=['gzip', 'zlib']) try: import bz2 except ImportError: @@ -83,4 +83,4 @@ except ImportError: else: register(bz2.compress, bz2.decompress, - "application/x-bz2", aliases=["bzip2", "bzip"]) + 'application/x-bz2', aliases=['bzip2', 'bzip']) diff --git a/kombu/connection.py b/kombu/connection.py index 5395ed2e..8f4cc765 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -29,12 +29,12 @@ from .utils import cached_property, retry_over_time from .utils.compat import OrderedDict, LifoQueue as _LifoQueue from .utils.url import parse_url -_LOG_CONNECTION = os.environ.get("KOMBU_LOG_CONNECTION", False) -_LOG_CHANNEL = os.environ.get("KOMBU_LOG_CHANNEL", False) +_LOG_CONNECTION = os.environ.get('KOMBU_LOG_CONNECTION', False) +_LOG_CHANNEL = os.environ.get('KOMBU_LOG_CHANNEL', False) -__all__ = ["parse_url", "BrokerConnection", "Resource", - "ConnectionPool", "ChannelPool"] -URI_PASSTHROUGH = frozenset(["sqla", "sqlalchemy"]) +__all__ = ['parse_url', 'BrokerConnection', 'Resource', + 'ConnectionPool', 'ChannelPool'] +URI_PASSTHROUGH = frozenset(['sqla', 'sqlalchemy']) logger = get_logger(__name__) @@ -73,7 +73,7 @@ class BrokerConnection(object): """ port = None - virtual_host = "/" + virtual_host = '/' connect_timeout = 5 _closed = None @@ -91,29 +91,29 @@ class BrokerConnection(object): #: after a call to :meth:`drain_nowait`. more_to_read = False - def __init__(self, hostname="localhost", userid=None, + def __init__(self, hostname='localhost', userid=None, password=None, virtual_host=None, port=None, insist=False, ssl=False, transport=None, connect_timeout=5, transport_options=None, login_method=None, uri_prefix=None, **kwargs): # have to spell the args out, just to get nice docstrings :( - params = {"hostname": hostname, "userid": userid, - "password": password, "virtual_host": virtual_host, - "port": port, "insist": insist, "ssl": ssl, - "transport": transport, "connect_timeout": connect_timeout, - "login_method": login_method} - if hostname and "://" in hostname \ + params = {'hostname': hostname, 'userid': userid, + 'password': password, 'virtual_host': virtual_host, + 'port': port, 'insist': insist, 'ssl': ssl, + 'transport': transport, 'connect_timeout': connect_timeout, + 'login_method': login_method} + if hostname and '://' in hostname \ and transport not in URI_PASSTHROUGH: - if '+' in hostname[:hostname.index("://")]: + if '+' in hostname[:hostname.index('://')]: # e.g. sqla+mysql://root:masterkey@localhost/ - params["transport"], params["hostname"] = hostname.split('+') - self.uri_prefix = params["transport"] + params['transport'], params['hostname'] = hostname.split('+') + self.uri_prefix = params['transport'] else: params.update(parse_url(hostname)) self._init_params(**params) # backend_cls argument will be removed shortly. - self.transport_cls = self.transport_cls or kwargs.get("backend_cls") + self.transport_cls = self.transport_cls or kwargs.get('backend_cls') if transport_options is None: transport_options = {} @@ -140,9 +140,9 @@ class BrokerConnection(object): self.ssl = ssl self.transport_cls = transport - def _debug(self, msg, ident="[Kombu connection:0x%(id)x] ", **kwargs): + def _debug(self, msg, ident='[Kombu connection:0x%(id)x] ', **kwargs): if self._logger: # pragma: no cover - logger.debug((ident + unicode(msg)) % {"id": id(self)}, + logger.debug((ident + unicode(msg)) % {'id': id(self)}, **kwargs) def connect(self): @@ -152,12 +152,12 @@ class BrokerConnection(object): def channel(self): """Request a new channel.""" - self._debug("create channel") + self._debug('create channel') chan = self.transport.create_channel(self.connection) if _LOG_CHANNEL: # pragma: no cover from .utils.debug import Logwrapped - return Logwrapped(chan, "kombu.channel", - "[Kombu channel:%(channel_id)s] ") + return Logwrapped(chan, 'kombu.channel', + '[Kombu channel:%(channel_id)s] ') return chan def drain_events(self, **kwargs): @@ -208,7 +208,7 @@ class BrokerConnection(object): if self._transport: self._transport.client = None self._transport = None - self._debug("closed") + self._debug('closed') self._closed = True def release(self): @@ -298,7 +298,7 @@ class BrokerConnection(object): try: return fun(*args, **kwargs) except self.connection_errors + self.channel_errors, exc: - self._debug("ensure got exception: %r" % (exc, ), + self._debug('ensure got exception: %r' % (exc, ), exc_info=True) if got_connection: raise @@ -360,7 +360,7 @@ class BrokerConnection(object): def __call__(self, *args, **kwargs): if channels[0] is None: self.revive(create_channel()) - kwargs["channel"] = channels[0] + kwargs['channel'] = channels[0] return fun(*args, **kwargs), channels[0] revive = Revival() @@ -383,24 +383,24 @@ class BrokerConnection(object): return self.__class__(**dict(self._info(), **kwargs)) def _info(self): - transport_cls = self.transport_cls or "amqp" - transport_cls = {AMQP_ALIAS: "amqp"}.get(transport_cls, transport_cls) + transport_cls = self.transport_cls or 'amqp' + transport_cls = {AMQP_ALIAS: 'amqp'}.get(transport_cls, transport_cls) D = self.transport.default_connection_params hostname = self.hostname if self.uri_prefix: - hostname = "%s+%s" % (self.uri_prefix, hostname) - info = (("hostname", hostname or D.get("hostname")), - ("userid", self.userid or D.get("userid")), - ("password", self.password or D.get("password")), - ("virtual_host", self.virtual_host or D.get("virtual_host")), - ("port", self.port or D.get("port")), - ("insist", self.insist), - ("ssl", self.ssl), - ("transport", transport_cls), - ("connect_timeout", self.connect_timeout), - ("transport_options", self.transport_options), - ("login_method", self.login_method or D.get("login_method")), - ("uri_prefix", self.uri_prefix)) + hostname = '%s+%s' % (self.uri_prefix, hostname) + info = (('hostname', hostname or D.get('hostname')), + ('userid', self.userid or D.get('userid')), + ('password', self.password or D.get('password')), + ('virtual_host', self.virtual_host or D.get('virtual_host')), + ('port', self.port or D.get('port')), + ('insist', self.insist), + ('ssl', self.ssl), + ('transport', transport_cls), + ('connect_timeout', self.connect_timeout), + ('transport_options', self.transport_options), + ('login_method', self.login_method or D.get('login_method')), + ('uri_prefix', self.uri_prefix)) return info def info(self): @@ -408,36 +408,36 @@ class BrokerConnection(object): return OrderedDict(self._info()) def __eqhash__(self): - return hash("%s|%s|%s|%s|%s|%s" % ( + return hash('%s|%s|%s|%s|%s|%s' % ( self.transport_cls, self.hostname, self.userid, self.password, self.virtual_host, self.port)) def as_uri(self, include_password=False): if self.transport_cls in URI_PASSTHROUGH: - return self.transport_cls + '+' + (self.hostname or "localhost") - quoteS = partial(quote, safe="") # strict quote + return self.transport_cls + '+' + (self.hostname or 'localhost') + quoteS = partial(quote, safe='') # strict quote fields = self.info() - port = fields["port"] - userid = fields["userid"] - password = fields["password"] - transport = fields["transport"] - url = "%s://" % transport + port = fields['port'] + userid = fields['userid'] + password = fields['password'] + transport = fields['transport'] + url = '%s://' % transport if userid: url += quoteS(userid) if include_password and password: url += ':' + quoteS(password) url += '@' - url += quoteS(fields["hostname"]) + url += quoteS(fields['hostname']) # If the transport equals 'mongodb' the # hostname contains a full mongodb connection # URI. Let pymongo retreive the port from there. - if port and transport != "mongodb": + if port and transport != 'mongodb': url += ':' + str(port) - url += '/' + quote(fields["virtual_host"]) + url += '/' + quote(fields['virtual_host']) if self.uri_prefix: - return "%s+%s" % (self.uri_prefix, url) + return '%s+%s' % (self.uri_prefix, url) return url def Pool(self, limit=None, preload=None): @@ -545,14 +545,14 @@ class BrokerConnection(object): exchange_opts, **kwargs) def _establish_connection(self): - self._debug("establishing connection...") + self._debug('establishing connection...') conn = self.transport.establish_connection() - self._debug("connection established: %r" % (conn, )) + self._debug('connection established: %r' % (conn, )) return conn def __repr__(self): """``x.__repr__() <==> repr(x)``""" - return "<BrokerConnection: %s at 0x%x>" % (self.as_uri(), id(self)) + return '<BrokerConnection: %s at 0x%x>' % (self.as_uri(), id(self)) def __copy__(self): """``x.__copy__() <==> copy(x)``""" @@ -602,7 +602,7 @@ class BrokerConnection(object): @property def host(self): """The host as a host name/port pair separated by colon.""" - return ":".join([self.hostname, str(self.port)]) + return ':'.join([self.hostname, str(self.port)]) @property def transport(self): @@ -633,7 +633,7 @@ class BrokerConnection(object): @property def is_evented(self): - return getattr(self.transport, "on_poll_start", None) + return getattr(self.transport, 'on_poll_start', None) Connection = BrokerConnection @@ -649,7 +649,7 @@ class Resource(object): self.setup() def setup(self): - raise NotImplementedError("subclass responsibility") + raise NotImplementedError('subclass responsibility') def _add_when_empty(self): if self.limit and len(self._dirty) >= self.limit: @@ -740,7 +740,7 @@ class Resource(object): except AttributeError: # Issue #78 pass - mutex = getattr(resource, "mutex", None) + mutex = getattr(resource, 'mutex', None) if mutex: mutex.acquire() try: @@ -757,7 +757,7 @@ class Resource(object): if mutex: mutex.release() - if os.environ.get("KOMBU_DEBUG_POOL"): # pragma: no cover + if os.environ.get('KOMBU_DEBUG_POOL'): # pragma: no cover _orig_acquire = acquire _orig_release = release @@ -766,20 +766,20 @@ class Resource(object): def acquire(self, *args, **kwargs): # noqa import traceback id = self._next_resource_id = self._next_resource_id + 1 - print("+%s ACQUIRE %s" % (id, self.__class__.__name__, )) + print('+%s ACQUIRE %s' % (id, self.__class__.__name__, )) r = self._orig_acquire(*args, **kwargs) r._resource_id = id - print("-%s ACQUIRE %s" % (id, self.__class__.__name__, )) - if not hasattr(r, "acquired_by"): + print('-%s ACQUIRE %s' % (id, self.__class__.__name__, )) + if not hasattr(r, 'acquired_by'): r.acquired_by = [] r.acquired_by.append(traceback.format_stack()) return r def release(self, resource): # noqa id = resource._resource_id - print("+%s RELEASE %s" % (id, self.__class__.__name__, )) + print('+%s RELEASE %s' % (id, self.__class__.__name__, )) r = self._orig_release(resource) - print("-%s RELEASE %s" % (id, self.__class__.__name__, )) + print('-%s RELEASE %s' % (id, self.__class__.__name__, )) self._next_resource_id -= 1 return r @@ -796,7 +796,7 @@ class ConnectionPool(Resource): return self.connection.clone() def release_resource(self, resource): - resource._debug("released") + resource._debug('released') def close_resource(self, resource): resource._close() @@ -819,7 +819,7 @@ class ConnectionPool(Resource): def prepare(self, resource): if callable(resource): resource = resource() - resource._debug("acquired") + resource._debug('acquired') return resource diff --git a/kombu/entity.py b/kombu/entity.py index 4cfb08dd..c63ee1c4 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -14,10 +14,10 @@ from .abstract import MaybeChannelBound TRANSIENT_DELIVERY_MODE = 1 PERSISTENT_DELIVERY_MODE = 2 -DELIVERY_MODES = {"transient": TRANSIENT_DELIVERY_MODE, - "persistent": PERSISTENT_DELIVERY_MODE} +DELIVERY_MODES = {'transient': TRANSIENT_DELIVERY_MODE, + 'persistent': PERSISTENT_DELIVERY_MODE} -__all__ = ["Exchange", "Queue"] +__all__ = ['Exchange', 'Queue'] class Exchange(MaybeChannelBound): @@ -118,27 +118,27 @@ class Exchange(MaybeChannelBound): TRANSIENT_DELIVERY_MODE = TRANSIENT_DELIVERY_MODE PERSISTENT_DELIVERY_MODE = PERSISTENT_DELIVERY_MODE - name = "" - type = "direct" + name = '' + type = 'direct' durable = True auto_delete = False delivery_mode = PERSISTENT_DELIVERY_MODE - attrs = (("name", None), - ("type", None), - ("arguments", None), - ("durable", bool), - ("auto_delete", bool), - ("delivery_mode", lambda m: DELIVERY_MODES.get(m) or m)) + attrs = (('name', None), + ('type', None), + ('arguments', None), + ('durable', bool), + ('auto_delete', bool), + ('delivery_mode', lambda m: DELIVERY_MODES.get(m) or m)) - def __init__(self, name="", type="", channel=None, **kwargs): + def __init__(self, name='', type='', channel=None, **kwargs): super(Exchange, self).__init__(**kwargs) self.name = name or self.name self.type = type or self.type self.maybe_bind(channel) def __hash__(self): - return hash("E|%s" % (self.name, )) + return hash('E|%s' % (self.name, )) def declare(self, nowait=False, passive=False): """Declare the exchange. @@ -188,7 +188,7 @@ class Exchange(MaybeChannelBound): """ properties = {} if properties is None else properties dm = delivery_mode or self.delivery_mode - properties["delivery_mode"] = \ + properties['delivery_mode'] = \ DELIVERY_MODES[dm] if (dm != 2 and dm != 1) else dm return self.channel.prepare_message(body, properties=properties, @@ -239,7 +239,7 @@ class Exchange(MaybeChannelBound): return False def __repr__(self): - return super(Exchange, self).__repr__("Exchange %s(%s)" % (self.name, + return super(Exchange, self).__repr__('Exchange %s(%s)' % (self.name, self.type)) @property @@ -341,27 +341,27 @@ class Queue(MaybeChannelBound): generated queue names. """ - name = "" - exchange = Exchange("") - routing_key = "" + name = '' + exchange = Exchange('') + routing_key = '' durable = True exclusive = False auto_delete = False no_ack = False - attrs = (("name", None), - ("exchange", None), - ("routing_key", None), - ("queue_arguments", None), - ("binding_arguments", None), - ("durable", bool), - ("exclusive", bool), - ("auto_delete", bool), - ("no_ack", None), - ("alias", None)) - - def __init__(self, name="", exchange=None, routing_key="", channel=None, + attrs = (('name', None), + ('exchange', None), + ('routing_key', None), + ('queue_arguments', None), + ('binding_arguments', None), + ('durable', bool), + ('exclusive', bool), + ('auto_delete', bool), + ('no_ack', None), + ('alias', None)) + + def __init__(self, name='', exchange=None, routing_key='', channel=None, **kwargs): super(Queue, self).__init__(**kwargs) self.name = name or self.name @@ -373,7 +373,7 @@ class Queue(MaybeChannelBound): self.maybe_bind(channel) def __hash__(self): - return hash("Q|%s" % (self.name, )) + return hash('Q|%s' % (self.name, )) def when_bound(self): if self.exchange: @@ -437,7 +437,7 @@ class Queue(MaybeChannelBound): no_ack = self.no_ack if no_ack is None else no_ack message = self.channel.basic_get(queue=self.name, no_ack=no_ack) if message is not None: - m2p = getattr(self.channel, "message_to_python", None) + m2p = getattr(self.channel, 'message_to_python', None) if m2p: message = m2p(message) return message @@ -518,7 +518,7 @@ class Queue(MaybeChannelBound): def __repr__(self): return super(Queue, self).__repr__( - "Queue %s -> %s -> %s" % (self.name, + 'Queue %s -> %s -> %s' % (self.name, self.exchange, self.routing_key)) @@ -528,32 +528,32 @@ class Queue(MaybeChannelBound): @classmethod def from_dict(self, queue, **options): - binding_key = options.get("binding_key") or options.get("routing_key") + binding_key = options.get('binding_key') or options.get('routing_key') - e_durable = options.get("exchange_durable") + e_durable = options.get('exchange_durable') if e_durable is None: - e_durable = options.get("durable") + e_durable = options.get('durable') - e_auto_delete = options.get("exchange_auto_delete") + e_auto_delete = options.get('exchange_auto_delete') if e_auto_delete is None: - e_auto_delete = options.get("auto_delete") + e_auto_delete = options.get('auto_delete') - q_durable = options.get("queue_durable") + q_durable = options.get('queue_durable') if q_durable is None: - q_durable = options.get("durable") + q_durable = options.get('durable') - q_auto_delete = options.get("queue_auto_delete") + q_auto_delete = options.get('queue_auto_delete') if q_auto_delete is None: - q_auto_delete = options.get("auto_delete") + q_auto_delete = options.get('auto_delete') - e_arguments = options.get("exchange_arguments") - q_arguments = options.get("queue_arguments") - b_arguments = options.get("binding_arguments") + e_arguments = options.get('exchange_arguments') + q_arguments = options.get('queue_arguments') + b_arguments = options.get('binding_arguments') - exchange = Exchange(options.get("exchange"), - type=options.get("exchange_type"), - delivery_mode=options.get("delivery_mode"), - routing_key=options.get("routing_key"), + exchange = Exchange(options.get('exchange'), + type=options.get('exchange_type'), + delivery_mode=options.get('delivery_mode'), + routing_key=options.get('routing_key'), durable=e_durable, auto_delete=e_auto_delete, arguments=e_arguments) @@ -561,8 +561,8 @@ class Queue(MaybeChannelBound): exchange=exchange, routing_key=binding_key, durable=q_durable, - exclusive=options.get("exclusive"), + exclusive=options.get('exclusive'), auto_delete=q_auto_delete, - no_ack=options.get("no_ack"), + no_ack=options.get('no_ack'), queue_arguments=q_arguments, binding_arguments=b_arguments) diff --git a/kombu/exceptions.py b/kombu/exceptions.py index 43eb409c..887d2bcb 100644 --- a/kombu/exceptions.py +++ b/kombu/exceptions.py @@ -12,10 +12,10 @@ from __future__ import absolute_import import socket -__all__ = ["NotBoundError", "MessageStateError", "TimeoutError", - "LimitExceeded", "ConnectionLimitExceeded", - "ChannelLimitExceeded", "StdChannelError", "VersionMismatch", - "SerializerNotInstalled"] +__all__ = ['NotBoundError', 'MessageStateError', 'TimeoutError', + 'LimitExceeded', 'ConnectionLimitExceeded', + 'ChannelLimitExceeded', 'StdChannelError', 'VersionMismatch', + 'SerializerNotInstalled'] TimeoutError = socket.timeout diff --git a/kombu/log.py b/kombu/log.py index 5b5cedee..76655942 100644 --- a/kombu/log.py +++ b/kombu/log.py @@ -9,12 +9,12 @@ from .utils.compat import WatchedFileHandler from .utils.encoding import safe_repr, safe_str from .utils.functional import maybe_promise -__all__ = ["LogMixin", "LOG_LEVELS", "get_loglevel", "setup_logging"] +__all__ = ['LogMixin', 'LOG_LEVELS', 'get_loglevel', 'setup_logging'] LOG_LEVELS = dict(logging._levelNames) -LOG_LEVELS["FATAL"] = logging.FATAL -LOG_LEVELS[logging.FATAL] = "FATAL" -DISABLE_TRACEBACKS = os.environ.get("DISABLE_TRACEBACKS") +LOG_LEVELS['FATAL'] = logging.FATAL +LOG_LEVELS[logging.FATAL] = 'FATAL' +DISABLE_TRACEBACKS = os.environ.get('DISABLE_TRACEBACKS') class NullHandler(logging.Handler): @@ -48,7 +48,7 @@ def naive_format_parts(fmt): for i, e in enumerate(l[1:]): if not e or not l[i - 1]: yield - elif e[0] in ["r", "s"]: + elif e[0] in ['r', 's']: yield e[0] @@ -80,13 +80,13 @@ class LogMixin(object): return self._error(logging.CRITICAL, *args, **kwargs) def _error(self, severity, *args, **kwargs): - kwargs.setdefault("exc_info", True) + kwargs.setdefault('exc_info', True) if DISABLE_TRACEBACKS: - kwargs.pop("exc_info", None) + kwargs.pop('exc_info', None) return self.log(severity, *args, **kwargs) def annotate(self, text): - return "%s - %s" % (self.logger_name, text) + return '%s - %s' % (self.logger_name, text) def log(self, severity, *args, **kwargs): if self.logger.isEnabledFor(severity): @@ -98,7 +98,7 @@ class LogMixin(object): *list(safeify_format(args[0], *expand)), **kwargs) else: return self.logger.log(severity, - self.annotate(" ".join(map(safe_str, args))), + self.annotate(' '.join(map(safe_str, args))), **kwargs) def get_logger(self): @@ -139,10 +139,10 @@ class Log(LogMixin): def setup_logging(loglevel=None, logfile=None): logger = logging.getLogger() - loglevel = get_loglevel(loglevel or "ERROR") + loglevel = get_loglevel(loglevel or 'ERROR') logfile = logfile if logfile else sys.__stderr__ if not logger.handlers: - if hasattr(logfile, "write"): + if hasattr(logfile, 'write'): handler = logging.StreamHandler(logfile) else: handler = WatchedFileHandler(logfile) diff --git a/kombu/messaging.py b/kombu/messaging.py index 63d03cf2..46fb0060 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -18,7 +18,7 @@ from .compression import compress from .serialization import encode from .utils import maybe_list -__all__ = ["Exchange", "Queue", "Producer", "Consumer"] +__all__ = ['Exchange', 'Queue', 'Producer', 'Consumer'] class Producer(object): @@ -46,7 +46,7 @@ class Producer(object): exchange = None # Default routing key. - routing_key = "" + routing_key = '' #: Default serializer to use. Default is JSON. serializer = None @@ -68,7 +68,7 @@ class Producer(object): self.channel = channel self.exchange = exchange or self.exchange if self.exchange is None: - self.exchange = Exchange("") + self.exchange = Exchange('') self.routing_key = routing_key or self.routing_key self.serializer = serializer or self.serializer self.compression = compression or self.compression @@ -171,7 +171,7 @@ class Producer(object): if self.auto_declare: self.declare() if self.on_return: - self.channel.events["basic_return"].append(self.on_return) + self.channel.events['basic_return'].append(self.on_return) def __enter__(self): return self @@ -206,7 +206,7 @@ class Producer(object): content_encoding = 'binary' if compression: - body, headers["compression"] = compress(body, compression) + body, headers['compression'] = compress(body, compression) return body, content_type, content_encoding @@ -446,7 +446,7 @@ class Consumer(object): """ callbacks = self.callbacks if not callbacks: - raise NotImplementedError("Consumer does not have any callback") + raise NotImplementedError('Consumer does not have any callback') [callback(body, message) for callback in callbacks] def _basic_consume(self, queue, consumer_tag=None, @@ -466,7 +466,7 @@ class Consumer(object): def _receive_callback(self, message): channel = self.channel try: - m2p = getattr(channel, "message_to_python", None) + m2p = getattr(channel, 'message_to_python', None) if m2p: message = m2p(message) decoded = message.decode() @@ -478,7 +478,7 @@ class Consumer(object): self.receive(decoded, message) def __repr__(self): - return "<Consumer: %s>" % (self.queues, ) + return '<Consumer: %s>' % (self.queues, ) @property def connection(self): diff --git a/kombu/mixins.py b/kombu/mixins.py index 5df8b5e7..25168b03 100644 --- a/kombu/mixins.py +++ b/kombu/mixins.py @@ -23,7 +23,7 @@ from .utils import cached_property, nested from .utils.encoding import safe_repr from .utils.limits import TokenBucket -__all__ = ["ConsumerMixin"] +__all__ = ['ConsumerMixin'] class ConsumerMixin(LogMixin): @@ -45,7 +45,7 @@ class ConsumerMixin(LogMixin): class Worker(ConsumerMixin): - task_queue = Queue("tasks", Exchange("tasks"), "tasks")) + task_queue = Queue('tasks', Exchange('tasks'), 'tasks')) def __init__(self, connection): self.connection = None @@ -55,7 +55,7 @@ class ConsumerMixin(LogMixin): callback=[self.on_task])] def on_task(self, body, message): - print("Got task: %r" % (body, )) + print('Got task: %r' % (body, )) message.ack() **Additional handler methods**: @@ -129,7 +129,7 @@ class ConsumerMixin(LogMixin): should_stop = False def get_consumers(self, Consumer, channel): - raise NotImplementedError("Subclass responsibility") + raise NotImplementedError('Subclass responsibility') def on_connection_revived(self): pass @@ -151,8 +151,8 @@ class ConsumerMixin(LogMixin): message.ack() def on_connection_error(self, exc, interval): - self.error("Broker connection error: %r. " - "Trying again in %s seconds.", exc, interval) + self.error('Broker connection error: %r. ' + 'Trying again in %s seconds.', exc, interval) @contextmanager def extra_context(self, connection, channel): @@ -165,8 +165,8 @@ class ConsumerMixin(LogMixin): for _ in self.consume(limit=None): pass except self.connection.connection_errors: - self.error("Connection to broker lost. " - "Trying to re-establish the connection...") + self.error('Connection to broker lost. ' + 'Trying to re-establish the connection...') def consume(self, limit=None, timeout=None, safety_interval=1, **kwargs): elapsed = 0 @@ -189,7 +189,7 @@ class ConsumerMixin(LogMixin): else: yield elapsed = 0 - self.debug("consume exiting") + self.debug('consume exiting') def maybe_conn_error(self, fun): """Applies function but ignores any connection or channel @@ -212,15 +212,15 @@ class ConsumerMixin(LogMixin): def Consumer(self): with self.establish_connection() as conn: self.on_connection_revived() - self.info("Connected to %s", conn.as_uri()) + self.info('Connected to %s', conn.as_uri()) channel = conn.default_channel cls = partial(Consumer, channel, on_decode_error=self.on_decode_error) with self._consume_from(*self.get_consumers(cls, channel)) as c: yield conn, channel, c - self.debug("Consumers cancelled") + self.debug('Consumers cancelled') self.on_consume_end(conn, channel) - self.debug("Connection closed") + self.debug('Connection closed') def _consume_from(self, *consumers): return nested(*consumers) diff --git a/kombu/pidbox.py b/kombu/pidbox.py index edc39af1..faf21d3b 100644 --- a/kombu/pidbox.py +++ b/kombu/pidbox.py @@ -20,7 +20,7 @@ from .entity import Exchange, Queue from .messaging import Consumer, Producer from .utils import kwdict, uuid -__all__ = ["Node", "Mailbox"] +__all__ = ['Node', 'Mailbox'] class Node(object): @@ -51,7 +51,7 @@ class Node(object): self.handlers = handlers def Consumer(self, channel=None, **options): - options.setdefault("no_ack", True) + options.setdefault('no_ack', True) return Consumer(channel or self.channel, [self.mailbox.get_queue(self.hostname)], **options) @@ -69,10 +69,10 @@ class Node(object): def dispatch_from_message(self, message): message = dict(message) - method = message["method"] - destination = message.get("destination") - reply_to = message.get("reply_to") - arguments = message.get("arguments") + method = message['method'] + destination = message.get('destination') + reply_to = message.get('reply_to') + arguments = message.get('arguments') if not destination or self.hostname in destination: return self.dispatch(method, arguments, reply_to) @@ -84,12 +84,12 @@ class Node(object): except SystemExit: raise except Exception, exc: - reply = {"error": repr(exc)} + reply = {'error': repr(exc)} if reply_to: self.reply({self.hostname: reply}, - exchange=reply_to["exchange"], - routing_key=reply_to["routing_key"]) + exchange=reply_to['exchange'], + routing_key=reply_to['routing_key']) return reply def handle(self, method, arguments={}): @@ -111,8 +111,8 @@ class Node(object): class Mailbox(object): node_cls = Node - exchange_fmt = "%s.pidbox" - reply_exchange_fmt = "reply.%s.pidbox" + exchange_fmt = '%s.pidbox' + reply_exchange_fmt = 'reply.%s.pidbox' #: Name of application. namespace = None @@ -121,7 +121,7 @@ class Mailbox(object): connection = None #: Exchange type (usually direct, or fanout for broadcast). - type = "direct" + type = 'direct' #: mailbox exchange (init by constructor). exchange = None @@ -129,7 +129,7 @@ class Mailbox(object): #: exchange to send replies to. reply_exchange = None - def __init__(self, namespace, type="direct", connection=None): + def __init__(self, namespace, type='direct', connection=None): self.namespace = namespace self.connection = connection self.type = type @@ -166,22 +166,22 @@ class Mailbox(object): channel=channel) def get_reply_queue(self, ticket): - return Queue("%s.%s" % (ticket, self.reply_exchange.name), + return Queue('%s.%s' % (ticket, self.reply_exchange.name), exchange=self.reply_exchange, routing_key=ticket, durable=False, auto_delete=True) def get_queue(self, hostname): - return Queue("%s.%s.pidbox" % (hostname, self.namespace), + return Queue('%s.%s.pidbox' % (hostname, self.namespace), exchange=self.exchange, durable=False, auto_delete=True) def _publish_reply(self, reply, exchange, routing_key, channel=None): chan = channel or self.connection.default_channel - exchange = Exchange(exchange, exchange_type="direct", - delivery_mode="transient", + exchange = Exchange(exchange, exchange_type='direct', + delivery_mode='transient', durable=False) producer = Producer(chan, exchange=exchange, auto_declare=True) @@ -189,12 +189,12 @@ class Mailbox(object): def _publish(self, type, arguments, destination=None, reply_ticket=None, channel=None): - message = {"method": type, - "arguments": arguments, - "destination": destination} + message = {'method': type, + 'arguments': arguments, + 'destination': destination} if reply_ticket: - message["reply_to"] = {"exchange": self.reply_exchange.name, - "routing_key": reply_ticket} + message['reply_to'] = {'exchange': self.reply_exchange.name, + 'routing_key': reply_ticket} chan = channel or self.connection.default_channel producer = Producer(chan, exchange=self.exchange) producer.publish(message) @@ -203,7 +203,7 @@ class Mailbox(object): reply=False, timeout=1, limit=None, callback=None, channel=None): if destination is not None and \ not isinstance(destination, (list, tuple)): - raise ValueError("destination must be a list/tuple not %s" % ( + raise ValueError('destination must be a list/tuple not %s' % ( type(destination))) arguments = arguments or {} @@ -253,10 +253,10 @@ class Mailbox(object): return Exchange(self.exchange_fmt % namespace, type=type, durable=False, - delivery_mode="transient") + delivery_mode='transient') def _get_reply_exchange(self, namespace): return Exchange(self.reply_exchange_fmt % namespace, - type="direct", + type='direct', durable=False, - delivery_mode="transient") + delivery_mode='transient') diff --git a/kombu/pools.py b/kombu/pools.py index 4f8bcbb9..4d4b7e99 100644 --- a/kombu/pools.py +++ b/kombu/pools.py @@ -18,13 +18,13 @@ from .connection import Resource from .messaging import Producer from .utils import EqualityDict -__all__ = ["ProducerPool", "PoolGroup", "register_group", - "connections", "producers", "get_limit", "set_limit", "reset"] +__all__ = ['ProducerPool', 'PoolGroup', 'register_group', + 'connections', 'producers', 'get_limit', 'set_limit', 'reset'] _limit = [200] _used = [False] _groups = [] use_global_limit = object() -disable_limit_protection = os.environ.get("KOMBU_DISABLE_LIMIT_PROTECTION") +disable_limit_protection = os.environ.get('KOMBU_DISABLE_LIMIT_PROTECTION') class ProducerPool(Resource): @@ -32,7 +32,7 @@ class ProducerPool(Resource): def __init__(self, connections, *args, **kwargs): self.connections = connections - self.Producer = kwargs.pop("Producer", None) or self.Producer + self.Producer = kwargs.pop('Producer', None) or self.Producer super(ProducerPool, self).__init__(*args, **kwargs) def _acquire_connection(self): @@ -70,7 +70,7 @@ class PoolGroup(EqualityDict): self.limit = limit def create(self, resource, limit): - raise NotImplementedError("PoolGroups must define ``create``") + raise NotImplementedError('PoolGroups must define ``create``') def __missing__(self, resource): limit = self.limit diff --git a/kombu/serialization.py b/kombu/serialization.py index 3070c854..f0f89966 100644 --- a/kombu/serialization.py +++ b/kombu/serialization.py @@ -22,11 +22,11 @@ except ImportError: # pragma: no cover from .exceptions import SerializerNotInstalled from .utils.encoding import bytes_to_str, str_to_bytes, bytes_t -__all__ = ["pickle", "encode", "decode", - "register", "unregister"] -SKIP_DECODE = frozenset(["binary", "ascii-8bit"]) +__all__ = ['pickle', 'encode', 'decode', + 'register', 'unregister'] +SKIP_DECODE = frozenset(['binary', 'ascii-8bit']) -if sys.platform.startswith("java"): # pragma: no cover +if sys.platform.startswith('java'): # pragma: no cover def _decode(t, coding): return codecs.getdecoder(coding)(t)[0] @@ -99,7 +99,7 @@ class SerializerRegistry(object): self.type_to_name.pop(content_type, None) except KeyError: raise SerializerNotInstalled( - "No encoder/decoder installed for %s" % name) + 'No encoder/decoder installed for %s' % name) def _set_default_serializer(self, name): """ @@ -117,14 +117,14 @@ class SerializerRegistry(object): self._default_encode) = self._encoders[name] except KeyError: raise SerializerNotInstalled( - "No encoder installed for %s" % name) + 'No encoder installed for %s' % name) def encode(self, data, serializer=None): - if serializer == "raw": + if serializer == 'raw': return raw_encode(data) if serializer and not self._encoders.get(serializer): raise SerializerNotInstalled( - "No encoder installed for %s" % serializer) + 'No encoder installed for %s' % serializer) # If a raw string was sent, assume binary encoding # (it's likely either ASCII or a raw binary file, and a character @@ -132,12 +132,12 @@ class SerializerRegistry(object): if not serializer and isinstance(data, bytes_t): # In Python 3+, this would be "bytes"; allow binary data to be # sent as a message without getting encoder errors - return "application/data", "binary", data + return 'application/data', 'binary', data # For Unicode objects, force it into a string if not serializer and isinstance(data, unicode): - payload = data.encode("utf-8") - return "text/plain", "utf-8", payload + payload = data.encode('utf-8') + return 'text/plain', 'utf-8', payload if serializer: content_type, content_encoding, encoder = \ @@ -153,7 +153,7 @@ class SerializerRegistry(object): def decode(self, data, content_type, content_encoding, force=False): if content_type in self._disabled_content_types and not force: raise SerializerNotInstalled( - "Content-type %r has been disabled." % (content_type, )) + 'Content-type %r has been disabled.' % (content_type, )) content_type = content_type or 'application/data' content_encoding = (content_encoding or 'utf-8').lower() @@ -230,7 +230,7 @@ decode = registry.decode """ .. function:: register(name, encoder, decoder, content_type, - content_encoding="utf-8"): + content_encoding='utf-8'): Register a new encoder/decoder. :param name: A convenience name for the serialization method. @@ -252,7 +252,7 @@ decode = registry.decode the `decoder` method will be returning. Will usually be utf-8`, `us-ascii`, or `binary`. - """ +""" register = registry.register @@ -262,7 +262,7 @@ register = registry.register :param name: Registered serialization method name. - """ +""" unregister = registry.unregister @@ -306,7 +306,7 @@ def register_yaml(): """In case a client receives a yaml message, but yaml isn't installed.""" raise SerializerNotInstalled( - "No decoder installed for YAML. Install the PyYAML library") + 'No decoder installed for YAML. Install the PyYAML library') registry.register('yaml', None, not_available, 'application/x-yaml') @@ -340,8 +340,8 @@ def register_msgpack(): """In case a client receives a msgpack message, but yaml isn't installed.""" raise SerializerNotInstalled( - "No decoder installed for msgpack. " - "Install the msgpack library") + 'No decoder installed for msgpack. ' + 'Install the msgpack library') registry.register('msgpack', None, not_available, 'application/x-msgpack') diff --git a/kombu/simple.py b/kombu/simple.py index 7d101ea5..3b5b6ed6 100644 --- a/kombu/simple.py +++ b/kombu/simple.py @@ -20,7 +20,7 @@ from . import entity from . import messaging from .connection import maybe_channel -__all__ = ["SimpleQueue", "SimpleBuffer"] +__all__ = ['SimpleQueue', 'SimpleBuffer'] class SimpleBase(object): @@ -113,7 +113,7 @@ class SimpleQueue(SimpleBase): if no_ack is None: no_ack = self.no_ack if not isinstance(queue, entity.Queue): - exchange = entity.Exchange(name, "direct", **exchange_opts) + exchange = entity.Exchange(name, 'direct', **exchange_opts) queue = entity.Queue(name, exchange, name, **queue_opts) else: name = queue.name @@ -132,5 +132,5 @@ class SimpleBuffer(SimpleQueue): queue_opts = dict(durable=False, auto_delete=True) exchange_opts = dict(durable=False, - delivery_mode="transient", + delivery_mode='transient', auto_delete=True) diff --git a/kombu/syn.py b/kombu/syn.py index 2ec281b9..7983f41c 100644 --- a/kombu/syn.py +++ b/kombu/syn.py @@ -10,7 +10,7 @@ from __future__ import absolute_import import sys -__all__ = ["detect_environment"] +__all__ = ['detect_environment'] _environment = None @@ -25,28 +25,28 @@ def select_blocking_method(type): def _detect_environment(): ## -eventlet- - if "eventlet" in sys.modules: + if 'eventlet' in sys.modules: try: from eventlet.patcher import is_monkey_patched as is_eventlet import socket if is_eventlet(socket): - return "eventlet" + return 'eventlet' except ImportError: pass # -gevent- - if "gevent" in sys.modules: + if 'gevent' in sys.modules: try: from gevent import socket as _gsocket import socket if socket.socket is _gsocket.socket: - return "gevent" + return 'gevent' except ImportError: pass - return "default" + return 'default' def detect_environment(): diff --git a/kombu/tests/__init__.py b/kombu/tests/__init__.py index ca31361a..730d9404 100644 --- a/kombu/tests/__init__.py +++ b/kombu/tests/__init__.py @@ -9,31 +9,31 @@ from kombu.exceptions import VersionMismatch # avoid json implementation inconsistencies. try: import json # noqa - anyjson.force_implementation("json") + anyjson.force_implementation('json') except ImportError: - anyjson.force_implementation("simplejson") + anyjson.force_implementation('simplejson') def find_distribution_modules(name=__name__, file=__file__): - current_dist_depth = len(name.split(".")) - 1 + current_dist_depth = len(name.split('.')) - 1 current_dist = os.path.join(os.path.dirname(file), *([os.pardir] * current_dist_depth)) abs = os.path.abspath(current_dist) dist_name = os.path.basename(abs) for dirpath, dirnames, filenames in os.walk(abs): - package = (dist_name + dirpath[len(abs):]).replace("/", ".") - if "__init__.py" in filenames: + package = (dist_name + dirpath[len(abs):]).replace('/', '.') + if '__init__.py' in filenames: yield package for filename in filenames: - if filename.endswith(".py") and filename != "__init__.py": - yield ".".join([package, filename])[:-3] + if filename.endswith('.py') and filename != '__init__.py': + yield '.'.join([package, filename])[:-3] def import_all_modules(name=__name__, file=__file__, skip=[]): for module in find_distribution_modules(name, file): if module not in skip: - print("preimporting %r for coverage..." % (module, )) + print('preimporting %r for coverage...' % (module, )) try: __import__(module) except (ImportError, VersionMismatch, AttributeError): @@ -41,8 +41,8 @@ def import_all_modules(name=__name__, file=__file__, skip=[]): def is_in_coverage(): - return (os.environ.get("COVER_ALL_MODULES") or - "--with-coverage3" in sys.argv) + return (os.environ.get('COVER_ALL_MODULES') or + '--with-coverage3' in sys.argv) def setup_django_env(): @@ -52,12 +52,12 @@ def setup_django_env(): return if not settings.configured: - settings.configure(DATABASES={"default": { - "ENGINE": "django.db.backends.sqlite3", - "NAME": ":memory:"}}, - DATABASE_ENGINE="sqlite3", - DATABASE_NAME=":memory:", - INSTALLED_APPS=("kombu.transport.django", )) + settings.configure(DATABASES={'default': { + 'ENGINE': 'django.db.backends.sqlite3', + 'NAME': ':memory:'}}, + DATABASE_ENGINE='sqlite3', + DATABASE_NAME=':memory:', + INSTALLED_APPS=('kombu.transport.django', )) def setup(): diff --git a/kombu/tests/compat.py b/kombu/tests/compat.py index 4cfe6b79..391b7f1a 100644 --- a/kombu/tests/compat.py +++ b/kombu/tests/compat.py @@ -7,8 +7,8 @@ class WarningMessage(object): """Holds the result of a single showwarning() call.""" - _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file", - "line") + _WARNING_DETAILS = ('message', 'category', 'filename', 'lineno', 'file', + 'line') def __init__(self, message, category, filename, lineno, file=None, line=None): @@ -19,8 +19,8 @@ class WarningMessage(object): self._category_name = category and category.__name__ or None def __str__(self): - return ("{message : %r, category : %r, filename : %r, lineno : %s, " - "line : %r}" % (self.message, self._category_name, + return ('{message : %r, category : %r, filename : %r, lineno : %s, ' + 'line : %r}' % (self.message, self._category_name, self.filename, self.lineno, self.line)) @@ -50,21 +50,21 @@ class catch_warnings(object): """ self._record = record - self._module = module is None and sys.modules["warnings"] or module + self._module = module is None and sys.modules['warnings'] or module self._entered = False def __repr__(self): args = [] if self._record: - args.append("record=True") + args.append('record=True') if self._module is not sys.modules['warnings']: - args.append("module=%r" % self._module) + args.append('module=%r' % self._module) name = type(self).__name__ - return "%s(%s)" % (name, ", ".join(args)) + return '%s(%s)' % (name, ', '.join(args)) def __enter__(self): if self._entered: - raise RuntimeError("Cannot enter %r twice" % self) + raise RuntimeError('Cannot enter %r twice' % self) self._entered = True self._filters = self._module.filters self._module.filters = self._filters[:] @@ -82,6 +82,6 @@ class catch_warnings(object): def __exit__(self, *exc_info): if not self._entered: - raise RuntimeError("Cannot exit %r without entering first" % self) + raise RuntimeError('Cannot exit %r without entering first' % self) self._module.filters = self._filters self._module.showwarning = self._showwarning diff --git a/kombu/tests/mocks.py b/kombu/tests/mocks.py index 8d81bc90..dfe172f0 100644 --- a/kombu/tests/mocks.py +++ b/kombu/tests/mocks.py @@ -10,7 +10,7 @@ from kombu.transport import base class Message(base.Message): def __init__(self, *args, **kwargs): - self.throw_decode_error = kwargs.get("throw_decode_error", False) + self.throw_decode_error = kwargs.get('throw_decode_error', False) super(Message, self).__init__(*args, **kwargs) def decode(self): @@ -28,7 +28,7 @@ class Channel(base.StdChannel): self.called = [] self.deliveries = count(1).next self.to_deliver = [] - self.events = {"basic_return": []} + self.events = {'basic_return': []} def _called(self, name): self.called.append(name) @@ -37,11 +37,11 @@ class Channel(base.StdChannel): return key in self.called def exchange_declare(self, *args, **kwargs): - self._called("exchange_declare") + self._called('exchange_declare') def prepare_message(self, message_data, properties={}, priority=0, content_type=None, content_encoding=None, headers=None): - self._called("prepare_message") + self._called('prepare_message') return dict(body=message_data, headers=headers, properties=properties, @@ -49,69 +49,69 @@ class Channel(base.StdChannel): content_type=content_type, content_encoding=content_encoding) - def basic_publish(self, message, exchange="", routing_key="", + def basic_publish(self, message, exchange='', routing_key='', mandatory=False, immediate=False, **kwargs): - self._called("basic_publish") + self._called('basic_publish') return message, exchange, routing_key def exchange_delete(self, *args, **kwargs): - self._called("exchange_delete") + self._called('exchange_delete') def queue_declare(self, *args, **kwargs): - self._called("queue_declare") + self._called('queue_declare') def queue_bind(self, *args, **kwargs): - self._called("queue_bind") + self._called('queue_bind') def queue_unbind(self, *args, **kwargs): - self._called("queue_unbind") + self._called('queue_unbind') def queue_delete(self, queue, if_unused=False, if_empty=False, **kwargs): - self._called("queue_delete") + self._called('queue_delete') def basic_get(self, *args, **kwargs): - self._called("basic_get") + self._called('basic_get') try: return self.to_deliver.pop() except IndexError: pass def queue_purge(self, *args, **kwargs): - self._called("queue_purge") + self._called('queue_purge') def basic_consume(self, *args, **kwargs): - self._called("basic_consume") + self._called('basic_consume') def basic_cancel(self, *args, **kwargs): - self._called("basic_cancel") + self._called('basic_cancel') def basic_ack(self, *args, **kwargs): - self._called("basic_ack") + self._called('basic_ack') def basic_recover(self, requeue=False): - self._called("basic_recover") + self._called('basic_recover') def close(self): - self._called("close") + self._called('close') def message_to_python(self, message, *args, **kwargs): - self._called("message_to_python") + self._called('message_to_python') return Message(self, body=anyjson.dumps(message), delivery_tag=self.deliveries(), throw_decode_error=self.throw_decode_error, - content_type="application/json", content_encoding="utf-8") + content_type='application/json', content_encoding='utf-8') def flow(self, active): - self._called("flow") + self._called('flow') def basic_reject(self, delivery_tag, requeue=False): if requeue: - return self._called("basic_reject:requeue") - return self._called("basic_reject") + return self._called('basic_reject:requeue') + return self._called('basic_reject') def basic_qos(self, prefetch_size=0, prefetch_count=0, apply_global=False): - self._called("basic_qos") + self._called('basic_qos') class Connection(object): @@ -133,7 +133,7 @@ class Transport(base.Transport): return connection.channel() def drain_events(self, connection, **kwargs): - return "event" + return 'event' def close_connection(self, connection): connection.connected = False diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py index 452c26a8..01937885 100644 --- a/kombu/tests/test_common.py +++ b/kombu/tests/test_common.py @@ -16,16 +16,16 @@ from .utils import ContextMock, Mock, MockPool class test_Broadcast(TestCase): def test_arguments(self): - q = Broadcast(name="test_Broadcast") - self.assertTrue(q.name.startswith("bcast.")) - self.assertEqual(q.alias, "test_Broadcast") + q = Broadcast(name='test_Broadcast') + self.assertTrue(q.name.startswith('bcast.')) + self.assertEqual(q.alias, 'test_Broadcast') self.assertTrue(q.auto_delete) - self.assertEqual(q.exchange.name, "test_Broadcast") - self.assertEqual(q.exchange.type, "fanout") + self.assertEqual(q.exchange.name, 'test_Broadcast') + self.assertEqual(q.exchange.type, 'fanout') - q = Broadcast("test_Broadcast", "explicit_queue_name") - self.assertEqual(q.name, "explicit_queue_name") - self.assertEqual(q.exchange.name, "test_Broadcast") + q = Broadcast('test_Broadcast', 'explicit_queue_name') + self.assertEqual(q.name, 'explicit_queue_name') + self.assertEqual(q.exchange.name, 'test_Broadcast') class test_maybe_declare(TestCase): @@ -69,26 +69,26 @@ class test_replies(TestCase): def test_send_reply(self): req = Mock() - req.content_type = "application/json" - req.properties = {"reply_to": "hello", - "correlation_id": "world"} + req.content_type = 'application/json' + req.properties = {'reply_to': 'hello', + 'correlation_id': 'world'} exchange = Mock() exchange.is_bound = True producer = Mock() producer.channel.connection.client.declared_entities = set() - send_reply(exchange, req, {"hello": "world"}, producer) + send_reply(exchange, req, {'hello': 'world'}, producer) self.assertTrue(producer.publish.call_count) args = producer.publish.call_args - self.assertDictEqual(args[0][0], {"hello": "world"}) - self.assertDictEqual(args[1], {"exchange": exchange, - "routing_key": "hello", - "correlation_id": "world", - "serializer": "json"}) + self.assertDictEqual(args[0][0], {'hello': 'world'}) + self.assertDictEqual(args[1], {'exchange': exchange, + 'routing_key': 'hello', + 'correlation_id': 'world', + 'serializer': 'json'}) exchange.declare.assert_called_with() - @patch("kombu.common.ipublish") + @patch('kombu.common.ipublish') def test_isend_reply(self, ipublish): pool, exchange, req, msg, props = (Mock(), Mock(), Mock(), Mock(), Mock()) @@ -97,7 +97,7 @@ class test_replies(TestCase): ipublish.assert_called_with(pool, send_reply, (exchange, req, msg), props) - @patch("kombu.common.itermessages") + @patch('kombu.common.itermessages') def test_collect_replies_with_ack(self, itermessages): conn, channel, queue = Mock(), Mock(), Mock() body, message = Mock(), Mock() @@ -113,7 +113,7 @@ class test_replies(TestCase): channel.after_reply_message_received.assert_called_with(queue.name) - @patch("kombu.common.itermessages") + @patch('kombu.common.itermessages') def test_collect_replies_no_ack(self, itermessages): conn, channel, queue = Mock(), Mock(), Mock() body, message = Mock(), Mock() @@ -124,7 +124,7 @@ class test_replies(TestCase): itermessages.assert_called_with(conn, channel, queue, no_ack=True) self.assertFalse(message.ack.called) - @patch("kombu.common.itermessages") + @patch('kombu.common.itermessages') def test_collect_replies_no_replies(self, itermessages): conn, channel, queue = Mock(), Mock(), Mock() itermessages.return_value = [] @@ -137,9 +137,9 @@ class test_replies(TestCase): class test_insured(TestCase): - @patch("kombu.common.insured_logger") + @patch('kombu.common.insured_logger') def test_ensure_errback(self, insured_logger): - common._ensure_errback("foo", 30) + common._ensure_errback('foo', 30) self.assertTrue(insured_logger.error.called) def test_revive_connection(self): @@ -158,7 +158,7 @@ class test_insured(TestCase): common.revive_producer(Mock(), channel, None) - def get_insured_mocks(self, insured_returns=("works", "ignored")): + def get_insured_mocks(self, insured_returns=('works', 'ignored')): conn = ContextMock() pool = MockPool(conn) fun = Mock() @@ -169,28 +169,28 @@ class test_insured(TestCase): def test_insured(self): conn, pool, fun, insured = self.get_insured_mocks() - ret = common.insured(pool, fun, (2, 2), {"foo": "bar"}) - self.assertEqual(ret, "works") + ret = common.insured(pool, fun, (2, 2), {'foo': 'bar'}) + self.assertEqual(ret, 'works') conn.ensure_connection.assert_called_with( errback=common._ensure_errback) self.assertTrue(insured.called) i_args, i_kwargs = insured.call_args self.assertTupleEqual(i_args, (2, 2)) - self.assertDictEqual(i_kwargs, {"foo": "bar", - "connection": conn}) + self.assertDictEqual(i_kwargs, {'foo': 'bar', + 'connection': conn}) self.assertTrue(conn.autoretry.called) ar_args, ar_kwargs = conn.autoretry.call_args self.assertTupleEqual(ar_args, (fun, conn.default_channel)) - self.assertTrue(ar_kwargs.get("on_revive")) - self.assertTrue(ar_kwargs.get("errback")) + self.assertTrue(ar_kwargs.get('on_revive')) + self.assertTrue(ar_kwargs.get('errback')) def test_insured_custom_errback(self): conn, pool, fun, insured = self.get_insured_mocks() custom_errback = Mock() - common.insured(pool, fun, (2, 2), {"foo": "bar"}, + common.insured(pool, fun, (2, 2), {'foo': 'bar'}, errback=custom_errback) conn.ensure_connection.assert_called_with(errback=custom_errback) @@ -206,26 +206,26 @@ class test_insured(TestCase): def test_ipublish(self): producer, pool, fun, ensure_returns = self.get_ipublish_args() - ensure_returns.return_value = "works" + ensure_returns.return_value = 'works' - ret = common.ipublish(pool, fun, (2, 2), {"foo": "bar"}) - self.assertEqual(ret, "works") + ret = common.ipublish(pool, fun, (2, 2), {'foo': 'bar'}) + self.assertEqual(ret, 'works') self.assertTrue(producer.connection.ensure.called) e_args, e_kwargs = producer.connection.ensure.call_args self.assertTupleEqual(e_args, (producer, fun)) - self.assertTrue(e_kwargs.get("on_revive")) - self.assertEqual(e_kwargs.get("errback"), common._ensure_errback) + self.assertTrue(e_kwargs.get('on_revive')) + self.assertEqual(e_kwargs.get('errback'), common._ensure_errback) - ensure_returns.assert_called_with(2, 2, foo="bar", producer=producer) + ensure_returns.assert_called_with(2, 2, foo='bar', producer=producer) def test_ipublish_with_custom_errback(self): producer, pool, fun, _ = self.get_ipublish_args() errback = Mock() - common.ipublish(pool, fun, (2, 2), {"foo": "bar"}, errback=errback) + common.ipublish(pool, fun, (2, 2), {'foo': 'bar'}, errback=errback) _, e_kwargs = producer.connection.ensure.call_args - self.assertEqual(e_kwargs.get("errback"), errback) + self.assertEqual(e_kwargs.get('errback'), errback) class MockConsumer(object): @@ -254,17 +254,17 @@ class test_itermessages(TestCase): raise socket.timeout() for consumer in MockConsumer.consumers: for callback in consumer.callbacks: - callback("body", "message") + callback('body', 'message') def test_default(self): conn = self.MockConnection() channel = Mock() channel.connection.client = conn - it = common.itermessages(conn, channel, "q", limit=1, + it = common.itermessages(conn, channel, 'q', limit=1, Consumer=MockConsumer) ret = it.next() - self.assertTupleEqual(ret, ("body", "message")) + self.assertTupleEqual(ret, ('body', 'message')) with self.assertRaises(StopIteration): it.next() @@ -274,19 +274,19 @@ class test_itermessages(TestCase): conn.should_raise_timeout = True channel = Mock() channel.connection.client = conn - it = common.itermessages(conn, channel, "q", limit=1, + it = common.itermessages(conn, channel, 'q', limit=1, Consumer=MockConsumer) with self.assertRaises(StopIteration): it.next() - @patch("kombu.common.deque") + @patch('kombu.common.deque') def test_when_raises_IndexError(self, deque): deque_instance = deque.return_value = Mock() deque_instance.popleft.side_effect = IndexError() conn = self.MockConnection() channel = Mock() - it = common.itermessages(conn, channel, "q", limit=1, + it = common.itermessages(conn, channel, 'q', limit=1, Consumer=MockConsumer) with self.assertRaises(StopIteration): diff --git a/kombu/tests/test_compat.py b/kombu/tests/test_compat.py index 588ac8be..36a60d39 100644 --- a/kombu/tests/test_compat.py +++ b/kombu/tests/test_compat.py @@ -38,44 +38,44 @@ class test_misc(TestCase): self.assertEqual(list(it2), [2, 3, 4, 5, 6, 7, 8, 9, 10, 11]) def test_Queue_from_dict(self): - defs = {"binding_key": "foo.#", - "exchange": "fooex", - "exchange_type": "topic", - "durable": True, - "auto_delete": False} - - q1 = Queue.from_dict("foo", **dict(defs)) - self.assertEqual(q1.name, "foo") - self.assertEqual(q1.routing_key, "foo.#") - self.assertEqual(q1.exchange.name, "fooex") - self.assertEqual(q1.exchange.type, "topic") + defs = {'binding_key': 'foo.#', + 'exchange': 'fooex', + 'exchange_type': 'topic', + 'durable': True, + 'auto_delete': False} + + q1 = Queue.from_dict('foo', **dict(defs)) + self.assertEqual(q1.name, 'foo') + self.assertEqual(q1.routing_key, 'foo.#') + self.assertEqual(q1.exchange.name, 'fooex') + self.assertEqual(q1.exchange.type, 'topic') self.assertTrue(q1.durable) self.assertTrue(q1.exchange.durable) self.assertFalse(q1.auto_delete) self.assertFalse(q1.exchange.auto_delete) - q2 = Queue.from_dict("foo", **dict(defs, + q2 = Queue.from_dict('foo', **dict(defs, exchange_durable=False)) self.assertTrue(q2.durable) self.assertFalse(q2.exchange.durable) - q3 = Queue.from_dict("foo", **dict(defs, + q3 = Queue.from_dict('foo', **dict(defs, exchange_auto_delete=True)) self.assertFalse(q3.auto_delete) self.assertTrue(q3.exchange.auto_delete) - q4 = Queue.from_dict("foo", **dict(defs, + q4 = Queue.from_dict('foo', **dict(defs, queue_durable=False)) self.assertFalse(q4.durable) self.assertTrue(q4.exchange.durable) - q5 = Queue.from_dict("foo", **dict(defs, + q5 = Queue.from_dict('foo', **dict(defs, queue_auto_delete=True)) self.assertTrue(q5.auto_delete) self.assertFalse(q5.exchange.auto_delete) - self.assertEqual(Queue.from_dict("foo", **dict(defs)), - Queue.from_dict("foo", **dict(defs))) + self.assertEqual(Queue.from_dict('foo', **dict(defs)), + Queue.from_dict('foo', **dict(defs))) class test_Publisher(TestCase): @@ -85,44 +85,44 @@ class test_Publisher(TestCase): def test_constructor(self): pub = compat.Publisher(self.connection, - exchange="test_Publisher_constructor", - routing_key="rkey") + exchange='test_Publisher_constructor', + routing_key='rkey') self.assertIsInstance(pub.backend, Channel) - self.assertEqual(pub.exchange.name, "test_Publisher_constructor") + self.assertEqual(pub.exchange.name, 'test_Publisher_constructor') self.assertTrue(pub.exchange.durable) self.assertFalse(pub.exchange.auto_delete) - self.assertEqual(pub.exchange.type, "direct") + self.assertEqual(pub.exchange.type, 'direct') pub2 = compat.Publisher(self.connection, - exchange="test_Publisher_constructor2", - routing_key="rkey", + exchange='test_Publisher_constructor2', + routing_key='rkey', auto_delete=True, durable=False) self.assertTrue(pub2.exchange.auto_delete) self.assertFalse(pub2.exchange.durable) - explicit = Exchange("test_Publisher_constructor_explicit", - type="topic") + explicit = Exchange('test_Publisher_constructor_explicit', + type='topic') pub3 = compat.Publisher(self.connection, exchange=explicit) self.assertEqual(pub3.exchange, explicit) compat.Publisher(self.connection, - exchange="test_Publisher_constructor3", + exchange='test_Publisher_constructor3', channel=self.connection.default_channel) def test_send(self): pub = compat.Publisher(self.connection, - exchange="test_Publisher_send", - routing_key="rkey") - pub.send({"foo": "bar"}) - self.assertIn("basic_publish", pub.backend) + exchange='test_Publisher_send', + routing_key='rkey') + pub.send({'foo': 'bar'}) + self.assertIn('basic_publish', pub.backend) pub.close() def test__enter__exit__(self): pub = compat.Publisher(self.connection, - exchange="test_Publisher_send", - routing_key="rkey") + exchange='test_Publisher_send', + routing_key='rkey') x = pub.__enter__() self.assertIs(x, pub) x.__exit__() @@ -134,15 +134,15 @@ class test_Consumer(TestCase): def setUp(self): self.connection = Connection(transport=Transport) - @patch("kombu.compat._iterconsume") - def test_iterconsume_calls__iterconsume(self, it, n="test_iterconsume"): + @patch('kombu.compat._iterconsume') + def test_iterconsume_calls__iterconsume(self, it, n='test_iterconsume'): c = compat.Consumer(self.connection, queue=n, exchange=n) c.iterconsume(limit=10, no_ack=True) it.assert_called_with(c.connection, c, True, 10) - def test_constructor(self, n="test_Consumer_constructor"): + def test_constructor(self, n='test_Consumer_constructor'): c = compat.Consumer(self.connection, queue=n, exchange=n, - routing_key="rkey") + routing_key='rkey') self.assertIsInstance(c.backend, Channel) q = c.queues[0] self.assertTrue(q.durable) @@ -152,9 +152,9 @@ class test_Consumer(TestCase): self.assertEqual(q.name, n) self.assertEqual(q.exchange.name, n) - c2 = compat.Consumer(self.connection, queue=n + "2", - exchange=n + "2", - routing_key="rkey", durable=False, + c2 = compat.Consumer(self.connection, queue=n + '2', + exchange=n + '2', + routing_key='rkey', durable=False, auto_delete=True, exclusive=True) q2 = c2.queues[0] self.assertFalse(q2.durable) @@ -162,78 +162,78 @@ class test_Consumer(TestCase): self.assertTrue(q2.auto_delete) self.assertTrue(q2.exchange.auto_delete) - def test__enter__exit__(self, n="test__enter__exit__"): + def test__enter__exit__(self, n='test__enter__exit__'): c = compat.Consumer(self.connection, queue=n, exchange=n, - routing_key="rkey") + routing_key='rkey') x = c.__enter__() self.assertIs(x, c) x.__exit__() self.assertTrue(c._closed) - def test_revive(self, n="test_revive"): + def test_revive(self, n='test_revive'): c = compat.Consumer(self.connection, queue=n, exchange=n) with self.connection.channel() as c2: c.revive(c2) self.assertIs(c.backend, c2) - def test__iter__(self, n="test__iter__"): + def test__iter__(self, n='test__iter__'): c = compat.Consumer(self.connection, queue=n, exchange=n) c.iterqueue = Mock() c.__iter__() c.iterqueue.assert_called_with(infinite=True) - def test_iter(self, n="test_iterqueue"): + def test_iter(self, n='test_iterqueue'): c = compat.Consumer(self.connection, queue=n, exchange=n, - routing_key="rkey") + routing_key='rkey') c.close() - def test_process_next(self, n="test_process_next"): + def test_process_next(self, n='test_process_next'): c = compat.Consumer(self.connection, queue=n, exchange=n, - routing_key="rkey") + routing_key='rkey') with self.assertRaises(NotImplementedError): c.process_next() c.close() - def test_iterconsume(self, n="test_iterconsume"): + def test_iterconsume(self, n='test_iterconsume'): c = compat.Consumer(self.connection, queue=n, exchange=n, - routing_key="rkey") + routing_key='rkey') c.close() - def test_discard_all(self, n="test_discard_all"): + def test_discard_all(self, n='test_discard_all'): c = compat.Consumer(self.connection, queue=n, exchange=n, - routing_key="rkey") + routing_key='rkey') c.discard_all() - self.assertIn("queue_purge", c.backend) + self.assertIn('queue_purge', c.backend) - def test_fetch(self, n="test_fetch"): + def test_fetch(self, n='test_fetch'): c = compat.Consumer(self.connection, queue=n, exchange=n, - routing_key="rkey") + routing_key='rkey') self.assertIsNone(c.fetch()) self.assertIsNone(c.fetch(no_ack=True)) - self.assertIn("basic_get", c.backend) + self.assertIn('basic_get', c.backend) callback_called = [False] def receive(payload, message): callback_called[0] = True - c.backend.to_deliver.append("42") - self.assertEqual(c.fetch().payload, "42") - c.backend.to_deliver.append("46") + c.backend.to_deliver.append('42') + self.assertEqual(c.fetch().payload, '42') + c.backend.to_deliver.append('46') c.register_callback(receive) - self.assertEqual(c.fetch(enable_callbacks=True).payload, "46") + self.assertEqual(c.fetch(enable_callbacks=True).payload, '46') self.assertTrue(callback_called[0]) - def test_discard_all_filterfunc_not_supported(self, n="xjf21j21"): + def test_discard_all_filterfunc_not_supported(self, n='xjf21j21'): c = compat.Consumer(self.connection, queue=n, exchange=n, - routing_key="rkey") + routing_key='rkey') with self.assertRaises(NotImplementedError): c.discard_all(filterfunc=lambda x: x) c.close() - def test_wait(self, n="test_wait"): + def test_wait(self, n='test_wait'): class C(compat.Consumer): @@ -242,11 +242,11 @@ class test_Consumer(TestCase): yield i c = C(self.connection, queue=n, exchange=n, - routing_key="rkey") + routing_key='rkey') self.assertEqual(c.wait(10), range(10)) c.close() - def test_iterqueue(self, n="test_iterqueue"): + def test_iterqueue(self, n='test_iterqueue'): i = [0] class C(compat.Consumer): @@ -257,7 +257,7 @@ class test_Consumer(TestCase): return z c = C(self.connection, queue=n, exchange=n, - routing_key="rkey") + routing_key='rkey') self.assertEqual(list(c.iterqueue(limit=10)), range(10)) c.close() @@ -267,14 +267,14 @@ class test_ConsumerSet(TestCase): def setUp(self): self.connection = Connection(transport=Transport) - @patch("kombu.compat._iterconsume") - def test_iterconsume(self, _iterconsume, n="test_iterconsume"): + @patch('kombu.compat._iterconsume') + def test_iterconsume(self, _iterconsume, n='test_iterconsume'): c = compat.Consumer(self.connection, queue=n, exchange=n) cs = compat.ConsumerSet(self.connection, consumers=[c]) cs.iterconsume(limit=10, no_ack=True) _iterconsume.assert_called_with(c.connection, cs, True, 10) - def test_revive(self, n="test_revive"): + def test_revive(self, n='test_revive'): c = compat.Consumer(self.connection, queue=n, exchange=n) cs = compat.ConsumerSet(self.connection, consumers=[c]) @@ -282,11 +282,11 @@ class test_ConsumerSet(TestCase): cs.revive(c2) self.assertIs(cs.backend, c2) - def test_constructor(self, prefix="0daf8h21"): - dcon = {"%s.xyx" % prefix: {"exchange": "%s.xyx" % prefix, - "routing_key": "xyx"}, - "%s.xyz" % prefix: {"exchange": "%s.xyz" % prefix, - "routing_key": "xyz"}} + def test_constructor(self, prefix='0daf8h21'): + dcon = {'%s.xyx' % prefix: {'exchange': '%s.xyx' % prefix, + 'routing_key': 'xyx'}, + '%s.xyz' % prefix: {'exchange': '%s.xyz' % prefix, + 'routing_key': 'xyz'}} consumers = [compat.Consumer(self.connection, queue=prefix + str(i), exchange=prefix + str(i)) for i in range(3)] @@ -297,25 +297,25 @@ class test_ConsumerSet(TestCase): self.assertEqual(len(c2.queues), 2) c.add_consumer(compat.Consumer(self.connection, - queue=prefix + "xaxxxa", - exchange=prefix + "xaxxxa")) + queue=prefix + 'xaxxxa', + exchange=prefix + 'xaxxxa')) self.assertEqual(len(c.queues), 4) for cq in c.queues: self.assertIs(cq.channel, c.channel) - c2.add_consumer_from_dict({"%s.xxx" % prefix: { - "exchange": "%s.xxx" % prefix, - "routing_key": "xxx"}}) + c2.add_consumer_from_dict({'%s.xxx' % prefix: { + 'exchange': '%s.xxx' % prefix, + 'routing_key': 'xxx'}}) self.assertEqual(len(c2.queues), 3) for c2q in c2.queues: self.assertIs(c2q.channel, c2.channel) c.discard_all() - self.assertEqual(c.channel.called.count("queue_purge"), 4) + self.assertEqual(c.channel.called.count('queue_purge'), 4) c.consume() c.close() c2.close() - self.assertIn("basic_cancel", c.channel) - self.assertIn("close", c.channel) - self.assertIn("close", c2.channel) + self.assertIn('basic_cancel', c.channel) + self.assertIn('close', c.channel) + self.assertIn('close', c2.channel) diff --git a/kombu/tests/test_compression.py b/kombu/tests/test_compression.py index 97fcd17d..2df3b388 100644 --- a/kombu/tests/test_compression.py +++ b/kombu/tests/test_compression.py @@ -20,34 +20,34 @@ class test_compression(TestCase): else: self.has_bzip2 = True - @mask_modules("bz2") + @mask_modules('bz2') def test_no_bz2(self): - c = sys.modules.pop("kombu.compression") + c = sys.modules.pop('kombu.compression') try: import kombu.compression - self.assertFalse(hasattr(kombu.compression, "bz2")) + self.assertFalse(hasattr(kombu.compression, 'bz2')) finally: if c is not None: - sys.modules["kombu.compression"] = c + sys.modules['kombu.compression'] = c def test_encoders(self): encoders = compression.encoders() - self.assertIn("application/x-gzip", encoders) + self.assertIn('application/x-gzip', encoders) if self.has_bzip2: - self.assertIn("application/x-bz2", encoders) + self.assertIn('application/x-bz2', encoders) def test_compress__decompress__zlib(self): - text = "The Quick Brown Fox Jumps Over The Lazy Dog" - c, ctype = compression.compress(text, "zlib") + text = 'The Quick Brown Fox Jumps Over The Lazy Dog' + c, ctype = compression.compress(text, 'zlib') self.assertNotEqual(text, c) d = compression.decompress(c, ctype) self.assertEqual(d, text) def test_compress__decompress__bzip2(self): if not self.has_bzip2: - raise SkipTest("bzip2 not available") - text = "The Brown Quick Fox Over The Lazy Dog Jumps" - c, ctype = compression.compress(text, "bzip2") + raise SkipTest('bzip2 not available') + text = 'The Brown Quick Fox Over The Lazy Dog Jumps' + c, ctype = compression.compress(text, 'bzip2') self.assertNotEqual(text, c) d = compression.decompress(c, ctype) self.assertEqual(d, text) diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py index 8a4719b2..eb464855 100644 --- a/kombu/tests/test_connection.py +++ b/kombu/tests/test_connection.py @@ -16,15 +16,15 @@ from .utils import Mock, skip_if_not_module class test_connection_utils(TestCase): def setUp(self): - self.url = "amqp://user:pass@localhost:5672/my/vhost" - self.nopass = "amqp://user@localhost:5672/my/vhost" + self.url = 'amqp://user:pass@localhost:5672/my/vhost' + self.nopass = 'amqp://user@localhost:5672/my/vhost' self.expected = { - "transport": "amqp", - "userid": "user", - "password": "pass", - "hostname": "localhost", - "port": 5672, - "virtual_host": "my/vhost", + 'transport': 'amqp', + 'userid': 'user', + 'password': 'pass', + 'hostname': 'localhost', + 'port': 5672, + 'virtual_host': 'my/vhost', } def test_parse_url(self): @@ -32,8 +32,8 @@ class test_connection_utils(TestCase): self.assertDictEqual(result, self.expected) def test_parse_url_mongodb(self): - result = parse_url("mongodb://example.com/") - self.assertEqual(result["hostname"], "example.com/") + result = parse_url('mongodb://example.com/') + self.assertEqual(result['hostname'], 'example.com/') def test_parse_generated_as_uri(self): conn = BrokerConnection(self.url) @@ -44,14 +44,14 @@ class test_connection_utils(TestCase): self.assertEqual(conn.as_uri(), self.nopass) self.assertEqual(conn.as_uri(include_password=True), self.url) - @skip_if_not_module("pymongo") + @skip_if_not_module('pymongo') def test_as_uri_when_mongodb(self): - x = BrokerConnection("mongodb://localhost") + x = BrokerConnection('mongodb://localhost') self.assertTrue(x.as_uri()) def test_bogus_scheme(self): with self.assertRaises(KeyError): - BrokerConnection("bogus://localhost:7421").transport + BrokerConnection('bogus://localhost:7421').transport def assert_info(self, conn, **fields): info = conn.info() @@ -63,79 +63,79 @@ class test_connection_utils(TestCase): C = BrokerConnection self.assert_info( - C("amqp://user:pass@host:10000/vhost"), - userid="user", password="pass", hostname="host", - port=10000, virtual_host="vhost") + C('amqp://user:pass@host:10000/vhost'), + userid='user', password='pass', hostname='host', + port=10000, virtual_host='vhost') self.assert_info( - C("amqp://user%61:%61pass@ho%61st:10000/v%2fhost"), - userid="usera", password="apass", - hostname="hoast", port=10000, - virtual_host="v/host") + C('amqp://user%61:%61pass@ho%61st:10000/v%2fhost'), + userid='usera', password='apass', + hostname='hoast', port=10000, + virtual_host='v/host') self.assert_info( - C("amqp://"), - userid="guest", password="guest", - hostname="localhost", port=5672, - virtual_host="/") + C('amqp://'), + userid='guest', password='guest', + hostname='localhost', port=5672, + virtual_host='/') self.assert_info( - C("amqp://:@/"), - userid="guest", password="guest", - hostname="localhost", port=5672, - virtual_host="/") + C('amqp://:@/'), + userid='guest', password='guest', + hostname='localhost', port=5672, + virtual_host='/') self.assert_info( - C("amqp://user@/"), - userid="user", password="guest", - hostname="localhost", port=5672, - virtual_host="/") + C('amqp://user@/'), + userid='user', password='guest', + hostname='localhost', port=5672, + virtual_host='/') self.assert_info( - C("amqp://user:pass@/"), - userid="user", password="pass", - hostname="localhost", port=5672, - virtual_host="/") + C('amqp://user:pass@/'), + userid='user', password='pass', + hostname='localhost', port=5672, + virtual_host='/') self.assert_info( - C("amqp://host"), - userid="guest", password="guest", - hostname="host", port=5672, - virtual_host="/") + C('amqp://host'), + userid='guest', password='guest', + hostname='host', port=5672, + virtual_host='/') self.assert_info( - C("amqp://:10000"), - userid="guest", password="guest", - hostname="localhost", port=10000, - virtual_host="/") + C('amqp://:10000'), + userid='guest', password='guest', + hostname='localhost', port=10000, + virtual_host='/') self.assert_info( - C("amqp:///vhost"), - userid="guest", password="guest", - hostname="localhost", port=5672, - virtual_host="vhost") + C('amqp:///vhost'), + userid='guest', password='guest', + hostname='localhost', port=5672, + virtual_host='vhost') self.assert_info( - C("amqp://host/"), - userid="guest", password="guest", - hostname="host", port=5672, - virtual_host="/") + C('amqp://host/'), + userid='guest', password='guest', + hostname='host', port=5672, + virtual_host='/') self.assert_info( - C("amqp://host/%2f"), - userid="guest", password="guest", - hostname="host", port=5672, - virtual_host="/") + C('amqp://host/%2f'), + userid='guest', password='guest', + hostname='host', port=5672, + virtual_host='/') def test_url_IPV6(self): C = BrokerConnection raise SkipTest("urllib can't parse ipv6 urls") self.assert_info( - C("amqp://[::1]"), - userid="guest", password="guest", - hostname="[::1]", port=5672, - virtual_host="/") + C('amqp://[::1]'), + userid='guest', password='guest', + hostname='[::1]', port=5672, + virtual_host='/') class test_Connection(TestCase): @@ -147,10 +147,10 @@ class test_Connection(TestCase): conn = self.conn conn.connect() self.assertTrue(conn.connection.connected) - self.assertEqual(conn.host, "localhost:5672") + self.assertEqual(conn.host, 'localhost:5672') channel = conn.channel() self.assertTrue(channel.open) - self.assertEqual(conn.drain_events(), "event") + self.assertEqual(conn.drain_events(), 'event') _connection = conn.connection conn.close() self.assertFalse(_connection.connected) @@ -175,7 +175,7 @@ class test_Connection(TestCase): connection_errors = (_CustomError, ) def close_connection(self, connection): - raise _CustomError("foo") + raise _CustomError('foo') conn = BrokerConnection(transport=MyTransport) conn.connect() @@ -196,7 +196,7 @@ class test_Connection(TestCase): def connection_errors(self): return (KeyError, ) - conn = Conn("memory://") + conn = Conn('memory://') conn._default_channel = Mock() conn._default_channel.close.side_effect = KeyError() @@ -216,17 +216,17 @@ class test_Connection(TestCase): def test_ensure_success(self): def publish(): - return "foobar" + return 'foobar' ensured = self.conn.ensure(None, publish) - self.assertEqual(ensured(), "foobar") + self.assertEqual(ensured(), 'foobar') def test_ensure_failure(self): class _CustomError(Exception): pass def publish(): - raise _CustomError("bar") + raise _CustomError('bar') ensured = self.conn.ensure(None, publish) with self.assertRaises(_CustomError): @@ -237,7 +237,7 @@ class test_Connection(TestCase): pass def publish(): - raise _ConnectionError("failed connection") + raise _ConnectionError('failed connection') self.conn.transport.connection_errors = (_ConnectionError,) ensured = self.conn.ensure(self.conn, publish) @@ -246,13 +246,13 @@ class test_Connection(TestCase): def test_autoretry(self): myfun = Mock() - myfun.__name__ = "test_autoretry" + myfun.__name__ = 'test_autoretry' self.conn.transport.connection_errors = (KeyError, ) def on_call(*args, **kwargs): myfun.side_effect = None - raise KeyError("foo") + raise KeyError('foo') myfun.side_effect = on_call insured = self.conn.autoretry(myfun) @@ -262,18 +262,18 @@ class test_Connection(TestCase): def test_SimpleQueue(self): conn = self.conn - q = conn.SimpleQueue("foo") + q = conn.SimpleQueue('foo') self.assertIs(q.channel, conn.default_channel) chan = conn.channel() - q2 = conn.SimpleQueue("foo", channel=chan) + q2 = conn.SimpleQueue('foo', channel=chan) self.assertIs(q2.channel, chan) def test_SimpleBuffer(self): conn = self.conn - q = conn.SimpleBuffer("foo") + q = conn.SimpleBuffer('foo') self.assertIs(q.channel, conn.default_channel) chan = conn.channel() - q2 = conn.SimpleBuffer("foo", channel=chan) + q2 = conn.SimpleBuffer('foo', channel=chan) self.assertIs(q2.channel, chan) def test_Producer(self): @@ -313,7 +313,7 @@ class test_Connection(TestCase): class test_Connection_with_transport_options(TestCase): - transport_options = {"pool_recycler": 3600, "echo": True} + transport_options = {'pool_recycler': 3600, 'echo': True} def setUp(self): self.conn = BrokerConnection(port=5672, transport=Transport, @@ -334,7 +334,7 @@ class ResourceCase(TestCase): abstract = True def create_resource(self, limit, preload): - raise NotImplementedError("subclass responsibility") + raise NotImplementedError('subclass responsibility') def assertState(self, P, avail, dirty): self.assertEqual(P._resource.qsize(), avail) @@ -400,7 +400,7 @@ class ResourceCase(TestCase): return P = self.create_resource(10, 10) cr = P.close_resource = Mock() - cr.side_effect = AttributeError("x") + cr.side_effect = AttributeError('x') P.acquire() self.assertTrue(P._dirty) @@ -450,7 +450,7 @@ class test_ConnectionPool(ResourceCase): def test_prepare_not_callable(self): P = self.create_resource(None, None) - conn = BrokerConnection("memory://") + conn = BrokerConnection('memory://') self.assertIs(P.prepare(conn), conn) def test_acquire_channel(self): @@ -472,7 +472,7 @@ class test_ChannelPool(ResourceCase): self.assertTrue(q[0].basic_consume) self.assertTrue(q[1].basic_consume) with self.assertRaises(AttributeError): - getattr(q[2], "basic_consume") + getattr(q[2], 'basic_consume') def test_setup_no_limit(self): P = self.create_resource(None, None) @@ -481,6 +481,6 @@ class test_ChannelPool(ResourceCase): def test_prepare_not_callable(self): P = self.create_resource(10, 0) - conn = BrokerConnection("memory://") + conn = BrokerConnection('memory://') chan = conn.default_channel self.assertIs(P.prepare(chan), chan) diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py index 2ad1df1e..95892edc 100644 --- a/kombu/tests/test_entities.py +++ b/kombu/tests/test_entities.py @@ -17,36 +17,36 @@ def get_conn(): class test_Exchange(TestCase): def test_bound(self): - exchange = Exchange("foo", "direct") + exchange = Exchange('foo', 'direct') self.assertFalse(exchange.is_bound) - self.assertIn("<unbound", repr(exchange)) + self.assertIn('<unbound', repr(exchange)) chan = get_conn().channel() bound = exchange.bind(chan) self.assertTrue(bound.is_bound) self.assertIs(bound.channel, chan) - self.assertIn("<bound", repr(bound)) + self.assertIn('<bound', repr(bound)) def test_hash(self): - self.assertEqual(hash(Exchange("a")), hash(Exchange("a"))) - self.assertNotEqual(hash(Exchange("a")), hash(Exchange("b"))) + self.assertEqual(hash(Exchange('a')), hash(Exchange('a'))) + self.assertNotEqual(hash(Exchange('a')), hash(Exchange('b'))) def test_can_cache_declaration(self): - self.assertTrue(Exchange("a", durable=True).can_cache_declaration) - self.assertFalse(Exchange("a", durable=False).can_cache_declaration) + self.assertTrue(Exchange('a', durable=True).can_cache_declaration) + self.assertFalse(Exchange('a', durable=False).can_cache_declaration) def test_eq(self): - e1 = Exchange("foo", "direct") - e2 = Exchange("foo", "direct") + e1 = Exchange('foo', 'direct') + e2 = Exchange('foo', 'direct') self.assertEqual(e1, e2) - e3 = Exchange("foo", "topic") + e3 = Exchange('foo', 'topic') self.assertNotEqual(e1, e3) self.assertFalse(e1.__eq__(True)) def test_revive(self): - exchange = Exchange("foo", "direct") + exchange = Exchange('foo', 'direct') conn = get_conn() chan = conn.channel() @@ -65,63 +65,63 @@ class test_Exchange(TestCase): self.assertIs(bound._channel, chan2) def test_assert_is_bound(self): - exchange = Exchange("foo", "direct") + exchange = Exchange('foo', 'direct') with self.assertRaises(NotBoundError): exchange.declare() conn = get_conn() chan = conn.channel() exchange.bind(chan).declare() - self.assertIn("exchange_declare", chan) + self.assertIn('exchange_declare', chan) def test_set_transient_delivery_mode(self): - exc = Exchange("foo", "direct", delivery_mode="transient") + exc = Exchange('foo', 'direct', delivery_mode='transient') self.assertEqual(exc.delivery_mode, Exchange.TRANSIENT_DELIVERY_MODE) def test_set_persistent_delivery_mode(self): - exc = Exchange("foo", "direct", delivery_mode="persistent") + exc = Exchange('foo', 'direct', delivery_mode='persistent') self.assertEqual(exc.delivery_mode, Exchange.PERSISTENT_DELIVERY_MODE) def test_bind_at_instantiation(self): - self.assertTrue(Exchange("foo", channel=get_conn().channel()).is_bound) + self.assertTrue(Exchange('foo', channel=get_conn().channel()).is_bound) def test_create_message(self): chan = get_conn().channel() - Exchange("foo", channel=chan).Message({"foo": "bar"}) - self.assertIn("prepare_message", chan) + Exchange('foo', channel=chan).Message({'foo': 'bar'}) + self.assertIn('prepare_message', chan) def test_publish(self): chan = get_conn().channel() - Exchange("foo", channel=chan).publish("the quick brown fox") - self.assertIn("basic_publish", chan) + Exchange('foo', channel=chan).publish('the quick brown fox') + self.assertIn('basic_publish', chan) def test_delete(self): chan = get_conn().channel() - Exchange("foo", channel=chan).delete() - self.assertIn("exchange_delete", chan) + Exchange('foo', channel=chan).delete() + self.assertIn('exchange_delete', chan) def test__repr__(self): - b = Exchange("foo", "topic") - self.assertIn("foo(topic)", repr(b)) - self.assertIn("Exchange", repr(b)) + b = Exchange('foo', 'topic') + self.assertIn('foo(topic)', repr(b)) + self.assertIn('Exchange', repr(b)) class test_Queue(TestCase): def setUp(self): - self.exchange = Exchange("foo", "direct") + self.exchange = Exchange('foo', 'direct') def test_hash(self): - self.assertEqual(hash(Queue("a")), hash(Queue("a"))) - self.assertNotEqual(hash(Queue("a")), hash(Queue("b"))) + self.assertEqual(hash(Queue('a')), hash(Queue('a'))) + self.assertNotEqual(hash(Queue('a')), hash(Queue('b'))) def test_when_bound_but_no_exchange(self): - q = Queue("a") + q = Queue('a') q.exchange = None self.assertIsNone(q.when_bound()) def test_declare_but_no_exchange(self): - q = Queue("a") + q = Queue('a') q.queue_declare = Mock() q.queue_bind = Mock() q.exchange = None @@ -131,29 +131,29 @@ class test_Queue(TestCase): q.queue_bind.assert_called_with(False) def test_can_cache_declaration(self): - self.assertTrue(Queue("a", durable=True).can_cache_declaration) - self.assertFalse(Queue("a", durable=False).can_cache_declaration) + self.assertTrue(Queue('a', durable=True).can_cache_declaration) + self.assertFalse(Queue('a', durable=False).can_cache_declaration) def test_eq(self): - q1 = Queue("xxx", Exchange("xxx", "direct"), "xxx") - q2 = Queue("xxx", Exchange("xxx", "direct"), "xxx") + q1 = Queue('xxx', Exchange('xxx', 'direct'), 'xxx') + q2 = Queue('xxx', Exchange('xxx', 'direct'), 'xxx') self.assertEqual(q1, q2) self.assertFalse(q1.__eq__(True)) - q3 = Queue("yyy", Exchange("xxx", "direct"), "xxx") + q3 = Queue('yyy', Exchange('xxx', 'direct'), 'xxx') self.assertNotEqual(q1, q3) def test_exclusive_implies_auto_delete(self): self.assertTrue( - Queue("foo", self.exchange, exclusive=True).auto_delete) + Queue('foo', self.exchange, exclusive=True).auto_delete) def test_binds_at_instantiation(self): - self.assertTrue(Queue("foo", self.exchange, + self.assertTrue(Queue('foo', self.exchange, channel=get_conn().channel()).is_bound) def test_also_binds_exchange(self): chan = get_conn().channel() - b = Queue("foo", self.exchange) + b = Queue('foo', self.exchange) self.assertFalse(b.is_bound) self.assertFalse(b.exchange.is_bound) b = b.bind(chan) @@ -164,49 +164,49 @@ class test_Queue(TestCase): def test_declare(self): chan = get_conn().channel() - b = Queue("foo", self.exchange, "foo", channel=chan) + b = Queue('foo', self.exchange, 'foo', channel=chan) self.assertTrue(b.is_bound) b.declare() - self.assertIn("exchange_declare", chan) - self.assertIn("queue_declare", chan) - self.assertIn("queue_bind", chan) + self.assertIn('exchange_declare', chan) + self.assertIn('queue_declare', chan) + self.assertIn('queue_bind', chan) def test_get(self): - b = Queue("foo", self.exchange, "foo", channel=get_conn().channel()) + b = Queue('foo', self.exchange, 'foo', channel=get_conn().channel()) b.get() - self.assertIn("basic_get", b.channel) + self.assertIn('basic_get', b.channel) def test_purge(self): - b = Queue("foo", self.exchange, "foo", channel=get_conn().channel()) + b = Queue('foo', self.exchange, 'foo', channel=get_conn().channel()) b.purge() - self.assertIn("queue_purge", b.channel) + self.assertIn('queue_purge', b.channel) def test_consume(self): - b = Queue("foo", self.exchange, "foo", channel=get_conn().channel()) - b.consume("fifafo", None) - self.assertIn("basic_consume", b.channel) + b = Queue('foo', self.exchange, 'foo', channel=get_conn().channel()) + b.consume('fifafo', None) + self.assertIn('basic_consume', b.channel) def test_cancel(self): - b = Queue("foo", self.exchange, "foo", channel=get_conn().channel()) - b.cancel("fifafo") - self.assertIn("basic_cancel", b.channel) + b = Queue('foo', self.exchange, 'foo', channel=get_conn().channel()) + b.cancel('fifafo') + self.assertIn('basic_cancel', b.channel) def test_delete(self): - b = Queue("foo", self.exchange, "foo", channel=get_conn().channel()) + b = Queue('foo', self.exchange, 'foo', channel=get_conn().channel()) b.delete() - self.assertIn("queue_delete", b.channel) + self.assertIn('queue_delete', b.channel) def test_unbind(self): - b = Queue("foo", self.exchange, "foo", channel=get_conn().channel()) + b = Queue('foo', self.exchange, 'foo', channel=get_conn().channel()) b.unbind() - self.assertIn("queue_unbind", b.channel) + self.assertIn('queue_unbind', b.channel) def test_as_dict(self): - q = Queue("foo", self.exchange, "rk") + q = Queue('foo', self.exchange, 'rk') d = q.as_dict(recurse=True) - self.assertEqual(d["exchange"]["name"], self.exchange.name) + self.assertEqual(d['exchange']['name'], self.exchange.name) def test__repr__(self): - b = Queue("foo", self.exchange, "foo") - self.assertIn("foo", repr(b)) - self.assertIn("Queue", repr(b)) + b = Queue('foo', self.exchange, 'foo') + self.assertIn('foo', repr(b)) + self.assertIn('Queue', repr(b)) diff --git a/kombu/tests/test_log.py b/kombu/tests/test_log.py index 5ca7d1ed..62dd1b35 100644 --- a/kombu/tests/test_log.py +++ b/kombu/tests/test_log.py @@ -15,119 +15,119 @@ class test_NullHandler(TestCase): def test_emit(self): h = log.NullHandler() - h.emit("record") + h.emit('record') class test_get_logger(TestCase): def test_when_string(self): - l = log.get_logger("foo") + l = log.get_logger('foo') - self.assertIs(l, logging.getLogger("foo")) + self.assertIs(l, logging.getLogger('foo')) h1 = l.handlers[0] self.assertIsInstance(h1, log.NullHandler) def test_when_logger(self): - l = log.get_logger(logging.getLogger("foo")) + l = log.get_logger(logging.getLogger('foo')) h1 = l.handlers[0] self.assertIsInstance(h1, log.NullHandler) def test_with_custom_handler(self): - l = logging.getLogger("bar") + l = logging.getLogger('bar') handler = log.NullHandler() l.addHandler(handler) - l = log.get_logger("bar") + l = log.get_logger('bar') self.assertIs(l.handlers[0], handler) def test_anon_logger(self): - l = log.anon_logger("test_anon_logger") + l = log.anon_logger('test_anon_logger') self.assertIsInstance(l.handlers[0], log.NullHandler) def test_get_loglevel(self): - self.assertEqual(log.get_loglevel("DEBUG"), logging.DEBUG) - self.assertEqual(log.get_loglevel("ERROR"), logging.ERROR) + self.assertEqual(log.get_loglevel('DEBUG'), logging.DEBUG) + self.assertEqual(log.get_loglevel('ERROR'), logging.ERROR) self.assertEqual(log.get_loglevel(logging.INFO), logging.INFO) class test_safe_format(TestCase): def test_formatting(self): - fmt = "The %r jumped over the %s" - args = ["frog", "elephant"] + fmt = 'The %r jumped over the %s' + args = ['frog', 'elephant'] res = list(log.safeify_format(fmt, *args)) - self.assertListEqual(res, ["'frog'", "elephant"]) + self.assertListEqual(res, ["'frog'", 'elephant']) class test_LogMixin(TestCase): def setUp(self): - self.log = log.Log("Log", Mock()) + self.log = log.Log('Log', Mock()) self.logger = self.log.logger def test_debug(self): - self.log.debug("debug") - self.logger.log.assert_called_with(logging.DEBUG, "Log - debug") + self.log.debug('debug') + self.logger.log.assert_called_with(logging.DEBUG, 'Log - debug') def test_info(self): - self.log.info("info") - self.logger.log.assert_called_with(logging.INFO, "Log - info") + self.log.info('info') + self.logger.log.assert_called_with(logging.INFO, 'Log - info') def test_warning(self): - self.log.warn("warning") - self.logger.log.assert_called_with(logging.WARN, "Log - warning") + self.log.warn('warning') + self.logger.log.assert_called_with(logging.WARN, 'Log - warning') def test_error(self): - self.log.error("error", exc_info="exc") - self.logger.log.assert_called_with(logging.ERROR, "Log - error", - exc_info="exc") + self.log.error('error', exc_info='exc') + self.logger.log.assert_called_with(logging.ERROR, 'Log - error', + exc_info='exc') def test_critical(self): - self.log.critical("crit", exc_info="exc") - self.logger.log.assert_called_with(logging.CRITICAL, "Log - crit", - exc_info="exc") + self.log.critical('crit', exc_info='exc') + self.logger.log.assert_called_with(logging.CRITICAL, 'Log - crit', + exc_info='exc') def test_error_when_DISABLE_TRACEBACKS(self): log.DISABLE_TRACEBACKS = True try: - self.log.error("error") - self.logger.log.assert_called_with(logging.ERROR, "Log - error") + self.log.error('error') + self.logger.log.assert_called_with(logging.ERROR, 'Log - error') finally: log.DISABLE_TRACEBACKS = False def test_get_loglevel(self): - self.assertEqual(self.log.get_loglevel("DEBUG"), logging.DEBUG) - self.assertEqual(self.log.get_loglevel("ERROR"), logging.ERROR) + self.assertEqual(self.log.get_loglevel('DEBUG'), logging.DEBUG) + self.assertEqual(self.log.get_loglevel('ERROR'), logging.ERROR) self.assertEqual(self.log.get_loglevel(logging.INFO), logging.INFO) def test_is_enabled_for(self): self.logger.isEnabledFor.return_value = True - self.assertTrue(self.log.is_enabled_for("DEBUG")) + self.assertTrue(self.log.is_enabled_for('DEBUG')) self.logger.isEnabledFor.assert_called_with(logging.DEBUG) def test_LogMixin_get_logger(self): self.assertIs(log.LogMixin().get_logger(), - logging.getLogger("LogMixin")) + logging.getLogger('LogMixin')) def test_Log_get_logger(self): - self.assertIs(log.Log("test_Log").get_logger(), - logging.getLogger("test_Log")) + self.assertIs(log.Log('test_Log').get_logger(), + logging.getLogger('test_Log')) def test_log_when_not_enabled(self): self.logger.isEnabledFor.return_value = False - self.log.debug("debug") + self.log.debug('debug') self.assertFalse(self.logger.log.called) def test_log_with_format(self): - self.log.debug("Host %r removed", "example.com") + self.log.debug('Host %r removed', 'example.com') self.logger.log.assert_called_with(logging.DEBUG, - "Log - Host %s removed", "'example.com'") + 'Log - Host %s removed', "'example.com'") class test_setup_logging(TestCase): - @patch("logging.getLogger") + @patch('logging.getLogger') def test_set_up_default_values(self, getLogger): logger = logging.getLogger.return_value = Mock() logger.handlers = [] @@ -140,18 +140,18 @@ class test_setup_logging(TestCase): self.assertIsInstance(handler, logging.StreamHandler) self.assertIs(handler.stream, sys.__stderr__) - @patch("logging.getLogger") - @patch("kombu.log.WatchedFileHandler") + @patch('logging.getLogger') + @patch('kombu.log.WatchedFileHandler') def test_setup_custom_values(self, getLogger, WatchedFileHandler): logger = logging.getLogger.return_value = Mock() logger.handlers = [] - log.setup_logging(loglevel=logging.DEBUG, logfile="/var/logfile") + log.setup_logging(loglevel=logging.DEBUG, logfile='/var/logfile') logger.setLevel.assert_called_with(logging.DEBUG) self.assertTrue(logger.addHandler.called) self.assertTrue(WatchedFileHandler.called) - @patch("logging.getLogger") + @patch('logging.getLogger') def test_logger_already_setup(self, getLogger): logger = logging.getLogger.return_value = Mock() logger.handlers = [Mock()] diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py index 1c74d74f..b5385d71 100644 --- a/kombu/tests/test_messaging.py +++ b/kombu/tests/test_messaging.py @@ -18,20 +18,20 @@ from .utils import Mock class test_Producer(TestCase): def setUp(self): - self.exchange = Exchange("foo", "direct") + self.exchange = Exchange('foo', 'direct') self.connection = BrokerConnection(transport=Transport) self.connection.connect() self.assertTrue(self.connection.connection.connected) self.assertFalse(self.exchange.is_bound) - @patch("kombu.common.maybe_declare") + @patch('kombu.common.maybe_declare') def test_maybe_declare(self, maybe_declare): p = self.connection.Producer() - q = Queue("foo") + q = Queue('foo') p.maybe_declare(q) maybe_declare.assert_called_with(q, p.channel, False) - @patch("kombu.common.maybe_declare") + @patch('kombu.common.maybe_declare') def test_maybe_declare_when_entity_false(self, maybe_declare): p = self.connection.Producer() p.maybe_declare(None) @@ -41,92 +41,92 @@ class test_Producer(TestCase): channel = self.connection.channel() p = Producer(channel, self.exchange, auto_declare=True) self.assertIsNot(p.exchange, self.exchange, - "creates Exchange clone at bind") + 'creates Exchange clone at bind') self.assertTrue(p.exchange.is_bound) - self.assertIn("exchange_declare", channel, - "auto_declare declares exchange") + self.assertIn('exchange_declare', channel, + 'auto_declare declares exchange') def test_manual_declare(self): channel = self.connection.channel() p = Producer(channel, self.exchange, auto_declare=False) self.assertTrue(p.exchange.is_bound) - self.assertNotIn("exchange_declare", channel, - "auto_declare=False does not declare exchange") + self.assertNotIn('exchange_declare', channel, + 'auto_declare=False does not declare exchange') p.declare() - self.assertIn("exchange_declare", channel, - "p.declare() declares exchange") + self.assertIn('exchange_declare', channel, + 'p.declare() declares exchange') def test_prepare(self): - message = {u"the quick brown fox": u"jumps over the lazy dog"} + message = {u'the quick brown fox': u'jumps over the lazy dog'} channel = self.connection.channel() - p = Producer(channel, self.exchange, serializer="json") + p = Producer(channel, self.exchange, serializer='json') m, ctype, cencoding = p._prepare(message, headers={}) self.assertDictEqual(message, anyjson.loads(m)) - self.assertEqual(ctype, "application/json") - self.assertEqual(cencoding, "utf-8") + self.assertEqual(ctype, 'application/json') + self.assertEqual(cencoding, 'utf-8') def test_prepare_compression(self): - message = {u"the quick brown fox": u"jumps over the lazy dog"} + message = {u'the quick brown fox': u'jumps over the lazy dog'} channel = self.connection.channel() - p = Producer(channel, self.exchange, serializer="json") + p = Producer(channel, self.exchange, serializer='json') headers = {} - m, ctype, cencoding = p._prepare(message, compression="zlib", + m, ctype, cencoding = p._prepare(message, compression='zlib', headers=headers) - self.assertEqual(ctype, "application/json") - self.assertEqual(cencoding, "utf-8") - self.assertEqual(headers["compression"], "application/x-gzip") + self.assertEqual(ctype, 'application/json') + self.assertEqual(cencoding, 'utf-8') + self.assertEqual(headers['compression'], 'application/x-gzip') import zlib self.assertEqual(anyjson.loads( - zlib.decompress(m).decode("utf-8")), message) + zlib.decompress(m).decode('utf-8')), message) def test_prepare_custom_content_type(self): - message = "the quick brown fox".encode("utf-8") + message = 'the quick brown fox'.encode('utf-8') channel = self.connection.channel() - p = Producer(channel, self.exchange, serializer="json") - m, ctype, cencoding = p._prepare(message, content_type="custom") + p = Producer(channel, self.exchange, serializer='json') + m, ctype, cencoding = p._prepare(message, content_type='custom') self.assertEqual(m, message) - self.assertEqual(ctype, "custom") - self.assertEqual(cencoding, "binary") - m, ctype, cencoding = p._prepare(message, content_type="custom", - content_encoding="alien") + self.assertEqual(ctype, 'custom') + self.assertEqual(cencoding, 'binary') + m, ctype, cencoding = p._prepare(message, content_type='custom', + content_encoding='alien') self.assertEqual(m, message) - self.assertEqual(ctype, "custom") - self.assertEqual(cencoding, "alien") + self.assertEqual(ctype, 'custom') + self.assertEqual(cencoding, 'alien') def test_prepare_is_already_unicode(self): - message = u"the quick brown fox" + message = u'the quick brown fox' channel = self.connection.channel() - p = Producer(channel, self.exchange, serializer="json") - m, ctype, cencoding = p._prepare(message, content_type="text/plain") - self.assertEqual(m, message.encode("utf-8")) - self.assertEqual(ctype, "text/plain") - self.assertEqual(cencoding, "utf-8") - m, ctype, cencoding = p._prepare(message, content_type="text/plain", - content_encoding="utf-8") - self.assertEqual(m, message.encode("utf-8")) - self.assertEqual(ctype, "text/plain") - self.assertEqual(cencoding, "utf-8") + p = Producer(channel, self.exchange, serializer='json') + m, ctype, cencoding = p._prepare(message, content_type='text/plain') + self.assertEqual(m, message.encode('utf-8')) + self.assertEqual(ctype, 'text/plain') + self.assertEqual(cencoding, 'utf-8') + m, ctype, cencoding = p._prepare(message, content_type='text/plain', + content_encoding='utf-8') + self.assertEqual(m, message.encode('utf-8')) + self.assertEqual(ctype, 'text/plain') + self.assertEqual(cencoding, 'utf-8') def test_publish_with_Exchange_instance(self): p = self.connection.Producer() p.exchange.publish = Mock() - p.publish("hello", exchange=Exchange("foo")) - self.assertEqual(p.exchange.publish.call_args[0][4], "foo") + p.publish('hello', exchange=Exchange('foo')) + self.assertEqual(p.exchange.publish.call_args[0][4], 'foo') def test_publish_retry_with_declare(self): p = self.connection.Producer() p.maybe_declare = Mock() ensure = p.connection.ensure = Mock() - ex = Exchange("foo") - p.publish("hello", exchange=ex, declare=[ex], retry=True, - retry_policy={"step": 4}) + ex = Exchange('foo') + p.publish('hello', exchange=ex, declare=[ex], retry=True, + retry_policy={'step': 4}) p.maybe_declare.assert_called_with(ex, True, step=4) ensure.assert_called_with(p, p.exchange.publish, step=4) def test_revive_when_channel_is_connection(self): p = self.connection.Producer() p.exchange = Mock() - new_conn = BrokerConnection("memory://") + new_conn = BrokerConnection('memory://') defchan = new_conn.default_channel p.revive(new_conn) @@ -148,20 +148,20 @@ class test_Producer(TestCase): def test_publish(self): channel = self.connection.channel() - p = Producer(channel, self.exchange, serializer="json") - message = {u"the quick brown fox": u"jumps over the lazy dog"} - ret = p.publish(message, routing_key="process") - self.assertIn("prepare_message", channel) - self.assertIn("basic_publish", channel) + p = Producer(channel, self.exchange, serializer='json') + message = {u'the quick brown fox': u'jumps over the lazy dog'} + ret = p.publish(message, routing_key='process') + self.assertIn('prepare_message', channel) + self.assertIn('basic_publish', channel) m, exc, rkey = ret - self.assertDictEqual(message, anyjson.loads(m["body"])) - self.assertDictContainsSubset({"content_type": "application/json", - "content_encoding": "utf-8", - "priority": 0}, m) - self.assertDictContainsSubset({"delivery_mode": 2}, m["properties"]) + self.assertDictEqual(message, anyjson.loads(m['body'])) + self.assertDictContainsSubset({'content_type': 'application/json', + 'content_encoding': 'utf-8', + 'priority': 0}, m) + self.assertDictContainsSubset({'delivery_mode': 2}, m['properties']) self.assertEqual(exc, p.exchange.name) - self.assertEqual(rkey, "process") + self.assertEqual(rkey, 'process') def test_no_exchange(self): chan = self.connection.channel() @@ -183,7 +183,7 @@ class test_Producer(TestCase): pass p = Producer(chan, on_return=on_return) - self.assertTrue(on_return in chan.events["basic_return"]) + self.assertTrue(on_return in chan.events['basic_return']) self.assertTrue(p.on_return) @@ -193,11 +193,11 @@ class test_Consumer(TestCase): self.connection = BrokerConnection(transport=Transport) self.connection.connect() self.assertTrue(self.connection.connection.connected) - self.exchange = Exchange("foo", "direct") + self.exchange = Exchange('foo', 'direct') def test_set_no_ack(self): channel = self.connection.channel() - queue = Queue("qname", self.exchange, "rkey") + queue = Queue('qname', self.exchange, 'rkey') consumer = Consumer(channel, queue, auto_declare=True, no_ack=True) self.assertTrue(consumer.no_ack) @@ -224,30 +224,30 @@ class test_Consumer(TestCase): def test_consuming_from(self): consumer = self.connection.Consumer() - consumer.queues[:] = [Queue("a"), Queue("b")] - self.assertFalse(consumer.consuming_from(Queue("c"))) - self.assertFalse(consumer.consuming_from("c")) - self.assertTrue(consumer.consuming_from(Queue("a"))) - self.assertTrue(consumer.consuming_from(Queue("b"))) - self.assertTrue(consumer.consuming_from("b")) + consumer.queues[:] = [Queue('a'), Queue('b')] + self.assertFalse(consumer.consuming_from(Queue('c'))) + self.assertFalse(consumer.consuming_from('c')) + self.assertTrue(consumer.consuming_from(Queue('a'))) + self.assertTrue(consumer.consuming_from(Queue('b'))) + self.assertTrue(consumer.consuming_from('b')) def test_receive_callback_without_m2p(self): channel = self.connection.channel() c = channel.Consumer() - m2p = getattr(channel, "message_to_python") + m2p = getattr(channel, 'message_to_python') channel.message_to_python = None try: message = Mock() - message.decode.return_value = "Hello" + message.decode.return_value = 'Hello' recv = c.receive = Mock() c._receive_callback(message) - recv.assert_called_with("Hello", message) + recv.assert_called_with('Hello', message) finally: channel.message_to_python = m2p def test_set_callbacks(self): channel = self.connection.channel() - queue = Queue("qname", self.exchange, "rkey") + queue = Queue('qname', self.exchange, 'rkey') callbacks = [lambda x, y: x, lambda x, y: x] consumer = Consumer(channel, queue, auto_declare=True, @@ -256,7 +256,7 @@ class test_Consumer(TestCase): def test_auto_declare(self): channel = self.connection.channel() - queue = Queue("qname", self.exchange, "rkey") + queue = Queue('qname', self.exchange, 'rkey') consumer = Consumer(channel, queue, auto_declare=True) consumer.consume() consumer.consume() # twice is a noop @@ -265,12 +265,12 @@ class test_Consumer(TestCase): self.assertTrue(consumer.queues[0].exchange.is_bound) self.assertIsNot(consumer.queues[0].exchange, self.exchange) - for meth in ("exchange_declare", - "queue_declare", - "queue_bind", - "basic_consume"): + for meth in ('exchange_declare', + 'queue_declare', + 'queue_bind', + 'basic_consume'): self.assertIn(meth, channel) - self.assertEqual(channel.called.count("basic_consume"), 1) + self.assertEqual(channel.called.count('basic_consume'), 1) self.assertTrue(consumer._active_tags) consumer.cancel_by_queue(queue.name) @@ -279,93 +279,93 @@ class test_Consumer(TestCase): def test_manual_declare(self): channel = self.connection.channel() - queue = Queue("qname", self.exchange, "rkey") + queue = Queue('qname', self.exchange, 'rkey') consumer = Consumer(channel, queue, auto_declare=False) self.assertIsNot(consumer.queues[0], queue) self.assertTrue(consumer.queues[0].is_bound) self.assertTrue(consumer.queues[0].exchange.is_bound) self.assertIsNot(consumer.queues[0].exchange, self.exchange) - for meth in ("exchange_declare", - "queue_declare", - "basic_consume"): + for meth in ('exchange_declare', + 'queue_declare', + 'basic_consume'): self.assertNotIn(meth, channel) consumer.declare() - for meth in ("exchange_declare", - "queue_declare", - "queue_bind"): + for meth in ('exchange_declare', + 'queue_declare', + 'queue_bind'): self.assertIn(meth, channel) - self.assertNotIn("basic_consume", channel) + self.assertNotIn('basic_consume', channel) consumer.consume() - self.assertIn("basic_consume", channel) + self.assertIn('basic_consume', channel) def test_consume__cancel(self): channel = self.connection.channel() - queue = Queue("qname", self.exchange, "rkey") + queue = Queue('qname', self.exchange, 'rkey') consumer = Consumer(channel, queue, auto_declare=True) consumer.consume() consumer.cancel() - self.assertIn("basic_cancel", channel) + self.assertIn('basic_cancel', channel) self.assertFalse(consumer._active_tags) def test___enter____exit__(self): channel = self.connection.channel() - queue = Queue("qname", self.exchange, "rkey") + queue = Queue('qname', self.exchange, 'rkey') consumer = Consumer(channel, queue, auto_declare=True) context = consumer.__enter__() self.assertIs(context, consumer) self.assertTrue(consumer._active_tags) res = consumer.__exit__(None, None, None) self.assertFalse(res) - self.assertIn("basic_cancel", channel) + self.assertIn('basic_cancel', channel) self.assertFalse(consumer._active_tags) def test_flow(self): channel = self.connection.channel() - queue = Queue("qname", self.exchange, "rkey") + queue = Queue('qname', self.exchange, 'rkey') consumer = Consumer(channel, queue, auto_declare=True) consumer.flow(False) - self.assertIn("flow", channel) + self.assertIn('flow', channel) def test_qos(self): channel = self.connection.channel() - queue = Queue("qname", self.exchange, "rkey") + queue = Queue('qname', self.exchange, 'rkey') consumer = Consumer(channel, queue, auto_declare=True) consumer.qos(30, 10, False) - self.assertIn("basic_qos", channel) + self.assertIn('basic_qos', channel) def test_purge(self): channel = self.connection.channel() - b1 = Queue("qname1", self.exchange, "rkey") - b2 = Queue("qname2", self.exchange, "rkey") - b3 = Queue("qname3", self.exchange, "rkey") - b4 = Queue("qname4", self.exchange, "rkey") + b1 = Queue('qname1', self.exchange, 'rkey') + b2 = Queue('qname2', self.exchange, 'rkey') + b3 = Queue('qname3', self.exchange, 'rkey') + b4 = Queue('qname4', self.exchange, 'rkey') consumer = Consumer(channel, [b1, b2, b3, b4], auto_declare=True) consumer.purge() - self.assertEqual(channel.called.count("queue_purge"), 4) + self.assertEqual(channel.called.count('queue_purge'), 4) def test_multiple_queues(self): channel = self.connection.channel() - b1 = Queue("qname1", self.exchange, "rkey") - b2 = Queue("qname2", self.exchange, "rkey") - b3 = Queue("qname3", self.exchange, "rkey") - b4 = Queue("qname4", self.exchange, "rkey") + b1 = Queue('qname1', self.exchange, 'rkey') + b2 = Queue('qname2', self.exchange, 'rkey') + b3 = Queue('qname3', self.exchange, 'rkey') + b4 = Queue('qname4', self.exchange, 'rkey') consumer = Consumer(channel, [b1, b2, b3, b4]) consumer.consume() - self.assertEqual(channel.called.count("exchange_declare"), 4) - self.assertEqual(channel.called.count("queue_declare"), 4) - self.assertEqual(channel.called.count("queue_bind"), 4) - self.assertEqual(channel.called.count("basic_consume"), 4) + self.assertEqual(channel.called.count('exchange_declare'), 4) + self.assertEqual(channel.called.count('queue_declare'), 4) + self.assertEqual(channel.called.count('queue_bind'), 4) + self.assertEqual(channel.called.count('basic_consume'), 4) self.assertEqual(len(consumer._active_tags), 4) consumer.cancel() - self.assertEqual(channel.called.count("basic_cancel"), 4) + self.assertEqual(channel.called.count('basic_cancel'), 4) self.assertFalse(len(consumer._active_tags)) def test_receive_callback(self): channel = self.connection.channel() - b1 = Queue("qname1", self.exchange, "rkey") + b1 = Queue('qname1', self.exchange, 'rkey') consumer = Consumer(channel, [b1]) received = [] @@ -375,15 +375,15 @@ class test_Consumer(TestCase): message.payload # trigger cache consumer.register_callback(callback) - consumer._receive_callback({u"foo": u"bar"}) + consumer._receive_callback({u'foo': u'bar'}) - self.assertIn("basic_ack", channel) - self.assertIn("message_to_python", channel) - self.assertEqual(received[0], {u"foo": u"bar"}) + self.assertIn('basic_ack', channel) + self.assertIn('message_to_python', channel) + self.assertEqual(received[0], {u'foo': u'bar'}) def test_basic_ack_twice(self): channel = self.connection.channel() - b1 = Queue("qname1", self.exchange, "rkey") + b1 = Queue('qname1', self.exchange, 'rkey') consumer = Consumer(channel, [b1]) def callback(message_data, message): @@ -392,23 +392,23 @@ class test_Consumer(TestCase): consumer.register_callback(callback) with self.assertRaises(MessageStateError): - consumer._receive_callback({"foo": "bar"}) + consumer._receive_callback({'foo': 'bar'}) def test_basic_reject(self): channel = self.connection.channel() - b1 = Queue("qname1", self.exchange, "rkey") + b1 = Queue('qname1', self.exchange, 'rkey') consumer = Consumer(channel, [b1]) def callback(message_data, message): message.reject() consumer.register_callback(callback) - consumer._receive_callback({"foo": "bar"}) - self.assertIn("basic_reject", channel) + consumer._receive_callback({'foo': 'bar'}) + self.assertIn('basic_reject', channel) def test_basic_reject_twice(self): channel = self.connection.channel() - b1 = Queue("qname1", self.exchange, "rkey") + b1 = Queue('qname1', self.exchange, 'rkey') consumer = Consumer(channel, [b1]) def callback(message_data, message): @@ -417,24 +417,24 @@ class test_Consumer(TestCase): consumer.register_callback(callback) with self.assertRaises(MessageStateError): - consumer._receive_callback({"foo": "bar"}) - self.assertIn("basic_reject", channel) + consumer._receive_callback({'foo': 'bar'}) + self.assertIn('basic_reject', channel) def test_basic_reject__requeue(self): channel = self.connection.channel() - b1 = Queue("qname1", self.exchange, "rkey") + b1 = Queue('qname1', self.exchange, 'rkey') consumer = Consumer(channel, [b1]) def callback(message_data, message): message.requeue() consumer.register_callback(callback) - consumer._receive_callback({"foo": "bar"}) - self.assertIn("basic_reject:requeue", channel) + consumer._receive_callback({'foo': 'bar'}) + self.assertIn('basic_reject:requeue', channel) def test_basic_reject__requeue_twice(self): channel = self.connection.channel() - b1 = Queue("qname1", self.exchange, "rkey") + b1 = Queue('qname1', self.exchange, 'rkey') consumer = Consumer(channel, [b1]) def callback(message_data, message): @@ -443,28 +443,28 @@ class test_Consumer(TestCase): consumer.register_callback(callback) with self.assertRaises(MessageStateError): - consumer._receive_callback({"foo": "bar"}) - self.assertIn("basic_reject:requeue", channel) + consumer._receive_callback({'foo': 'bar'}) + self.assertIn('basic_reject:requeue', channel) def test_receive_without_callbacks_raises(self): channel = self.connection.channel() - b1 = Queue("qname1", self.exchange, "rkey") + b1 = Queue('qname1', self.exchange, 'rkey') consumer = Consumer(channel, [b1]) with self.assertRaises(NotImplementedError): consumer.receive(1, 2) def test_decode_error(self): channel = self.connection.channel() - b1 = Queue("qname1", self.exchange, "rkey") + b1 = Queue('qname1', self.exchange, 'rkey') consumer = Consumer(channel, [b1]) consumer.channel.throw_decode_error = True with self.assertRaises(ValueError): - consumer._receive_callback({"foo": "bar"}) + consumer._receive_callback({'foo': 'bar'}) def test_on_decode_error_callback(self): channel = self.connection.channel() - b1 = Queue("qname1", self.exchange, "rkey") + b1 = Queue('qname1', self.exchange, 'rkey') thrown = [] def on_decode_error(msg, exc): @@ -472,23 +472,23 @@ class test_Consumer(TestCase): consumer = Consumer(channel, [b1], on_decode_error=on_decode_error) consumer.channel.throw_decode_error = True - consumer._receive_callback({"foo": "bar"}) + consumer._receive_callback({'foo': 'bar'}) self.assertTrue(thrown) m, exc = thrown[0] - self.assertEqual(anyjson.loads(m), {"foo": "bar"}) + self.assertEqual(anyjson.loads(m), {'foo': 'bar'}) self.assertIsInstance(exc, ValueError) def test_recover(self): channel = self.connection.channel() - b1 = Queue("qname1", self.exchange, "rkey") + b1 = Queue('qname1', self.exchange, 'rkey') consumer = Consumer(channel, [b1]) consumer.recover() - self.assertIn("basic_recover", channel) + self.assertIn('basic_recover', channel) def test_revive(self): channel = self.connection.channel() - b1 = Queue("qname1", self.exchange, "rkey") + b1 = Queue('qname1', self.exchange, 'rkey') consumer = Consumer(channel, [b1]) channel2 = self.connection.channel() consumer.revive(channel2) @@ -498,7 +498,7 @@ class test_Consumer(TestCase): def test__repr__(self): channel = self.connection.channel() - b1 = Queue("qname1", self.exchange, "rkey") + b1 = Queue('qname1', self.exchange, 'rkey') self.assertTrue(repr(Consumer(channel, [b1]))) def test_connection_property_handles_AttributeError(self): diff --git a/kombu/tests/test_pidbox.py b/kombu/tests/test_pidbox.py index 28ac977a..72d974ea 100644 --- a/kombu/tests/test_pidbox.py +++ b/kombu/tests/test_pidbox.py @@ -14,32 +14,32 @@ from .utils import Mock class test_Mailbox(TestCase): def _handler(self, state): - return self.stats["var"] + return self.stats['var'] def setUp(self): class Mailbox(pidbox.Mailbox): def _collect(self, *args, **kwargs): - return "COLLECTED" + return 'COLLECTED' - self.mailbox = Mailbox("test_pidbox") - self.connection = BrokerConnection(transport="memory") - self.state = {"var": 1} - self.handlers = {"mymethod": self._handler} + self.mailbox = Mailbox('test_pidbox') + self.connection = BrokerConnection(transport='memory') + self.state = {'var': 1} + self.handlers = {'mymethod': self._handler} self.bound = self.mailbox(self.connection) self.default_chan = self.connection.channel() - self.node = self.bound.Node("test_pidbox", state=self.state, + self.node = self.bound.Node('test_pidbox', state=self.state, handlers=self.handlers, channel=self.default_chan) def test_reply__collect(self): - mailbox = pidbox.Mailbox("test_reply__collect")(self.connection) + mailbox = pidbox.Mailbox('test_reply__collect')(self.connection) exchange = mailbox.reply_exchange.name ticket = uuid() mailbox.get_reply_queue(ticket)(self.connection.channel()).declare() - mailbox._publish_reply({"foo": "bar"}, exchange, ticket) + mailbox._publish_reply({'foo': 'bar'}, exchange, ticket) _callback_called = [False] def callback(body): @@ -48,14 +48,14 @@ class test_Mailbox(TestCase): channel = self.connection.channel() reply = mailbox._collect(ticket, limit=1, callback=callback, channel=channel) - self.assertEqual(reply, [{"foo": "bar"}]) + self.assertEqual(reply, [{'foo': 'bar'}]) self.assertTrue(_callback_called[0]) ticket = uuid() mailbox.get_reply_queue(ticket)(self.connection.channel()).declare() - mailbox._publish_reply({"biz": "boz"}, exchange, ticket) + mailbox._publish_reply({'biz': 'boz'}, exchange, ticket) reply = mailbox._collect(ticket, limit=1, channel=channel) - self.assertEqual(reply, [{"biz": "boz"}]) + self.assertEqual(reply, [{'biz': 'boz'}]) de = mailbox.connection.drain_events = Mock() de.side_effect = socket.timeout @@ -77,7 +77,7 @@ class test_Mailbox(TestCase): self.assertTrue(self.handlers) # No initial handlers - node2 = self.bound.Node("test_pidbox2", state=self.state) + node2 = self.bound.Node('test_pidbox2', state=self.state) self.assertDictEqual(node2.handlers, {}) def test_Node_consumer(self): @@ -91,26 +91,26 @@ class test_Mailbox(TestCase): self.assertFalse(consumer2.no_ack) def test_handler(self): - node = self.bound.Node("test_handler", state=self.state) + node = self.bound.Node('test_handler', state=self.state) @node.handler def my_handler_name(state): return 42 - self.assertIn("my_handler_name", node.handlers) + self.assertIn('my_handler_name', node.handlers) def test_dispatch(self): - node = self.bound.Node("test_dispatch", state=self.state) + node = self.bound.Node('test_dispatch', state=self.state) @node.handler def my_handler_name(state, x=None, y=None): return x + y - self.assertEqual(node.dispatch("my_handler_name", - arguments={"x": 10, "y": 10}), 20) + self.assertEqual(node.dispatch('my_handler_name', + arguments={'x': 10, 'y': 10}), 20) def test_dispatch_raising_SystemExit(self): - node = self.bound.Node("test_dispatch_raising_SystemExit", + node = self.bound.Node('test_dispatch_raising_SystemExit', state=self.state) @node.handler @@ -118,18 +118,18 @@ class test_Mailbox(TestCase): raise SystemExit with self.assertRaises(SystemExit): - node.dispatch("my_handler_name") + node.dispatch('my_handler_name') def test_dispatch_raising(self): - node = self.bound.Node("test_dispatch_raising", state=self.state) + node = self.bound.Node('test_dispatch_raising', state=self.state) @node.handler def my_handler_name(state): - raise KeyError("foo") + raise KeyError('foo') - res = node.dispatch("my_handler_name") - self.assertIn("error", res) - self.assertIn("KeyError", res["error"]) + res = node.dispatch('my_handler_name') + self.assertIn('error', res) + self.assertIn('KeyError', res['error']) def test_dispatch_replies(self): _replied = [False] @@ -137,16 +137,16 @@ class test_Mailbox(TestCase): def reply(data, **options): _replied[0] = True - node = self.bound.Node("test_dispatch", state=self.state) + node = self.bound.Node('test_dispatch', state=self.state) node.reply = reply @node.handler def my_handler_name(state, x=None, y=None): return x + y - node.dispatch("my_handler_name", - arguments={"x": 10, "y": 10}, - reply_to={"exchange": "foo", "routing_key": "bar"}) + node.dispatch('my_handler_name', + arguments={'x': 10, 'y': 10}, + reply_to={'exchange': 'foo', 'routing_key': 'bar'}) self.assertTrue(_replied[0]) def test_reply(self): @@ -157,34 +157,34 @@ class test_Mailbox(TestCase): mailbox = self.mailbox(self.connection) mailbox._publish_reply = publish_reply - node = mailbox.Node("test_reply") + node = mailbox.Node('test_reply') @node.handler def my_handler_name(state): return 42 - node.dispatch("my_handler_name", - reply_to={"exchange": "exchange", - "routing_key": "rkey"}) + node.dispatch('my_handler_name', + reply_to={'exchange': 'exchange', + 'routing_key': 'rkey'}) data, exchange, routing_key = _replied[0] - self.assertEqual(data, {"test_reply": 42}) - self.assertEqual(exchange, "exchange") - self.assertEqual(routing_key, "rkey") + self.assertEqual(data, {'test_reply': 42}) + self.assertEqual(exchange, 'exchange') + self.assertEqual(routing_key, 'rkey') def test_handle_message(self): - node = self.bound.Node("test_dispatch_from_message") + node = self.bound.Node('test_dispatch_from_message') @node.handler def my_handler_name(state, x=None, y=None): return x * y - body = {"method": "my_handler_name", - "arguments": {"x": 64, "y": 64}} + body = {'method': 'my_handler_name', + 'arguments': {'x': 64, 'y': 64}} self.assertEqual(node.handle_message(body, None), 64 * 64) # message not for me should not be processed. - body["destination"] = ["some_other_node"] + body['destination'] = ['some_other_node'] self.assertIsNone(node.handle_message(body, None)) def test_listen(self): @@ -194,27 +194,27 @@ class test_Mailbox(TestCase): self.assertEqual(consumer.channel, self.default_chan) def test_cast(self): - self.bound.cast(["somenode"], "mymethod") + self.bound.cast(['somenode'], 'mymethod') consumer = self.node.Consumer() self.assertIsCast(self.get_next(consumer)) def test_abcast(self): - self.bound.abcast("mymethod") + self.bound.abcast('mymethod') consumer = self.node.Consumer() self.assertIsCast(self.get_next(consumer)) def test_call_destination_must_be_sequence(self): with self.assertRaises(ValueError): - self.bound.call("some_node", "mymethod") + self.bound.call('some_node', 'mymethod') def test_call(self): - self.assertEqual(self.bound.call(["some_node"], "mymethod"), - "COLLECTED") + self.assertEqual(self.bound.call(['some_node'], 'mymethod'), + 'COLLECTED') consumer = self.node.Consumer() self.assertIsCall(self.get_next(consumer)) def test_multi_call(self): - self.assertEqual(self.bound.multi_call("mymethod"), "COLLECTED") + self.assertEqual(self.bound.multi_call('mymethod'), 'COLLECTED') consumer = self.node.Consumer() self.assertIsCall(self.get_next(consumer)) @@ -224,8 +224,8 @@ class test_Mailbox(TestCase): return m.payload def assertIsCast(self, message): - self.assertTrue(message["method"]) + self.assertTrue(message['method']) def assertIsCall(self, message): - self.assertTrue(message["method"]) - self.assertTrue(message["reply_to"]) + self.assertTrue(message['method']) + self.assertTrue(message['reply_to']) diff --git a/kombu/tests/test_pools.py b/kombu/tests/test_pools.py index f1a9b44d..ce11f584 100644 --- a/kombu/tests/test_pools.py +++ b/kombu/tests/test_pools.py @@ -100,24 +100,24 @@ class test_PoolGroup(TestCase): def test_getitem_using_global_limit(self): pools._used[0] = False g = self.MyGroup(limit=pools.use_global_limit) - res = g["foo"] - self.assertTupleEqual(res, ("foo", pools.get_limit())) + res = g['foo'] + self.assertTupleEqual(res, ('foo', pools.get_limit())) self.assertTrue(pools._used[0]) def test_getitem_using_custom_limit(self): pools._used[0] = True g = self.MyGroup(limit=102456) - res = g["foo"] - self.assertTupleEqual(res, ("foo", 102456)) + res = g['foo'] + self.assertTupleEqual(res, ('foo', 102456)) def test_delitem(self): g = self.MyGroup() - g["foo"] - del(g["foo"]) - self.assertNotIn("foo", g) + g['foo'] + del(g['foo']) + self.assertNotIn('foo', g) def test_Connections(self): - conn = Connection("memory://") + conn = Connection('memory://') p = pools.connections[conn] self.assertTrue(p) self.assertIsInstance(p, ConnectionPool) @@ -125,7 +125,7 @@ class test_PoolGroup(TestCase): self.assertEqual(p.limit, pools.get_limit()) def test_Producers(self): - conn = Connection("memory://") + conn = Connection('memory://') p = pools.producers[conn] self.assertTrue(p) self.assertIsInstance(p, pools.ProducerPool) @@ -134,7 +134,7 @@ class test_PoolGroup(TestCase): self.assertEqual(p.limit, pools.get_limit()) def test_all_groups(self): - conn = Connection("memory://") + conn = Connection('memory://') pools.connections[conn] self.assertTrue(list(pools._all_pools())) @@ -148,7 +148,7 @@ class test_PoolGroup(TestCase): def clear(self): self.clear_called = True - p1 = pools.connections["foo"] = Mock() + p1 = pools.connections['foo'] = Mock() g1 = MyGroup() pools._groups.append(g1) @@ -156,7 +156,7 @@ class test_PoolGroup(TestCase): p1.force_close_all.assert_called_with() self.assertTrue(g1.clear_called) - p1 = pools.connections["foo"] = Mock() + p1 = pools.connections['foo'] = Mock() p1.force_close_all.side_effect = KeyError() pools.reset() @@ -166,7 +166,7 @@ class test_PoolGroup(TestCase): limit = pools.get_limit() self.assertEqual(limit, 34576) - pools.connections[Connection("memory://")] + pools.connections[Connection('memory://')] pools.set_limit(limit + 1) self.assertEqual(pools.get_limit(), limit + 1) limit = pools.get_limit() @@ -181,8 +181,8 @@ class test_PoolGroup(TestCase): class test_fun_PoolGroup(TestCase): def test_connections_behavior(self): - c1u = "memory://localhost:123" - c2u = "memory://localhost:124" + c1u = 'memory://localhost:123' + c2u = 'memory://localhost:124' c1 = Connection(c1u) c2 = Connection(c2u) c3 = Connection(c1u) diff --git a/kombu/tests/test_serialization.py b/kombu/tests/test_serialization.py index b5ad35b1..c5e0eccd 100644 --- a/kombu/tests/test_serialization.py +++ b/kombu/tests/test_serialization.py @@ -22,19 +22,19 @@ latin_string_as_utf8 = latin_string.encode('utf-8') # For serialization tests -py_data = {"string": "The quick brown fox jumps over the lazy dog", - "int": 10, - "float": 3.14159265, - "unicode": u"Thé quick brown fox jumps over thé lazy dog", - "list": ["george", "jerry", "elaine", "cosmo"], +py_data = {'string': 'The quick brown fox jumps over the lazy dog', + 'int': 10, + 'float': 3.14159265, + 'unicode': u'Thé quick brown fox jumps over thé lazy dog', + 'list': ['george', 'jerry', 'elaine', 'cosmo'], } # JSON serialization tests -json_data = ('{"int": 10, "float": 3.1415926500000002, ' +json_data = ('''{"int": 10, "float": 3.1415926500000002, ' '"list": ["george", "jerry", "elaine", "cosmo"], ' '"string": "The quick brown fox jumps over the lazy ' 'dog", "unicode": "Th\\u00e9 quick brown fox jumps over ' - 'th\\u00e9 lazy dog"}') + 'th\\u00e9 lazy dog"}''') # Pickle serialization tests pickle_data = pickle.dumps(py_data) @@ -49,9 +49,9 @@ yaml_data = ('float: 3.1415926500000002\nint: 10\n' msgpack_py_data = dict(py_data) # msgpack only supports tuples -msgpack_py_data["list"] = tuple(msgpack_py_data["list"]) +msgpack_py_data['list'] = tuple(msgpack_py_data['list']) # Unicode chars are lost in transmit :( -msgpack_py_data["unicode"] = 'Th quick brown fox jumps over th lazy dog' +msgpack_py_data['unicode'] = 'Th quick brown fox jumps over th lazy dog' msgpack_data = ('\x85\xa3int\n\xa5float\xcb@\t!\xfbS\xc8\xd4\xf1\xa4list' '\x94\xa6george\xa5jerry\xa6elaine\xa5cosmo\xa6string\xda' '\x00+The quick brown fox jumps over the lazy dog\xa7unicode' @@ -59,11 +59,11 @@ msgpack_data = ('\x85\xa3int\n\xa5float\xcb@\t!\xfbS\xc8\xd4\xf1\xa4list' def say(m): - sys.stderr.write("%s\n" % (m, )) + sys.stderr.write('%s\n' % (m, )) -registry.register('testS', lambda s: s, lambda s: "decoded", - "application/testS", "utf-8") +registry.register('testS', lambda s: s, lambda s: 'decoded', + 'application/testS', 'utf-8') class test_Serialization(TestCase): @@ -71,32 +71,32 @@ class test_Serialization(TestCase): def test_disable(self): disabled = registry._disabled_content_types try: - registry.disable("testS") - self.assertIn("application/testS", disabled) + registry.disable('testS') + self.assertIn('application/testS', disabled) disabled.clear() - registry.disable("application/testS") - self.assertIn("application/testS", disabled) + registry.disable('application/testS') + self.assertIn('application/testS', disabled) finally: disabled.clear() def test_decode_when_disabled(self): disabled = registry._disabled_content_types try: - registry.disable("testS") + registry.disable('testS') with self.assertRaises(SerializerNotInstalled): - registry.decode("xxd", "application/testS", "utf-8", + registry.decode('xxd', 'application/testS', 'utf-8', force=False) - ret = registry.decode("xxd", "application/testS", "utf-8", + ret = registry.decode('xxd', 'application/testS', 'utf-8', force=True) - self.assertEqual(ret, "decoded") + self.assertEqual(ret, 'decoded') finally: disabled.clear() def test_decode_when_data_is_None(self): - registry.decode(None, "application/testS", "utf-8") + registry.decode(None, 'application/testS', 'utf-8') def test_content_type_decoding(self): self.assertEqual(unicode_string, @@ -123,13 +123,13 @@ class test_Serialization(TestCase): content_encoding='binary')) def test_content_type_encoding(self): - # Using the "raw" serializer + # Using the 'raw' serializer self.assertEqual(unicode_string_as_utf8, registry.encode( - unicode_string, serializer="raw")[-1]) + unicode_string, serializer='raw')[-1]) self.assertEqual(latin_string_as_utf8, registry.encode( - latin_string, serializer="raw")[-1]) + latin_string, serializer='raw')[-1]) # And again w/o a specific serializer to check the # code where we force unicode objects into a string. self.assertEqual(unicode_string_as_utf8, @@ -146,7 +146,7 @@ class test_Serialization(TestCase): def test_json_encode(self): self.assertEqual(registry.decode( - registry.encode(py_data, serializer="json")[-1], + registry.encode(py_data, serializer='json')[-1], content_type='application/json', content_encoding='utf-8'), registry.decode( @@ -167,7 +167,7 @@ class test_Serialization(TestCase): def test_msgpack_encode(self): register_msgpack() self.assertEqual(registry.decode( - registry.encode(msgpack_py_data, serializer="msgpack")[-1], + registry.encode(msgpack_py_data, serializer='msgpack')[-1], content_type='application/x-msgpack', content_encoding='binary'), registry.decode( @@ -188,7 +188,7 @@ class test_Serialization(TestCase): def test_yaml_encode(self): register_yaml() self.assertEqual(registry.decode( - registry.encode(py_data, serializer="yaml")[-1], + registry.encode(py_data, serializer='yaml')[-1], content_type='application/x-yaml', content_encoding='utf-8'), registry.decode( @@ -206,41 +206,41 @@ class test_Serialization(TestCase): def test_pickle_encode(self): self.assertEqual(pickle_data, registry.encode(py_data, - serializer="pickle")[-1]) + serializer='pickle')[-1]) def test_register(self): register(None, None, None, None) def test_unregister(self): with self.assertRaises(SerializerNotInstalled): - unregister("nonexisting") - registry.encode("foo", serializer="pickle") - unregister("pickle") + unregister('nonexisting') + registry.encode('foo', serializer='pickle') + unregister('pickle') with self.assertRaises(SerializerNotInstalled): - registry.encode("foo", serializer="pickle") + registry.encode('foo', serializer='pickle') register_pickle() def test_set_default_serializer_missing(self): with self.assertRaises(SerializerNotInstalled): - registry._set_default_serializer("nonexisting") + registry._set_default_serializer('nonexisting') def test_encode_missing(self): with self.assertRaises(SerializerNotInstalled): - registry.encode("foo", serializer="nonexisting") + registry.encode('foo', serializer='nonexisting') def test_raw_encode(self): - self.assertTupleEqual(raw_encode("foo".encode("utf-8")), - ("application/data", "binary", - "foo".encode("utf-8"))) + self.assertTupleEqual(raw_encode('foo'.encode('utf-8')), + ('application/data', 'binary', + 'foo'.encode('utf-8'))) - @mask_modules("yaml") + @mask_modules('yaml') def test_register_yaml__no_yaml(self): register_yaml() with self.assertRaises(SerializerNotInstalled): - decode("foo", "application/x-yaml", "utf-8") + decode('foo', 'application/x-yaml', 'utf-8') - @mask_modules("msgpack") + @mask_modules('msgpack') def test_register_msgpack__no_msgpack(self): register_msgpack() with self.assertRaises(SerializerNotInstalled): - decode("foo", "application/x-msgpack", "utf-8") + decode('foo', 'application/x-msgpack', 'utf-8') diff --git a/kombu/tests/test_simple.py b/kombu/tests/test_simple.py index 130708bf..7580e650 100644 --- a/kombu/tests/test_simple.py +++ b/kombu/tests/test_simple.py @@ -17,7 +17,7 @@ class SimpleBase(TestCase): if not isinstance(q, Queue): q = self.__class__.__name__ if name: - q = "%s.%s" % (q, name) + q = '%s.%s' % (q, name) return self._Queue(q, *args, **kwargs) def _Queue(self, *args, **kwargs): @@ -25,7 +25,7 @@ class SimpleBase(TestCase): def setUp(self): if not self.abstract: - self.connection = BrokerConnection(transport="memory") + self.connection = BrokerConnection(transport='memory') self.q = self.Queue(None, no_ack=True) def tearDown(self): @@ -36,42 +36,42 @@ class SimpleBase(TestCase): def test_produce__consume(self): if self.abstract: return - q = self.Queue("test_produce__consume", no_ack=True) + q = self.Queue('test_produce__consume', no_ack=True) - q.put({"hello": "Simple"}) + q.put({'hello': 'Simple'}) - self.assertEqual(q.get(timeout=1).payload, {"hello": "Simple"}) + self.assertEqual(q.get(timeout=1).payload, {'hello': 'Simple'}) with self.assertRaises(Empty): q.get(timeout=0.1) def test_produce__basic_get(self): if self.abstract: return - q = self.Queue("test_produce__basic_get", no_ack=True) - q.put({"hello": "SimpleSync"}) - self.assertEqual(q.get_nowait().payload, {"hello": "SimpleSync"}) + q = self.Queue('test_produce__basic_get', no_ack=True) + q.put({'hello': 'SimpleSync'}) + self.assertEqual(q.get_nowait().payload, {'hello': 'SimpleSync'}) with self.assertRaises(Empty): q.get_nowait() - q.put({"hello": "SimpleSync"}) - self.assertEqual(q.get(block=False).payload, {"hello": "SimpleSync"}) + q.put({'hello': 'SimpleSync'}) + self.assertEqual(q.get(block=False).payload, {'hello': 'SimpleSync'}) with self.assertRaises(Empty): q.get(block=False) def test_clear(self): if self.abstract: return - q = self.Queue("test_clear", no_ack=True) + q = self.Queue('test_clear', no_ack=True) for i in range(10): - q.put({"hello": "SimplePurge%d" % (i, )}) + q.put({'hello': 'SimplePurge%d' % (i, )}) self.assertEqual(q.clear(), 10) def test_enter_exit(self): if self.abstract: return - q = self.Queue("test_enter_exit") + q = self.Queue('test_enter_exit') q.close = Mock() self.assertIs(q.__enter__(), q) @@ -81,10 +81,10 @@ class SimpleBase(TestCase): def test_qsize(self): if self.abstract: return - q = self.Queue("test_clear", no_ack=True) + q = self.Queue('test_clear', no_ack=True) for i in range(10): - q.put({"hello": "SimplePurge%d" % (i, )}) + q.put({'hello': 'SimplePurge%d' % (i, )}) self.assertEqual(q.qsize(), 10) self.assertEqual(len(q), 10) @@ -93,17 +93,17 @@ class SimpleBase(TestCase): if self.abstract: return channel = self.connection.channel() - q = self.Queue("test_autoclose", no_ack=True, channel=channel) + q = self.Queue('test_autoclose', no_ack=True, channel=channel) q.close() def test_custom_Queue(self): if self.abstract: return n = self.__class__.__name__ - exchange = Exchange("%s-test.custom.Queue" % (n, )) - queue = Queue("%s-test.custom.Queue" % (n, ), + exchange = Exchange('%s-test.custom.Queue' % (n, )) + queue = Queue('%s-test.custom.Queue' % (n, ), exchange, - "my.routing.key") + 'my.routing.key') q = self.Queue(queue) self.assertEqual(q.consumer.queues[0], queue) @@ -112,7 +112,7 @@ class SimpleBase(TestCase): def test_bool(self): if self.abstract: return - q = self.Queue("test_nonzero") + q = self.Queue('test_nonzero') self.assertTrue(q) @@ -123,7 +123,7 @@ class test_SimpleQueue(SimpleBase): return self.connection.SimpleQueue(*args, **kwargs) def test_is_ack(self): - q = self.Queue("test_is_no_ack") + q = self.Queue('test_is_no_ack') self.assertFalse(q.no_ack) @@ -134,5 +134,5 @@ class test_SimpleBuffer(SimpleBase): return self.connection.SimpleBuffer(*args, **kwargs) def test_is_no_ack(self): - q = self.Queue("test_is_no_ack") + q = self.Queue('test_is_no_ack') self.assertTrue(q.no_ack) diff --git a/kombu/tests/test_utils.py b/kombu/tests/test_utils.py index e9d5ac1c..9bf7f5d8 100644 --- a/kombu/tests/test_utils.py +++ b/kombu/tests/test_utils.py @@ -50,11 +50,11 @@ class test_utils(TestCase): [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]) def test_reprkwargs(self): - self.assertTrue(utils.reprkwargs({"foo": "bar", 1: 2, u"k": "v"})) + self.assertTrue(utils.reprkwargs({'foo': 'bar', 1: 2, u'k': 'v'})) def test_reprcall(self): - self.assertTrue(utils.reprcall("add", - (2, 2), {"copy": True})) + self.assertTrue(utils.reprcall('add', + (2, 2), {'copy': True})) class test_UUID(TestCase): @@ -71,9 +71,9 @@ class test_UUID(TestCase): @skip_if_module('__pypy__') def test_uuid_without_ctypes(self): - old_utils = sys.modules.pop("kombu.utils") + old_utils = sys.modules.pop('kombu.utils') - @mask_modules("ctypes") + @mask_modules('ctypes') def with_ctypes_masked(): from kombu.utils import ctypes, uuid @@ -85,7 +85,7 @@ class test_UUID(TestCase): try: with_ctypes_masked() finally: - sys.modules["celery.utils"] = old_utils + sys.modules['celery.utils'] = old_utils class test_Misc(TestCase): @@ -95,8 +95,8 @@ class test_Misc(TestCase): def f(**kwargs): return kwargs - kw = {u"foo": "foo", - u"bar": "bar"} + kw = {u'foo': 'foo', + u'bar': 'bar'} self.assertTrue(f(**utils.kwdict(kw))) @@ -118,8 +118,8 @@ class test_emergency_dump_state(TestCase): def test_dump(self, stdout, stderr): fh = MyBytesIO() - utils.emergency_dump_state({"foo": "bar"}, open_file=lambda n, m: fh) - self.assertDictEqual(pickle.loads(fh.getvalue()), {"foo": "bar"}) + utils.emergency_dump_state({'foo': 'bar'}, open_file=lambda n, m: fh) + self.assertDictEqual(pickle.loads(fh.getvalue()), {'foo': 'bar'}) self.assertTrue(stderr.getvalue()) self.assertFalse(stdout.getvalue()) @@ -128,9 +128,9 @@ class test_emergency_dump_state(TestCase): fh = MyStringIO() def raise_something(*args, **kwargs): - raise KeyError("foo") + raise KeyError('foo') - utils.emergency_dump_state({"foo": "bar"}, open_file=lambda n, m: fh, + utils.emergency_dump_state({'foo': 'bar'}, open_file=lambda n, m: fh, dump=raise_something) self.assertIn("'foo': 'bar'", fh.getvalue()) self.assertTrue(stderr.getvalue()) @@ -213,7 +213,7 @@ class test_cached_property(TestCase): def foo(self, value): self.xx = 10 - desc = X.__dict__["foo"] + desc = X.__dict__['foo'] self.assertIs(X.foo, desc) self.assertIs(desc.__get__(None), desc) diff --git a/kombu/tests/transport/test_amqplib.py b/kombu/tests/transport/test_amqplib.py index 226e5b31..803af460 100644 --- a/kombu/tests/transport/test_amqplib.py +++ b/kombu/tests/transport/test_amqplib.py @@ -39,8 +39,8 @@ class test_Channel(TestCase): self.assertFalse(self.channel.no_ack_consumers) def test_prepare_message(self): - x = self.channel.prepare_message("foobar", 10, - "application/data", "utf-8", + x = self.channel.prepare_message('foobar', 10, + 'application/data', 'utf-8', properties={}) self.assertTrue(x) @@ -56,22 +56,22 @@ class test_Channel(TestCase): self.assertIsNone(self.channel.connection) def test_basic_consume_registers_ack_status(self): - self.channel.wait_returns = "my-consumer-tag" - self.channel.basic_consume("foo", no_ack=True) - self.assertIn("my-consumer-tag", self.channel.no_ack_consumers) + self.channel.wait_returns = 'my-consumer-tag' + self.channel.basic_consume('foo', no_ack=True) + self.assertIn('my-consumer-tag', self.channel.no_ack_consumers) - self.channel.wait_returns = "other-consumer-tag" - self.channel.basic_consume("bar", no_ack=False) - self.assertNotIn("other-consumer-tag", self.channel.no_ack_consumers) + self.channel.wait_returns = 'other-consumer-tag' + self.channel.basic_consume('bar', no_ack=False) + self.assertNotIn('other-consumer-tag', self.channel.no_ack_consumers) - self.channel.basic_cancel("my-consumer-tag") - self.assertNotIn("my-consumer-tag", self.channel.no_ack_consumers) + self.channel.basic_cancel('my-consumer-tag') + self.assertNotIn('my-consumer-tag', self.channel.no_ack_consumers) class test_Transport(TestCase): def setUp(self): - self.connection = BrokerConnection("amqplib://") + self.connection = BrokerConnection('amqplib://') self.transport = self.connection.transport def test_create_channel(self): @@ -92,13 +92,13 @@ class test_Transport(TestCase): vars(self).update(kwargs) self.transport.Connection = Conn - self.transport.client.hostname = "localhost" + self.transport.client.hostname = 'localhost' conn1 = self.transport.establish_connection() - self.assertEqual(conn1.host, "127.0.0.1:5672") + self.assertEqual(conn1.host, '127.0.0.1:5672') - self.transport.client.hostname = "example.com" + self.transport.client.hostname = 'example.com' conn2 = self.transport.establish_connection() - self.assertEqual(conn2.host, "example.com:5672") + self.assertEqual(conn2.host, 'example.com:5672') def test_close_connection(self): connection = Mock() @@ -116,15 +116,15 @@ class test_Transport(TestCase): connection.channels = {1: 1, 2: 2} self.assertTrue(self.transport.verify_connection(connection)) - @mask_modules("ssl") + @mask_modules('ssl') def test_import_no_ssl(self): - pm = sys.modules.pop("kombu.transport.amqplib") + pm = sys.modules.pop('kombu.transport.amqplib') try: from kombu.transport.amqplib import SSLError - self.assertEqual(SSLError.__module__, "kombu.transport.amqplib") + self.assertEqual(SSLError.__module__, 'kombu.transport.amqplib') finally: if pm is not None: - sys.modules["kombu.transport.amqplib"] = pm + sys.modules['kombu.transport.amqplib'] = pm class test_amqplib(TestCase): @@ -135,8 +135,8 @@ class test_amqplib(TestCase): Connection = MockConnection c = BrokerConnection(port=None, transport=Transport).connect() - self.assertEqual(c["host"], - "127.0.0.1:%s" % (Transport.default_port, )) + self.assertEqual(c['host'], + '127.0.0.1:%s' % (Transport.default_port, )) def test_custom_port(self): @@ -144,4 +144,4 @@ class test_amqplib(TestCase): Connection = MockConnection c = BrokerConnection(port=1337, transport=Transport).connect() - self.assertEqual(c["host"], "127.0.0.1:1337") + self.assertEqual(c['host'], '127.0.0.1:1337') diff --git a/kombu/tests/transport/test_base.py b/kombu/tests/transport/test_base.py index d8822b18..691e132c 100644 --- a/kombu/tests/transport/test_base.py +++ b/kombu/tests/transport/test_base.py @@ -11,13 +11,13 @@ from kombu.tests.utils import Mock class test_StdChannel(TestCase): def setUp(self): - self.conn = BrokerConnection("memory://") + self.conn = BrokerConnection('memory://') self.channel = self.conn.channel() self.channel.queues.clear() self.conn.connection.state.clear() def test_Consumer(self): - q = Queue("foo") + q = Queue('foo') print(self.channel.queues) cons = self.channel.Consumer(q) self.assertIsInstance(cons, Consumer) @@ -34,27 +34,27 @@ class test_StdChannel(TestCase): def test_interface_after_reply_message_received(self): self.assertIsNone(StdChannel().after_reply_message_received( - Queue("foo"))) + Queue('foo'))) class test_Message(TestCase): def setUp(self): - self.conn = BrokerConnection("memory://") + self.conn = BrokerConnection('memory://') self.channel = self.conn.channel() self.message = Message(self.channel, delivery_tag=313) def test_ack_respects_no_ack_consumers(self): - self.channel.no_ack_consumers = set(["abc"]) - self.message.delivery_info["consumer_tag"] = "abc" + self.channel.no_ack_consumers = set(['abc']) + self.message.delivery_info['consumer_tag'] = 'abc' ack = self.channel.basic_ack = Mock() self.message.ack() - self.assertNotEqual(self.message._state, "ACK") + self.assertNotEqual(self.message._state, 'ACK') self.assertFalse(ack.called) def test_ack_missing_consumer_tag(self): - self.channel.no_ack_consumers = set(["abc"]) + self.channel.no_ack_consumers = set(['abc']) self.message.delivery_info = {} ack = self.channel.basic_ack = Mock() @@ -63,7 +63,7 @@ class test_Message(TestCase): def test_ack_not_no_ack(self): self.channel.no_ack_consumers = set() - self.message.delivery_info["consumer_tag"] = "abc" + self.message.delivery_info['consumer_tag'] = 'abc' ack = self.channel.basic_ack = Mock() self.message.ack() @@ -76,7 +76,7 @@ class test_Message(TestCase): def test_ack_log_error_when_error(self): ack = self.message.ack = Mock() - ack.side_effect = KeyError("foo") + ack.side_effect = KeyError('foo') logger = Mock() self.message.ack_log_error(logger, KeyError) ack.assert_called_with() diff --git a/kombu/tests/transport/test_memory.py b/kombu/tests/transport/test_memory.py index 1942a611..2865d6a4 100644 --- a/kombu/tests/transport/test_memory.py +++ b/kombu/tests/transport/test_memory.py @@ -13,14 +13,14 @@ from kombu.tests.utils import TestCase class test_MemoryTransport(TestCase): def setUp(self): - self.c = BrokerConnection(transport="memory") - self.e = Exchange("test_transport_memory") - self.q = Queue("test_transport_memory", + self.c = BrokerConnection(transport='memory') + self.e = Exchange('test_transport_memory') + self.q = Queue('test_transport_memory', exchange=self.e, - routing_key="test_transport_memory") - self.q2 = Queue("test_transport_memory2", + routing_key='test_transport_memory') + self.q2 = Queue('test_transport_memory2', exchange=self.e, - routing_key="test_transport_memory2") + routing_key='test_transport_memory2') def test_produce_consume_noack(self): channel = self.c.channel() @@ -28,7 +28,7 @@ class test_MemoryTransport(TestCase): consumer = Consumer(channel, self.q, no_ack=True) for i in range(10): - producer.publish({"foo": i}, routing_key="test_transport_memory") + producer.publish({'foo': i}, routing_key='test_transport_memory') _received = [] @@ -53,9 +53,9 @@ class test_MemoryTransport(TestCase): self.q2(channel).declare() for i in range(10): - producer.publish({"foo": i}, routing_key="test_transport_memory") + producer.publish({'foo': i}, routing_key='test_transport_memory') for i in range(10): - producer.publish({"foo": i}, routing_key="test_transport_memory2") + producer.publish({'foo': i}, routing_key='test_transport_memory2') _received1 = [] _received2 = [] @@ -82,15 +82,15 @@ class test_MemoryTransport(TestCase): self.assertEqual(len(_received1) + len(_received2), 20) # compression - producer.publish({"compressed": True}, - routing_key="test_transport_memory", - compression="zlib") + producer.publish({'compressed': True}, + routing_key='test_transport_memory', + compression='zlib') m = self.q(channel).get() - self.assertDictEqual(m.payload, {"compressed": True}) + self.assertDictEqual(m.payload, {'compressed': True}) # queue.delete for i in range(10): - producer.publish({"foo": i}, routing_key="test_transport_memory") + producer.publish({'foo': i}, routing_key='test_transport_memory') self.assertTrue(self.q(channel).get()) self.q(channel).delete() self.q(channel).declare() @@ -98,7 +98,7 @@ class test_MemoryTransport(TestCase): # queue.purge for i in range(10): - producer.publish({"foo": i}, routing_key="test_transport_memory2") + producer.publish({'foo': i}, routing_key='test_transport_memory2') self.assertTrue(self.q2(channel).get()) self.q2(channel).purge() self.assertIsNone(self.q2(channel).get()) @@ -122,7 +122,7 @@ class test_MemoryTransport(TestCase): class Cycle(object): def get(self, timeout=None): - return ("foo", "foo"), c1 + return ('foo', 'foo'), c1 self.c.transport.cycle = Cycle() with self.assertRaises(KeyError): @@ -132,6 +132,6 @@ class test_MemoryTransport(TestCase): chan = self.c.channel() chan.queues.clear() - x = chan._queue_for("foo") + x = chan._queue_for('foo') self.assertTrue(x) - self.assertIs(chan._queue_for("foo"), x) + self.assertIs(chan._queue_for('foo'), x) diff --git a/kombu/tests/transport/test_mongodb.py b/kombu/tests/transport/test_mongodb.py index 43def853..1d4145be 100644 --- a/kombu/tests/transport/test_mongodb.py +++ b/kombu/tests/transport/test_mongodb.py @@ -15,52 +15,52 @@ class MockConnection(dict): class test_mongodb(TestCase): - @skip_if_not_module("pymongo") + @skip_if_not_module('pymongo') def test_url_parser(self): from kombu.transport import mongodb from pymongo.errors import ConfigurationError raise SkipTest( - "Test is functional: it actually connects to mongod") + 'Test is functional: it actually connects to mongod') class Transport(mongodb.Transport): Connection = MockConnection - url = "mongodb://" + url = 'mongodb://' c = BrokerConnection(url, transport=Transport).connect() client = c.channels[0].client - self.assertEquals(client.name, "kombu_default") - self.assertEquals(client.connection.host, "127.0.0.1") + self.assertEquals(client.name, 'kombu_default') + self.assertEquals(client.connection.host, '127.0.0.1') - url = "mongodb://localhost" + url = 'mongodb://localhost' c = BrokerConnection(url, transport=Transport).connect() client = c.channels[0].client - self.assertEquals(client.name, "kombu_default") + self.assertEquals(client.name, 'kombu_default') - url = "mongodb://localhost/dbname" + url = 'mongodb://localhost/dbname' c = BrokerConnection(url, transport=Transport).connect() client = c.channels[0].client - self.assertEquals(client.name, "dbname") + self.assertEquals(client.name, 'dbname') - url = "mongodb://localhost,example.org:29017/dbname" + url = 'mongodb://localhost,example.org:29017/dbname' c = BrokerConnection(url, transport=Transport).connect() client = c.channels[0].client nodes = client.connection.nodes self.assertEquals(len(nodes), 2) - self.assertTrue(("example.org", 29017) in nodes) - self.assertEquals(client.name, "dbname") + self.assertTrue(('example.org', 29017) in nodes) + self.assertEquals(client.name, 'dbname') # Passing options breaks kombu's _init_params method - # url = "mongodb://localhost,localhost2:29017/dbname?safe=true" + # url = 'mongodb://localhost,localhost2:29017/dbname?safe=true' # c = BrokerConnection(url, transport=Transport).connect() # client = c.channels[0].client - url = "mongodb://localhost:27017,localhost2:29017/dbname" + url = 'mongodb://localhost:27017,localhost2:29017/dbname' c = BrokerConnection(url, transport=Transport).connect() client = c.channels[0].client - url = "mongodb://username:password@localhost/dbname" + url = 'mongodb://username:password@localhost/dbname' c = BrokerConnection(url, transport=Transport).connect() # Assuming there's no user 'username' with password 'password' # configured in mongodb diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py index c14c8a9a..3d9fb361 100644 --- a/kombu/tests/transport/test_redis.py +++ b/kombu/tests/transport/test_redis.py @@ -52,7 +52,7 @@ class Client(object): self.connection = self._sconnection(self) def bgsave(self): - self._called.append("BGSAVE") + self._called.append('BGSAVE') if self.bgsave_raises_ResponseError: raise ResponseError() @@ -95,7 +95,7 @@ class Client(object): cmd, queues = self.connection._sock.data.pop() assert cmd == type self.connection._sock.data = [] - if type == "BRPOP": + if type == 'BRPOP': item = self.brpop(queues, 0.001) if item: return item @@ -156,7 +156,7 @@ class Client(object): self._sock.data.append((cmd, args)) def info(self): - return {"foo": 1} + return {'foo': 1} def pubsub(self, *args, **kwargs): connection = self.connection @@ -221,18 +221,18 @@ class test_Channel(TestCase): self.channel = self.connection.channel() def test_basic_consume_when_fanout_queue(self): - self.channel.exchange_declare(exchange="txconfan", type="fanout") - self.channel.queue_declare(queue="txconfanq") - self.channel.queue_bind(queue="txconfanq", exchange="txconfan") + self.channel.exchange_declare(exchange='txconfan', type='fanout') + self.channel.queue_declare(queue='txconfanq') + self.channel.queue_bind(queue='txconfanq', exchange='txconfan') - self.assertIn("txconfanq", self.channel._fanout_queues) - self.channel.basic_consume("txconfanq", False, None, 1) - self.assertIn("txconfanq", self.channel.active_fanout_queues) - self.assertEqual(self.channel._fanout_to_queue.get("txconfan"), - "txconfanq") + self.assertIn('txconfanq', self.channel._fanout_queues) + self.channel.basic_consume('txconfanq', False, None, 1) + self.assertIn('txconfanq', self.channel.active_fanout_queues) + self.assertEqual(self.channel._fanout_to_queue.get('txconfan'), + 'txconfanq') def test_basic_cancel_unknown_delivery_tag(self): - self.assertIsNone(self.channel.basic_cancel("txaseqwewq")) + self.assertIsNone(self.channel.basic_cancel('txaseqwewq')) def test_subscribe_no_queues(self): self.channel.subclient = Mock() @@ -243,14 +243,14 @@ class test_Channel(TestCase): def test_subscribe(self): self.channel.subclient = Mock() - self.channel.active_fanout_queues.add("a") - self.channel.active_fanout_queues.add("b") - self.channel._fanout_queues.update(a="a", b="b") + self.channel.active_fanout_queues.add('a') + self.channel.active_fanout_queues.add('b') + self.channel._fanout_queues.update(a='a', b='b') self.channel._subscribe() self.assertTrue(self.channel.subclient.subscribe.called) s_args, _ = self.channel.subclient.subscribe.call_args - self.assertItemsEqual(s_args[0], ["a", "b"]) + self.assertItemsEqual(s_args[0], ['a', 'b']) self.channel.subclient.connection._sock = None self.channel._subscribe() @@ -259,43 +259,43 @@ class test_Channel(TestCase): def test_handle_unsubscribe_message(self): s = self.channel.subclient s.subscribed = True - self.channel._handle_message(s, ["unsubscribe", "a", 0]) + self.channel._handle_message(s, ['unsubscribe', 'a', 0]) self.assertFalse(s.subscribed) def test_handle_pmessage_message(self): self.assertDictEqual(self.channel._handle_message( self.channel.subclient, - ["pmessage", "pattern", "channel", "data"]), - {"type": "pmessage", - "pattern": "pattern", - "channel": "channel", - "data": "data"}) + ['pmessage', 'pattern', 'channel', 'data']), + {'type': 'pmessage', + 'pattern': 'pattern', + 'channel': 'channel', + 'data': 'data'}) def test_handle_message(self): self.assertDictEqual(self.channel._handle_message( self.channel.subclient, - ["type", "channel", "data"]), - {"type": "type", - "pattern": None, - "channel": "channel", - "data": "data"}) + ['type', 'channel', 'data']), + {'type': 'type', + 'pattern': None, + 'channel': 'channel', + 'data': 'data'}) def test_brpop_start_but_no_queues(self): self.assertIsNone(self.channel._brpop_start()) def test_receive(self): s = self.channel.subclient = Mock() - self.channel._fanout_to_queue["a"] = "b" - s.parse_response.return_value = ["message", "a", - dumps({"hello": "world"})] + self.channel._fanout_to_queue['a'] = 'b' + s.parse_response.return_value = ['message', 'a', + dumps({'hello': 'world'})] payload, queue = self.channel._receive() - self.assertDictEqual(payload, {"hello": "world"}) - self.assertEqual(queue, "b") + self.assertDictEqual(payload, {'hello': 'world'}) + self.assertEqual(queue, 'b') def test_receive_raises(self): self.channel._in_listen = True s = self.channel.subclient = Mock() - s.parse_response.side_effect = KeyError("foo") + s.parse_response.side_effect = KeyError('foo') with self.assertRaises(redis.Empty): self.channel._receive() @@ -310,14 +310,14 @@ class test_Channel(TestCase): def test_receive_different_message_Type(self): s = self.channel.subclient = Mock() - s.parse_response.return_value = ["pmessage", "/foo/", 0, "data"] + s.parse_response.return_value = ['pmessage', '/foo/', 0, 'data'] with self.assertRaises(redis.Empty): self.channel._receive() def test_brpop_read_raises(self): c = self.channel.client = Mock() - c.parse_response.side_effect = KeyError("foo") + c.parse_response.side_effect = KeyError('foo') with self.assertRaises(redis.Empty): self.channel._brpop_read() @@ -334,20 +334,20 @@ class test_Channel(TestCase): def test_poll_error(self): c = self.channel.client = Mock() c.parse_response = Mock() - self.channel._poll_error("BRPOP") + self.channel._poll_error('BRPOP') - c.parse_response.assert_called_with("BRPOP") + c.parse_response.assert_called_with('BRPOP') - c.parse_response.side_effect = KeyError("foo") - self.assertIsNone(self.channel._poll_error("BRPOP")) + c.parse_response.side_effect = KeyError('foo') + self.assertIsNone(self.channel._poll_error('BRPOP')) def test_put_fanout(self): self.channel._in_poll = False c = self.channel.client = Mock() - body = {"hello": "world"} - self.channel._put_fanout("exchange", body) - c.publish.assert_called_with("exchange", dumps(body)) + body = {'hello': 'world'} + self.channel._put_fanout('exchange', body) + c.publish.assert_called_with('exchange', dumps(body)) def test_delete(self): x = self.channel @@ -355,20 +355,20 @@ class test_Channel(TestCase): delete = x.client.delete = Mock() srem = x.client.srem = Mock() - x._delete("queue", "exchange", "routing_key", None) - delete.assert_has_call("queue") - srem.assert_has_call(x.keyprefix_queue % ("exchange", ), - x.sep.join(["routing_key", "", "queue"])) + x._delete('queue', 'exchange', 'routing_key', None) + delete.assert_has_call('queue') + srem.assert_has_call(x.keyprefix_queue % ('exchange', ), + x.sep.join(['routing_key', '', 'queue'])) def test_has_queue(self): self.channel._in_poll = False exists = self.channel.client.exists = Mock() exists.return_value = True - self.assertTrue(self.channel._has_queue("foo")) - exists.assert_has_call("foo") + self.assertTrue(self.channel._has_queue('foo')) + exists.assert_has_call('foo') exists.return_value = False - self.assertFalse(self.channel._has_queue("foo")) + self.assertFalse(self.channel._has_queue('foo')) def test_close_when_closed(self): self.channel.closed = True @@ -382,27 +382,27 @@ class test_Channel(TestCase): c.connection.disconnect.assert_called_with() def test_invalid_database_raises_ValueError(self): - self.channel.connection.client.virtual_host = "xfeqwewkfk" + self.channel.connection.client.virtual_host = 'xfeqwewkfk' with self.assertRaises(ValueError): self.channel._create_client() - @skip_if_not_module("redis") + @skip_if_not_module('redis') def test_get_client(self): import redis as R KombuRedis = redis.Channel._get_client(self.channel) self.assertTrue(KombuRedis) - Rv = getattr(R, "__version__") + Rv = getattr(R, '__version__') try: - R.__version__ = "2.4.0" + R.__version__ = '2.4.0' with self.assertRaises(VersionMismatch): redis.Channel._get_client(self.channel) finally: if Rv is not None: R.__version__ = Rv - @skip_if_not_module("redis") + @skip_if_not_module('redis') def test_get_response_error(self): from redis.exceptions import ResponseError self.assertIs(redis.Channel._get_response_error(self.channel), @@ -421,19 +421,19 @@ class test_Channel(TestCase): self.assertTrue(self.channel._avail_client) cc.assert_called_with() - @skip_if_not_module("redis") + @skip_if_not_module('redis') def test_transport_get_errors(self): self.assertTrue(redis.Transport._get_errors(self.connection.transport)) - @skip_if_not_module("redis") + @skip_if_not_module('redis') def test_transport_get_errors_when_InvalidData_used(self): from redis import exceptions class ID(Exception): pass - DataError = getattr(exceptions, "DataError", None) - InvalidData = getattr(exceptions, "InvalidData", None) + DataError = getattr(exceptions, 'DataError', None) + InvalidData = getattr(exceptions, 'InvalidData', None) exceptions.InvalidData = ID exceptions.DataError = None try: @@ -462,28 +462,28 @@ class test_Channel(TestCase): # which raises a channel error so that the consumer/publisher # can recover by redeclaring the required entities. with self.assertRaises(InconsistencyError): - self.channel.get_table("celery") + self.channel.get_table('celery') class test_Redis(TestCase): def setUp(self): self.connection = BrokerConnection(transport=Transport) - self.exchange = Exchange("test_Redis", type="direct") - self.queue = Queue("test_Redis", self.exchange, "test_Redis") + self.exchange = Exchange('test_Redis', type='direct') + self.queue = Queue('test_Redis', self.exchange, 'test_Redis') def tearDown(self): self.connection.close() def test_publish__get(self): channel = self.connection.channel() - producer = Producer(channel, self.exchange, routing_key="test_Redis") + producer = Producer(channel, self.exchange, routing_key='test_Redis') self.queue(channel).declare() - producer.publish({"hello": "world"}) + producer.publish({'hello': 'world'}) self.assertDictEqual(self.queue(channel).get().payload, - {"hello": "world"}) + {'hello': 'world'}) self.assertIsNone(self.queue(channel).get()) self.assertIsNone(self.queue(channel).get()) self.assertIsNone(self.queue(channel).get()) @@ -491,10 +491,10 @@ class test_Redis(TestCase): def test_publish__consume(self): connection = BrokerConnection(transport=Transport) channel = connection.channel() - producer = Producer(channel, self.exchange, routing_key="test_Redis") + producer = Producer(channel, self.exchange, routing_key='test_Redis') consumer = Consumer(channel, self.queue) - producer.publish({"hello2": "world2"}) + producer.publish({'hello2': 'world2'}) _received = [] def callback(message_data, message): @@ -515,13 +515,13 @@ class test_Redis(TestCase): def test_purge(self): channel = self.connection.channel() - producer = Producer(channel, self.exchange, routing_key="test_Redis") + producer = Producer(channel, self.exchange, routing_key='test_Redis') self.queue(channel).declare() for i in range(10): - producer.publish({"hello": "world-%s" % (i, )}) + producer.publish({'hello': 'world-%s' % (i, )}) - self.assertEqual(channel._size("test_Redis"), 10) + self.assertEqual(channel._size('test_Redis'), 10) self.assertEqual(self.queue(channel).purge(), 10) channel.close() @@ -530,16 +530,16 @@ class test_Redis(TestCase): transport=Transport).channel() self.assertEqual(c1.client.db, 1) - c2 = BrokerConnection(virtual_host="1", + c2 = BrokerConnection(virtual_host='1', transport=Transport).channel() self.assertEqual(c2.client.db, 1) - c3 = BrokerConnection(virtual_host="/1", + c3 = BrokerConnection(virtual_host='/1', transport=Transport).channel() self.assertEqual(c3.client.db, 1) with self.assertRaises(Exception): - BrokerConnection(virtual_host="/foo", + BrokerConnection(virtual_host='/foo', transport=Transport).channel() def test_db_port(self): @@ -574,7 +574,7 @@ class test_Redis(TestCase): def test_get__Empty(self): channel = self.connection.channel() with self.assertRaises(Empty): - channel._get("does-not-exist") + channel._get('does-not-exist') channel.close() def test_get_client(self): @@ -610,7 +610,7 @@ def _redis_modules(): class ResponseError(Exception): pass - exceptions = types.ModuleType("redis.exceptions") + exceptions = types.ModuleType('redis.exceptions') exceptions.ConnectionError = ConnectionError exceptions.AuthenticationError = AuthenticationError exceptions.InvalidData = InvalidData @@ -620,7 +620,7 @@ def _redis_modules(): class Redis(object): pass - myredis = types.ModuleType("redis") + myredis = types.ModuleType('redis') myredis.exceptions = exceptions myredis.Redis = Redis @@ -703,7 +703,7 @@ class test_MultiChannelPoller(TestCase): self.assertEqual(p._register.call_count, 1) channel.client.connection._sock = Mock() - p._chan_to_sock[(channel, channel.client, "BRPOP")] = True + p._chan_to_sock[(channel, channel.client, 'BRPOP')] = True channel._in_poll = True p._register_BRPOP(channel) self.assertEqual(channel._brpop_start.call_count, 1) @@ -717,7 +717,7 @@ class test_MultiChannelPoller(TestCase): p._register = Mock() p._register_LISTEN(channel) - p._register.assert_called_with(channel, channel.subclient, "LISTEN") + p._register.assert_called_with(channel, channel.subclient, 'LISTEN') self.assertEqual(p._register.call_count, 1) self.assertEqual(channel._subscribe.call_count, 1) @@ -753,7 +753,7 @@ class test_MultiChannelPoller(TestCase): p.get() def test_get_brpop_qos_allow(self): - p, channel = self.create_get(queues=["a_queue"]) + p, channel = self.create_get(queues=['a_queue']) channel.qos.can_consume.return_value = True with self.assertRaises(redis.Empty): @@ -762,7 +762,7 @@ class test_MultiChannelPoller(TestCase): p._register_BRPOP.assert_called_with(channel) def test_get_brpop_qos_disallow(self): - p, channel = self.create_get(queues=["a_queue"]) + p, channel = self.create_get(queues=['a_queue']) channel.qos.can_consume.return_value = False with self.assertRaises(redis.Empty): @@ -771,7 +771,7 @@ class test_MultiChannelPoller(TestCase): self.assertFalse(p._register_BRPOP.called) def test_get_listen(self): - p, channel = self.create_get(fanouts=["f_queue"]) + p, channel = self.create_get(fanouts=['f_queue']) with self.assertRaises(redis.Empty): p.get() @@ -780,19 +780,19 @@ class test_MultiChannelPoller(TestCase): def test_get_receives_ERR(self): p, channel = self.create_get(events=[(1, eventio.ERR)]) - p._fd_to_chan[1] = (channel, "BRPOP") + p._fd_to_chan[1] = (channel, 'BRPOP') with self.assertRaises(redis.Empty): p.get() - channel._poll_error.assert_called_with("BRPOP") + channel._poll_error.assert_called_with('BRPOP') def test_get_receives_multiple(self): p, channel = self.create_get(events=[(1, eventio.ERR), (1, eventio.ERR)]) - p._fd_to_chan[1] = (channel, "BRPOP") + p._fd_to_chan[1] = (channel, 'BRPOP') with self.assertRaises(redis.Empty): p.get() - channel._poll_error.assert_called_with("BRPOP") + channel._poll_error.assert_called_with('BRPOP') diff --git a/kombu/tests/transport/test_sqlalchemy.py b/kombu/tests/transport/test_sqlalchemy.py index b827bc88..8fb8e6dd 100644 --- a/kombu/tests/transport/test_sqlalchemy.py +++ b/kombu/tests/transport/test_sqlalchemy.py @@ -14,15 +14,15 @@ class test_sqlalchemy(TestCase): try: import sqlalchemy # noqa except ImportError: - raise SkipTest("sqlalchemy not installed") - with patch("kombu.transport.sqlalchemy.Channel._open"): - url = "sqlalchemy+sqlite://celerydb.sqlite" + raise SkipTest('sqlalchemy not installed') + with patch('kombu.transport.sqlalchemy.Channel._open'): + url = 'sqlalchemy+sqlite://celerydb.sqlite' BrokerConnection(url).connect() - url = "sqla+sqlite://celerydb.sqlite" + url = 'sqla+sqlite://celerydb.sqlite' BrokerConnection(url).connect() # Should prevent regression fixed by f187ccd - url = "sqlb+sqlite://celerydb.sqlite" + url = 'sqlb+sqlite://celerydb.sqlite' with self.assertRaises(KeyError): BrokerConnection(url).connect() diff --git a/kombu/tests/transport/test_transport.py b/kombu/tests/transport/test_transport.py index cc3eeea7..151f4b56 100644 --- a/kombu/tests/transport/test_transport.py +++ b/kombu/tests/transport/test_transport.py @@ -12,19 +12,19 @@ class test_transport(TestCase): def test_resolve_transport__no_class_name(self): with self.assertRaises(KeyError): - transport.resolve_transport("nonexistant") + transport.resolve_transport('nonexistant') def test_resolve_transport_when_callable(self): self.assertTupleEqual(transport.resolve_transport( - lambda: "kombu.transport.memory.Transport"), - ("kombu.transport.memory", "Transport")) + lambda: 'kombu.transport.memory.Transport'), + ('kombu.transport.memory', 'Transport')) class test_transport_gettoq(TestCase): - @patch("warnings.warn") + @patch('warnings.warn') def test_compat(self, warn): - x = transport._ghettoq("Redis", "redis", "redis") + x = transport._ghettoq('Redis', 'redis', 'redis') - self.assertEqual(x(), "kombu.transport.redis.Transport") + self.assertEqual(x(), 'kombu.transport.redis.Transport') self.assertTrue(warn.called) diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py index 301d71e7..d4a5fe3f 100644 --- a/kombu/tests/transport/virtual/test_base.py +++ b/kombu/tests/transport/virtual/test_base.py @@ -16,20 +16,20 @@ from kombu.tests.utils import Mock, redirect_stdouts def client(**kwargs): - return BrokerConnection(transport="kombu.transport.virtual.Transport", + return BrokerConnection(transport='kombu.transport.virtual.Transport', **kwargs) def memory_client(): - return BrokerConnection(transport="memory") + return BrokerConnection(transport='memory') class test_BrokerState(TestCase): def test_constructor(self): s = virtual.BrokerState() - self.assertTrue(hasattr(s, "exchanges")) - self.assertTrue(hasattr(s, "bindings")) + self.assertTrue(hasattr(s, 'exchanges')) + self.assertTrue(hasattr(s, 'bindings')) t = virtual.BrokerState(exchanges=16, bindings=32) self.assertEqual(t.exchanges, 16) @@ -95,66 +95,66 @@ class test_QoS(TestCase): self.assertFalse(stdout.getvalue()) def test_get(self): - self.q._delivered["foo"] = 1 - self.assertEqual(self.q.get("foo"), 1) + self.q._delivered['foo'] = 1 + self.assertEqual(self.q.get('foo'), 1) class test_Message(TestCase): def test_create(self): c = client().channel() - data = c.prepare_message("the quick brown fox...") - tag = data["properties"]["delivery_tag"] = uuid() + data = c.prepare_message('the quick brown fox...') + tag = data['properties']['delivery_tag'] = uuid() message = c.message_to_python(data) self.assertIsInstance(message, virtual.Message) self.assertIs(message, c.message_to_python(message)) self.assertEqual(message.body, - "the quick brown fox...".encode("utf-8")) + 'the quick brown fox...'.encode('utf-8')) self.assertTrue(message.delivery_tag, tag) def test_create_no_body(self): virtual.Message(Mock(), { - "body": None, - "properties": {"delivery_tag": 1}}) + 'body': None, + 'properties': {'delivery_tag': 1}}) def test_serializable(self): c = client().channel() - data = c.prepare_message("the quick brown fox...") - tag = data["properties"]["delivery_tag"] = uuid() + data = c.prepare_message('the quick brown fox...') + tag = data['properties']['delivery_tag'] = uuid() message = c.message_to_python(data) dict_ = message.serializable() - self.assertEqual(dict_["body"], - "the quick brown fox...".encode("utf-8")) - self.assertEqual(dict_["properties"]["delivery_tag"], tag) + self.assertEqual(dict_['body'], + 'the quick brown fox...'.encode('utf-8')) + self.assertEqual(dict_['properties']['delivery_tag'], tag) class test_AbstractChannel(TestCase): def test_get(self): with self.assertRaises(NotImplementedError): - virtual.AbstractChannel()._get("queue") + virtual.AbstractChannel()._get('queue') def test_put(self): with self.assertRaises(NotImplementedError): - virtual.AbstractChannel()._put("queue", "m") + virtual.AbstractChannel()._put('queue', 'm') def test_size(self): - self.assertEqual(virtual.AbstractChannel()._size("queue"), 0) + self.assertEqual(virtual.AbstractChannel()._size('queue'), 0) def test_purge(self): with self.assertRaises(NotImplementedError): - virtual.AbstractChannel()._purge("queue") + virtual.AbstractChannel()._purge('queue') def test_delete(self): with self.assertRaises(NotImplementedError): - virtual.AbstractChannel()._delete("queue") + virtual.AbstractChannel()._delete('queue') def test_new_queue(self): - self.assertIsNone(virtual.AbstractChannel()._new_queue("queue")) + self.assertIsNone(virtual.AbstractChannel()._new_queue('queue')) def test_has_queue(self): - self.assertTrue(virtual.AbstractChannel()._has_queue("queue")) + self.assertTrue(virtual.AbstractChannel()._has_queue('queue')) def test_poll(self): @@ -181,20 +181,20 @@ class test_Channel(TestCase): def test_exchange_declare(self): c = self.channel - c.exchange_declare("test_exchange_declare", "direct", + c.exchange_declare('test_exchange_declare', 'direct', durable=True, auto_delete=True) - self.assertIn("test_exchange_declare", c.state.exchanges) + self.assertIn('test_exchange_declare', c.state.exchanges) # can declare again with same values - c.exchange_declare("test_exchange_declare", "direct", + c.exchange_declare('test_exchange_declare', 'direct', durable=True, auto_delete=True) - self.assertIn("test_exchange_declare", c.state.exchanges) + self.assertIn('test_exchange_declare', c.state.exchanges) # using different values raises NotEquivalentError with self.assertRaises(virtual.NotEquivalentError): - c.exchange_declare("test_exchange_declare", "direct", + c.exchange_declare('test_exchange_declare', 'direct', durable=False, auto_delete=True) - def test_exchange_delete(self, ex="test_exchange_delete"): + def test_exchange_delete(self, ex='test_exchange_delete'): class PurgeChannel(virtual.Channel): purged = [] @@ -204,13 +204,13 @@ class test_Channel(TestCase): c = PurgeChannel(self.channel.connection) - c.exchange_declare(ex, "direct", durable=True, auto_delete=True) + c.exchange_declare(ex, 'direct', durable=True, auto_delete=True) self.assertIn(ex, c.state.exchanges) self.assertNotIn(ex, c.state.bindings) # no bindings yet c.exchange_delete(ex) self.assertNotIn(ex, c.state.exchanges) - c.exchange_declare(ex, "direct", durable=True, auto_delete=True) + c.exchange_declare(ex, 'direct', durable=True, auto_delete=True) c.queue_declare(ex) c.queue_bind(ex, ex, ex) self.assertTrue(c.state.bindings[ex]) @@ -218,7 +218,7 @@ class test_Channel(TestCase): self.assertNotIn(ex, c.state.bindings) self.assertIn(ex, c.purged) - def test_queue_delete__if_empty(self, n="test_queue_delete__if_empty"): + def test_queue_delete__if_empty(self, n='test_queue_delete__if_empty'): class PurgeChannel(virtual.Channel): purged = [] size = 30 @@ -244,7 +244,7 @@ class test_Channel(TestCase): self.assertNotIn(n, c.state.bindings) self.assertIn(n, c.purged) - def test_queue_purge(self, n="test_queue_purge"): + def test_queue_purge(self, n='test_queue_purge'): class PurgeChannel(virtual.Channel): purged = [] @@ -260,35 +260,35 @@ class test_Channel(TestCase): self.assertIn(n, c.purged) def test_basic_publish__get__consume__restore(self, - n="test_basic_publish"): + n='test_basic_publish'): c = memory_client().channel() c.exchange_declare(n) c.queue_declare(n) c.queue_bind(n, n, n) - c.queue_declare(n + "2") - c.queue_bind(n + "2", n, n) + c.queue_declare(n + '2') + c.queue_bind(n + '2', n, n) - m = c.prepare_message("nthex quick brown fox...") + m = c.prepare_message('nthex quick brown fox...') c.basic_publish(m, n, n) r1 = c.message_to_python(c.basic_get(n)) self.assertTrue(r1) self.assertEqual(r1.body, - "nthex quick brown fox...".encode("utf-8")) + 'nthex quick brown fox...'.encode('utf-8')) self.assertIsNone(c.basic_get(n)) consumer_tag = uuid() - c.basic_consume(n + "2", False, consumer_tag=consumer_tag, + c.basic_consume(n + '2', False, consumer_tag=consumer_tag, callback=lambda *a: None) - self.assertIn(n + "2", c._active_queues) + self.assertIn(n + '2', c._active_queues) r2, _ = c.drain_events() r2 = c.message_to_python(r2) self.assertEqual(r2.body, - "nthex quick brown fox...".encode("utf-8")) - self.assertEqual(r2.delivery_info["exchange"], n) - self.assertEqual(r2.delivery_info["routing_key"], n) + 'nthex quick brown fox...'.encode('utf-8')) + self.assertEqual(r2.delivery_info['exchange'], n) + self.assertEqual(r2.delivery_info['routing_key'], n) with self.assertRaises(virtual.Empty): c.drain_events() c.basic_cancel(consumer_tag) @@ -296,7 +296,7 @@ class test_Channel(TestCase): c._restore(r2) r3 = c.message_to_python(c.basic_get(n)) self.assertTrue(r3) - self.assertEqual(r3.body, "nthex quick brown fox...".encode("utf-8")) + self.assertEqual(r3.body, 'nthex quick brown fox...'.encode('utf-8')) self.assertIsNone(c.basic_get(n)) def test_basic_ack(self): @@ -308,7 +308,7 @@ class test_Channel(TestCase): self.was_acked = True self.channel._qos = MockQoS(self.channel) - self.channel.basic_ack("foo") + self.channel.basic_ack('foo') self.assertTrue(self.channel._qos.was_acked) def test_basic_recover__requeue(self): @@ -336,8 +336,8 @@ class test_Channel(TestCase): self.assertEqual(errors[0][1], 1) self.assertFalse(q._delivered) - @patch("kombu.transport.virtual.emergency_dump_state") - @patch("kombu.transport.virtual.say") + @patch('kombu.transport.virtual.emergency_dump_state') + @patch('kombu.transport.virtual.say') def test_restore_unacked_once_when_unrestored(self, say, emergency_dump_state): q = self.channel.qos @@ -373,20 +373,20 @@ class test_Channel(TestCase): self.was_rejected = True self.channel._qos = MockQoS(self.channel) - self.channel.basic_reject("foo") + self.channel.basic_reject('foo') self.assertTrue(self.channel._qos.was_rejected) def test_basic_qos(self): self.channel.basic_qos(prefetch_count=128) self.assertEqual(self.channel._qos.prefetch_count, 128) - def test_lookup__undeliverable(self, n="test_lookup__undeliverable"): + def test_lookup__undeliverable(self, n='test_lookup__undeliverable'): warnings.resetwarnings() with catch_warnings(record=True) as log: - self.assertListEqual(self.channel._lookup(n, n, "ae.undeliver"), - ["ae.undeliver"]) + self.assertListEqual(self.channel._lookup(n, n, 'ae.undeliver'), + ['ae.undeliver']) self.assertTrue(log) - self.assertIn("could not be delivered", log[0].message.args[0]) + self.assertIn('could not be delivered', log[0].message.args[0]) def test_context(self): x = self.channel.__enter__() @@ -418,44 +418,44 @@ class test_Channel(TestCase): c._get_many.assert_called_with(c._active_queues, timeout=10.0) def test_get_exchanges(self): - self.channel.exchange_declare(exchange="foo") + self.channel.exchange_declare(exchange='foo') self.assertTrue(self.channel.get_exchanges()) def test_basic_cancel_not_in_active_queues(self): c = self.channel - c._consumers.add("x") - c._tag_to_queue["x"] = "foo" + c._consumers.add('x') + c._tag_to_queue['x'] = 'foo' c._active_queues = Mock() c._active_queues.remove.side_effect = ValueError() - c.basic_cancel("x") - c._active_queues.remove.assert_called_with("foo") + c.basic_cancel('x') + c._active_queues.remove.assert_called_with('foo') def test_basic_cancel_unknown_ctag(self): - self.assertIsNone(self.channel.basic_cancel("unknown-tag")) + self.assertIsNone(self.channel.basic_cancel('unknown-tag')) def test_list_bindings(self): c = self.channel - c.exchange_declare(exchange="foo") - c.queue_declare(queue="q") - c.queue_bind(queue="q", exchange="foo", routing_key="rk") + c.exchange_declare(exchange='foo') + c.queue_declare(queue='q') + c.queue_bind(queue='q', exchange='foo', routing_key='rk') - self.assertIn(("q", "foo", "rk"), list(c.list_bindings())) + self.assertIn(('q', 'foo', 'rk'), list(c.list_bindings())) def test_after_reply_message_received(self): c = self.channel c.queue_delete = Mock() - c.after_reply_message_received("foo") - c.queue_delete.assert_called_with("foo") + c.after_reply_message_received('foo') + c.queue_delete.assert_called_with('foo') def test_queue_delete_unknown_queue(self): - self.assertIsNone(self.channel.queue_delete("xiwjqjwel")) + self.assertIsNone(self.channel.queue_delete('xiwjqjwel')) def test_queue_declare_passive(self): has_queue = self.channel._has_queue = Mock() has_queue.return_value = False with self.assertRaises(StdChannelError): - self.channel.queue_declare(queue="21wisdjwqe", passive=True) + self.channel.queue_declare(queue='21wisdjwqe', passive=True) class test_Transport(TestCase): diff --git a/kombu/tests/transport/virtual/test_exchange.py b/kombu/tests/transport/virtual/test_exchange.py index d33b9efc..50498021 100644 --- a/kombu/tests/transport/virtual/test_exchange.py +++ b/kombu/tests/transport/virtual/test_exchange.py @@ -20,54 +20,54 @@ class ExchangeCase(TestCase): class test_Direct(ExchangeCase): type = exchange.DirectExchange - table = [("rFoo", None, "qFoo"), - ("rFoo", None, "qFox"), - ("rBar", None, "qBar"), - ("rBaz", None, "qBaz")] + table = [('rFoo', None, 'qFoo'), + ('rFoo', None, 'qFox'), + ('rBar', None, 'qBar'), + ('rBaz', None, 'qBaz')] def test_lookup(self): self.assertListEqual(self.e.lookup( - self.table, "eFoo", "rFoo", None), - ["qFoo", "qFox"]) + self.table, 'eFoo', 'rFoo', None), + ['qFoo', 'qFox']) self.assertListEqual(self.e.lookup( - self.table, "eMoz", "rMoz", "DEFAULT"), + self.table, 'eMoz', 'rMoz', 'DEFAULT'), []) self.assertListEqual(self.e.lookup( - self.table, "eBar", "rBar", None), - ["qBar"]) + self.table, 'eBar', 'rBar', None), + ['qBar']) class test_Fanout(ExchangeCase): type = exchange.FanoutExchange - table = [(None, None, "qFoo"), - (None, None, "qFox"), - (None, None, "qBar")] + table = [(None, None, 'qFoo'), + (None, None, 'qFox'), + (None, None, 'qBar')] def test_lookup(self): self.assertListEqual(self.e.lookup( - self.table, "eFoo", "rFoo", None), - ["qFoo", "qFox", "qBar"]) + self.table, 'eFoo', 'rFoo', None), + ['qFoo', 'qFox', 'qBar']) def test_deliver_when_fanout_supported(self): self.e.channel = Mock() self.e.channel.supports_fanout = True message = Mock() - self.e.deliver(message, "exchange", None) - self.e.channel._put_fanout.assert_called_with("exchange", message) + self.e.deliver(message, 'exchange', None) + self.e.channel._put_fanout.assert_called_with('exchange', message) def test_deliver_when_fanout_unsupported(self): self.e.channel = Mock() self.e.channel.supports_fanout = False - self.e.deliver(Mock(), "exchange", None) + self.e.deliver(Mock(), 'exchange', None) self.assertFalse(self.e.channel._put_fanout.called) class test_Topic(ExchangeCase): type = exchange.TopicExchange - table = [("stock.#", None, "rFoo"), - ("stock.us.*", None, "rBar")] + table = [('stock.#', None, 'rFoo'), + ('stock.us.*', None, 'rBar')] def setUp(self): super(test_Topic, self).setUp() @@ -75,32 +75,32 @@ class test_Topic(ExchangeCase): for rkey, _, queue in self.table] def test_prepare_bind(self): - x = self.e.prepare_bind("qFoo", "eFoo", "stock.#", {}) - self.assertTupleEqual(x, ("stock.#", r'^stock\..*?$', "qFoo")) + x = self.e.prepare_bind('qFoo', 'eFoo', 'stock.#', {}) + self.assertTupleEqual(x, ('stock.#', r'^stock\..*?$', 'qFoo')) def test_lookup(self): self.assertListEqual(self.e.lookup( - self.table, "eFoo", "stock.us.nasdaq", None), - ["rFoo", "rBar"]) + self.table, 'eFoo', 'stock.us.nasdaq', None), + ['rFoo', 'rBar']) self.assertTrue(self.e._compiled) self.assertListEqual(self.e.lookup( - self.table, "eFoo", "stock.europe.OSE", None), - ["rFoo"]) + self.table, 'eFoo', 'stock.europe.OSE', None), + ['rFoo']) self.assertListEqual(self.e.lookup( - self.table, "eFoo", "stockxeuropexOSE", None), + self.table, 'eFoo', 'stockxeuropexOSE', None), []) self.assertListEqual(self.e.lookup( - self.table, "eFoo", "candy.schleckpulver.snap_crackle", None), + self.table, 'eFoo', 'candy.schleckpulver.snap_crackle', None), []) def test_deliver(self): self.e.channel = Mock() - self.e.channel._lookup.return_value = ("a", "b") + self.e.channel._lookup.return_value = ('a', 'b') message = Mock() - self.e.deliver(message, "exchange", "rkey") + self.e.deliver(message, 'exchange', 'rkey') - expected = [(("a", message), {}), - (("b", message), {})] + expected = [(('a', message), {}), + (('b', message), {})] self.assertListEqual(self.e.channel._put.call_args_list, expected) @@ -109,32 +109,32 @@ class test_ExchangeType(ExchangeCase): def test_lookup(self): with self.assertRaises(NotImplementedError): - self.e.lookup([], "eFoo", "rFoo", None) + self.e.lookup([], 'eFoo', 'rFoo', None) def test_prepare_bind(self): - self.assertTupleEqual(self.e.prepare_bind("qFoo", "eFoo", "rFoo", {}), - ("rFoo", None, "qFoo")) + self.assertTupleEqual(self.e.prepare_bind('qFoo', 'eFoo', 'rFoo', {}), + ('rFoo', None, 'qFoo')) def test_equivalent(self): - e1 = dict(type="direct", + e1 = dict(type='direct', durable=True, auto_delete=True, arguments={}) self.assertTrue( - self.e.equivalent(e1, "eFoo", "direct", True, True, {})) + self.e.equivalent(e1, 'eFoo', 'direct', True, True, {})) self.assertFalse( - self.e.equivalent(e1, "eFoo", "topic", True, True, {})) + self.e.equivalent(e1, 'eFoo', 'topic', True, True, {})) self.assertFalse( - self.e.equivalent(e1, "eFoo", "direct", False, True, {})) + self.e.equivalent(e1, 'eFoo', 'direct', False, True, {})) self.assertFalse( - self.e.equivalent(e1, "eFoo", "direct", True, False, {})) + self.e.equivalent(e1, 'eFoo', 'direct', True, False, {})) self.assertFalse( - self.e.equivalent(e1, "eFoo", "direct", True, True, { - "expires": 3000})) - e2 = dict(e1, arguments={"expires": 3000}) + self.e.equivalent(e1, 'eFoo', 'direct', True, True, { + 'expires': 3000})) + e2 = dict(e1, arguments={'expires': 3000}) self.assertTrue( - self.e.equivalent(e2, "eFoo", "direct", True, True, { - "expires": 3000})) + self.e.equivalent(e2, 'eFoo', 'direct', True, True, { + 'expires': 3000})) self.assertFalse( - self.e.equivalent(e2, "eFoo", "direct", True, True, { - "expires": 6000})) + self.e.equivalent(e2, 'eFoo', 'direct', True, True, { + 'expires': 6000})) diff --git a/kombu/tests/transport/virtual/test_scheduling.py b/kombu/tests/transport/virtual/test_scheduling.py index 6cae66f1..afbcd061 100644 --- a/kombu/tests/transport/virtual/test_scheduling.py +++ b/kombu/tests/transport/virtual/test_scheduling.py @@ -20,12 +20,12 @@ def consume(fun, n): class test_FairCycle(TestCase): def test_cycle(self): - resources = ["a", "b", "c", "d", "e"] + resources = ['a', 'b', 'c', 'd', 'e'] def echo(r, timeout=None): return r - # cycle should be ["a", "b", "c", "d", "e", ... repeat] + # cycle should be ['a', 'b', 'c', 'd', 'e', ... repeat] cycle = FairCycle(echo, resources, MyEmpty) for i in range(len(resources)): self.assertEqual(cycle.get(), (resources[i], @@ -35,21 +35,21 @@ class test_FairCycle(TestCase): resources[i])) def test_cycle_breaks(self): - resources = ["a", "b", "c", "d", "e"] + resources = ['a', 'b', 'c', 'd', 'e'] def echo(r): - if r == "c": + if r == 'c': raise MyEmpty(r) return r cycle = FairCycle(echo, resources, MyEmpty) self.assertEqual(consume(cycle.get, len(resources)), - [("a", "a"), ("b", "b"), ("d", "d"), - ("e", "e"), ("a", "a")]) + [('a', 'a'), ('b', 'b'), ('d', 'd'), + ('e', 'e'), ('a', 'a')]) self.assertEqual(consume(cycle.get, len(resources)), - [("b", "b"), ("d", "d"), ("e", "e"), - ("a", "a"), ("b", "b")]) - cycle2 = FairCycle(echo, ["c", "c"], MyEmpty) + [('b', 'b'), ('d', 'd'), ('e', 'e'), + ('a', 'a'), ('b', 'b')]) + cycle2 = FairCycle(echo, ['c', 'c'], MyEmpty) with self.assertRaises(MyEmpty): consume(cycle2.get, 3) diff --git a/kombu/tests/utilities/test_encoding.py b/kombu/tests/utilities/test_encoding.py index 336ed037..eb19d1ad 100644 --- a/kombu/tests/utilities/test_encoding.py +++ b/kombu/tests/utilities/test_encoding.py @@ -15,24 +15,24 @@ from kombu.tests.utils import TestCase @contextmanager def clean_encoding(): - old_encoding = sys.modules.pop("kombu.utils.encoding", None) + old_encoding = sys.modules.pop('kombu.utils.encoding', None) import kombu.utils.encoding yield kombu.utils.encoding if old_encoding: - sys.modules["kombu.utils.encoding"] = old_encoding + sys.modules['kombu.utils.encoding'] = old_encoding class test_default_encoding(TestCase): - @patch("sys.getfilesystemencoding") + @patch('sys.getfilesystemencoding') def test_default(self, getfilesystemencoding): - getfilesystemencoding.return_value = "ascii" + getfilesystemencoding.return_value = 'ascii' with clean_encoding() as encoding: enc = encoding.default_encoding() - if sys.platform.startswith("java"): - self.assertEqual(enc, "utf-8") + if sys.platform.startswith('java'): + self.assertEqual(enc, 'utf-8') else: - self.assertEqual(enc, "ascii") + self.assertEqual(enc, 'ascii') getfilesystemencoding.assert_called_with() @@ -40,32 +40,32 @@ class test_encoding_utils(TestCase): def setUp(self): if sys.version_info >= (3, 0): - raise SkipTest("not relevant on py3k") + raise SkipTest('not relevant on py3k') def test_str_to_bytes(self): with clean_encoding() as e: - self.assertIsInstance(e.str_to_bytes(u"foobar"), str) - self.assertIsInstance(e.str_to_bytes("foobar"), str) + self.assertIsInstance(e.str_to_bytes(u'foobar'), str) + self.assertIsInstance(e.str_to_bytes('foobar'), str) def test_from_utf8(self): with clean_encoding() as e: - self.assertIsInstance(e.from_utf8(u"foobar"), str) + self.assertIsInstance(e.from_utf8(u'foobar'), str) def test_default_encode(self): with clean_encoding() as e: - self.assertTrue(e.default_encode("foo")) + self.assertTrue(e.default_encode('foo')) class test_safe_str(TestCase): def test_when_str(self): - self.assertEqual(safe_str("foo"), "foo") + self.assertEqual(safe_str('foo'), 'foo') def test_when_unicode(self): - self.assertIsInstance(safe_str(u"foo"), str) + self.assertIsInstance(safe_str(u'foo'), str) def test_when_containing_high_chars(self): - s = u"The quiæk fåx jømps øver the lazy dåg" + s = u'The quiæk fåx jømps øver the lazy dåg' res = safe_str(s) self.assertIsInstance(res, str) @@ -78,6 +78,6 @@ class test_safe_str(TestCase): class O(object): def __repr__(self): - raise KeyError("foo") + raise KeyError('foo') - self.assertIn("<Unrepresentable", safe_str(O())) + self.assertIn('<Unrepresentable', safe_str(O())) diff --git a/kombu/tests/utilities/test_functional.py b/kombu/tests/utilities/test_functional.py index 841a9bda..c84a6854 100644 --- a/kombu/tests/utilities/test_functional.py +++ b/kombu/tests/utilities/test_functional.py @@ -14,11 +14,11 @@ def double(x): class test_promise(TestCase): def test__str__(self): - self.assertEqual(str(promise(lambda: "the quick brown fox")), - "the quick brown fox") + self.assertEqual(str(promise(lambda: 'the quick brown fox')), + 'the quick brown fox') def test__repr__(self): - self.assertEqual(repr(promise(lambda: "fi fa fo")), + self.assertEqual(repr(promise(lambda: 'fi fa fo')), "'fi fa fo'") def test_evaluate(self): diff --git a/kombu/tests/utils.py b/kombu/tests/utils.py index 2a98d3d0..5f743957 100644 --- a/kombu/tests/utils.py +++ b/kombu/tests/utils.py @@ -21,14 +21,14 @@ except AttributeError: class TestCase(unittest.TestCase): - if not hasattr(unittest.TestCase, "assertItemsEqual"): + if not hasattr(unittest.TestCase, 'assertItemsEqual'): assertItemsEqual = unittest.TestCase.assertSameElements class Mock(mock.Mock): def __init__(self, *args, **kwargs): - attrs = kwargs.pop("attrs", None) or {} + attrs = kwargs.pop('attrs', None) or {} super(Mock, self).__init__(*args, **kwargs) for attr_name, attr_value in attrs.items(): setattr(self, attr_name, attr_value) @@ -94,12 +94,12 @@ def mask_modules(*modnames): For example: - >>> @missing_modules("sys"): + >>> @missing_modules('sys'): >>> def foo(): ... try: ... import sys ... except ImportError: - ... print("sys not found") + ... print('sys not found') sys not found >>> import sys @@ -116,7 +116,7 @@ def mask_modules(*modnames): def myimp(name, *args, **kwargs): if name in modnames: - raise ImportError("No module named %s" % name) + raise ImportError('No module named %s' % name) else: return realimport(name, *args, **kwargs) @@ -137,7 +137,7 @@ def skip_if_environ(env_var_name): @wraps(fun) def _skips_if_environ(*args, **kwargs): if os.environ.get(env_var_name): - raise SkipTest("SKIP %s: %s set\n" % ( + raise SkipTest('SKIP %s: %s set\n' % ( fun.__name__, env_var_name)) return fun(*args, **kwargs) @@ -152,7 +152,7 @@ def skip_if_module(module): def _skip_if_module(*args, **kwargs): try: __import__(module) - raise SkipTest("SKIP %s: %s available\n" % ( + raise SkipTest('SKIP %s: %s available\n' % ( fun.__name__, module)) except ImportError: pass @@ -168,7 +168,7 @@ def skip_if_not_module(module): try: __import__(module) except ImportError: - raise SkipTest("SKIP %s: %s available\n" % ( + raise SkipTest('SKIP %s: %s available\n' % ( fun.__name__, module)) return fun(*args, **kwargs) return _skip_if_not_module @@ -176,4 +176,4 @@ def skip_if_not_module(module): def skip_if_quick(fun): - return skip_if_environ("QUICKTEST")(fun) + return skip_if_environ('QUICKTEST')(fun) diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index d4174ccd..05f8e477 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -61,7 +61,7 @@ class Table(Domain): """ item = self.get_queue(queue) if item: - return item, item["id"] + return item, item['id'] id = uuid() return self.new_item(id), id @@ -69,9 +69,9 @@ class Table(Domain): if queue not in self._already_bound: binding, id = self.create_binding(queue) binding.update(exchange=exchange, - routing_key=routing_key or "", - pattern=pattern or "", - queue=queue or "", + routing_key=routing_key or '', + pattern=pattern or '', + queue=queue or '', id=id) binding.save() self._already_bound.add(queue) @@ -86,7 +86,7 @@ class Table(Domain): def exchange_delete(self, exchange): """Delete all routes for `exchange`.""" for item in self.routes_for(exchange): - self.delete_item(item["id"]) + self.delete_item(item['id']) def get_item(self, item_name): """Uses `consistent_read` by default.""" @@ -109,7 +109,7 @@ class Table(Domain): return item def get_exchanges(self): - return list(set(i["exchange"] for i in self.select())) + return list(set(i['exchange'] for i in self.select())) def _get_queue_item(self, queue): return self._try_first("""WHERE queue = '%s' limit 1""" % queue) @@ -117,14 +117,14 @@ class Table(Domain): def _get_queue_id(self, queue): item = self._get_queue_item(queue) if item: - return item["id"] + return item['id'] class Channel(virtual.Channel): Table = Table - default_region = "us-east-1" - domain_format = "kombu%(vhost)s" + default_region = 'us-east-1' + domain_format = 'kombu%(vhost)s' _sdb = None _sqs = None _queue_cache = {} @@ -184,7 +184,7 @@ class Channel(virtual.Channel): """ if self.supports_fanout: - return [(r["routing_key"], r["pattern"], r["queue"]) + return [(r['routing_key'], r['pattern'], r['queue']) for r in self.table.routes_for(exchange)] return super(Channel, self).get_table(exchange) @@ -222,7 +222,7 @@ class Channel(virtual.Channel): def _put_fanout(self, exchange, message, **kwargs): """Deliver fanout message to all queues in ``exchange``.""" for route in self.table.routes_for(exchange): - self._put(route["queue"], message, **kwargs) + self._put(route['queue'], message, **kwargs) def _get(self, queue): """Try to retrieve a single message off ``queue``.""" @@ -234,19 +234,19 @@ class Channel(virtual.Channel): if queue in self._noack_queues: q.delete_message(m) else: - payload["properties"]["delivery_info"].update({ - "sqs_message": m, "sqs_queue": q, }) + payload['properties']['delivery_info'].update({ + 'sqs_message': m, 'sqs_queue': q, }) return payload raise Empty() def basic_ack(self, delivery_tag): delivery_info = self.qos.get(delivery_tag).delivery_info try: - queue = delivery_info["sqs_queue"] + queue = delivery_info['sqs_queue'] except KeyError: pass else: - queue.delete_message(delivery_info["sqs_message"]) + queue.delete_message(delivery_info['sqs_message']) super(Channel, self).basic_ack(delivery_tag) def _size(self, queue): @@ -308,8 +308,8 @@ class Channel(virtual.Channel): @property def table(self): name = self.entity_name(self.domain_format % { - "vhost": self.conninfo.virtual_host}) - d = self.sdb.get_object("CreateDomain", {"DomainName": name}, + 'vhost': self.conninfo.virtual_host}) + d = self.sdb.get_object('CreateDomain', {'DomainName': name}, self.Table) d.name = name return d @@ -324,19 +324,19 @@ class Channel(virtual.Channel): @cached_property def visibility_timeout(self): - return self.transport_options.get("visibility_timeout") + return self.transport_options.get('visibility_timeout') @cached_property def queue_name_prefix(self): - return self.transport_options.get("queue_name_prefix", '') + return self.transport_options.get('queue_name_prefix', '') @cached_property def supports_fanout(self): - return self.transport_options.get("sdb_persistence", False) + return self.transport_options.get('sdb_persistence', False) @cached_property def region(self): - return self.transport_options.get("region") or self.default_region + return self.transport_options.get('region') or self.default_region class Transport(virtual.Transport): diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index 04fd1dcd..7605cd83 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -14,15 +14,15 @@ import sys from kombu.syn import detect_environment -DEFAULT_TRANSPORT = "amqp" +DEFAULT_TRANSPORT = 'amqp' -AMQP_TRANSPORT = "kombu.transport.amqplib.Transport" -AMQP_ALIAS = "librabbitmq" -if detect_environment() == "default": +AMQP_TRANSPORT = 'kombu.transport.amqplib.Transport' +AMQP_ALIAS = 'librabbitmq' +if detect_environment() == 'default': try: import librabbitmq # noqa - AMQP_TRANSPORT = "kombu.transport.librabbitmq.Transport" # noqa - AMQP_ALIAS = "amqp" # noqa + AMQP_TRANSPORT = 'kombu.transport.librabbitmq.Transport' # noqa + AMQP_ALIAS = 'amqp' # noqa except ImportError: pass @@ -33,8 +33,8 @@ def _ghettoq(name, new, alias=None): def __inner(): import warnings _new = callable(xxx) and xxx() or xxx - gtransport = "ghettoq.taproot.%s" % name - ktransport = "kombu.transport.%s.Transport" % _new + gtransport = 'ghettoq.taproot.%s' % name + ktransport = 'kombu.transport.%s.Transport' % _new this = alias or name warnings.warn(""" Ghettoq does not work with Kombu, but there is now a built-in version @@ -48,27 +48,27 @@ def _ghettoq(name, new, alias=None): TRANSPORT_ALIASES = { - "amqp": AMQP_TRANSPORT, - "amqplib": "kombu.transport.amqplib.Transport", - "librabbitmq": "kombu.transport.librabbitmq.Transport", - "pika": "kombu.transport.pika2.Transport", - "oldpika": "kombu.transport.pika.SyncTransport", - "memory": "kombu.transport.memory.Transport", - "redis": "kombu.transport.redis.Transport", - "SQS": "kombu.transport.SQS.Transport", - "sqs": "kombu.transport.SQS.Transport", - "beanstalk": "kombu.transport.beanstalk.Transport", - "mongodb": "kombu.transport.mongodb.Transport", - "couchdb": "kombu.transport.couchdb.Transport", - "zookeeper": "kombu.transport.zookeeper.Transport", - "django": "kombu.transport.django.Transport", - "sqlalchemy": "kombu.transport.sqlalchemy.Transport", - "sqla": "kombu.transport.sqlalchemy.Transport", - "ghettoq.taproot.Redis": _ghettoq("Redis", "redis", "redis"), - "ghettoq.taproot.Database": _ghettoq("Database", "django", "django"), - "ghettoq.taproot.MongoDB": _ghettoq("MongoDB", "mongodb"), - "ghettoq.taproot.Beanstalk": _ghettoq("Beanstalk", "beanstalk"), - "ghettoq.taproot.CouchDB": _ghettoq("CouchDB", "couchdb"), + 'amqp': AMQP_TRANSPORT, + 'amqplib': 'kombu.transport.amqplib.Transport', + 'librabbitmq': 'kombu.transport.librabbitmq.Transport', + 'pika': 'kombu.transport.pika2.Transport', + 'oldpika': 'kombu.transport.pika.SyncTransport', + 'memory': 'kombu.transport.memory.Transport', + 'redis': 'kombu.transport.redis.Transport', + 'SQS': 'kombu.transport.SQS.Transport', + 'sqs': 'kombu.transport.SQS.Transport', + 'beanstalk': 'kombu.transport.beanstalk.Transport', + 'mongodb': 'kombu.transport.mongodb.Transport', + 'couchdb': 'kombu.transport.couchdb.Transport', + 'zookeeper': 'kombu.transport.zookeeper.Transport', + 'django': 'kombu.transport.django.Transport', + 'sqlalchemy': 'kombu.transport.sqlalchemy.Transport', + 'sqla': 'kombu.transport.sqlalchemy.Transport', + 'ghettoq.taproot.Redis': _ghettoq('Redis', 'redis', 'redis'), + 'ghettoq.taproot.Database': _ghettoq('Database', 'django', 'django'), + 'ghettoq.taproot.MongoDB': _ghettoq('MongoDB', 'mongodb'), + 'ghettoq.taproot.Beanstalk': _ghettoq('Beanstalk', 'beanstalk'), + 'ghettoq.taproot.CouchDB': _ghettoq('CouchDB', 'couchdb'), } _transport_cache = {} @@ -78,9 +78,9 @@ def resolve_transport(transport=None): transport = TRANSPORT_ALIASES.get(transport, transport) if callable(transport): transport = transport() - transport_module_name, _, transport_cls_name = transport.rpartition(".") + transport_module_name, _, transport_cls_name = transport.rpartition('.') if not transport_module_name: - raise KeyError("No such transport: %s" % (transport, )) + raise KeyError('No such transport: %s' % (transport, )) return transport_module_name, transport_cls_name diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py index 3ba6d1b7..f87269af 100644 --- a/kombu/transport/amqplib.py +++ b/kombu/transport/amqplib.py @@ -31,12 +31,12 @@ from kombu.utils.amq_manager import get_manager from . import base DEFAULT_PORT = 5672 -HAS_MSG_PEEK = hasattr(socket, "MSG_PEEK") +HAS_MSG_PEEK = hasattr(socket, 'MSG_PEEK') # amqplib's handshake mistakenly identifies as protocol version 1191, # this breaks in RabbitMQ tip, which no longer falls back to # 0-8 for unknown ids. -transport.AMQP_PROTOCOL_HEADER = str_to_bytes("AMQP\x01\x01\x08\x00") +transport.AMQP_PROTOCOL_HEADER = str_to_bytes('AMQP\x01\x01\x08\x00') # - fixes warnings when socket is not connected. @@ -80,8 +80,8 @@ class Connection(amqp.Connection): # pragma: no cover routing_key = args.read_shortstr() exc = AMQPChannelException(reply_code, reply_text, (50, 60)) - if channel.events["basic_return"]: - for callback in channel.events["basic_return"]: + if channel.events['basic_return']: + for callback in channel.events['basic_return']: callback(exc, exchange, routing_key, msg) else: raise exc @@ -129,7 +129,7 @@ class Connection(amqp.Connection): # pragma: no cover return self.method_reader.read_method() except SSLError, exc: # http://bugs.python.org/issue10272 - if "timed out" in str(exc): + if 'timed out' in str(exc): raise socket.timeout() raise finally: @@ -186,17 +186,17 @@ class Message(base.Message): super(Message, self).__init__(channel, body=msg.body, delivery_tag=msg.delivery_tag, - content_type=props.get("content_type"), - content_encoding=props.get("content_encoding"), + content_type=props.get('content_type'), + content_encoding=props.get('content_encoding'), delivery_info=msg.delivery_info, properties=msg.properties, - headers=props.get("application_headers") or {}, + headers=props.get('application_headers') or {}, **kwargs) class Channel(_Channel, base.StdChannel): Message = Message - events = {"basic_return": []} + events = {'basic_return': []} def __init__(self, *args, **kwargs): self.no_ack_consumers = set() @@ -224,7 +224,7 @@ class Channel(_Channel, base.StdChannel): def basic_consume(self, *args, **kwargs): consumer_tag = super(Channel, self).basic_consume(*args, **kwargs) - if kwargs["no_ack"]: + if kwargs['no_ack']: self.no_ack_consumers.add(consumer_tag) return consumer_tag @@ -267,8 +267,8 @@ class Transport(base.Transport): for name, default_value in self.default_connection_params.items(): if not getattr(conninfo, name, None): setattr(conninfo, name, default_value) - if conninfo.hostname == "localhost": - conninfo.hostname = "127.0.0.1" + if conninfo.hostname == 'localhost': + conninfo.hostname = '127.0.0.1' conn = self.Connection(host=conninfo.host, userid=conninfo.userid, password=conninfo.password, @@ -314,9 +314,9 @@ class Transport(base.Transport): @property def default_connection_params(self): - return {"userid": "guest", "password": "guest", - "port": self.default_port, - "hostname": "localhost", "login_method": "AMQPLAIN"} + return {'userid': 'guest', 'password': 'guest', + 'port': self.default_port, + 'hostname': 'localhost', 'login_method': 'AMQPLAIN'} def get_manager(self, *args, **kwargs): return get_manager(self.client, *args, **kwargs) diff --git a/kombu/transport/base.py b/kombu/transport/base.py index 970a1819..8ae44419 100644 --- a/kombu/transport/base.py +++ b/kombu/transport/base.py @@ -15,7 +15,7 @@ from kombu.exceptions import MessageStateError from kombu.serialization import decode from kombu.utils import cached_property -ACKNOWLEDGED_STATES = frozenset(["ACK", "REJECTED", "REQUEUED"]) +ACKNOWLEDGED_STATES = frozenset(['ACK', 'REJECTED', 'REQUEUED']) class StdChannel(object): @@ -30,7 +30,7 @@ class StdChannel(object): return Producer(self, *args, **kwargs) def get_bindings(self): - raise NotImplementedError("%r does not implement list_bindings" % ( + raise NotImplementedError('%r does not implement list_bindings' % ( self.__class__, )) def after_reply_message_received(self, queue): @@ -47,12 +47,12 @@ class StdChannel(object): class Message(object): """Base class for received messages.""" - __slots__ = ("_state", "channel", "delivery_tag", - "content_type", "content_encoding", - "delivery_info", "headers", - "properties", "body", - "_decoded_cache", - "MessageStateError", "__dict__") + __slots__ = ('_state', 'channel', 'delivery_tag', + 'content_type', 'content_encoding', + 'delivery_info', 'headers', + 'properties', 'body', + '_decoded_cache', + 'MessageStateError', '__dict__') MessageStateError = MessageStateError def __init__(self, channel, body=None, delivery_tag=None, @@ -67,10 +67,10 @@ class Message(object): self.headers = headers or {} self.properties = properties or {} self._decoded_cache = None - self._state = "RECEIVED" + self._state = 'RECEIVED' try: - body = decompress(body, self.headers["compression"]) + body = decompress(body, self.headers['compression']) except KeyError: pass if postencode and isinstance(body, unicode): @@ -87,7 +87,7 @@ class Message(object): """ if self.channel.no_ack_consumers is not None: try: - consumer_tag = self.delivery_info["consumer_tag"] + consumer_tag = self.delivery_info['consumer_tag'] except KeyError: pass else: @@ -95,9 +95,9 @@ class Message(object): return if self.acknowledged: raise self.MessageStateError( - "Message already acknowledged with state: %s" % self._state) + 'Message already acknowledged with state: %s' % self._state) self.channel.basic_ack(self.delivery_tag) - self._state = "ACK" + self._state = 'ACK' def ack_log_error(self, logger, errors): try: @@ -124,9 +124,9 @@ class Message(object): """ if self.acknowledged: raise self.MessageStateError( - "Message already acknowledged with state: %s" % self._state) + 'Message already acknowledged with state: %s' % self._state) self.channel.basic_reject(self.delivery_tag, requeue=False) - self._state = "REJECTED" + self._state = 'REJECTED' def requeue(self): """Reject this message and put it back on the queue. @@ -140,9 +140,9 @@ class Message(object): """ if self.acknowledged: raise self.MessageStateError( - "Message already acknowledged with state: %s" % self._state) + 'Message already acknowledged with state: %s' % self._state) self.channel.basic_reject(self.delivery_tag, requeue=True) - self._state = "REQUEUED" + self._state = 'REQUEUED' def decode(self): """Deserialize the message body, returning the original @@ -170,7 +170,7 @@ class Management(object): def get_bindings(self): raise NotImplementedError( - "Your transport does not implement list_bindings") + 'Your transport does not implement list_bindings') class Transport(object): @@ -196,31 +196,31 @@ class Transport(object): #: Type of driver, can be used to separate transports #: using the AMQP protocol (driver_type: 'amqp'), #: Redis (driver_type: 'redis'), etc... - driver_type = "N/A" + driver_type = 'N/A' - #: Name of driver library (e.g. "amqplib", "redis", "beanstalkc"). - driver_name = "N/A" + #: Name of driver library (e.g. 'amqplib', 'redis', 'beanstalkc'). + driver_name = 'N/A' def __init__(self, client, **kwargs): self.client = client def establish_connection(self): - raise NotImplementedError("Subclass responsibility") + raise NotImplementedError('Subclass responsibility') def close_connection(self, connection): - raise NotImplementedError("Subclass responsibility") + raise NotImplementedError('Subclass responsibility') def create_channel(self, connection): - raise NotImplementedError("Subclass responsibility") + raise NotImplementedError('Subclass responsibility') def close_channel(self, connection): - raise NotImplementedError("Subclass responsibility") + raise NotImplementedError('Subclass responsibility') def drain_events(self, connection, **kwargs): - raise NotImplementedError("Subclass responsibility") + raise NotImplementedError('Subclass responsibility') def driver_version(self): - return "N/A" + return 'N/A' def eventmap(self, connection): """Map of fd -> event handler for event based use. diff --git a/kombu/transport/beanstalk.py b/kombu/transport/beanstalk.py index b748104c..6745bc1e 100644 --- a/kombu/transport/beanstalk.py +++ b/kombu/transport/beanstalk.py @@ -22,7 +22,7 @@ from . import virtual DEFAULT_PORT = 11300 -__author__ = "David Ziegler <david.ziegler@gmail.com>" +__author__ = 'David Ziegler <david.ziegler@gmail.com>' class Channel(virtual.Channel): @@ -33,7 +33,7 @@ class Channel(virtual.Channel): if job: try: item = loads(job.body) - dest = job.stats()["tube"] + dest = job.stats()['tube'] except Exception: job.bury() else: @@ -44,10 +44,10 @@ class Channel(virtual.Channel): def _put(self, queue, message, **kwargs): extra = {} - priority = message["properties"]["delivery_info"]["priority"] - ttr = message["properties"].get("ttr") + priority = message['properties']['delivery_info']['priority'] + ttr = message['properties'].get('ttr') if ttr is not None: - extra["ttr"] = ttr + extra['ttr'] = ttr self.client.use(queue) self.client.put(dumps(message), priority=priority, **extra) @@ -135,8 +135,8 @@ class Transport(virtual.Transport): IOError, beanstalkc.SocketError, beanstalkc.BeanstalkcException) - driver_type = "beanstalk" - driver_name = "beanstalkc" + driver_type = 'beanstalk' + driver_name = 'beanstalkc' def driver_version(self): return beanstalkc.__version__ diff --git a/kombu/transport/couchdb.py b/kombu/transport/couchdb.py index 8a342444..d0c672b1 100644 --- a/kombu/transport/couchdb.py +++ b/kombu/transport/couchdb.py @@ -23,15 +23,15 @@ from kombu.utils import uuid4 from . import virtual DEFAULT_PORT = 5984 -DEFAULT_DATABASE = "kombu_default" +DEFAULT_DATABASE = 'kombu_default' -__author__ = "David Clymer <david@zettazebra.com>" +__author__ = 'David Clymer <david@zettazebra.com>' def create_message_view(db): from couchdb import design - view = design.ViewDefinition("kombu", "messages", """ + view = design.ViewDefinition('kombu', 'messages', """ function (doc) { if (doc.queue && doc.payload) emit(doc.queue, doc); @@ -58,7 +58,7 @@ class Channel(virtual.Channel): item = result.rows[0].value self.client.delete(item) - return loads(item["payload"]) + return loads(item['payload']) def _purge(self, queue): result = self._query(queue) @@ -72,8 +72,8 @@ class Channel(virtual.Channel): def _open(self): conninfo = self.connection.client dbname = conninfo.virtual_host - proto = conninfo.ssl and "https" or "http" - if not dbname or dbname == "/": + proto = conninfo.ssl and 'https' or 'http' + if not dbname or dbname == '/': dbname = DEFAULT_DATABASE port = conninfo.port or DEFAULT_PORT server = couchdb.Server('%s://%s:%s/' % (proto, @@ -94,7 +94,7 @@ class Channel(virtual.Channel): # if the message view is not yet set up, we'll need it now. create_message_view(self.client) self.view_created = True - return self.client.view("kombu/messages", key=queue, **kwargs) + return self.client.view('kombu/messages', key=queue, **kwargs) @property def client(self): @@ -118,8 +118,8 @@ class Transport(virtual.Transport): couchdb.PreconditionFailed, couchdb.ResourceConflict, couchdb.ResourceNotFound) - driver_type = "couchdb" - driver_name = "couchdb" + driver_type = 'couchdb' + driver_name = 'couchdb' def driver_version(self): return couchdb.__version__ diff --git a/kombu/transport/django/__init__.py b/kombu/transport/django/__init__.py index 88e0d4a5..b277432b 100644 --- a/kombu/transport/django/__init__.py +++ b/kombu/transport/django/__init__.py @@ -14,10 +14,10 @@ from kombu.exceptions import StdChannelError from .models import Queue VERSION = (1, 0, 0) -__version__ = ".".join(map(str, VERSION)) +__version__ = '.'.join(map(str, VERSION)) -POLLING_INTERVAL = getattr(settings, "KOMBU_POLLING_INTERVAL", - getattr(settings, "DJKOMBU_POLLING_INTERVAL", 5.0)) +POLLING_INTERVAL = getattr(settings, 'KOMBU_POLLING_INTERVAL', + getattr(settings, 'DJKOMBU_POLLING_INTERVAL', 5.0)) class Channel(virtual.Channel): @@ -31,7 +31,7 @@ class Channel(virtual.Channel): def basic_consume(self, queue, *args, **kwargs): qinfo = self.state.bindings[queue] exchange = qinfo[0] - if self.typeof(exchange).type == "fanout": + if self.typeof(exchange).type == 'fanout': return super(Channel, self).basic_consume(queue, *args, **kwargs) @@ -62,8 +62,8 @@ class Transport(virtual.Transport): channel_errors = (StdChannelError, errors.ObjectDoesNotExist, errors.MultipleObjectsReturned) - driver_type = "sql" - driver_name = "django" + driver_type = 'sql' + driver_name = 'django' def driver_version(self): import django diff --git a/kombu/transport/django/management/commands/clean_kombu_messages.py b/kombu/transport/django/management/commands/clean_kombu_messages.py index 05eb59c1..5facee96 100644 --- a/kombu/transport/django/management/commands/clean_kombu_messages.py +++ b/kombu/transport/django/management/commands/clean_kombu_messages.py @@ -17,6 +17,6 @@ class Command(BaseCommand): count = Message.objects.filter(visible=False).count() - print("Removing %s invisible %s... " % ( - count, pluralize("message", count))) + print('Removing %s invisible %s... ' % ( + count, pluralize('message', count))) Message.objects.cleanup() diff --git a/kombu/transport/django/managers.py b/kombu/transport/django/managers.py index ea96d5ce..162f6544 100644 --- a/kombu/transport/django/managers.py +++ b/kombu/transport/django/managers.py @@ -69,7 +69,7 @@ class MessageManager(models.Manager): def cleanup(self): cursor = self.connection_for_write().cursor() try: - cursor.execute("DELETE FROM %s WHERE visible=%%s" % ( + cursor.execute('DELETE FROM %s WHERE visible=%%s' % ( self.model._meta.db_table, ), (False, )) except: transaction.rollback_unless_managed() diff --git a/kombu/transport/django/models.py b/kombu/transport/django/models.py index ef6984f2..f6af9cb6 100644 --- a/kombu/transport/django/models.py +++ b/kombu/transport/django/models.py @@ -7,26 +7,26 @@ from .managers import QueueManager, MessageManager class Queue(models.Model): - name = models.CharField(_("name"), max_length=200, unique=True) + name = models.CharField(_('name'), max_length=200, unique=True) objects = QueueManager() class Meta: - db_table = "djkombu_queue" - verbose_name = _("queue") - verbose_name_plural = _("queues") + db_table = 'djkombu_queue' + verbose_name = _('queue') + verbose_name_plural = _('queues') class Message(models.Model): visible = models.BooleanField(default=True, db_index=True) sent_at = models.DateTimeField(null=True, blank=True, db_index=True, auto_now_add=True) - payload = models.TextField(_("payload"), null=False) - queue = models.ForeignKey(Queue, related_name="messages") + payload = models.TextField(_('payload'), null=False) + queue = models.ForeignKey(Queue, related_name='messages') objects = MessageManager() class Meta: - db_table = "djkombu_message" - verbose_name = _("message") - verbose_name_plural = _("messages") + db_table = 'djkombu_message' + verbose_name = _('message') + verbose_name_plural = _('messages') diff --git a/kombu/transport/librabbitmq.py b/kombu/transport/librabbitmq.py index 48a2a3e6..e0d349b7 100644 --- a/kombu/transport/librabbitmq.py +++ b/kombu/transport/librabbitmq.py @@ -36,10 +36,10 @@ class Message(base.Message): body=body, delivery_info=info, properties=props, - delivery_tag=info["delivery_tag"], - content_type=props["content_type"], - content_encoding=props["content_encoding"], - headers=props.get("headers")) + delivery_tag=info['delivery_tag'], + content_type=props['content_type'], + content_encoding=props['content_encoding'], + headers=props.get('headers')) class Channel(amqp.Channel, base.StdChannel): @@ -50,10 +50,10 @@ class Channel(amqp.Channel, base.StdChannel): properties=None): """Encapsulate data into a AMQP message.""" properties = properties if properties is not None else {} - properties.update({"content_type": content_type, - "content_encoding": content_encoding, - "headers": headers, - "priority": priority}) + properties.update({'content_type': content_type, + 'content_encoding': content_encoding, + 'headers': headers, + 'priority': priority}) return body, properties @@ -71,14 +71,14 @@ class Transport(base.Transport): IOError, OSError) channel_errors = (StdChannelError, ChannelError, ) - driver_type = "amqp" - driver_name = "librabbitmq" + driver_type = 'amqp' + driver_name = 'librabbitmq' nb_keep_draining = True def __init__(self, client, **kwargs): self.client = client - self.default_port = kwargs.get("default_port") or self.default_port + self.default_port = kwargs.get('default_port') or self.default_port def driver_version(self): return amqp.__version__ @@ -125,6 +125,6 @@ class Transport(base.Transport): @property def default_connection_params(self): - return {"userid": "guest", "password": "guest", - "port": self.default_port, - "hostname": "localhost", "login_method": "AMQPLAIN"} + return {'userid': 'guest', 'password': 'guest', + 'port': self.default_port, + 'hostname': 'localhost', 'login_method': 'AMQPLAIN'} diff --git a/kombu/transport/memory.py b/kombu/transport/memory.py index 9c5f71f8..b3ed61cf 100644 --- a/kombu/transport/memory.py +++ b/kombu/transport/memory.py @@ -59,8 +59,8 @@ class Transport(virtual.Transport): #: memory backend state is global. state = virtual.BrokerState() - driver_type = "memory" - driver_name = "memory" + driver_type = 'memory' + driver_name = 'memory' def driver_version(self): - return "N/A" + return 'N/A' diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py index c4283d74..e8342425 100644 --- a/kombu/transport/mongodb.py +++ b/kombu/transport/mongodb.py @@ -22,7 +22,7 @@ from kombu.exceptions import StdChannelError from . import virtual -DEFAULT_HOST = "127.0.0.1" +DEFAULT_HOST = '127.0.0.1' DEFAULT_PORT = 27017 __author__ = """\ @@ -51,33 +51,33 @@ class Channel(virtual.Channel): if queue in self._fanout_queues: msg = self._queue_cursors[queue].next() self._queue_readcounts[queue] += 1 - return loads(msg["payload"]) + return loads(msg['payload']) else: - msg = self.client.command("findandmodify", "messages", - query={"queue": queue}, - sort={"_id": pymongo.ASCENDING}, remove=True) + msg = self.client.command('findandmodify', 'messages', + query={'queue': queue}, + sort={'_id': pymongo.ASCENDING}, remove=True) except errors.OperationFailure, exc: - if "No matching object found" in exc.args[0]: + if 'No matching object found' in exc.args[0]: raise Empty() raise except StopIteration: raise Empty() # as of mongo 2.0 empty results won't raise an error - if msg["value"] is None: + if msg['value'] is None: raise Empty() - return loads(msg["value"]["payload"]) + return loads(msg['value']['payload']) def _size(self, queue): if queue in self._fanout_queues: return (self._queue_cursors[queue].count() - self._queue_readcounts[queue]) - return self.client.messages.find({"queue": queue}).count() + return self.client.messages.find({'queue': queue}).count() def _put(self, queue, message, **kwargs): - self.client.messages.insert({"payload": dumps(message), - "queue": queue}) + self.client.messages.insert({'payload': dumps(message), + 'queue': queue}) def _purge(self, queue): size = self._size(queue) @@ -86,7 +86,7 @@ class Channel(virtual.Channel): cursor.rewind() self._queue_cursors[queue] = cursor.skip(cursor.count()) else: - self.client.messages.remove({"queue": queue}) + self.client.messages.remove({'queue': queue}) return size def close(self): @@ -107,18 +107,18 @@ class Channel(virtual.Channel): if not conninfo.hostname: conninfo.hostname = DEFAULT_HOST - for part in conninfo.hostname.split("/"): + for part in conninfo.hostname.split('/'): if not hostname: - hostname = "mongodb://" + part + hostname = 'mongodb://' + part continue dbname = part - if "?" in part: + if '?' in part: # In case someone is passing options # to the mongodb connection. Right now # it is not permitted by kombu - dbname, options = part.split("?") - hostname += "/?" + options + dbname, options = part.split('?') + hostname += '/?' + options # At this point we expect the hostname to be something like # (considering replica set form too): @@ -126,14 +126,14 @@ class Channel(virtual.Channel): # mongodb://[username:password@]host1[:port1][,host2[:port2], # ...[,hostN[:portN]]][/[?options]] mongoconn = Connection(host=hostname) - version = mongoconn.server_info()["version"] - if tuple(map(int, version.split(".")[:2])) < (1, 3): + version = mongoconn.server_info()['version'] + if tuple(map(int, version.split('.')[:2])) < (1, 3): raise NotImplementedError( - "Kombu requires MongoDB version 1.3+, but connected to %s" % ( + 'Kombu requires MongoDB version 1.3+, but connected to %s' % ( version, )) - if not dbname or dbname == "/": - dbname = "kombu_default" + if not dbname or dbname == '/': + dbname = 'kombu_default' database = getattr(mongoconn, dbname) @@ -143,56 +143,56 @@ class Channel(virtual.Channel): self.db = database col = database.messages - col.ensure_index([("queue", 1)]) + col.ensure_index([('queue', 1)]) - if "messages.broadcast" not in database.collection_names(): + if 'messages.broadcast' not in database.collection_names(): capsize = conninfo.transport_options.get( - "capped_queue_size") or 100000 - database.create_collection("messages.broadcast", size=capsize, + 'capped_queue_size') or 100000 + database.create_collection('messages.broadcast', size=capsize, capped=True) - self.bcast = getattr(database, "messages.broadcast") - self.bcast.ensure_index([("queue", 1)]) + self.bcast = getattr(database, 'messages.broadcast') + self.bcast.ensure_index([('queue', 1)]) - self.routing = getattr(database, "messages.routing") - self.routing.ensure_index([("queue", 1), ("exchange", 1)]) + self.routing = getattr(database, 'messages.routing') + self.routing.ensure_index([('queue', 1), ('exchange', 1)]) return database #TODO: Store a more complete exchange metatable in the routing collection def get_table(self, exchange): """Get table of bindings for ``exchange``.""" brokerRoutes = self.client.messages.routing.find({ - "exchange": exchange}) + 'exchange': exchange}) - localRoutes = self.state.exchanges[exchange]["table"] + localRoutes = self.state.exchanges[exchange]['table'] for route in brokerRoutes: - localRoutes.append((route["routing_key"], - route["pattern"], - route["queue"])) + localRoutes.append((route['routing_key'], + route['pattern'], + route['queue'])) return set(localRoutes) def _put_fanout(self, exchange, message, **kwargs): """Deliver fanout message.""" - self.client.messages.broadcast.insert({"payload": dumps(message), - "queue": exchange}) + self.client.messages.broadcast.insert({'payload': dumps(message), + 'queue': exchange}) def _queue_bind(self, exchange, routing_key, pattern, queue): - if self.typeof(exchange).type == "fanout": - cursor = self.bcast.find(query={"queue": exchange}, - sort=[("$natural", 1)], tailable=True) + if self.typeof(exchange).type == 'fanout': + cursor = self.bcast.find(query={'queue': exchange}, + sort=[('$natural', 1)], tailable=True) # Fast forward the cursor past old events self._queue_cursors[queue] = cursor.skip(cursor.count()) self._queue_readcounts[queue] = cursor.count() self._fanout_queues[queue] = exchange - meta = {"exchange": exchange, - "queue": queue, - "routing_key": routing_key, - "pattern": pattern} + meta = {'exchange': exchange, + 'queue': queue, + 'routing_key': routing_key, + 'pattern': pattern} self.client.messages.routing.update(meta, meta, upsert=True) def queue_delete(self, queue, **kwargs): - self.routing.remove({"queue": queue}) + self.routing.remove({'queue': queue}) super(Channel, self).queue_delete(queue, **kwargs) if queue in self._fanout_queues: self._queue_cursors[queue].close() @@ -215,8 +215,8 @@ class Transport(virtual.Transport): channel_errors = (StdChannelError, errors.ConnectionFailure, errors.OperationFailure, ) - driver_type = "mongodb" - driver_name = "pymongo" + driver_type = 'mongodb' + driver_name = 'pymongo' def driver_version(self): return pymongo.version diff --git a/kombu/transport/pika.py b/kombu/transport/pika.py index 441848c5..a8ba69e5 100644 --- a/kombu/transport/pika.py +++ b/kombu/transport/pika.py @@ -22,7 +22,7 @@ from pika import channel # must be here to raise import error try: from pika import asyncore_adapter except ImportError: - raise VersionMismatch("Kombu only works with pika version 0.5.2") + raise VersionMismatch('Kombu only works with pika version 0.5.2') from pika import blocking_adapter from pika import connection from pika import exceptions @@ -32,11 +32,11 @@ from pika.spec import Basic, BasicProperties DEFAULT_PORT = 5672 -BASIC_PROPERTIES = ("content_type", "content_encoding", - "headers", "delivery_mode", "priority", - "correlation_id", "reply_to", "expiration", - "message_id", "timestamp", "type", "user_id", - "app_id", "cluster_id") +BASIC_PROPERTIES = ('content_type', 'content_encoding', + 'headers', 'delivery_mode', 'priority', + 'correlation_id', 'reply_to', 'expiration', + 'message_id', 'timestamp', 'type', 'user_id', + 'app_id', 'cluster_id') class Message(base.Message): @@ -46,14 +46,14 @@ class Message(base.Message): propdict = dict(zip(BASIC_PROPERTIES, attrgetter(*BASIC_PROPERTIES)(props))) - kwargs.update({"body": body, - "delivery_tag": method.delivery_tag, - "content_type": props.content_type, - "content_encoding": props.content_encoding, - "headers": props.headers, - "properties": propdict, - "delivery_info": dict( - consumer_tag=getattr(method, "consumer_tag", None), + kwargs.update({'body': body, + 'delivery_tag': method.delivery_tag, + 'content_type': props.content_type, + 'content_encoding': props.content_encoding, + 'headers': props.headers, + 'properties': propdict, + 'delivery_info': dict( + consumer_tag=getattr(method, 'consumer_tag', None), routing_key=method.routing_key, delivery_tag=method.delivery_tag, redelivered=method.redelivered, @@ -129,8 +129,8 @@ class Channel(channel.Channel, base.StdChannel): def close(self): super(Channel, self).close() - if getattr(self, "handler", None): - if getattr(self.handler, "connection", None): + if getattr(self, 'handler', None): + if getattr(self.handler, 'connection', None): self.handler.connection.channels.pop( self.handler.channel_number, None) self.handler.connection = None @@ -181,7 +181,7 @@ class AsyncoreConnection(asyncore_adapter.AsyncoreConnection): current_events = self._event_counter self.drain_events(timeout=timeout) if timeout and self._event_counter <= current_events: - raise socket.timeout("timed out") + raise socket.timeout('timed out') def on_data_available(self, buf): self._event_counter += 1 @@ -212,12 +212,12 @@ class SyncTransport(base.Transport): exceptions.DuplicateConsumerTag, exceptions.UnknownConsumerTag, exceptions.ProtocolSyntaxError) - driver_type = "amqp" - driver_name = "pika" + driver_type = 'amqp' + driver_name = 'pika' def __init__(self, client, **kwargs): self.client = client - self.default_port = kwargs.get("default_port", self.default_port) + self.default_port = kwargs.get('default_port', self.default_port) def driver_version(self): import pika @@ -249,8 +249,8 @@ class SyncTransport(base.Transport): @property def default_connection_params(self): - return {"hostname": "localhost", "port": self.default_port, - "userid": "guest", "password": "guest"} + return {'hostname': 'localhost', 'port': self.default_port, + 'userid': 'guest', 'password': 'guest'} class AsyncoreTransport(SyncTransport): diff --git a/kombu/transport/pika2.py b/kombu/transport/pika2.py index 864ea300..bf1ab721 100644 --- a/kombu/transport/pika2.py +++ b/kombu/transport/pika2.py @@ -25,11 +25,11 @@ from pika.adapters import blocking_connection as blocking from pika import exceptions DEFAULT_PORT = 5672 -BASIC_PROPERTIES = ("content_type", "content_encoding", - "headers", "delivery_mode", "priority", - "correlation_id", "reply_to", "expiration", - "message_id", "timestamp", "type", "user_id", - "app_id", "cluster_id") +BASIC_PROPERTIES = ('content_type', 'content_encoding', + 'headers', 'delivery_mode', 'priority', + 'correlation_id', 'reply_to', 'expiration', + 'message_id', 'timestamp', 'type', 'user_id', + 'app_id', 'cluster_id') class Message(base.Message): @@ -39,14 +39,14 @@ class Message(base.Message): propdict = dict(zip(BASIC_PROPERTIES, attrgetter(*BASIC_PROPERTIES)(props))) - kwargs.update({"body": body, - "delivery_tag": method.delivery_tag, - "content_type": props.content_type, - "content_encoding": props.content_encoding, - "headers": props.headers, - "properties": propdict, - "delivery_info": dict( - consumer_tag=getattr(method, "consumer_tag", None), + kwargs.update({'body': body, + 'delivery_tag': method.delivery_tag, + 'content_type': props.content_type, + 'content_encoding': props.content_encoding, + 'headers': props.headers, + 'properties': propdict, + 'delivery_info': dict( + consumer_tag=getattr(method, 'consumer_tag', None), routing_key=method.routing_key, delivery_tag=method.delivery_tag, redelivered=method.redelivered, @@ -124,8 +124,8 @@ class Channel(blocking.BlockingChannel, base.StdChannel): def close(self, *args): super(Channel, self).close(*args) self.connection = None - if getattr(self, "handler", None): - if getattr(self.handler, "connection", None): + if getattr(self, 'handler', None): + if getattr(self.handler, 'connection', None): self.handler.connection.channels.pop( self.handler.channel_number, None) self.handler.connection = None @@ -169,8 +169,8 @@ class Connection(blocking.BlockingConnection): super(Connection, self).close(*args) -AuthenticationError = getattr(exceptions, "AuthenticationError", - getattr(exceptions, "LoginError")) +AuthenticationError = getattr(exceptions, 'AuthenticationError', + getattr(exceptions, 'LoginError')) class Transport(base.Transport): @@ -192,12 +192,12 @@ class Transport(base.Transport): exceptions.DuplicateConsumerTag, exceptions.UnknownConsumerTag, exceptions.ProtocolSyntaxError) - driver_type = "amqp" - driver_name = "pika" + driver_type = 'amqp' + driver_name = 'pika' def __init__(self, client, **kwargs): self.client = client - self.default_port = kwargs.get("default_port", self.default_port) + self.default_port = kwargs.get('default_port', self.default_port) def driver_version(self): return pika.__version__ @@ -231,5 +231,5 @@ class Transport(base.Transport): @property def default_connection_params(self): - return {"hostname": "localhost", "port": self.default_port, - "userid": "guest", "password": "guest"} + return {'hostname': 'localhost', 'port': self.default_port, + 'userid': 'guest', 'password': 'guest'} diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 6786f683..9125c55e 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -36,7 +36,7 @@ except ImportError: from . import virtual -logger = get_logger("kombu.transport.redis") +logger = get_logger('kombu.transport.redis') DEFAULT_PORT = 6379 DEFAULT_DB = 0 @@ -67,7 +67,7 @@ class QoS(virtual.QoS): def append(self, message, delivery_tag): delivery = message.delivery_info - EX, RK = delivery["exchange"], delivery["routing_key"] + EX, RK = delivery['exchange'], delivery['routing_key'] self.client.pipeline() \ .zadd(self.unacked_index_key, delivery_tag, time()) \ .hset(self.unacked_key, delivery_tag, @@ -173,7 +173,7 @@ class MultiChannelPoller(object): def _register_BRPOP(self, channel): """enable BRPOP mode for channel.""" - ident = channel, channel.client, "BRPOP" + ident = channel, channel.client, 'BRPOP' if channel.client.connection._sock is None or \ ident not in self._chan_to_sock: channel._in_poll = False @@ -186,7 +186,7 @@ class MultiChannelPoller(object): """enable LISTEN mode for channel.""" if channel.subclient.connection._sock is None: channel._in_listen = False - self._register(channel, channel.subclient, "LISTEN") + self._register(channel, channel.subclient, 'LISTEN') if not channel._in_listen: channel._subscribe() # send SUBSCRIBE @@ -239,21 +239,21 @@ class Channel(virtual.Channel): _client = None _subclient = None supports_fanout = True - keyprefix_queue = "_kombu.binding.%s" + keyprefix_queue = '_kombu.binding.%s' sep = '\x06\x16' _in_poll = False _in_listen = False _fanout_queues = {} - unacked_key = "unacked" - unacked_index_key = "unacked_index" + unacked_key = 'unacked' + unacked_index_key = 'unacked_index' visibility_timeout = 3600 # 1 hour priority_steps = PRIORITY_STEPS from_transport_options = (virtual.Channel.from_transport_options - + ("unacked_key", - "unacked_index_key", - "visibility_timeout", - "priority_steps")) + + ('unacked_key', + 'unacked_index_key', + 'visibility_timeout', + 'priority_steps')) def __init__(self, *args, **kwargs): super_ = super(Channel, self) @@ -265,7 +265,7 @@ class Channel(virtual.Channel): self.active_fanout_queues = set() self.auto_delete_queues = set() self._fanout_to_queue = {} - self.handlers = {"BRPOP": self._brpop_read, "LISTEN": self._receive} + self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive} # Evaluate connection. self.client.info() @@ -278,13 +278,13 @@ class Channel(virtual.Channel): def _do_restore_message(self, payload, exchange, routing_key): try: try: - payload["headers"]["redelivered"] = True + payload['headers']['redelivered'] = True except KeyError: pass for queue in self._lookup(exchange, routing_key): self._avail_client.lpush(queue, dumps(payload)) except Exception: - logger.critical("Could not restore message: %r", payload, + logger.critical('Could not restore message: %r', payload, exc_info=True) def _restore(self, message, payload=None): @@ -335,14 +335,14 @@ class Channel(virtual.Channel): self._in_listen = True def _handle_message(self, client, r): - if r[0] == "unsubscribe" and r[2] == 0: + if r[0] == 'unsubscribe' and r[2] == 0: client.subscribed = False - elif r[0] == "pmessage": - return {"type": r[0], "pattern": r[1], - "channel": r[2], "data": r[3]} + elif r[0] == 'pmessage': + return {'type': r[0], 'pattern': r[1], + 'channel': r[2], 'data': r[3]} else: - return {"type": r[0], "pattern": None, - "channel": r[1], "data": r[2]} + return {'type': r[0], 'pattern': None, + 'channel': r[1], 'data': r[2]} def _receive(self): c = self.subclient @@ -353,9 +353,9 @@ class Channel(virtual.Channel): self._in_listen = False if response is not None: payload = self._handle_message(c, response) - if payload["type"] == "message": - return (loads(payload["data"]), - self._fanout_to_queue[payload["channel"]]) + if payload['type'] == 'message': + return (loads(payload['data']), + self._fanout_to_queue[payload['channel']]) raise Empty() def _brpop_start(self, timeout=1): @@ -364,14 +364,14 @@ class Channel(virtual.Channel): return keys = [self._q_for_pri(queue, pri) for pri in PRIORITY_STEPS for queue in queues] + [timeout or 0] - self.client.connection.send_command("BRPOP", *keys) + self.client.connection.send_command('BRPOP', *keys) self._in_poll = True def _brpop_read(self, **options): try: try: dest__item = self.client.parse_response(self.client.connection, - "BRPOP", + 'BRPOP', **options) except self.connection_errors: # if there's a ConnectionError, disconnect so the next @@ -419,7 +419,7 @@ class Channel(virtual.Channel): """Deliver message.""" try: pri = max(min(int( - message["properties"]["delivery_info"]["priority"]), 9), 0) + message['properties']['delivery_info']['priority']), 9), 0) except (TypeError, ValueError, KeyError): pri = 0 self._avail_client.lpush(self._q_for_pri(queue, pri), dumps(message)) @@ -433,20 +433,20 @@ class Channel(virtual.Channel): self.auto_delete_queues.add(queue) def _queue_bind(self, exchange, routing_key, pattern, queue): - if self.typeof(exchange).type == "fanout": + if self.typeof(exchange).type == 'fanout': # Mark exchange as fanout. self._fanout_queues[queue] = exchange self._avail_client.sadd(self.keyprefix_queue % (exchange, ), - self.sep.join([routing_key or "", - pattern or "", - queue or ""])) + self.sep.join([routing_key or '', + pattern or '', + queue or ''])) def _delete(self, queue, exchange, routing_key, pattern, *args): self.auto_delete_queues.discard(queue) self._avail_client.srem(self.keyprefix_queue % (exchange, ), - self.sep.join([routing_key or "", - pattern or "", - queue or ""])) + self.sep.join([routing_key or '', + pattern or '', + queue or ''])) cmds = self._avail_client.pipeline() for pri in PRIORITY_STEPS: cmds = cmds.delete(self._q_for_pri(queue, pri)) @@ -463,7 +463,7 @@ class Channel(virtual.Channel): values = self.client.smembers(key) if not values: raise InconsistencyError( - "Queue list empty or key does not exist: %r" % ( + 'Queue list empty or key does not exist: %r' % ( self.keyprefix_queue % exchange)) return [tuple(val.split(self.sep)) for val in values] @@ -486,7 +486,7 @@ class Channel(virtual.Channel): self.queue_delete(queue) # Close connections - for attr in "client", "subclient": + for attr in 'client', 'subclient': try: self.__dict__[attr].connection.disconnect() except (KeyError, AttributeError, self.ResponseError): @@ -497,28 +497,28 @@ class Channel(virtual.Channel): conninfo = self.connection.client database = conninfo.virtual_host if not isinstance(database, int): - if not database or database == "/": + if not database or database == '/': database = DEFAULT_DB - elif database.startswith("/"): + elif database.startswith('/'): database = database[1:] try: database = int(database) except ValueError: raise ValueError( - "Database name must be int between 0 and limit - 1") + 'Database name must be int between 0 and limit - 1') - return self.Client(host=conninfo.hostname or "127.0.0.1", + return self.Client(host=conninfo.hostname or '127.0.0.1', port=conninfo.port or DEFAULT_PORT, db=database, password=conninfo.password) def _get_client(self): - version = getattr(redis, "__version__", (0, 0, 0)) - version = tuple(map(int, version.split("."))) + version = getattr(redis, '__version__', (0, 0, 0)) + version = tuple(map(int, version.split('.'))) if version < (2, 4, 4): raise VersionMismatch( - "Redis transport requires redis-py versions 2.4.4 or later. " - "You have %r" % (".".join(map(str_t, version)), )) + 'Redis transport requires redis-py versions 2.4.4 or later. ' + 'You have %r' % ('.'.join(map(str_t, version)), )) # KombuRedis maintains a connection attribute on it's instance and # uses that when executing commands @@ -565,7 +565,7 @@ class Channel(virtual.Channel): client = self._create_client() pubsub = client.pubsub() pool = pubsub.connection_pool - pubsub.connection = pool.get_connection("pubsub", pubsub.shard_hint) + pubsub.connection = pool.get_connection('pubsub', pubsub.shard_hint) return pubsub def _update_cycle(self): @@ -599,8 +599,8 @@ class Transport(virtual.Transport): polling_interval = None # disable sleep between unsuccessful polls. default_port = DEFAULT_PORT - driver_type = "redis" - driver_name = "redis" + driver_type = 'redis' + driver_name = 'redis' def __init__(self, *args, **kwargs): super(Transport, self).__init__(*args, **kwargs) @@ -639,7 +639,7 @@ class Transport(virtual.Transport): """Utility to import redis-py's exceptions at runtime.""" from redis import exceptions # This exception suddenly changed name between redis-py versions - if hasattr(exceptions, "InvalidData"): + if hasattr(exceptions, 'InvalidData'): DataError = exceptions.InvalidData else: DataError = exceptions.DataError diff --git a/kombu/transport/sqlalchemy/__init__.py b/kombu/transport/sqlalchemy/__init__.py index 921c8374..2c461c8a 100644 --- a/kombu/transport/sqlalchemy/__init__.py +++ b/kombu/transport/sqlalchemy/__init__.py @@ -14,7 +14,7 @@ from .models import Queue, Message, metadata VERSION = (1, 1, 0) -__version__ = ".".join(map(str, VERSION)) +__version__ = '.'.join(map(str, VERSION)) class Channel(virtual.Channel): @@ -110,8 +110,8 @@ class Transport(virtual.Transport): default_port = 0 connection_errors = () channel_errors = (StdChannelError, ) - driver_type = "sql" - driver_name = "sqlalchemy" + driver_type = 'sql' + driver_name = 'sqlalchemy' def driver_version(self): import sqlalchemy diff --git a/kombu/transport/sqlalchemy/models.py b/kombu/transport/sqlalchemy/models.py index 8c4ca857..7437ea76 100644 --- a/kombu/transport/sqlalchemy/models.py +++ b/kombu/transport/sqlalchemy/models.py @@ -11,43 +11,43 @@ ModelBase = declarative_base(metadata=metadata) class Queue(ModelBase): - __tablename__ = "kombu_queue" - __table_args__ = {"sqlite_autoincrement": True, 'mysql_engine': 'InnoDB'} + __tablename__ = 'kombu_queue' + __table_args__ = {'sqlite_autoincrement': True, 'mysql_engine': 'InnoDB'} - id = Column(Integer, Sequence("queue_id_sequence"), primary_key=True, + id = Column(Integer, Sequence('queue_id_sequence'), primary_key=True, autoincrement=True) name = Column(String(200), unique=True) - messages = relation("Message", backref="queue", lazy="noload") + messages = relation('Message', backref='queue', lazy='noload') def __init__(self, name): self.name = name def __str__(self): - return "<Queue(%s)>" % (self.name) + return '<Queue(%s)>' % (self.name) class Message(ModelBase): - __tablename__ = "kombu_message" - __table_args__ = {"sqlite_autoincrement": True, 'mysql_engine': 'InnoDB'} + __tablename__ = 'kombu_message' + __table_args__ = {'sqlite_autoincrement': True, 'mysql_engine': 'InnoDB'} - id = Column(Integer, Sequence("message_id_sequence"), primary_key=True, + id = Column(Integer, Sequence('message_id_sequence'), primary_key=True, autoincrement=True) visible = Column(Boolean, default=True, index=True) sent_at = Column('timestamp', DateTime, nullable=True, index=True, onupdate=datetime.datetime.now) payload = Column(Text, nullable=False) - queue_id = Column(Integer, ForeignKey("kombu_queue.id", - name="FK_kombu_message_queue")) + queue_id = Column(Integer, ForeignKey('kombu_queue.id', + name='FK_kombu_message_queue')) version = Column(SmallInteger, nullable=False, default=1) - __mapper_args__ = {"version_id_col": version} + __mapper_args__ = {'version_id_col': version} def __init__(self, payload, queue): self.payload = payload self.queue = queue def __str__(self): - return "<Message(%s, %s, %s, %s)>" % (self.visible, + return '<Message(%s, %s, %s, %s)>' % (self.visible, self.sent_at, self.payload, self.queue_id) diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 011dab6a..22d345af 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -181,19 +181,19 @@ class QoS(object): if not self.restore_at_shutdown: return - elif not self.channel.do_restore or getattr(state, "restored", None): + elif not self.channel.do_restore or getattr(state, 'restored', None): assert not state return try: if state: - say("Restoring %r unacknowledged message(s).", + say('Restoring %r unacknowledged message(s).', len(self._delivered)) unrestored = self.restore_unacked() if unrestored: errors, messages = zip(*unrestored) - say("UNABLE TO RESTORE %s MESSAGES: %s", + say('UNABLE TO RESTORE %s MESSAGES: %s', len(errors), errors) emergency_dump_state(messages) finally: @@ -204,32 +204,32 @@ class Message(base.Message): def __init__(self, channel, payload, **kwargs): self._raw = payload - properties = payload["properties"] - body = payload.get("body") + properties = payload['properties'] + body = payload.get('body') if body: - body = channel.decode_body(body, properties.get("body_encoding")) - fields = {"body": body, - "delivery_tag": properties["delivery_tag"], - "content_type": payload.get("content-type"), - "content_encoding": payload.get("content-encoding"), - "headers": payload.get("headers"), - "properties": properties, - "delivery_info": properties.get("delivery_info"), - "postencode": "utf-8"} + body = channel.decode_body(body, properties.get('body_encoding')) + fields = {'body': body, + 'delivery_tag': properties['delivery_tag'], + 'content_type': payload.get('content-type'), + 'content_encoding': payload.get('content-encoding'), + 'headers': payload.get('headers'), + 'properties': properties, + 'delivery_info': properties.get('delivery_info'), + 'postencode': 'utf-8'} super(Message, self).__init__(channel, **dict(kwargs, **fields)) def serializable(self): props = self.properties body, _ = self.channel.encode_body(self.body, - props.get("body_encoding")) + props.get('body_encoding')) headers = dict(self.headers) # remove compression header - headers.pop("compression", None) - return {"body": body, - "properties": props, - "content-type": self.content_type, - "content-encoding": self.content_encoding, - "headers": self.headers} + headers.pop('compression', None) + return {'body': body, + 'properties': props, + 'content-type': self.content_type, + 'content-encoding': self.content_encoding, + 'headers': self.headers} class AbstractChannel(object): @@ -243,15 +243,15 @@ class AbstractChannel(object): def _get(self, queue, timeout=None): """Get next message from `queue`.""" - raise NotImplementedError("Virtual channels must implement _get") + raise NotImplementedError('Virtual channels must implement _get') def _put(self, queue, message): """Put `message` onto `queue`.""" - raise NotImplementedError("Virtual channels must implement _put") + raise NotImplementedError('Virtual channels must implement _put') def _purge(self, queue): """Remove all messages from `queue`.""" - raise NotImplementedError("Virtual channels must implement _purge") + raise NotImplementedError('Virtual channels must implement _purge') def _size(self, queue): """Return the number of messages in `queue` as an :class:`int`.""" @@ -313,21 +313,21 @@ class Channel(AbstractChannel, base.StdChannel): supports_fanout = False #: Binary <-> ASCII codecs. - codecs = {"base64": Base64()} + codecs = {'base64': Base64()} #: Default body encoding. - #: NOTE: ``transport_options["body_encoding"]`` will override this value. - body_encoding = "base64" + #: NOTE: ``transport_options['body_encoding']`` will override this value. + body_encoding = 'base64' #: counter used to generate delivery tags for this channel. _next_delivery_tag = count(1).next #: Optional queue where messages with no route is delivered. - #: Set by ``transport_options["deadletter_queue"]``. + #: Set by ``transport_options['deadletter_queue']``. deadletter_queue = None # List of options to transfer from :attr:`transport_options`. - from_transport_options = ("body_encoding", "deadletter_queue") + from_transport_options = ('body_encoding', 'deadletter_queue') def __init__(self, connection, **kwargs): self.connection = connection @@ -351,15 +351,15 @@ class Channel(AbstractChannel, base.StdChannel): except KeyError: pass - def exchange_declare(self, exchange, type="direct", durable=False, + def exchange_declare(self, exchange, type='direct', durable=False, auto_delete=False, arguments=None, nowait=False, passive=False): """Declare exchange.""" if passive: if exchange not in self.state.exchanges: - raise StdChannelError("404", - u"NOT_FOUND - no exchange %r in vhost %r" % ( + raise StdChannelError('404', + u'NOT_FOUND - no exchange %r in vhost %r' % ( exchange, self.connection.client.virtual_host or '/'), - (50, 10), "Channel.exchange_declare") + (50, 10), 'Channel.exchange_declare') return try: prev = self.state.exchanges[exchange] @@ -367,17 +367,17 @@ class Channel(AbstractChannel, base.StdChannel): durable, auto_delete, arguments): raise NotEquivalentError( - "Cannot redeclare exchange %r in vhost %r with " - "different type, durable or autodelete value" % ( + 'Cannot redeclare exchange %r in vhost %r with ' + 'different type, durable or autodelete value' % ( exchange, - self.connection.client.virtual_host or "/")) + self.connection.client.virtual_host or '/')) except KeyError: self.state.exchanges[exchange] = { - "type": type, - "durable": durable, - "auto_delete": auto_delete, - "arguments": arguments or {}, - "table": [], + 'type': type, + 'durable': durable, + 'auto_delete': auto_delete, + 'arguments': arguments or {}, + 'table': [], } def exchange_delete(self, exchange, if_unused=False, nowait=False): @@ -389,10 +389,10 @@ class Channel(AbstractChannel, base.StdChannel): def queue_declare(self, queue, passive=False, **kwargs): """Declare queue.""" if passive and not self._has_queue(queue, **kwargs): - raise StdChannelError("404", - u"NOT_FOUND - no queue %r in vhost %r" % ( + raise StdChannelError('404', + u'NOT_FOUND - no queue %r in vhost %r' % ( queue, self.connection.client.virtual_host or '/'), - (50, 10), "Channel.queue_declare") + (50, 10), 'Channel.queue_declare') else: self._new_queue(queue, **kwargs) return queue, self._size(queue), 0 @@ -413,12 +413,12 @@ class Channel(AbstractChannel, base.StdChannel): def after_reply_message_received(self, queue): self.queue_delete(queue) - def queue_bind(self, queue, exchange, routing_key="", arguments=None, + def queue_bind(self, queue, exchange, routing_key='', arguments=None, **kwargs): """Bind `queue` to `exchange` with `routing key`.""" if queue in self.state.bindings: return - table = self.state.exchanges[exchange].setdefault("table", []) + table = self.state.exchanges[exchange].setdefault('table', []) self.state.bindings[queue] = exchange, routing_key, arguments meta = self.typeof(exchange).prepare_bind(queue, exchange, @@ -440,12 +440,12 @@ class Channel(AbstractChannel, base.StdChannel): def basic_publish(self, message, exchange, routing_key, **kwargs): """Publish message.""" - props = message["properties"] - message["body"], props["body_encoding"] = \ - self.encode_body(message["body"], self.body_encoding) - props["delivery_info"]["exchange"] = exchange - props["delivery_info"]["routing_key"] = routing_key - props["delivery_tag"] = self._next_delivery_tag() + props = message['properties'] + message['body'], props['body_encoding'] = \ + self.encode_body(message['body'], self.body_encoding) + props['delivery_info']['exchange'] = exchange + props['delivery_info']['routing_key'] = routing_key + props['delivery_tag'] = self._next_delivery_tag() self.typeof(exchange).deliver(message, exchange, routing_key, **kwargs) @@ -492,7 +492,7 @@ class Channel(AbstractChannel, base.StdChannel): """Recover unacked messages.""" if requeue: return self.qos.restore_unacked() - raise NotImplementedError("Does not support recover(requeue=False)") + raise NotImplementedError('Does not support recover(requeue=False)') def basic_reject(self, delivery_tag, requeue=False): """Reject message.""" @@ -512,11 +512,11 @@ class Channel(AbstractChannel, base.StdChannel): def get_table(self, exchange): """Get table of bindings for `exchange`.""" - return self.state.exchanges[exchange]["table"] + return self.state.exchanges[exchange]['table'] def typeof(self, exchange): """Get the exchange type instance for `exchange`.""" - type = self.state.exchanges[exchange]["type"] + type = self.state.exchanges[exchange]['type'] return self.exchange_types[type] def _lookup(self, exchange, routing_key, default=None): @@ -535,7 +535,7 @@ class Channel(AbstractChannel, base.StdChannel): if not R and default is not None: warnings.warn(UndeliverableWarning(UNDELIVERABLE_FMT % { - "exchange": exchange, "routing_key": routing_key})) + 'exchange': exchange, 'routing_key': routing_key})) self._new_queue(default) R = [default] return R @@ -544,14 +544,14 @@ class Channel(AbstractChannel, base.StdChannel): """Redeliver message to its original destination.""" delivery_info = message.delivery_info message = message.serializable() - message["redelivered"] = True - for queue in self._lookup(delivery_info["exchange"], - delivery_info["routing_key"]): + message['redelivered'] = True + for queue in self._lookup(delivery_info['exchange'], + delivery_info['routing_key']): self._put(queue, message) def drain_events(self, timeout=None): if self._consumers and self.qos.can_consume(): - if hasattr(self, "_get_many"): + if hasattr(self, '_get_many'): return self._get_many(self._active_queues, timeout=timeout) return self._poll(self.cycle, timeout=timeout) raise Empty() @@ -567,14 +567,14 @@ class Channel(AbstractChannel, base.StdChannel): properties=None): """Prepare message data.""" properties = properties or {} - info = properties.setdefault("delivery_info", {}) - info["priority"] = priority or 0 + info = properties.setdefault('delivery_info', {}) + info['priority'] = priority or 0 - return {"body": message_data, - "content-encoding": content_encoding, - "content-type": content_type, - "headers": headers or {}, - "properties": properties or {}} + return {'body': message_data, + 'content-encoding': content_encoding, + 'content-type': content_type, + 'headers': headers or {}, + 'properties': properties or {}} def flow(self, active=True): """Enable/disable message flow. @@ -583,7 +583,7 @@ class Channel(AbstractChannel, base.StdChannel): is not implemented by the base virtual implementation. """ - raise NotImplementedError("virtual channels does not support flow.") + raise NotImplementedError('virtual channels does not support flow.') def close(self): """Close channel, cancel all consumers, and requeue unacked @@ -690,7 +690,7 @@ class Transport(base.Transport): self._callbacks = {} self.cycle = self.Cycle(self._drain_channel, self.channels, Empty) self._next_channel_id = count(1).next - polling_interval = client.transport_options.get("polling_interval") + polling_interval = client.transport_options.get('polling_interval') if polling_interval is not None: self.polling_interval = polling_interval @@ -760,4 +760,4 @@ class Transport(base.Transport): @property def default_connection_params(self): - return {"port": self.default_port, "hostname": "localhost"} + return {'port': self.default_port, 'hostname': 'localhost'} diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py index a178847a..fed253ba 100644 --- a/kombu/transport/virtual/exchange.py +++ b/kombu/transport/virtual/exchange.py @@ -31,7 +31,7 @@ class ExchangeType(object): :returns: `default` if no queues matched. """ - raise NotImplementedError("subclass responsibility") + raise NotImplementedError('subclass responsibility') def prepare_bind(self, queue, exchange, routing_key, arguments): """Returns tuple of `(routing_key, regex, queue)` to be stored @@ -41,15 +41,15 @@ class ExchangeType(object): def equivalent(self, prev, exchange, type, durable, auto_delete, arguments): """Returns true if `prev` and `exchange` is equivalent.""" - return (type == prev["type"] and - durable == prev["durable"] and - auto_delete == prev["auto_delete"] and - (arguments or {}) == (prev["arguments"] or {})) + return (type == prev['type'] and + durable == prev['durable'] and + auto_delete == prev['auto_delete'] and + (arguments or {}) == (prev['arguments'] or {})) class DirectExchange(ExchangeType): """The `direct` exchange routes based on exact routing keys.""" - type = "direct" + type = 'direct' def lookup(self, table, exchange, routing_key, default): return [queue for rkey, _, queue in table @@ -66,11 +66,11 @@ class TopicExchange(ExchangeType): """The `topic` exchange routes messages based on words separated by dots, using wildcard characters ``*`` (any single word), and ``#`` (one or more words).""" - type = "topic" + type = 'topic' #: map of wildcard to regex conversions - wildcards = {"*": r".*?[^\.]", - "#": r".*?"} + wildcards = {'*': r'.*?[^\.]', + '#': r'.*?'} #: compiled regex cache _compiled = {} @@ -92,8 +92,8 @@ class TopicExchange(ExchangeType): def key_to_pattern(self, rkey): """Get the corresponding regex for any routing key.""" - return "^%s$" % ("\.".join(self.wildcards.get(word, word) - for word in rkey.split("."))) + return '^%s$' % ('\.'.join(self.wildcards.get(word, word) + for word in rkey.split('.'))) def _match(self, pattern, string): """Same as :func:`re.match`, except the regex is compiled and cached, @@ -116,7 +116,7 @@ class FanoutExchange(ExchangeType): for an example implementation of these methods. """ - type = "fanout" + type = 'fanout' def lookup(self, table, exchange, routing_key, default): return [queue for _, _, queue in table] @@ -127,6 +127,6 @@ class FanoutExchange(ExchangeType): #: Map of standard exchange types and corresponding classes. -STANDARD_EXCHANGE_TYPES = {"direct": DirectExchange, - "topic": TopicExchange, - "fanout": FanoutExchange} +STANDARD_EXCHANGE_TYPES = {'direct': DirectExchange, + 'topic': TopicExchange, + 'fanout': FanoutExchange} diff --git a/kombu/transport/virtual/scheduling.py b/kombu/transport/virtual/scheduling.py index 13fcb469..9fc3f8f0 100644 --- a/kombu/transport/virtual/scheduling.py +++ b/kombu/transport/virtual/scheduling.py @@ -48,5 +48,5 @@ class FairCycle(object): pass def __repr__(self): - return "<FairCycle: %r/%r %r>" % (self.pos, len(self.resources), + return '<FairCycle: %r/%r %r>' % (self.pos, len(self.resources), self.resources, ) diff --git a/kombu/transport/zookeeper.py b/kombu/transport/zookeeper.py index 028256b1..4dcc3ab5 100644 --- a/kombu/transport/zookeeper.py +++ b/kombu/transport/zookeeper.py @@ -50,7 +50,7 @@ from . import virtual DEFAULT_PORT = 2181 -__author__ = "Mahendra M <mahendra.m@gmail.com>" +__author__ = 'Mahendra M <mahendra.m@gmail.com>' class Channel(virtual.Channel): @@ -62,7 +62,7 @@ class Channel(virtual.Channel): def _put(self, queue, message, **kwargs): try: - priority = message["properties"]["delivery_info"]["priority"] + priority = message['properties']['delivery_info']['priority'] except KeyError: priority = 0 @@ -164,9 +164,8 @@ class Transport(virtual.Transport): kazoo.zkclient.NotEmptyException, kazoo.zkclient.SessionExpiredException, kazoo.zkclient.InvalidCallbackException) - driver_type = "zookeeper" - driver_name = "kazoo" + driver_type = 'zookeeper' + driver_name = 'kazoo' def driver_version(self): return kazoo.__version__ - diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index 3f5d0701..8e230afe 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -23,10 +23,10 @@ try: except: ctypes = None # noqa -__all__ = ["EqualityDict", "say", "uuid", "kwdict", "maybe_list", - "fxrange", "fxrangemax", "retry_over_time", - "emergency_dump_state", "cached_property", - "reprkwargs", "reprcall", "nested"] +__all__ = ['EqualityDict', 'say', 'uuid', 'kwdict', 'maybe_list', + 'fxrange', 'fxrangemax', 'retry_over_time', + 'emergency_dump_state', 'cached_property', + 'reprkwargs', 'reprcall', 'nested'] def eqhash(o): @@ -52,7 +52,7 @@ class EqualityDict(dict): def say(m, *s): - sys.stderr.write(str(m) % s + "\n") + sys.stderr.write(str(m) % s + '\n') def uuid4(): @@ -86,14 +86,14 @@ else: see: http://bugs.python.org/issue4978. """ - return dict((key.encode("utf-8"), value) + return dict((key.encode('utf-8'), value) for key, value in kwargs.items()) def maybe_list(v): if v is None: return [] - if hasattr(v, "__iter__"): + if hasattr(v, '__iter__'): return v return [v] @@ -178,13 +178,13 @@ def emergency_dump_state(state, open_file=open, dump=None): import pickle dump = pickle.dump persist = mktemp() - say("EMERGENCY DUMP STATE TO FILE -> %s <-" % persist) - fh = open_file(persist, "w") + say('EMERGENCY DUMP STATE TO FILE -> %s <-' % persist) + fh = open_file(persist, 'w') try: try: dump(state, fh, protocol=0) except Exception, exc: - say("Cannot pickle state: %r. Fallback to pformat." % (exc, )) + say('Cannot pickle state: %r. Fallback to pformat.' % (exc, )) fh.write(pformat(state)) finally: fh.flush() @@ -207,14 +207,14 @@ class cached_property(object): @connection.setter # Prepares stored value def connection(self, value): if value is None: - raise TypeError("Connection must be a connection") + raise TypeError('Connection must be a connection') return value @connection.deleter def connection(self, value): # Additional action to do at del(self.attr) if value is not None: - print("Connection %r deleted" % (value, )) + print('Connection %r deleted' % (value, )) """ @@ -260,13 +260,13 @@ class cached_property(object): return self.__class__(self.__get, self.__set, fdel) -def reprkwargs(kwargs, sep=', ', fmt="%s=%s"): +def reprkwargs(kwargs, sep=', ', fmt='%s=%s'): return sep.join(fmt % (k, _safe_repr(v)) for k, v in kwargs.iteritems()) def reprcall(name, args=(), kwargs=(), sep=', '): - return "%s(%s%s%s)" % (name, sep.join(map(_safe_repr, args or ())), - (args and kwargs) and sep or "", + return '%s(%s%s%s)' % (name, sep.join(map(_safe_repr, args or ())), + (args and kwargs) and sep or '', reprkwargs(kwargs, sep)) diff --git a/kombu/utils/amq_manager.py b/kombu/utils/amq_manager.py index b44ca721..0bb9ce4c 100644 --- a/kombu/utils/amq_manager.py +++ b/kombu/utils/amq_manager.py @@ -6,10 +6,10 @@ def get_manager(client, hostname=None, port=None, userid=None, import pyrabbit opt = client.transport_options.get host = (hostname if hostname is not None - else opt("manager_hostname", client.hostname)) - port = port if port is not None else opt("manager_port", 55672) - return pyrabbit.Client("%s:%s" % (host, port), + else opt('manager_hostname', client.hostname)) + port = port if port is not None else opt('manager_port', 55672) + return pyrabbit.Client('%s:%s' % (host, port), userid if userid is not None - else opt("manager_userid", client.userid), + else opt('manager_userid', client.userid), password if password is not None - else opt("manager_password", client.password)) + else opt('manager_password', client.password)) diff --git a/kombu/utils/compat.py b/kombu/utils/compat.py index 251bb67c..c1b3cec4 100644 --- a/kombu/utils/compat.py +++ b/kombu/utils/compat.py @@ -55,7 +55,7 @@ class CompatOrderedDict(dict, MutableMapping): """ if len(args) > 1: - raise TypeError("expected at most 1 arguments, got %d" % ( + raise TypeError('expected at most 1 arguments, got %d' % ( len(args))) try: self.__root @@ -136,7 +136,7 @@ class CompatOrderedDict(dict, MutableMapping): if isinstance(other, dict): for key in other: self[key] = other[key] - elif hasattr(other, "keys"): + elif hasattr(other, 'keys'): for key in other.keys(): self[key] = other[key] else: @@ -187,7 +187,7 @@ class CompatOrderedDict(dict, MutableMapping): if not self: raise KeyError('dictionary is empty') if last: - if sys.platform.startswith("java"): + if sys.platform.startswith('java'): key = self.keys()[-1] else: key = reversed(self).next() @@ -258,7 +258,7 @@ import platform as _platform from stat import ST_DEV, ST_INO -if _platform.system() == "Windows": +if _platform.system() == 'Windows': #since windows doesn't go with WatchedFileHandler use FileHandler instead WatchedFileHandler = logging.FileHandler else: diff --git a/kombu/utils/debug.py b/kombu/utils/debug.py index 5eb8a6a1..d8aba52c 100644 --- a/kombu/utils/debug.py +++ b/kombu/utils/debug.py @@ -16,11 +16,11 @@ from functools import wraps from kombu.log import get_logger -__all__ = ["setup_logging", "Logwrapped"] +__all__ = ['setup_logging', 'Logwrapped'] -def setup_logging(loglevel=logging.DEBUG, loggers=["kombu.connection", - "kombu.channel"]): +def setup_logging(loglevel=logging.DEBUG, loggers=['kombu.connection', + 'kombu.channel']): for logger in loggers: l = get_logger(logger) l.addHandler(logging.StreamHandler()) @@ -28,7 +28,7 @@ def setup_logging(loglevel=logging.DEBUG, loggers=["kombu.connection", class Logwrapped(object): - __ignore = ("__enter__", "__exit__") + __ignore = ('__enter__', '__exit__') def __init__(self, instance, logger=None, ident=None): self.instance = instance @@ -43,18 +43,18 @@ class Logwrapped(object): @wraps(meth) def __wrapped(*args, **kwargs): - info = "" + info = '' if self.ident: info += self.ident % vars(self.instance) - info += "%s(" % (meth.__name__, ) + info += '%s(' % (meth.__name__, ) if args: - info += ", ".join(map(repr, args)) + info += ', '.join(map(repr, args)) if kwargs: if args: - info += ", " - info += ", ".join("%s=%r" % (key, value) + info += ', ' + info += ', '.join('%s=%r' % (key, value) for key, value in kwargs.iteritems()) - info += ")" + info += ')' self.logger.debug(info) return meth(*args, **kwargs) diff --git a/kombu/utils/encoding.py b/kombu/utils/encoding.py index f4edaf9e..5e779808 100644 --- a/kombu/utils/encoding.py +++ b/kombu/utils/encoding.py @@ -18,10 +18,10 @@ import traceback is_py3k = sys.version_info >= (3, 0) -if sys.platform.startswith("java"): # pragma: no cover +if sys.platform.startswith('java'): # pragma: no cover def default_encoding(): - return "utf-8" + return 'utf-8' else: def default_encoding(): # noqa @@ -64,7 +64,7 @@ else: return s def from_utf8(s, *args, **kwargs): # noqa - return s.encode("utf-8", *args, **kwargs) + return s.encode('utf-8', *args, **kwargs) def default_encode(obj): # noqa return unicode(obj, default_encoding()) @@ -74,33 +74,33 @@ else: ensure_bytes = str_to_bytes -def safe_str(s, errors="replace"): +def safe_str(s, errors='replace'): s = bytes_to_str(s) if not isinstance(s, basestring): return safe_repr(s, errors) return _safe_str(s, errors) -def _safe_str(s, errors="replace"): +def _safe_str(s, errors='replace'): if is_py3k: # pragma: no cover if isinstance(s, str): return s try: return str(s) except Exception, exc: - return "<Unrepresentable %r: %r %r>" % ( - type(s), exc, "\n".join(traceback.format_stack())) + return '<Unrepresentable %r: %r %r>' % ( + type(s), exc, '\n'.join(traceback.format_stack())) encoding = default_encoding() try: if isinstance(s, unicode): return s.encode(encoding, errors) return unicode(s, encoding, errors) except Exception, exc: - return "<Unrepresentable %r: %r %r>" % ( - type(s), exc, "\n".join(traceback.format_stack())) + return '<Unrepresentable %r: %r %r>' % ( + type(s), exc, '\n'.join(traceback.format_stack())) -def safe_repr(o, errors="replace"): +def safe_repr(o, errors='replace'): try: return repr(o) except Exception: diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py index 3d487068..e04e13c9 100644 --- a/kombu/utils/eventio.py +++ b/kombu/utils/eventio.py @@ -48,7 +48,7 @@ except ImportError: from kombu.syn import detect_environment -__all__ = ["poll"] +__all__ = ['poll'] READ = POLL_READ = 0x001 WRITE = POLL_WRITE = 0x004 @@ -229,7 +229,7 @@ class _select(Poller): def _get_poller(): - if detect_environment() in ("eventlet", "gevent"): + if detect_environment() != 'default': # greenlet return _select elif epoll: diff --git a/kombu/utils/finalize.py b/kombu/utils/finalize.py index 9e0c1e6e..b8b13506 100644 --- a/kombu/utils/finalize.py +++ b/kombu/utils/finalize.py @@ -16,7 +16,7 @@ import weakref from itertools import count -__all__ = ["Finalize"] +__all__ = ['Finalize'] class Finalize(object): @@ -72,7 +72,7 @@ class Finalize(object): try: obj = self._weakref() except (AttributeError, TypeError): - return "<Finalize: (dead)>" + return '<Finalize: (dead)>' if obj is None: return diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py index 3c894dbd..34ff0657 100644 --- a/kombu/utils/functional.py +++ b/kombu/utils/functional.py @@ -40,8 +40,8 @@ class promise(object): return self def __reduce__(self): - return (self.__class__, (self._fun, ), {"_args": self._args, - "_kwargs": self._kwargs}) + return (self.__class__, (self._fun, ), {'_args': self._args, + '_kwargs': self._kwargs}) def maybe_promise(value): diff --git a/kombu/utils/limits.py b/kombu/utils/limits.py index c97e76eb..60c6b201 100644 --- a/kombu/utils/limits.py +++ b/kombu/utils/limits.py @@ -10,7 +10,7 @@ Token bucket implementation for rate limiting. """ import time -__all__ = ["TokenBucket"] +__all__ = ['TokenBucket'] class TokenBucket(object): diff --git a/kombu/utils/url.py b/kombu/utils/url.py index eaee888f..af703381 100644 --- a/kombu/utils/url.py +++ b/kombu/utils/url.py @@ -12,7 +12,7 @@ def _parse_url(url): scheme = urlparse(url).scheme schemeless = url[len(scheme) + 3:] # parse with HTTP URL semantics - parts = urlparse("http://" + schemeless) + parts = urlparse('http://' + schemeless) # The first pymongo.Connection() argument (host) can be # a mongodb connection URI. If this is the case, don't |