summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2014-06-11 14:55:11 +0100
committerAsk Solem <ask@celeryproject.org>2014-06-11 14:55:11 +0100
commit6524bd293ab580eaa1ac239de38e99856d07c282 (patch)
treedf953960f43fed46472469473df82620c4786269
parente6a3e7eaf233937efe967faa56aab8e6c5b059dd (diff)
parent244b882e0c103b915e98de6ebb7d0cb026974515 (diff)
downloadkombu-curlclient.tar.gz
Merge branch 'master' into curlclientcurlclient
Conflicts: kombu/async/hub.py kombu/tests/case.py kombu/tests/transport/test_redis.py kombu/transport/SQS.py
-rw-r--r--.travis.yml2
-rw-r--r--AUTHORS4
-rw-r--r--Changelog48
-rw-r--r--LICENSE2
-rw-r--r--Makefile76
-rw-r--r--README.rst53
-rw-r--r--docs/_ext/applyxrefs.py2
-rw-r--r--docs/conf.py2
-rw-r--r--docs/reference/index.rst3
-rw-r--r--docs/reference/kombu.utils.compat.rst11
-rw-r--r--docs/userguide/connections.rst50
-rw-r--r--docs/userguide/serialization.rst11
-rw-r--r--examples/experimental/async_consume.py2
-rw-r--r--examples/simple_task_queue/worker.py4
-rw-r--r--funtests/tests/test_SLMQ.py4
-rw-r--r--funtests/tests/test_SQS.py4
-rw-r--r--funtests/transport.py8
-rw-r--r--kombu/__init__.py6
-rw-r--r--kombu/abstract.py6
-rw-r--r--kombu/async/hub.py66
-rw-r--r--kombu/async/timer.py3
-rw-r--r--kombu/common.py48
-rw-r--r--kombu/connection.py43
-rw-r--r--kombu/entity.py4
-rw-r--r--kombu/five.py195
-rw-r--r--kombu/messaging.py2
-rw-r--r--kombu/pidbox.py24
-rw-r--r--kombu/serialization.py14
-rw-r--r--kombu/syn.py4
-rw-r--r--kombu/tests/__init__.py8
-rw-r--r--kombu/tests/async/http/test_curl.py16
-rw-r--r--kombu/tests/case.py7
-rw-r--r--kombu/tests/mocks.py5
-rw-r--r--kombu/tests/test_common.py14
-rw-r--r--kombu/tests/test_connection.py6
-rw-r--r--kombu/tests/test_entities.py2
-rw-r--r--kombu/tests/test_messaging.py14
-rw-r--r--kombu/tests/test_mixins.py2
-rw-r--r--kombu/tests/test_serialization.py4
-rw-r--r--kombu/tests/test_syn.py9
-rw-r--r--kombu/tests/transport/test_base.py4
-rw-r--r--kombu/tests/transport/test_redis.py8
-rw-r--r--kombu/tests/transport/virtual/test_base.py45
-rw-r--r--kombu/tests/utils/test_utils.py20
-rw-r--r--kombu/transport/SLMQ.py8
-rw-r--r--kombu/transport/SQS.py13
-rw-r--r--kombu/transport/amqplib.py28
-rw-r--r--kombu/transport/base.py6
-rw-r--r--kombu/transport/beanstalk.py26
-rw-r--r--kombu/transport/couchdb.py36
-rw-r--r--kombu/transport/django/__init__.py5
-rw-r--r--kombu/transport/filesystem.py4
-rw-r--r--kombu/transport/mongodb.py17
-rw-r--r--kombu/transport/redis.py17
-rw-r--r--kombu/transport/sqlalchemy/__init__.py3
-rw-r--r--kombu/transport/virtual/__init__.py51
-rw-r--r--kombu/transport/zookeeper.py16
-rw-r--r--kombu/utils/__init__.py35
-rw-r--r--kombu/utils/compat.py60
-rw-r--r--kombu/utils/eventio.py19
-rw-r--r--kombu/utils/json.py46
-rw-r--r--kombu/utils/url.py51
-rw-r--r--pavement.py190
-rw-r--r--requirements/default.txt1
-rw-r--r--requirements/extras/librabbitmq.txt2
-rw-r--r--requirements/pkgutils.txt1
-rw-r--r--requirements/py26.txt2
-rw-r--r--setup.cfg5
-rw-r--r--setup.py7
-rw-r--r--tox.ini11
70 files changed, 645 insertions, 880 deletions
diff --git a/.travis.yml b/.travis.yml
index 6ebca0b0..1cfb86fe 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,8 +1,8 @@
language: python
python:
- - 2.6
- 2.7
- 3.3
+ - 3.4
- pypy
before_install:
- |
diff --git a/AUTHORS b/AUTHORS
index 1d1a3614..94d90dce 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -37,6 +37,7 @@ Dustin J. Mitchell <dustin@mozilla.com>
Ephemera <obliviscence+git@gmail.com>
Eric Reynolds <ereynolds@opendns.com>
Fabrice Rabaute <fabrice@expa.com>
+Felix Schwarz <felix.schwarz@oss.schwarz.eu>
Fernando Jorge Mota <f.j.mota13@gmail.com>
Flavio [FlaPer87] Percoco Premoli <flaper87@flaper87.org>
Florian Munz <surf@theflow.de>
@@ -61,6 +62,7 @@ Joseph Crosland <jcrosland@flumotion.com>
Keith Fitzgerald <ghostrocket@me.com>
Kevin McCarthy <me@kevinmccarthy.org>
Kevin McDonald <k3vinmcdonald@gmail.com>
+Latitia M. Haskins <lhaskins@jetsonsys.com>
Mahendra M <Mahendra_M@infosys.com>
Marcin Lulek (ergo) <info@webreactor.eu>
Mark Lavin <mlavin@caktusgroup.com>
@@ -81,7 +83,9 @@ Pierre Riteau <priteau@ci.uchicago.edu>
Rafael Duran Castaneda <rafadurancastaneda@gmail.com>
Rafal Malinowski <malinowski@red-sky.pl>
Ralf Nyren <ralf-github@nyren.net>
+Randy Barlow <rbarlow@redhat.com>
Rob Ottaway <robottaway@gmail.com>
+Roger Hu <rhu@hearsaycorp.com>
Rumyana Neykova <rumi.neykova@gmail.com>
Rune Halvorsen <runeh@opera.com>
Ryan Petrello <lists@ryanpetrello.com>
diff --git a/Changelog b/Changelog
index 37e58271..fdf55977 100644
--- a/Changelog
+++ b/Changelog
@@ -4,6 +4,54 @@
Change history
================
+.. _version-3.0.17:
+
+3.0.17
+======
+:release-date: 2014-06-02 06:00 P.M UTC
+:release-by: Ask Solem
+
+- ``kombu[librabbitmq]`` now depends on librabbitmq 1.5.2.
+
+- Async: Event loop now selectively removes file descriptors for the mode
+ it failed in, and keeps others (e.g read vs write).
+
+ Fix contributed by Roger Hu.
+
+- CouchDB: Now works without userid set.
+
+ Fix contributed by Latitia M. Haskins.
+
+- SQLAlchemy: Now supports recovery from connection errors.
+
+ Contributed by Felix Schwarz.
+
+- Redis: Restore at shutdown now works when ack emulation is disabled.
+
+- :func:`kombu.common.eventloop` accidentally swallowed socket errors.
+
+- Adds :func:`kombu.utils.url.sanitize_url`
+
+.. _version-3.0.16:
+
+3.0.16
+======
+:release-date: 2014-05-06 01:00 P.M UTC
+:release-by: Ask Solem
+
+- ``kombu[librabbitmq]`` now depends on librabbitmq 1.5.1.
+
+- Redis: Fixes ``TypeError`` problem in ``unregister`` (Issue #342).
+
+ Fix contributed by Tobias Schottdorf.
+
+- Tests: Some unit tests accidentally required the `redis-py` library.
+
+ Fix contributed by Randy Barlow.
+
+- librabbitmq: Would crash when using an older version of :mod:`librabbitmq`,
+ now emits warning instead.
+
.. _version-3.0.15:
3.0.15
diff --git a/LICENSE b/LICENSE
index a8e9a51f..2268b3b2 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,4 +1,4 @@
-Copyright (c) 2012-2013 GoPivotal, Inc. All rights reserved.
+Copyright (c) 2012-2014 GoPivotal, Inc. All rights reserved.
Copyright (c) 2009-2012, Ask Solem & contributors.
All rights reserved.
diff --git a/Makefile b/Makefile
new file mode 100644
index 00000000..741d4963
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,76 @@
+PYTHON=python
+SPHINX_DIR="docs/"
+SPHINX_BUILDDIR="${SPHINX_DIR}/.build"
+README="README.rst"
+README_SRC="docs/templates/readme.txt"
+CONTRIBUTING_SRC="docs/contributing.rst"
+SPHINX2RST="extra/release/sphinx-to-rst.py"
+
+SPHINX_HTMLDIR = "${SPHINX_BUILDDIR}/html"
+
+html:
+ (cd "$(SPHINX_DIR)"; make html)
+ mv "$(SPHINX_HTMLDIR)" Documentation
+
+docsclean:
+ -rm -rf "$(SPHINX_BUILDDIR)"
+
+htmlclean:
+ -rm -rf "$(SPHINX)"
+
+apicheck:
+ extra/release/doc4allmods kombu
+
+indexcheck:
+ extra/release/verify-reference-index.sh
+
+configcheck:
+ PYTHONPATH=. $(PYTHON) extra/release/verify_config_reference.py $(CONFIGREF_SRC)
+
+flakecheck:
+ flake8 kombu
+
+flakediag:
+ -$(MAKE) flakecheck
+
+flakepluscheck:
+ flakeplus kombu --2.6
+
+flakeplusdiag:
+ -$(MAKE) flakepluscheck
+
+flakes: flakediag flakeplusdiag
+
+readmeclean:
+ -rm -f $(README)
+
+readmecheck:
+ iconv -f ascii -t ascii $(README) >/dev/null
+
+$(README):
+ $(PYTHON) $(SPHINX2RST) $(README_SRC) --ascii > $@
+
+readme: readmeclean $(README) readmecheck
+
+test:
+ nosetests -xv kombu.tests
+
+cov:
+ nosetests -xv kombu.tests --with-coverage --cover-html --cover-branch
+
+removepyc:
+ -find . -type f -a \( -name "*.pyc" -o -name "*$$py.class" \) | xargs rm
+ -find . -type d -name "__pycache__" | xargs rm -r
+
+gitclean:
+ git clean -xdn
+
+gitcleanforce:
+ git clean -xdf
+
+bump_version:
+ $(PYTHON) extra/release/bump_version.py kombu/__init__.py README.rst
+
+distcheck: flakecheck apicheck indexcheck configcheck readmecheck test gitclean
+
+dist: readme docsclean gitcleanforce removepyc
diff --git a/README.rst b/README.rst
index f985576b..7e238f65 100644
--- a/README.rst
+++ b/README.rst
@@ -4,7 +4,7 @@
kombu - Messaging library for Python
========================================
-:Version: 3.0.15
+:Version: 3.1.0a1
`Kombu` is a messaging library for Python.
@@ -81,31 +81,31 @@ and the `Wikipedia article about AMQP`_.
Transport Comparison
====================
-+---------------+----------+------------+------------+---------------+
-| **Client** | **Type** | **Direct** | **Topic** | **Fanout** |
-+---------------+----------+------------+------------+---------------+
-| *amqp* | Native | Yes | Yes | Yes |
-+---------------+----------+------------+------------+---------------+
-| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) |
-+---------------+----------+------------+------------+---------------+
-| *mongodb* | Virtual | Yes | Yes | Yes |
-+---------------+----------+------------+------------+---------------+
-| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ |
-+---------------+----------+------------+------------+---------------+
-| *couchdb* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *in-memory* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *django* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
++---------------+----------+------------+------------+---------------+--------------+
+| **Client** | **Type** | **Direct** | **Topic** | **Fanout** | **Priority** |
++---------------+----------+------------+------------+---------------+--------------+
+| *amqp* | Native | Yes | Yes | Yes | Yes [#f3]_ |
++---------------+----------+------------+------------+---------------+--------------+
+| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) | Yes |
++---------------+----------+------------+------------+---------------+--------------+
+| *mongodb* | Virtual | Yes | Yes | Yes | Yes |
++---------------+----------+------------+------------+---------------+--------------+
+| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No | Yes |
++---------------+----------+------------+------------+---------------+--------------+
+| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *couchdb* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No | Yes |
++---------------+----------+------------+------------+---------------+--------------+
+| *in-memory* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *django* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
.. [#f1] Declarations only kept in memory, so exchanges/queues
@@ -115,6 +115,7 @@ Transport Comparison
Disabled by default, but can be enabled by using the
``supports_fanout`` transport option.
+.. [#f3] AMQP Message priority support depends on broker implementation.
Documentation
-------------
diff --git a/docs/_ext/applyxrefs.py b/docs/_ext/applyxrefs.py
index 93222a33..8ad57e80 100644
--- a/docs/_ext/applyxrefs.py
+++ b/docs/_ext/applyxrefs.py
@@ -47,7 +47,6 @@ def has_target(fn):
if not readok:
return (True, None)
- #print fn, len(lines)
if len(lines) < 1:
print("Not touching empty file %s." % fn)
return (True, None)
@@ -69,7 +68,6 @@ def main(argv=None):
files.extend([(dirpath, f) for f in filenames])
files.sort()
files = [os.path.join(p, fn) for p, fn in files if fn.endswith('.txt')]
- #print files
for fn in files:
if fn in DONT_TOUCH:
diff --git a/docs/conf.py b/docs/conf.py
index cf28bb6c..a61b832f 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -29,7 +29,7 @@ master_doc = 'index'
# General information about the project.
project = 'Kombu'
-copyright = '2009-2013, Ask Solem'
+copyright = '2009-2014, Ask Solem'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
diff --git a/docs/reference/index.rst b/docs/reference/index.rst
index 6898a963..39e471cb 100644
--- a/docs/reference/index.rst
+++ b/docs/reference/index.rst
@@ -27,7 +27,7 @@
kombu.async.hub
kombu.async.semaphore
kombu.async.timer
- kombu.async..debug
+ kombu.async.debug
kombu.transport
kombu.transport.pyamqp
kombu.transport.librabbitmq
@@ -57,7 +57,6 @@
kombu.utils
kombu.utils.eventio
kombu.utils.limits
- kombu.utils.compat
kombu.utils.debug
kombu.utils.encoding
kombu.utils.functional
diff --git a/docs/reference/kombu.utils.compat.rst b/docs/reference/kombu.utils.compat.rst
deleted file mode 100644
index 3172ed38..00000000
--- a/docs/reference/kombu.utils.compat.rst
+++ /dev/null
@@ -1,11 +0,0 @@
-==========================================================
- Compat. utilities - kombu.utils.compat
-==========================================================
-
-.. contents::
- :local:
-.. currentmodule:: kombu.utils.compat
-
-.. automodule:: kombu.utils.compat
- :members:
- :undoc-members:
diff --git a/docs/userguide/connections.rst b/docs/userguide/connections.rst
index f97b4b79..38037a29 100644
--- a/docs/userguide/connections.rst
+++ b/docs/userguide/connections.rst
@@ -145,29 +145,31 @@ transport URL, or use ``amqp`` to have the fallback.
Transport Comparison
====================
-+---------------+----------+------------+------------+---------------+
-| **Client** | **Type** | **Direct** | **Topic** | **Fanout** |
-+---------------+----------+------------+------------+---------------+
-| *amqp* | Native | Yes | Yes | Yes |
-+---------------+----------+------------+------------+---------------+
-| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) |
-+---------------+----------+------------+------------+---------------+
-| *mongodb* | Virtual | Yes | Yes | Yes |
-+---------------+----------+------------+------------+---------------+
-| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ |
-+---------------+----------+------------+------------+---------------+
-| *couchdb* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *in-memory* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *django* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
-| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No |
-+---------------+----------+------------+------------+---------------+
++---------------+----------+------------+------------+---------------+--------------+
+| **Client** | **Type** | **Direct** | **Topic** | **Fanout** | **Priority** |
++---------------+----------+------------+------------+---------------+--------------+
+| *amqp* | Native | Yes | Yes | Yes | Yes [#f3]_ |
++---------------+----------+------------+------------+---------------+--------------+
+| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) | Yes |
++---------------+----------+------------+------------+---------------+--------------+
+| *mongodb* | Virtual | Yes | Yes | Yes | Yes |
++---------------+----------+------------+------------+---------------+--------------+
+| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No | Yes |
++---------------+----------+------------+------------+---------------+--------------+
+| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *couchdb* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No | Yes |
++---------------+----------+------------+------------+---------------+--------------+
+| *in-memory* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *django* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
+| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No | No |
++---------------+----------+------------+------------+---------------+--------------+
.. [#f1] Declarations only kept in memory, so exchanges/queues
@@ -176,3 +178,5 @@ Transport Comparison
.. [#f2] Fanout supported via storing routing tables in SimpleDB.
Disabled by default, but can be enabled by using the
``supports_fanout`` transport option.
+
+.. [#f3] AMQP Message priority support depends on broker implementation.
diff --git a/docs/userguide/serialization.rst b/docs/userguide/serialization.rst
index 37169a88..e65d6408 100644
--- a/docs/userguide/serialization.rst
+++ b/docs/userguide/serialization.rst
@@ -173,12 +173,15 @@ supported by Kombu.
.. code-block:: python
import pickle
- from kombu.serialization import BytesIO, register
+ from io import BytesIO
+ from kombu import serialization
def loads(s):
return pickle.load(BytesIO(s))
- register('my_pickle', pickle.dumps, loads,
- content_type='application/x-pickle2',
- content_encoding='binary')
+ serialization.register(
+ 'my_pickle', pickle.dumps, loads,
+ content_type='application/x-pickle2',
+ content_encoding='binary',
+ )
diff --git a/examples/experimental/async_consume.py b/examples/experimental/async_consume.py
index 1127128a..1a15c7a9 100644
--- a/examples/experimental/async_consume.py
+++ b/examples/experimental/async_consume.py
@@ -2,12 +2,12 @@
from kombu import Connection, Exchange, Queue, Producer, Consumer
from kombu.async import Hub
-from threading import Event
hub = Hub()
exchange = Exchange('asynt')
queue = Queue('asynt', exchange, 'asynt')
+
def send_message(conn):
producer = Producer(conn)
producer.publish('hello world', exchange=exchange, routing_key='asynt')
diff --git a/examples/simple_task_queue/worker.py b/examples/simple_task_queue/worker.py
index ded3aa73..6113d4ac 100644
--- a/examples/simple_task_queue/worker.py
+++ b/examples/simple_task_queue/worker.py
@@ -1,6 +1,6 @@
from kombu.mixins import ConsumerMixin
from kombu.log import get_logger
-from kombu.utils import kwdict, reprcall
+from kombu.utils import reprcall
from .queues import task_queues
@@ -23,7 +23,7 @@ class Worker(ConsumerMixin):
kwargs = body['kwargs']
logger.info('Got task: %s', reprcall(fun.__name__, args, kwargs))
try:
- fun(*args, **kwdict(kwargs))
+ fun(*args, **kwargs)
except Exception as exc:
logger.error('task raised exception: %r', exc)
message.ack()
diff --git a/funtests/tests/test_SLMQ.py b/funtests/tests/test_SLMQ.py
index d8fd47a1..0878d234 100644
--- a/funtests/tests/test_SLMQ.py
+++ b/funtests/tests/test_SLMQ.py
@@ -10,8 +10,8 @@ class test_SLMQ(transport.TransportCase):
event_loop_max = 100
message_size_limit = 4192
reliable_purge = False
- suppress_disorder_warning = True # does not guarantee FIFO order,
- # even in simple cases.
+ #: does not guarantee FIFO order, even in simple cases.
+ suppress_disorder_warning = True
def before_connect(self):
if "SLMQ_ACCOUNT" not in os.environ:
diff --git a/funtests/tests/test_SQS.py b/funtests/tests/test_SQS.py
index de08efb3..31f689b2 100644
--- a/funtests/tests/test_SQS.py
+++ b/funtests/tests/test_SQS.py
@@ -11,8 +11,8 @@ class test_SQS(transport.TransportCase):
event_loop_max = 100
message_size_limit = 4192 # SQS max body size / 2.
reliable_purge = False
- suppress_disorder_warning = True # does not guarantee FIFO order,
- # even in simple cases.
+ #: does not guarantee FIFO order, even in simple cases
+ suppress_disorder_warning = True
def before_connect(self):
try:
diff --git a/funtests/transport.py b/funtests/transport.py
index 18872336..93e2107c 100644
--- a/funtests/transport.py
+++ b/funtests/transport.py
@@ -14,7 +14,6 @@ from nose import SkipTest
from kombu import Connection
from kombu import Exchange, Queue
from kombu.five import range
-from kombu.tests.case import skip_if_quick
if sys.version_info >= (2, 5):
from hashlib import sha256 as _digest
@@ -22,10 +21,6 @@ else:
from sha import new as _digest # noqa
-def say(msg):
- print(msg, file=sys.stderr)
-
-
def _nobuf(x):
return [str(i) if isinstance(i, buffer) else i for i in x]
@@ -51,7 +46,7 @@ def consumeN(conn, consumer, n=1, timeout=30):
if seconds >= timeout:
raise socket.timeout(msg)
if seconds > 1:
- say(msg)
+ print(msg)
if len(messages) >= n:
break
@@ -167,7 +162,6 @@ class TransportCase(unittest.TestCase):
def _digest(self, data):
return _digest(data).hexdigest()
- @skip_if_quick
def test_produce__consume_large_messages(
self, bytes=1048576, n=10,
charset=string.punctuation + string.letters + string.digits):
diff --git a/kombu/__init__.py b/kombu/__init__.py
index f6b73012..da1b6b2c 100644
--- a/kombu/__init__.py
+++ b/kombu/__init__.py
@@ -7,7 +7,7 @@ version_info_t = namedtuple(
'version_info_t', ('major', 'minor', 'micro', 'releaselevel', 'serial'),
)
-VERSION = version_info_t(3, 0, 15, '', '')
+VERSION = version_info_t(3, 1, 0, 'a1', '')
__version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
__author__ = 'Ask Solem'
__contact__ = 'ask@celeryproject.org'
@@ -19,8 +19,8 @@ __docformat__ = 'restructuredtext en'
import os
import sys
-if sys.version_info < (2, 6): # pragma: no cover
- raise Exception('Kombu 3.1 requires Python versions 2.6 or later.')
+if sys.version_info < (2, 7): # pragma: no cover
+ raise Exception('Kombu 3.1 requires Python versions 2.7 or later.')
STATICA_HACK = True
globals()['kcah_acitats'[::-1].upper()] = False
diff --git a/kombu/abstract.py b/kombu/abstract.py
index 6dff8486..31bb547e 100644
--- a/kombu/abstract.py
+++ b/kombu/abstract.py
@@ -42,9 +42,9 @@ class Object(object):
if recurse and isinstance(obj, Object):
return obj.as_dict(recurse=True)
return type(obj) if type else obj
- return dict(
- (attr, f(getattr(self, attr), type)) for attr, type in self.attrs
- )
+ return {
+ attr: f(getattr(self, attr), type) for attr, type in self.attrs
+ }
def __reduce__(self):
return unpickle_dict, (self.__class__, self.as_dict())
diff --git a/kombu/async/hub.py b/kombu/async/hub.py
index 25c71cdb..aee8adef 100644
--- a/kombu/async/hub.py
+++ b/kombu/async/hub.py
@@ -10,17 +10,15 @@ from __future__ import absolute_import
import errno
-from collections import deque
from contextlib import contextmanager
from time import sleep
from types import GeneratorType as generator
-from amqp.promise import promise, Thenable
+from amqp.promise import Thenable, promise
from kombu.five import Empty, range
from kombu.log import get_logger
from kombu.utils import cached_property, fileno
-from kombu.utils.compat import get_errno
from kombu.utils.eventio import READ, WRITE, ERR, poll
from .timer import Timer
@@ -80,7 +78,7 @@ class Hub(object):
self.writers = {}
self.on_tick = set()
self.on_close = set()
- self._ready = deque()
+ self._ready = set()
self._running = False
self._loop = None
@@ -139,7 +137,7 @@ class Hub(object):
except (MemoryError, AssertionError):
raise
except OSError as exc:
- if get_errno(exc) == errno.ENOMEM:
+ if exc.errno == errno.ENOMEM:
raise
logger.error('Error in timer: %r', exc, exc_info=1)
except Exception as exc:
@@ -186,7 +184,7 @@ class Hub(object):
def call_soon(self, callback, *args):
if not isinstance(callback, Thenable):
callback = promise(callback, args)
- self._ready.append(callback)
+ self._ready.add(callback)
return callback
def call_later(self, delay, callback, *args):
@@ -273,53 +271,59 @@ class Hub(object):
tick_callback()
while todo:
- item = todo.popleft()
+ item = todo.pop()
if item:
item()
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
- #print('[[[HUB]]]: %s' % (self.repr_active(), ))
+ # print('[[[HUB]]]: %s' % (self.repr_active(), ))
if readers or writers:
to_consolidate = []
try:
events = poll(poll_timeout)
- #print('[EVENTS]: %s' % (self.nepr_events(events or []), ))
+ # print('[EVENTS]: %s' % (self.repr_events(events), ))
except ValueError: # Issue 882
raise StopIteration()
- for fileno, event in events or ():
- if fileno in consolidate and \
- writers.get(fileno) is None:
- to_consolidate.append(fileno)
+ for fd, event in events or ():
+ if fd in consolidate and \
+ writers.get(fd) is None:
+ to_consolidate.append(fd)
continue
cb = cbargs = None
- try:
- if event & READ:
- cb, cbargs = readers[fileno]
- elif event & WRITE:
- cb, cbargs = writers[fileno]
- elif event & ERR:
- try:
- cb, cbargs = (readers.get(fileno) or
- writers.get(fileno))
- except TypeError:
- pass
- except (KeyError, Empty):
- hub_remove(fileno)
- continue
+
+ if event & READ:
+ try:
+ cb, cbargs = readers[fd]
+ except KeyError:
+ self.remove_reader(fd)
+ continue
+ elif event & WRITE:
+ try:
+ cb, cbargs = writers[fd]
+ except KeyError:
+ self.remove_writer(fd)
+ continue
+ elif event & ERR:
+ try:
+ cb, cbargs = (readers.get(fd) or
+ writers.get(fd))
+ except TypeError:
+ pass
+
if cb is None:
continue
if isinstance(cb, generator):
try:
next(cb)
except OSError as exc:
- if get_errno(exc) != errno.EBADF:
+ if exc.errno != errno.EBADF:
raise
- hub_remove(fileno)
+ hub_remove(fd)
except StopIteration:
pass
except Exception:
- hub_remove(fileno)
+ hub_remove(fd)
raise
else:
try:
@@ -339,7 +343,7 @@ class Hub(object):
def repr_events(self, events):
from .debug import repr_events
- return repr_events(self, events)
+ return repr_events(self, events or [])
@cached_property
def scheduler(self):
diff --git a/kombu/async/timer.py b/kombu/async/timer.py
index 00f54123..09b6cf76 100644
--- a/kombu/async/timer.py
+++ b/kombu/async/timer.py
@@ -19,7 +19,6 @@ from weakref import proxy as weakrefproxy
from kombu.five import monotonic
from kombu.log import get_logger
-from kombu.utils.compat import timedelta_seconds
try:
from pytz import utc
@@ -41,7 +40,7 @@ def to_timestamp(d, default_timezone=utc):
if isinstance(d, datetime):
if d.tzinfo is None:
d = d.replace(tzinfo=default_timezone)
- return timedelta_seconds(d - EPOCH)
+ return max((d - EPOCH).total_seconds(), 0)
return d
diff --git a/kombu/common.py b/kombu/common.py
index cac03314..15a63185 100644
--- a/kombu/common.py
+++ b/kombu/common.py
@@ -22,7 +22,6 @@ from amqp import RecoverableConnectionError
from .entity import Exchange, Queue
from .five import range
from .log import get_logger
-from .messaging import Consumer as _Consumer
from .serialization import registry as serializers
from .utils import uuid
@@ -91,33 +90,40 @@ def declaration_cached(entity, channel):
def maybe_declare(entity, channel=None, retry=False, **retry_policy):
- if not entity.is_bound:
- assert channel
- entity = entity.bind(channel)
+ is_bound = entity.is_bound
+
+ if channel is None:
+ assert is_bound
+ channel = entity.channel
+
+ declared = ident = None
+ if channel.connection and entity.can_cache_declaration:
+ declared = channel.connection.client.declared_entities
+ ident = hash(entity)
+ if ident in declared:
+ return False
+
+ entity = entity if is_bound else entity.bind(channel)
if retry:
- return _imaybe_declare(entity, **retry_policy)
- return _maybe_declare(entity)
+ return _imaybe_declare(entity, declared, ident,
+ channel, **retry_policy)
+ return _maybe_declare(entity, declared, ident, channel)
-def _maybe_declare(entity):
- channel = entity.channel
+def _maybe_declare(entity, declared, ident, channel):
+ channel = channel or entity.channel
if not channel.connection:
raise RecoverableConnectionError('channel disconnected')
- if entity.can_cache_declaration:
- declared = channel.connection.client.declared_entities
- ident = hash(entity)
- if ident not in declared:
- entity.declare()
- declared.add(ident)
- return True
- return False
entity.declare()
+ if declared is not None and ident:
+ declared.add(ident)
return True
-def _imaybe_declare(entity, **retry_policy):
+def _imaybe_declare(entity, declared, ident, channel, **retry_policy):
return entity.channel.connection.client.ensure(
- entity, _maybe_declare, **retry_policy)(entity)
+ entity, _maybe_declare, **retry_policy)(
+ entity, declared, ident, channel)
def drain_consumer(consumer, limit=1, timeout=None, callbacks=None):
@@ -138,8 +144,8 @@ def drain_consumer(consumer, limit=1, timeout=None, callbacks=None):
def itermessages(conn, channel, queue, limit=1, timeout=None,
- Consumer=_Consumer, callbacks=None, **kwargs):
- return drain_consumer(Consumer(channel, queues=[queue], **kwargs),
+ callbacks=None, **kwargs):
+ return drain_consumer(conn.Consumer(channel, queues=[queue], **kwargs),
limit=limit, timeout=timeout, callbacks=callbacks)
@@ -181,8 +187,6 @@ def eventloop(conn, limit=None, timeout=None, ignore_timeouts=False):
except socket.timeout:
if timeout and not ignore_timeouts: # pragma: no cover
raise
- except socket.error: # pragma: no cover
- pass
def send_reply(exchange, req, msg,
diff --git a/kombu/connection.py b/kombu/connection.py
index 873b422b..54512852 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -10,14 +10,10 @@ from __future__ import absolute_import
import os
import socket
+from collections import OrderedDict
from contextlib import contextmanager
-from functools import partial
from itertools import count, cycle
from operator import itemgetter
-try:
- from urllib.parse import quote
-except ImportError: # Py2
- from urllib import quote # noqa
# jython breaks on relative import for .exceptions for some reason
# (Issue #112)
@@ -26,9 +22,8 @@ from .five import Empty, range, string_t, text_t, LifoQueue as _LifoQueue
from .log import get_logger
from .transport import get_transport_cls, supports_librabbitmq
from .utils import cached_property, retry_over_time, shufflecycle, HashedSeq
-from .utils.compat import OrderedDict
from .utils.functional import lazy
-from .utils.url import parse_url, urlparse
+from .utils.url import as_url, parse_url, quote, urlparse
__all__ = ['Connection', 'ConnectionPool', 'ChannelPool']
@@ -199,6 +194,7 @@ class Connection(object):
"""Switch connection parameters to use a new URL (does not
reconnect)"""
self.close()
+ self.declared_entities.clear()
self._closed = False
self._init_params(**dict(self._initial_params, **parse_url(url)))
@@ -569,36 +565,23 @@ class Connection(object):
self.password, self.virtual_host, self.port,
repr(self.transport_options))
- def as_uri(self, include_password=False, mask=''):
+ def as_uri(self, include_password=False, mask='**',
+ getfields=itemgetter('port', 'userid', 'password',
+ 'virtual_host', 'transport')):
"""Convert connection parameters to URL form."""
hostname = self.hostname or 'localhost'
if self.transport.can_parse_url:
if self.uri_prefix:
return '%s+%s' % (self.uri_prefix, hostname)
return self.hostname
- quoteS = partial(quote, safe='') # strict quote
fields = self.info()
- port, userid, password, transport = itemgetter(
- 'port', 'userid', 'password', 'transport'
- )(fields)
- url = '%s://' % transport
- if userid or password:
- if userid:
- url += quoteS(userid)
- if password:
- if include_password:
- url += ':' + quoteS(password)
- else:
- url += ':' + mask if mask else ''
- url += '@'
- url += quoteS(fields['hostname'])
- if port:
- url += ':%s' % (port, )
-
- url += '/' + quote(fields['virtual_host'])
- if self.uri_prefix:
- return '%s+%s' % (self.uri_prefix, url)
- return url
+ port, userid, password, vhost, transport = getfields(fields)
+ scheme = ('{0}+{1}'.format(self.uri_prefix, transport)
+ if self.uri_prefix else transport)
+ return as_url(
+ scheme, hostname, port, userid, password, quote(vhost),
+ sanitize=not include_password, mask=mask,
+ )
def Pool(self, limit=None, preload=None):
"""Pool of connections.
diff --git a/kombu/entity.py b/kombu/entity.py
index fda53bef..53777c68 100644
--- a/kombu/entity.py
+++ b/kombu/entity.py
@@ -288,7 +288,7 @@ class Exchange(MaybeChannelBound):
@property
def can_cache_declaration(self):
- return self.durable and not self.auto_delete
+ return True
class binding(object):
@@ -672,7 +672,7 @@ class Queue(MaybeChannelBound):
@property
def can_cache_declaration(self):
- return self.durable and not self.auto_delete
+ return True
@classmethod
def from_dict(self, queue, **options):
diff --git a/kombu/five.py b/kombu/five.py
index 33c77fe5..d83015f3 100644
--- a/kombu/five.py
+++ b/kombu/five.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
- celery.five
+ kombu.five
~~~~~~~~~~~
Compatibility implementations of features
@@ -10,194 +10,5 @@
"""
from __future__ import absolute_import
-############## py3k #########################################################
-import sys
-PY3 = sys.version_info[0] == 3
-
-try:
- reload = reload # noqa
-except NameError: # pragma: no cover
- from imp import reload # noqa
-
-try:
- from collections import UserList # noqa
-except ImportError: # pragma: no cover
- from UserList import UserList # noqa
-
-try:
- from collections import UserDict # noqa
-except ImportError: # pragma: no cover
- from UserDict import UserDict # noqa
-
-try:
- bytes_t = bytes
-except NameError: # pragma: no cover
- bytes_t = str # noqa
-
-############## time.monotonic ################################################
-
-if sys.version_info < (3, 3):
-
- import platform
- SYSTEM = platform.system()
-
- if SYSTEM == 'Darwin':
- import ctypes
- from ctypes.util import find_library
- libSystem = ctypes.CDLL('libSystem.dylib')
- CoreServices = ctypes.CDLL(find_library('CoreServices'),
- use_errno=True)
- mach_absolute_time = libSystem.mach_absolute_time
- mach_absolute_time.restype = ctypes.c_uint64
- absolute_to_nanoseconds = CoreServices.AbsoluteToNanoseconds
- absolute_to_nanoseconds.restype = ctypes.c_uint64
- absolute_to_nanoseconds.argtypes = [ctypes.c_uint64]
-
- def _monotonic():
- return absolute_to_nanoseconds(mach_absolute_time()) * 1e-9
-
- elif SYSTEM == 'Linux':
- # from stackoverflow:
- # questions/1205722/how-do-i-get-monotonic-time-durations-in-python
- import ctypes
- import os
-
- CLOCK_MONOTONIC = 1 # see <linux/time.h>
-
- class timespec(ctypes.Structure):
- _fields_ = [
- ('tv_sec', ctypes.c_long),
- ('tv_nsec', ctypes.c_long),
- ]
-
- librt = ctypes.CDLL('librt.so.1', use_errno=True)
- clock_gettime = librt.clock_gettime
- clock_gettime.argtypes = [
- ctypes.c_int, ctypes.POINTER(timespec),
- ]
-
- def _monotonic(): # noqa
- t = timespec()
- if clock_gettime(CLOCK_MONOTONIC, ctypes.pointer(t)) != 0:
- errno_ = ctypes.get_errno()
- raise OSError(errno_, os.strerror(errno_))
- return t.tv_sec + t.tv_nsec * 1e-9
- else:
- from time import time as _monotonic
-try:
- from time import monotonic
-except ImportError:
- monotonic = _monotonic # noqa
-
-############## Py3 <-> Py2 ###################################################
-
-if PY3: # pragma: no cover
- import builtins
-
- from queue import Queue, Empty, Full, LifoQueue
- from itertools import zip_longest
- from io import StringIO, BytesIO
-
- map = map
- zip = zip
- string = str
- string_t = str
- long_t = int
- text_t = str
- range = range
- module_name_t = str
-
- open_fqdn = 'builtins.open'
-
- def items(d):
- return d.items()
-
- def keys(d):
- return d.keys()
-
- def values(d):
- return d.values()
-
- def nextfun(it):
- return it.__next__
-
- exec_ = getattr(builtins, 'exec')
-
- def reraise(tp, value, tb=None):
- if value.__traceback__ is not tb:
- raise value.with_traceback(tb)
- raise value
-
- class WhateverIO(StringIO):
-
- def write(self, data):
- if isinstance(data, bytes):
- data = data.encode()
- StringIO.write(self, data)
-
-else:
- import __builtin__ as builtins # noqa
- from Queue import Queue, Empty, Full, LifoQueue # noqa
- from itertools import ( # noqa
- imap as map,
- izip as zip,
- izip_longest as zip_longest,
- )
- try:
- from cStringIO import StringIO # noqa
- except ImportError: # pragma: no cover
- from StringIO import StringIO # noqa
-
- string = unicode # noqa
- string_t = basestring # noqa
- text_t = unicode
- long_t = long # noqa
- range = xrange
- module_name_t = str
-
- open_fqdn = '__builtin__.open'
-
- def items(d): # noqa
- return d.iteritems()
-
- def keys(d): # noqa
- return d.iterkeys()
-
- def values(d): # noqa
- return d.itervalues()
-
- def nextfun(it): # noqa
- return it.next
-
- def exec_(code, globs=None, locs=None): # pragma: no cover
- """Execute code in a namespace."""
- if globs is None:
- frame = sys._getframe(1)
- globs = frame.f_globals
- if locs is None:
- locs = frame.f_locals
- del frame
- elif locs is None:
- locs = globs
- exec("""exec code in globs, locs""")
-
- exec_("""def reraise(tp, value, tb=None): raise tp, value, tb""")
-
- BytesIO = WhateverIO = StringIO # noqa
-
-
-def with_metaclass(Type, skip_attrs=set(['__dict__', '__weakref__'])):
- """Class decorator to set metaclass.
-
- Works with both Python 3 and Python 3 and it does not add
- an extra class in the lookup order like ``six.with_metaclass`` does
- (that is -- it copies the original class instead of using inheritance).
-
- """
-
- def _clone_with_metaclass(Class):
- attrs = dict((key, value) for key, value in items(vars(Class))
- if key not in skip_attrs)
- return Type(Class.__name__, Class.__bases__, attrs)
-
- return _clone_with_metaclass
+from amqp.five import * # noqa
+from amqp.five import __all__ # noqa
diff --git a/kombu/messaging.py b/kombu/messaging.py
index 98d59d45..8b923950 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -11,6 +11,7 @@ import numbers
from itertools import count
+from .common import maybe_declare
from .compression import compress
from .connection import maybe_channel, is_connection
from .entity import Exchange, Queue, DELIVERY_MODES
@@ -107,7 +108,6 @@ class Producer(object):
"""Declare the exchange if it hasn't already been declared
during this session."""
if entity:
- from .common import maybe_declare
return maybe_declare(entity, self.channel, retry, **retry_policy)
def publish(self, body, routing_key=None, delivery_mode=None,
diff --git a/kombu/pidbox.py b/kombu/pidbox.py
index 5c70a382..4ffb32e1 100644
--- a/kombu/pidbox.py
+++ b/kombu/pidbox.py
@@ -22,7 +22,7 @@ from .common import maybe_declare, oid_from
from .exceptions import InconsistencyError
from .five import range
from .log import get_logger
-from .utils import cached_property, kwdict, uuid, reprcall
+from .utils import cached_property, uuid, reprcall
REPLY_QUEUE_EXPIRES = 10
@@ -102,7 +102,7 @@ class Node(object):
reprcall(method, (), kwargs=arguments), reply_to, ticket)
handle = reply_to and self.handle_call or self.handle_cast
try:
- reply = handle(method, kwdict(arguments))
+ reply = handle(method, arguments)
except SystemExit:
raise
except Exception as exc:
@@ -130,7 +130,7 @@ class Node(object):
if message:
self.adjust_clock(message.headers.get('clock') or 0)
if not destination or self.hostname in destination:
- return self.dispatch(**kwdict(body))
+ return self.dispatch(**body)
dispatch_from_message = handle_message
def reply(self, data, exchange, routing_key, ticket, **kwargs):
@@ -210,14 +210,16 @@ class Mailbox(object):
def get_reply_queue(self):
oid = self.oid
- return Queue('%s.%s' % (oid, self.reply_exchange.name),
- exchange=self.reply_exchange,
- routing_key=oid,
- durable=False,
- auto_delete=True,
- queue_arguments={
- 'x-expires': int(REPLY_QUEUE_EXPIRES * 1000),
- })
+ return Queue(
+ '%s.%s' % (oid, self.reply_exchange.name),
+ exchange=self.reply_exchange,
+ routing_key=oid,
+ durable=False,
+ auto_delete=True,
+ queue_arguments={
+ 'x-expires': int(REPLY_QUEUE_EXPIRES * 1000),
+ },
+ )
@cached_property
def reply_queue(self):
diff --git a/kombu/serialization.py b/kombu/serialization.py
index 47b8f91b..53b8cddf 100644
--- a/kombu/serialization.py
+++ b/kombu/serialization.py
@@ -19,11 +19,12 @@ except ImportError: # pragma: no cover
from collections import namedtuple
from contextlib import contextmanager
+from io import BytesIO
from .exceptions import (
ContentDisallowed, DecodeError, EncodeError, SerializerNotInstalled
)
-from .five import BytesIO, reraise, text_t
+from .five import reraise, text_t
from .utils import entrypoints
from .utils.encoding import str_to_bytes, bytes_t
@@ -307,14 +308,9 @@ def raw_encode(data):
def register_json():
"""Register a encoder/decoder for JSON serialization."""
- from anyjson import loads as json_loads, dumps as json_dumps
+ from kombu.utils import json as _json
- def _loads(obj):
- if isinstance(obj, bytes_t):
- obj = obj.decode()
- return json_loads(obj)
-
- registry.register('json', json_dumps, _loads,
+ registry.register('json', _json.dumps, _json.loads,
content_type='application/json',
content_encoding='utf-8')
@@ -451,5 +447,5 @@ for ep, args in entrypoints('kombu.serializers'): # pragma: no cover
def prepare_accept_content(l, name_to_type=registry.name_to_type):
if l is not None:
- return set(n if '/' in n else name_to_type[n] for n in l)
+ return {n if '/' in n else name_to_type[n] for n in l}
return l
diff --git a/kombu/syn.py b/kombu/syn.py
index 7f6e8099..01b4d479 100644
--- a/kombu/syn.py
+++ b/kombu/syn.py
@@ -21,7 +21,7 @@ def select_blocking_method(type):
def _detect_environment():
- ## -eventlet-
+ # ## -eventlet-
if 'eventlet' in sys.modules:
try:
from eventlet.patcher import is_monkey_patched as is_eventlet
@@ -32,7 +32,7 @@ def _detect_environment():
except ImportError:
pass
- # -gevent-
+ # ## -gevent-
if 'gevent' in sys.modules:
try:
from gevent import socket as _gsocket
diff --git a/kombu/tests/__init__.py b/kombu/tests/__init__.py
index fb9f21a2..e5ed4047 100644
--- a/kombu/tests/__init__.py
+++ b/kombu/tests/__init__.py
@@ -1,19 +1,11 @@
from __future__ import absolute_import
-import anyjson
import atexit
import os
import sys
from kombu.exceptions import VersionMismatch
-# avoid json implementation inconsistencies.
-try:
- import json # noqa
- anyjson.force_implementation('json')
-except ImportError:
- anyjson.force_implementation('simplejson')
-
def teardown():
# Workaround for multiprocessing bug where logging
diff --git a/kombu/tests/async/http/test_curl.py b/kombu/tests/async/http/test_curl.py
index 63fac040..d094c97a 100644
--- a/kombu/tests/async/http/test_curl.py
+++ b/kombu/tests/async/http/test_curl.py
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
-from kombu.async.http.curl import READ, WRITE, CurlClient, pycurl
+from kombu.async.http.curl import READ, WRITE, CurlClient
from kombu.tests.case import (
HubCase, Mock, call, patch, case_requires, set_module_symbol,
@@ -42,7 +42,7 @@ class test_CurlClient(HubCase):
])
def test_close(self):
- with patch('kombu.async.http.curl.pycurl') as _pycurl:
+ with patch('kombu.async.http.curl.pycurl'):
x = self.Client()
x._timeout_check_tref = Mock(name='timeout_check_tref')
x.close()
@@ -52,7 +52,7 @@ class test_CurlClient(HubCase):
x._multi.close.assert_called_with()
def test_add_request(self):
- with patch('kombu.async.http.curl.pycurl') as _pycurl:
+ with patch('kombu.async.http.curl.pycurl'):
x = self.Client()
x._process_queue = Mock(name='_process_queue')
x._set_timeout = Mock(name='_set_timeout')
@@ -132,13 +132,3 @@ class test_CurlClient(HubCase):
x._on_event.assert_called_with(fd, _pycurl.CSELECT_IN)
x.on_writable(fd, _pycurl=_pycurl)
x._on_event.assert_called_with(fd, _pycurl.CSELECT_OUT)
-
-
-
-
-
-
-
-
-
-
diff --git a/kombu/tests/case.py b/kombu/tests/case.py
index 7aed71f2..4f1d4e1f 100644
--- a/kombu/tests/case.py
+++ b/kombu/tests/case.py
@@ -7,12 +7,13 @@ import types
from contextlib import contextmanager
from functools import wraps
+from io import StringIO
import mock
from nose import SkipTest
-from kombu.five import builtins, string_t, StringIO
+from kombu.five import builtins, string_t
from kombu.utils.encoding import ensure_bytes
try:
@@ -251,10 +252,6 @@ def skip_if_not_module(module, import_errors=(ImportError, )):
return _wrap_test
-def skip_if_quick(fun):
- return skip_if_environ('QUICKTEST')(fun)
-
-
@contextmanager
def set_module_symbol(module, key, value):
module = importlib.import_module(module)
diff --git a/kombu/tests/mocks.py b/kombu/tests/mocks.py
index 836457eb..5af53b19 100644
--- a/kombu/tests/mocks.py
+++ b/kombu/tests/mocks.py
@@ -2,9 +2,8 @@ from __future__ import absolute_import
from itertools import count
-import anyjson
-
from kombu.transport import base
+from kombu.utils import json
class Message(base.Message):
@@ -104,7 +103,7 @@ class Channel(base.StdChannel):
def message_to_python(self, message, *args, **kwargs):
self._called('message_to_python')
- return Message(self, body=anyjson.dumps(message),
+ return Message(self, body=json.dumps(message),
delivery_tag=next(self.deliveries),
throw_decode_error=self.throw_decode_error,
content_type='application/json',
diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py
index 34406992..c4eebb71 100644
--- a/kombu/tests/test_common.py
+++ b/kombu/tests/test_common.py
@@ -105,6 +105,8 @@ class test_maybe_declare(Case):
def test_with_retry(self):
channel = Mock()
+ client = channel.connection.client = Mock()
+ client.declared_entities = set()
entity = Mock()
entity.can_cache_declaration = True
entity.is_bound = True
@@ -265,8 +267,8 @@ class test_itermessages(Case):
conn = self.MockConnection()
channel = Mock()
channel.connection.client = conn
- it = common.itermessages(conn, channel, 'q', limit=1,
- Consumer=MockConsumer)
+ conn.Consumer = MockConsumer
+ it = common.itermessages(conn, channel, 'q', limit=1)
ret = next(it)
self.assertTupleEqual(ret, ('body', 'message'))
@@ -279,8 +281,8 @@ class test_itermessages(Case):
conn.should_raise_timeout = True
channel = Mock()
channel.connection.client = conn
- it = common.itermessages(conn, channel, 'q', limit=1,
- Consumer=MockConsumer)
+ conn.Consumer = MockConsumer
+ it = common.itermessages(conn, channel, 'q', limit=1)
with self.assertRaises(StopIteration):
next(it)
@@ -291,8 +293,8 @@ class test_itermessages(Case):
deque_instance.popleft.side_effect = IndexError()
conn = self.MockConnection()
channel = Mock()
- it = common.itermessages(conn, channel, 'q', limit=1,
- Consumer=MockConsumer)
+ conn.Consumer = MockConsumer
+ it = common.itermessages(conn, channel, 'q', limit=1)
with self.assertRaises(StopIteration):
next(it)
diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py
index fdc78f02..0456bc27 100644
--- a/kombu/tests/test_connection.py
+++ b/kombu/tests/test_connection.py
@@ -17,7 +17,7 @@ class test_connection_utils(Case):
def setup(self):
self.url = 'amqp://user:pass@localhost:5672/my/vhost'
- self.nopass = 'amqp://user@localhost:5672/my/vhost'
+ self.nopass = 'amqp://user:**@localhost:5672/my/vhost'
self.expected = {
'transport': 'amqp',
'userid': 'user',
@@ -31,10 +31,6 @@ class test_connection_utils(Case):
result = parse_url(self.url)
self.assertDictEqual(result, self.expected)
- def test_parse_url_mongodb(self):
- result = parse_url('mongodb://example.com/')
- self.assertEqual(result['hostname'], 'example.com/')
-
def test_parse_generated_as_uri(self):
conn = Connection(self.url)
info = conn.info()
diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py
index 317614c6..729a8d3a 100644
--- a/kombu/tests/test_entities.py
+++ b/kombu/tests/test_entities.py
@@ -285,7 +285,7 @@ class test_Queue(Case):
def test_can_cache_declaration(self):
self.assertTrue(Queue('a', durable=True).can_cache_declaration)
- self.assertFalse(Queue('a', durable=False).can_cache_declaration)
+ self.assertTrue(Queue('a', durable=False).can_cache_declaration)
def test_eq(self):
q1 = Queue('xxx', Exchange('xxx', 'direct'), 'xxx')
diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py
index 4f6c8411..5de13fca 100644
--- a/kombu/tests/test_messaging.py
+++ b/kombu/tests/test_messaging.py
@@ -1,6 +1,5 @@
from __future__ import absolute_import, unicode_literals
-import anyjson
import pickle
from collections import defaultdict
@@ -8,6 +7,7 @@ from collections import defaultdict
from kombu import Connection, Consumer, Producer, Exchange, Queue
from kombu.exceptions import MessageStateError
from kombu.utils import ChannelPromise
+from kombu.utils import json
from .case import Case, Mock, patch
from .mocks import Transport
@@ -36,7 +36,7 @@ class test_Producer(Case):
p = Producer(None)
self.assertFalse(p._channel)
- @patch('kombu.common.maybe_declare')
+ @patch('kombu.messaging.maybe_declare')
def test_maybe_declare(self, maybe_declare):
p = self.connection.Producer()
q = Queue('foo')
@@ -73,7 +73,7 @@ class test_Producer(Case):
channel = self.connection.channel()
p = Producer(channel, self.exchange, serializer='json')
m, ctype, cencoding = p._prepare(message, headers={})
- self.assertDictEqual(message, anyjson.loads(m))
+ self.assertDictEqual(message, json.loads(m))
self.assertEqual(ctype, 'application/json')
self.assertEqual(cencoding, 'utf-8')
@@ -89,7 +89,7 @@ class test_Producer(Case):
self.assertEqual(headers['compression'], 'application/x-gzip')
import zlib
self.assertEqual(
- anyjson.loads(zlib.decompress(m).decode('utf-8')),
+ json.loads(zlib.decompress(m).decode('utf-8')),
message,
)
@@ -184,7 +184,7 @@ class test_Producer(Case):
self.assertIn('basic_publish', channel)
m, exc, rkey = ret
- self.assertDictEqual(message, anyjson.loads(m['body']))
+ self.assertDictEqual(message, json.loads(m['body']))
self.assertDictContainsSubset({'content_type': 'application/json',
'content_encoding': 'utf-8',
'priority': 0}, m)
@@ -230,7 +230,7 @@ class test_Consumer(Case):
b = Consumer(self.connection, accept=['json', 'pickle'])
self.assertSetEqual(
b.accept,
- set(['application/json', 'application/x-python-serialize']),
+ {'application/json', 'application/x-python-serialize'},
)
c = Consumer(self.connection, accept=b.accept)
self.assertSetEqual(b.accept, c.accept)
@@ -580,7 +580,7 @@ class test_Consumer(Case):
self.assertTrue(thrown)
m, exc = thrown[0]
- self.assertEqual(anyjson.loads(m), {'foo': 'bar'})
+ self.assertEqual(json.loads(m), {'foo': 'bar'})
self.assertIsInstance(exc, ValueError)
def test_recover(self):
diff --git a/kombu/tests/test_mixins.py b/kombu/tests/test_mixins.py
index 6f868b9e..27b270e9 100644
--- a/kombu/tests/test_mixins.py
+++ b/kombu/tests/test_mixins.py
@@ -90,7 +90,6 @@ class test_ConsumerMixin(Case):
def test_Consumer_context(self):
c, Acons, Bcons = self._context()
- _connref = _chanref = None
with c.Consumer() as (conn, channel, consumer):
self.assertIs(conn, c.connection)
@@ -104,7 +103,6 @@ class test_ConsumerMixin(Case):
self.assertIs(subcons.channel, conn.default_channel)
Acons.__enter__.assert_called_with()
Bcons.__enter__.assert_called_with()
- _connref, _chanref = conn, channel
c.on_consume_end.assert_called_with(conn, channel)
diff --git a/kombu/tests/test_serialization.py b/kombu/tests/test_serialization.py
index 23120711..c048f799 100644
--- a/kombu/tests/test_serialization.py
+++ b/kombu/tests/test_serialization.py
@@ -69,10 +69,6 @@ ApVGggcXVpY2sgYnJvd24gZm94IGp1bXBzIG92ZXIgdGggbGF6eSBkb2c=\
"""))
-def say(m):
- sys.stderr.write('%s\n' % (m, ))
-
-
registry.register('testS', lambda s: s, lambda s: 'decoded',
'application/testS', 'utf-8')
diff --git a/kombu/tests/test_syn.py b/kombu/tests/test_syn.py
index 551e5544..34e58035 100644
--- a/kombu/tests/test_syn.py
+++ b/kombu/tests/test_syn.py
@@ -38,9 +38,12 @@ class test_syn(Case):
def test_detect_environment_gevent(self):
with patch('gevent.socket', create=True) as m:
prev, socket.socket = socket.socket, m.socket
- self.assertTrue(sys.modules['gevent'])
- env = syn._detect_environment()
- self.assertEqual(env, 'gevent')
+ try:
+ self.assertTrue(sys.modules['gevent'])
+ env = syn._detect_environment()
+ self.assertEqual(env, 'gevent')
+ finally:
+ socket.socket = prev
def test_detect_environment_no_eventlet_or_gevent(self):
try:
diff --git a/kombu/tests/transport/test_base.py b/kombu/tests/transport/test_base.py
index ac90e51f..13ebd85d 100644
--- a/kombu/tests/transport/test_base.py
+++ b/kombu/tests/transport/test_base.py
@@ -52,7 +52,7 @@ class test_Message(Case):
m.ack()
def test_ack_respects_no_ack_consumers(self):
- self.channel.no_ack_consumers = set(['abc'])
+ self.channel.no_ack_consumers = {'abc'}
self.message.delivery_info['consumer_tag'] = 'abc'
ack = self.channel.basic_ack = Mock()
@@ -61,7 +61,7 @@ class test_Message(Case):
self.assertFalse(ack.called)
def test_ack_missing_consumer_tag(self):
- self.channel.no_ack_consumers = set(['abc'])
+ self.channel.no_ack_consumers = {'abc'}
self.message.delivery_info = {}
ack = self.channel.basic_ack = Mock()
diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py
index 285f6004..6b09910b 100644
--- a/kombu/tests/transport/test_redis.py
+++ b/kombu/tests/transport/test_redis.py
@@ -3,7 +3,6 @@ from __future__ import absolute_import
import socket
import types
-from anyjson import dumps, loads
from collections import defaultdict
from itertools import count
@@ -12,6 +11,7 @@ from kombu.exceptions import InconsistencyError, VersionMismatch
from kombu.five import Empty, Queue as _Queue
from kombu.transport import virtual
from kombu.utils import eventio # patch poll
+from kombu.utils.json import dumps, loads
from kombu.tests.case import (
Case, Mock, call, module_exists, skip_if_not_module, patch,
@@ -220,6 +220,7 @@ class Transport(redis.Transport):
class test_Channel(Case):
+ @skip_if_not_module('redis')
def setup(self):
self.connection = self.create_connection()
self.channel = self.connection.default_channel
@@ -616,10 +617,12 @@ class test_Channel(Case):
self.channel.connection.client.virtual_host = 'dwqeq'
self.channel._connparams()
+ @skip_if_not_module('redis')
def test_connparams_allows_slash_in_db(self):
self.channel.connection.client.virtual_host = '/123'
self.assertEqual(self.channel._connparams()['db'], 123)
+ @skip_if_not_module('redis')
def test_connparams_db_can_be_int(self):
self.channel.connection.client.virtual_host = 124
self.assertEqual(self.channel._connparams()['db'], 124)
@@ -630,6 +633,7 @@ class test_Channel(Case):
redis.Channel._new_queue(self.channel, 'elaine', auto_delete=True)
self.assertIn('elaine', self.channel.auto_delete_queues)
+ @skip_if_not_module('redis')
def test_connparams_regular_hostname(self):
self.channel.connection.client.hostname = 'george.vandelay.com'
self.assertEqual(
@@ -785,6 +789,7 @@ class test_Channel(Case):
class test_Redis(Case):
+ @skip_if_not_module('redis')
def setup(self):
self.connection = Connection(transport=Transport)
self.exchange = Exchange('test_Redis', type='direct')
@@ -941,6 +946,7 @@ def _redis_modules():
class test_MultiChannelPoller(Case):
+ @skip_if_not_module('redis')
def setup(self):
self.Poller = redis.MultiChannelPoller
diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py
index df17d918..33764341 100644
--- a/kombu/tests/transport/virtual/test_base.py
+++ b/kombu/tests/transport/virtual/test_base.py
@@ -1,5 +1,6 @@
from __future__ import absolute_import
+import sys
import warnings
from kombu import Connection
@@ -10,6 +11,9 @@ from kombu.compression import compress
from kombu.tests.case import Case, Mock, patch, redirect_stdouts
+PY3 = sys.version_info[0] == 3
+PRINT_FQDN = 'builtins.print' if PY3 else '__builtin__.print'
+
def client(**kwargs):
return Connection(transport='kombu.transport.virtual:Transport', **kwargs)
@@ -267,8 +271,8 @@ class test_Channel(Case):
c.exchange_declare(n)
c.queue_declare(n)
c.queue_bind(n, n, n)
- c.queue_bind(n, n, n) # tests code path that returns
- # if queue already bound.
+ # tests code path that returns if queue already bound.
+ c.queue_bind(n, n, n)
c.queue_delete(n, if_empty=True)
self.assertIn(n, c.state.bindings)
@@ -393,8 +397,8 @@ class test_Channel(Case):
self.assertFalse(q._delivered)
@patch('kombu.transport.virtual.emergency_dump_state')
- @patch('kombu.transport.virtual.say')
- def test_restore_unacked_once_when_unrestored(self, say,
+ @patch(PRINT_FQDN)
+ def test_restore_unacked_once_when_unrestored(self, print_,
emergency_dump_state):
q = self.channel.qos
q._flush = Mock()
@@ -413,7 +417,7 @@ class test_Channel(Case):
self.channel.do_restore = True
q.restore_unacked_once()
- self.assertTrue(say.called)
+ self.assertTrue(print_.called)
self.assertTrue(emergency_dump_state.called)
def test_basic_recover(self):
@@ -515,6 +519,37 @@ class test_Channel(Case):
with self.assertRaises(ChannelError):
self.channel.queue_declare(queue='21wisdjwqe', passive=True)
+ def test_get_message_priority(self):
+
+ def _message(priority):
+ return self.channel.prepare_message(
+ 'the message with priority', priority=priority,
+ )
+
+ self.assertEqual(
+ self.channel._get_message_priority(_message(5)), 5,
+ )
+ self.assertEqual(
+ self.channel._get_message_priority(
+ _message(self.channel.min_priority - 10),
+ ),
+ self.channel.min_priority,
+ )
+ self.assertEqual(
+ self.channel._get_message_priority(
+ _message(self.channel.max_priority + 10),
+ ),
+ self.channel.max_priority,
+ )
+ self.assertEqual(
+ self.channel._get_message_priority(_message('foobar')),
+ self.channel.default_priority,
+ )
+ self.assertEqual(
+ self.channel._get_message_priority(_message(2), reverse=True),
+ self.channel.max_priority - 2,
+ )
+
class test_Transport(Case):
diff --git a/kombu/tests/utils/test_utils.py b/kombu/tests/utils/test_utils.py
index e0077b41..f55067ab 100644
--- a/kombu/tests/utils/test_utils.py
+++ b/kombu/tests/utils/test_utils.py
@@ -5,11 +5,7 @@ import pickle
import sys
from functools import wraps
-
-if sys.version_info >= (3, 0):
- from io import StringIO, BytesIO
-else:
- from StringIO import StringIO, StringIO as BytesIO # noqa
+from io import StringIO, BytesIO
from kombu import version_info_t
from kombu import utils
@@ -102,18 +98,6 @@ class test_UUID(Case):
sys.modules['celery.utils'] = old_utils
-class test_Misc(Case):
-
- def test_kwdict(self):
-
- def f(**kwargs):
- return kwargs
-
- kw = {'foo': 'foo',
- 'bar': 'bar'}
- self.assertTrue(f(**utils.kwdict(kw)))
-
-
class MyStringIO(StringIO):
def close(self):
@@ -369,7 +353,7 @@ class test_shufflecycle(Case):
prev_repeat, utils.repeat = utils.repeat, Mock()
try:
utils.repeat.return_value = list(range(10))
- values = set(['A', 'B', 'C'])
+ values = {'A', 'B', 'C'}
cycle = utils.shufflecycle(values)
seen = set()
for i in range(10):
diff --git a/kombu/transport/SLMQ.py b/kombu/transport/SLMQ.py
index 449bc2fb..3ea573af 100644
--- a/kombu/transport/SLMQ.py
+++ b/kombu/transport/SLMQ.py
@@ -10,13 +10,12 @@ from __future__ import absolute_import
import socket
import string
-from anyjson import loads, dumps
-
import os
from kombu.five import Empty, text_t
from kombu.utils import cached_property # , uuid
from kombu.utils.encoding import bytes_to_str, safe_str
+from kombu.utils.json import loads, dumps
from . import virtual
@@ -27,8 +26,9 @@ except ImportError: # pragma: no cover
get_client = ResponseError = None # noqa
# dots are replaced by dash, all other punctuation replaced by underscore.
-CHARS_REPLACE_TABLE = dict(
- (ord(c), 0x5f) for c in string.punctuation if c not in '_')
+CHARS_REPLACE_TABLE = {
+ ord(c): 0x5f for c in string.punctuation if c not in '_'
+}
class Channel(virtual.Channel):
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index c4e2200f..dc1243b9 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -44,7 +44,6 @@ import socket
import string
from amqp.promise import transform, ensure_promise, promise
-from anyjson import loads, dumps
from kombu.async import get_event_loop
from kombu.async.aws import sqs as _asynsqs
@@ -56,6 +55,7 @@ from kombu.five import Empty, range, string_t, text_t
from kombu.log import get_logger
from kombu.utils import cached_property
from kombu.utils.encoding import bytes_to_str, safe_str
+from kombu.utils.json import loads, dumps
from kombu.transport.virtual import scheduling
from . import virtual
@@ -64,10 +64,14 @@ logger = get_logger(__name__)
# dots are replaced by dash, all other punctuation
# replaced by underscore.
-CHARS_REPLACE_TABLE = dict((ord(c), 0x5f)
- for c in string.punctuation if c not in '-_.')
+CHARS_REPLACE_TABLE = {
+ ord(c): 0x5f for c in string.punctuation if c not in '-_.'
+}
CHARS_REPLACE_TABLE[0x2e] = 0x2d # '.' -> '-'
+#: SQS bulk get supports a maximum of 10 messages at a time.
+SQS_MAX_MESSAGES = 10
+
def maybe_int(x):
try:
@@ -75,9 +79,6 @@ def maybe_int(x):
except ValueError:
return x
-#: SQS bulk get supports a maximum of 10 messages at a time.
-SQS_MAX_MESSAGES = 10
-
class Channel(virtual.Channel):
default_region = 'us-east-1'
diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py
index 08c088a4..feedee74 100644
--- a/kombu/transport/amqplib.py
+++ b/kombu/transport/amqplib.py
@@ -17,11 +17,26 @@ except ImportError:
pass
from struct import unpack
-from amqplib import client_0_8 as amqp
-from amqplib.client_0_8 import transport
-from amqplib.client_0_8.channel import Channel as _Channel
-from amqplib.client_0_8.exceptions import AMQPConnectionException
-from amqplib.client_0_8.exceptions import AMQPChannelException
+try:
+ from amqplib import client_0_8 as amqp
+ from amqplib.client_0_8 import transport
+ from amqplib.client_0_8.channel import Channel as _Channel
+ from amqplib.client_0_8.exceptions import AMQPConnectionException
+ from amqplib.client_0_8.exceptions import AMQPChannelException
+except ImportError: # pragma: no cover
+
+ class NA(object):
+ pass
+
+ class NAx(object):
+ pass
+ amqp = NA
+ amqp.Connection = NA
+ transport = _Channel = NA # noqa
+ # Sphinx crashes if this is NA, must be different class
+ transport.TCPTransport = transport.SSLTransport = NAx
+ AMQPConnectionException = AMQPChannelException = NA # noqa
+
from kombu.five import items
from kombu.utils.encoding import str_to_bytes
@@ -325,6 +340,9 @@ class Transport(base.Transport):
self.client = client
self.default_port = kwargs.get('default_port') or self.default_port
+ if amqp is NA:
+ raise ImportError('Missing amqplib library (pip install amqplib)')
+
def create_channel(self, connection):
return connection.channel()
diff --git a/kombu/transport/base.py b/kombu/transport/base.py
index 699a70f0..8e2948ba 100644
--- a/kombu/transport/base.py
+++ b/kombu/transport/base.py
@@ -13,7 +13,6 @@ import socket
from kombu.exceptions import ChannelError, ConnectionError
from kombu.message import Message
from kombu.utils import cached_property
-from kombu.utils.compat import get_errno
__all__ = ['Message', 'StdChannel', 'Management', 'Transport']
@@ -150,8 +149,7 @@ class Transport(object):
return True
def _make_reader(self, connection, timeout=socket.timeout,
- error=socket.error, get_errno=get_errno,
- _unavail=(errno.EAGAIN, errno.EINTR)):
+ error=socket.error, _unavail=(errno.EAGAIN, errno.EINTR)):
drain_events = connection.drain_events
def _read(loop):
@@ -162,7 +160,7 @@ class Transport(object):
except timeout:
return
except error as exc:
- if get_errno(exc) in _unavail:
+ if exc.errno in _unavail:
return
raise
loop.call_soon(_read, loop)
diff --git a/kombu/transport/beanstalk.py b/kombu/transport/beanstalk.py
index 9dff8b49..fd12575d 100644
--- a/kombu/transport/beanstalk.py
+++ b/kombu/transport/beanstalk.py
@@ -10,16 +10,19 @@ Beanstalk transport.
"""
from __future__ import absolute_import
-import beanstalkc
import socket
-from anyjson import loads, dumps
-
from kombu.five import Empty
from kombu.utils.encoding import bytes_to_str
+from kombu.utils.json import loads, dumps
from . import virtual
+try:
+ import beanstalkc
+except ImportError: # pragma: no cover
+ beanstalkc = None # noqa
+
DEFAULT_PORT = 11300
__author__ = 'David Ziegler <david.ziegler@gmail.com>'
@@ -44,7 +47,7 @@ class Channel(virtual.Channel):
def _put(self, queue, message, **kwargs):
extra = {}
- priority = message['properties']['delivery_info']['priority']
+ priority = self._get_message_priority(message)
ttr = message['properties'].get('ttr')
if ttr is not None:
extra['ttr'] = ttr
@@ -127,16 +130,25 @@ class Transport(virtual.Transport):
default_port = DEFAULT_PORT
connection_errors = (
virtual.Transport.connection_errors + (
- socket.error, beanstalkc.SocketError, IOError)
+ socket.error, IOError,
+ getattr(beanstalkc, 'SocketError', None),
+ )
)
channel_errors = (
virtual.Transport.channel_errors + (
socket.error, IOError,
- beanstalkc.SocketError,
- beanstalkc.BeanstalkcException)
+ getattr(beanstalkc, 'SocketError', None),
+ getattr(beanstalkc, 'BeanstalkcException', None),
+ )
)
driver_type = 'beanstalk'
driver_name = 'beanstalkc'
+ def __init__(self, *args, **kwargs):
+ if beanstalkc is None:
+ raise ImportError(
+ 'Missing beanstalkc library (pip install beanstalkc)')
+ super(Transport, self).__init__(*args, **kwargs)
+
def driver_version(self):
return beanstalkc.__version__
diff --git a/kombu/transport/couchdb.py b/kombu/transport/couchdb.py
index 009dbbdc..c7811ab9 100644
--- a/kombu/transport/couchdb.py
+++ b/kombu/transport/couchdb.py
@@ -11,16 +11,19 @@ CouchDB transport.
from __future__ import absolute_import
import socket
-import couchdb
-
-from anyjson import loads, dumps
from kombu.five import Empty
from kombu.utils import uuid4
from kombu.utils.encoding import bytes_to_str
+from kombu.utils.json import loads, dumps
from . import virtual
+try:
+ import couchdb
+except ImportError: # pragma: no cover
+ couchdb = None # noqa
+
DEFAULT_PORT = 5984
DEFAULT_DATABASE = 'kombu_default'
@@ -80,7 +83,9 @@ class Channel(virtual.Channel):
port))
# Use username and password if avaliable
try:
- server.resource.credentials = (conninfo.userid, conninfo.password)
+ if conninfo.userid:
+ server.resource.credentials = (conninfo.userid,
+ conninfo.password)
except AttributeError:
pass
try:
@@ -110,20 +115,27 @@ class Transport(virtual.Transport):
connection_errors = (
virtual.Transport.connection_errors + (
socket.error,
- couchdb.HTTPError,
- couchdb.ServerError,
- couchdb.Unauthorized)
+ getattr(couchdb, 'HTTPError', None),
+ getattr(couchdb, 'ServerError', None),
+ getattr(couchdb, 'Unauthorized', None),
+ )
)
channel_errors = (
virtual.Transport.channel_errors + (
- couchdb.HTTPError,
- couchdb.ServerError,
- couchdb.PreconditionFailed,
- couchdb.ResourceConflict,
- couchdb.ResourceNotFound)
+ getattr(couchdb, 'HTTPError', None),
+ getattr(couchdb, 'ServerError', None),
+ getattr(couchdb, 'PreconditionFailed', None),
+ getattr(couchdb, 'ResourceConflict', None),
+ getattr(couchdb, 'ResourceNotFound', None),
+ )
)
driver_type = 'couchdb'
driver_name = 'couchdb'
+ def __init__(self, *args, **kwargs):
+ if couchdb is None:
+ raise ImportError('Missing couchdb library (pip install couchdb)')
+ super(Transport, self).__init__(*args, **kwargs)
+
def driver_version(self):
return couchdb.__version__
diff --git a/kombu/transport/django/__init__.py b/kombu/transport/django/__init__.py
index 67bfa576..9dde51a2 100644
--- a/kombu/transport/django/__init__.py
+++ b/kombu/transport/django/__init__.py
@@ -1,14 +1,14 @@
"""Kombu transport using the Django database as a message store."""
from __future__ import absolute_import
-from anyjson import loads, dumps
-
from django.conf import settings
from django.core import exceptions as errors
from kombu.five import Empty
from kombu.transport import virtual
from kombu.utils.encoding import bytes_to_str
+from kombu.utils.json import loads, dumps
+
from .models import Queue
@@ -35,7 +35,6 @@ class Channel(virtual.Channel):
super(Channel, self).basic_consume(queue, *args, **kwargs)
def _get(self, queue):
- #self.refresh_connection()
m = Queue.objects.fetch(queue)
if m:
return loads(bytes_to_str(m))
diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py
index c83dcdc3..92917ab4 100644
--- a/kombu/transport/filesystem.py
+++ b/kombu/transport/filesystem.py
@@ -7,8 +7,6 @@ Transport using the file system as the message store.
"""
from __future__ import absolute_import
-from anyjson import loads, dumps
-
import os
import shutil
import uuid
@@ -19,6 +17,8 @@ from kombu.exceptions import ChannelError
from kombu.five import Empty, monotonic
from kombu.utils import cached_property
from kombu.utils.encoding import bytes_to_str, str_to_bytes
+from kombu.utils.json import loads, dumps
+
VERSION = (1, 0, 0)
__version__ = '.'.join(map(str, VERSION))
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index eeef8756..34b0cab0 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -13,12 +13,12 @@ from __future__ import absolute_import
import pymongo
from pymongo import errors
-from anyjson import loads, dumps
from pymongo import MongoClient, uri_parser
from kombu.five import Empty
from kombu.syn import _detect_environment
from kombu.utils.encoding import bytes_to_str
+from kombu.utils.json import loads, dumps
from . import virtual
@@ -99,7 +99,8 @@ class Channel(virtual.Channel):
else:
msg = self.get_messages().find_and_modify(
query={'queue': queue},
- sort={'_id': pymongo.ASCENDING},
+ sort=[('priority', pymongo.ASCENDING),
+ ('_id', pymongo.ASCENDING)],
remove=True,
)
@@ -115,8 +116,11 @@ class Channel(virtual.Channel):
return self.get_messages().find({'queue': queue}).count()
def _put(self, queue, message, **kwargs):
- self.get_messages().insert({'payload': dumps(message),
- 'queue': queue})
+ self.get_messages().insert({
+ 'payload': dumps(message),
+ 'queue': queue,
+ 'priority': self._get_message_priority(message, reverse=True),
+ })
def _purge(self, queue):
size = self._size(queue)
@@ -202,14 +206,15 @@ class Channel(virtual.Channel):
def _ensure_indexes(self):
'''Ensure indexes on collections.'''
self.get_messages().ensure_index(
- [('queue', 1), ('_id', 1)], background=True,
+ [('queue', 1), ('priority', 1), ('_id', 1)], background=True,
)
self.get_broadcast().ensure_index([('queue', 1)])
self.get_routing().ensure_index([('queue', 1), ('exchange', 1)])
- #TODO Store a more complete exchange metatable in the routing collection
def get_table(self, exchange):
"""Get table of bindings for ``exchange``."""
+ # TODO Store a more complete exchange metatable in the
+ # routing collection
localRoutes = frozenset(self.state.exchanges[exchange]['table'])
brokerRoutes = self.get_messages().routing.find(
{'exchange': exchange}
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 9753c19e..e07a0da9 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -16,7 +16,6 @@ from contextlib import contextmanager
from time import time
from amqp import promise
-from anyjson import loads, dumps
from kombu.exceptions import InconsistencyError, VersionMismatch
from kombu.five import Empty, values, string_t
@@ -24,6 +23,7 @@ from kombu.log import get_logger
from kombu.utils import cached_property, uuid
from kombu.utils.eventio import poll, READ, ERR
from kombu.utils.encoding import bytes_to_str
+from kombu.utils.json import loads, dumps
from kombu.utils.url import _parse_url
NO_ROUTE_ERROR = """
@@ -475,6 +475,8 @@ class Channel(virtual.Channel):
crit('Could not restore message: %r', payload, exc_info=True)
def _restore(self, message, leftmost=False):
+ if not self.ack_emulation:
+ return super(Channel, self)._restore(message)
tag = message.delivery_tag
with self.conn_or_acquire() as client:
P, _ = client.pipeline() \
@@ -661,11 +663,8 @@ class Channel(virtual.Channel):
def _put(self, queue, message, **kwargs):
"""Deliver message."""
- try:
- pri = max(min(int(
- message['properties']['delivery_info']['priority']), 9), 0)
- except (TypeError, ValueError, KeyError):
- pri = 0
+ pri = self._get_message_priority(message)
+
with self.conn_or_acquire() as client:
client.lpush(self._q_for_pri(queue, pri), dumps(message))
@@ -896,8 +895,8 @@ class Channel(virtual.Channel):
@property
def active_queues(self):
"""Set of queues being consumed from (excluding fanout queues)."""
- return set(queue for queue in self._active_queues
- if queue not in self.active_fanout_queues)
+ return {queue for queue in self._active_queues
+ if queue not in self.active_fanout_queues}
class Transport(virtual.Transport):
@@ -914,6 +913,8 @@ class Transport(virtual.Transport):
)
def __init__(self, *args, **kwargs):
+ if redis is None:
+ raise ImportError('Missing redis library (pip install redis)')
super(Transport, self).__init__(*args, **kwargs)
# Get redis-py exceptions.
diff --git a/kombu/transport/sqlalchemy/__init__.py b/kombu/transport/sqlalchemy/__init__.py
index c085b469..27c6e65d 100644
--- a/kombu/transport/sqlalchemy/__init__.py
+++ b/kombu/transport/sqlalchemy/__init__.py
@@ -4,7 +4,6 @@
from __future__ import absolute_import
-from anyjson import loads, dumps
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import sessionmaker
@@ -13,6 +12,7 @@ from kombu.five import Empty
from kombu.transport import virtual
from kombu.utils import cached_property
from kombu.utils.encoding import bytes_to_str
+from kombu.utils.json import loads, dumps
from .models import (ModelBase, Queue as QueueBase, Message as MessageBase,
class_registry, metadata)
@@ -153,6 +153,7 @@ class Transport(virtual.Transport):
default_port = 0
driver_type = 'sql'
driver_name = 'sqlalchemy'
+ connection_errors = (OperationalError, )
def driver_version(self):
import sqlalchemy
diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py
index 502f8c9f..1b0dd471 100644
--- a/kombu/transport/virtual/__init__.py
+++ b/kombu/transport/virtual/__init__.py
@@ -7,7 +7,7 @@ Virtual transport implementation.
Emulates the AMQ API for non-AMQ transports.
"""
-from __future__ import absolute_import, unicode_literals
+from __future__ import absolute_import, print_function, unicode_literals
import base64
import socket
@@ -15,6 +15,7 @@ import sys
import warnings
from array import array
+from collections import OrderedDict
from itertools import count
from multiprocessing.util import Finalize
from time import sleep
@@ -23,8 +24,7 @@ from amqp.protocol import queue_declare_ok_t
from kombu.exceptions import ResourceError, ChannelError
from kombu.five import Empty, items, monotonic
-from kombu.utils import emergency_dump_state, kwdict, say, uuid
-from kombu.utils.compat import OrderedDict
+from kombu.utils import emergency_dump_state, uuid
from kombu.utils.encoding import str_to_bytes, bytes_to_str
from kombu.transport import base
@@ -44,6 +44,9 @@ Cannot redeclare exchange {0!r} in vhost {1!r} with \
different type, durable, autodelete or arguments value.\
"""
+RESTORING_FMT = 'Restoring {0!r} unacknowledged message(s)'
+RESTORE_PANIC_FMT = 'UNABLE TO RESTORE {0} MESSAGES: {1}'
+
class Base64(object):
@@ -196,7 +199,7 @@ class QoS(object):
delivered.clear()
return errors
- def restore_unacked_once(self):
+ def restore_unacked_once(self, stderr=None):
"""Restores all unacknowledged messages at shutdown/gc collect.
Will only be done once for each instance.
@@ -204,6 +207,7 @@ class QoS(object):
"""
self._on_collect.cancel()
self._flush()
+ stderr = sys.stderr if stderr is None else stderr
state = self._delivered
if not self.restore_at_shutdown or not self.channel.do_restore:
@@ -213,15 +217,15 @@ class QoS(object):
return
try:
if state:
- say('Restoring {0!r} unacknowledged message(s).',
- len(self._delivered))
+ print(RESTORING_FMT.format(len(self._delivered)),
+ file=stderr)
unrestored = self.restore_unacked()
if unrestored:
errors, messages = list(zip(*unrestored))
- say('UNABLE TO RESTORE {0} MESSAGES: {1}',
- len(errors), errors)
- emergency_dump_state(messages)
+ print(RESTORE_PANIC_FMT.format(len(errors), errors),
+ file=stderr)
+ emergency_dump_state(messages, stderr=stderr)
finally:
state.restored = True
@@ -253,7 +257,7 @@ class Message(base.Message):
'delivery_info': properties.get('delivery_info'),
'postencode': 'utf-8',
})
- super(Message, self).__init__(channel, **kwdict(kwargs))
+ super(Message, self).__init__(channel, **kwargs)
def serializable(self):
props = self.properties
@@ -367,6 +371,11 @@ class Channel(AbstractChannel, base.StdChannel):
# List of options to transfer from :attr:`transport_options`.
from_transport_options = ('body_encoding', 'deadletter_queue')
+ # Priority defaults
+ default_priority = 0
+ min_priority = 0
+ max_priority = 9
+
def __init__(self, connection, **kwargs):
self.connection = connection
self._consumers = set()
@@ -520,7 +529,7 @@ class Channel(AbstractChannel, base.StdChannel):
return self.typeof(exchange).deliver(
message, exchange, routing_key, **kwargs
)
- # anon exchange: routing_key is the destintaion queue
+ # anon exchange: routing_key is the destination queue
return self._put(routing_key, message, **kwargs)
def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
@@ -653,7 +662,7 @@ class Channel(AbstractChannel, base.StdChannel):
"""Prepare message data."""
properties = properties or {}
info = properties.setdefault('delivery_info', {})
- info['priority'] = priority or 0
+ info['priority'] = priority or self.default_priority
return {'body': body,
'content-encoding': content_encoding,
@@ -723,6 +732,24 @@ class Channel(AbstractChannel, base.StdChannel):
self._reset_cycle()
return self._cycle
+ def _get_message_priority(self, message, reverse=False):
+ """Get priority from message and limit the value within a
+ boundary of 0 to 9.
+
+ Higher value has more priority.
+
+ """
+ try:
+ priority = max(
+ min(int(message['properties']['delivery_info']['priority']),
+ self.max_priority),
+ self.min_priority,
+ )
+ except (TypeError, ValueError, KeyError):
+ priority = self.default_priority
+
+ return (self.max_priority - priority) if reverse else priority
+
class Management(base.Management):
diff --git a/kombu/transport/zookeeper.py b/kombu/transport/zookeeper.py
index 2d1c8abc..ab6d72f8 100644
--- a/kombu/transport/zookeeper.py
+++ b/kombu/transport/zookeeper.py
@@ -30,15 +30,12 @@ from __future__ import absolute_import
import os
import socket
-from anyjson import loads, dumps
-
from kombu.five import Empty
from kombu.utils.encoding import bytes_to_str
+from kombu.utils.json import loads, dumps
from . import virtual
-MAX_PRIORITY = 9
-
try:
import kazoo
from kazoo.client import KazooClient
@@ -103,13 +100,10 @@ class Channel(virtual.Channel):
return queue
def _put(self, queue, message, **kwargs):
- try:
- priority = message['properties']['delivery_info']['priority']
- except KeyError:
- priority = 0
-
- queue = self._get_queue(queue)
- queue.put(dumps(message), priority=(MAX_PRIORITY - priority))
+ return self._get_queue(queue).put(
+ dumps(message),
+ priority=self._get_message_priority(message, reverse=True),
+ )
def _get(self, queue):
queue = self._get_queue(queue)
diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py
index 5d5e7bfa..39137894 100644
--- a/kombu/utils/__init__.py
+++ b/kombu/utils/__init__.py
@@ -5,7 +5,7 @@ kombu.utils
Internal utilities.
"""
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
import importlib
import numbers
@@ -35,7 +35,7 @@ except ImportError: # pragma: no cover
FILENO_ERRORS = (AttributeError, ValueError) # noqa
-__all__ = ['EqualityDict', 'say', 'uuid', 'kwdict', 'maybe_list',
+__all__ = ['EqualityDict', 'uuid', 'maybe_list',
'fxrange', 'fxrangemax', 'retry_over_time',
'emergency_dump_state', 'cached_property',
'reprkwargs', 'reprcall', 'nested', 'fileno', 'maybe_fileno']
@@ -136,10 +136,6 @@ class EqualityDict(dict):
return dict.__delitem__(self, eqhash(key))
-def say(m, *fargs, **fkwargs):
- print(str(m).format(*fargs, **fkwargs), file=sys.stderr)
-
-
def uuid4():
# Workaround for http://bugs.python.org/issue4607
if ctypes and _uuid_generate_random: # pragma: no cover
@@ -159,22 +155,6 @@ def uuid():
gen_unique_id = uuid
-if sys.version_info >= (2, 6, 5):
-
- def kwdict(kwargs):
- return kwargs
-else:
- def kwdict(kwargs): # pragma: no cover # noqa
- """Make sure keyword arguments are not in Unicode.
-
- This should be fixed in newer Python versions,
- see: http://bugs.python.org/issue4978.
-
- """
- return dict((key.encode('utf-8'), value)
- for key, value in items(kwargs))
-
-
def maybe_list(v):
if v is None:
return []
@@ -257,21 +237,26 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None,
sleep(abs(int(tts) - tts))
-def emergency_dump_state(state, open_file=open, dump=None):
+def emergency_dump_state(state, open_file=open, dump=None, stderr=None):
from pprint import pformat
from tempfile import mktemp
+ stderr = sys.stderr if stderr is None else stderr
if dump is None:
import pickle
dump = pickle.dump
persist = mktemp()
- say('EMERGENCY DUMP STATE TO FILE -> {0} <-', persist)
+ print('EMERGENCY DUMP STATE TO FILE -> {0} <-'.format(persist), ## noqa
+ file=stderr)
fh = open_file(persist, 'w')
try:
try:
dump(state, fh, protocol=0)
except Exception as exc:
- say('Cannot pickle state: {0!r}. Fallback to pformat.', exc)
+ print( # noqa
+ 'Cannot pickle state: {0!r}. Fallback to pformat.'.format(exc),
+ file=stderr,
+ )
fh.write(default_encode(pformat(state)))
finally:
fh.flush()
diff --git a/kombu/utils/compat.py b/kombu/utils/compat.py
deleted file mode 100644
index 16028997..00000000
--- a/kombu/utils/compat.py
+++ /dev/null
@@ -1,60 +0,0 @@
-"""
-kombu.utils.compat
-==================
-
-Helps compatibility with older Python versions.
-
-"""
-from __future__ import absolute_import
-
-
-############## timedelta_seconds() -> delta.total_seconds ####################
-from datetime import timedelta
-
-HAVE_TIMEDELTA_TOTAL_SECONDS = hasattr(timedelta, 'total_seconds')
-
-
-if HAVE_TIMEDELTA_TOTAL_SECONDS: # pragma: no cover
-
- def timedelta_seconds(delta):
- """Convert :class:`datetime.timedelta` to seconds.
-
- Doesn't account for negative values.
-
- """
- return max(delta.total_seconds(), 0)
-
-else: # pragma: no cover
-
- def timedelta_seconds(delta): # noqa
- """Convert :class:`datetime.timedelta` to seconds.
-
- Doesn't account for negative values.
-
- """
- if delta.days < 0:
- return 0
- return delta.days * 86400 + delta.seconds + (delta.microseconds / 10e5)
-
-############## socket.error.errno ############################################
-
-
-def get_errno(exc):
- """:exc:`socket.error` and :exc:`IOError` first got
- the ``.errno`` attribute in Py2.7"""
- try:
- return exc.errno
- except AttributeError:
- try:
- # e.args = (errno, reason)
- if isinstance(exc.args, tuple) and len(exc.args) == 2:
- return exc.args[0]
- except AttributeError:
- pass
- return 0
-
-############## collections.OrderedDict #######################################
-try:
- from collections import OrderedDict
-except ImportError:
- from ordereddict import OrderedDict # noqa
diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py
index e4961cdc..6ed352d3 100644
--- a/kombu/utils/eventio.py
+++ b/kombu/utils/eventio.py
@@ -44,7 +44,6 @@ KQ_NOTE_REVOKE = getattr(__select__, 'kQ_NOTE_REVOKE', 64)
from kombu.syn import detect_environment
from . import fileno
-from .compat import get_errno
__all__ = ['poll']
@@ -53,9 +52,9 @@ WRITE = POLL_WRITE = 0x004
ERR = POLL_ERR = 0x008 | 0x010
try:
- SELECT_BAD_FD = set((errno.EBADF, errno.WSAENOTSOCK))
+ SELECT_BAD_FD = {errno.EBADF, errno.WSAENOTSOCK}
except AttributeError:
- SELECT_BAD_FD = set((errno.EBADF,))
+ SELECT_BAD_FD = {errno.EBADF}
class Poller(object):
@@ -64,7 +63,7 @@ class Poller(object):
try:
return self._poll(timeout)
except Exception as exc:
- if get_errno(exc) != errno.EINTR:
+ if exc.errno != errno.EINTR:
raise
@@ -77,7 +76,7 @@ class _epoll(Poller):
try:
self._epoll.register(fd, events)
except Exception as exc:
- if get_errno(exc) != errno.EEXIST:
+ if exc.errno != errno.EEXIST:
raise
def unregister(self, fd):
@@ -86,7 +85,7 @@ class _epoll(Poller):
except (socket.error, ValueError, KeyError, TypeError):
pass
except (IOError, OSError) as exc:
- if get_errno(exc) != errno.ENOENT:
+ if exc.errno != errno.ENOENT:
raise
def _poll(self, timeout):
@@ -198,7 +197,7 @@ class _select(Poller):
try:
_selectf([fd], [], [], 0)
except (_selecterr, socket.error) as exc:
- if get_errno(exc) in SELECT_BAD_FD:
+ if exc.errno in SELECT_BAD_FD:
self.unregister(fd)
def unregister(self, fd):
@@ -207,7 +206,7 @@ class _select(Poller):
except socket.error as exc:
# we don't know the previous fd of this object
# but it will be removed by the next poll iteration.
- if get_errno(exc) in SELECT_BAD_FD:
+ if exc.errno in SELECT_BAD_FD:
return
raise
self._rfd.discard(fd)
@@ -220,9 +219,9 @@ class _select(Poller):
self._rfd, self._wfd, self._efd, timeout,
)
except (_selecterr, socket.error) as exc:
- if get_errno(exc) == errno.EINTR:
+ if exc.errno == errno.EINTR:
return
- elif get_errno(exc) in SELECT_BAD_FD:
+ elif exc.errno in SELECT_BAD_FD:
return self._remove_bad()
raise
diff --git a/kombu/utils/json.py b/kombu/utils/json.py
new file mode 100644
index 00000000..a5227467
--- /dev/null
+++ b/kombu/utils/json.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+
+import sys
+
+from kombu.five import buffer_t, text_t, bytes_t
+
+try:
+ import simplejson as json
+except ImportError: # pragma: no cover
+ import json # noqa
+
+IS_PY3 = sys.version_info[0] == 3
+
+
+class JSONEncoder(json.JSONEncoder):
+
+ def default(self, obj, _super=json.JSONEncoder.default):
+ try:
+ reducer = obj.__json__
+ except AttributeError:
+ return _super(self, obj)
+ else:
+ return reducer()
+
+
+def dumps(s, _dumps=json.dumps, cls=JSONEncoder):
+ return _dumps(s, cls=cls)
+
+
+def loads(s, _loads=json.loads, decode_bytes=IS_PY3):
+ # None of the json implementations supports decoding from
+ # a buffer/memoryview, or even reading from a stream
+ # (load is just loads(fp.read()))
+ # but this is Python, we love copying strings, preferably many times
+ # over. Note that pickle does support buffer/memoryview
+ # </rant>
+ if isinstance(s, memoryview):
+ s = s.tobytes().decode('utf-8')
+ elif isinstance(s, bytearray):
+ s = s.decode('utf-8')
+ elif decode_bytes and isinstance(s, bytes_t):
+ s = s.decode('utf-8')
+ elif isinstance(s, buffer_t):
+ s = text_t(s) # ... awwwwwww :(
+ return _loads(s)
diff --git a/kombu/utils/url.py b/kombu/utils/url.py
index 8839fca2..d7197215 100644
--- a/kombu/utils/url.py
+++ b/kombu/utils/url.py
@@ -1,12 +1,16 @@
from __future__ import absolute_import
+from functools import partial
+
try:
- from urllib.parse import unquote, urlparse, parse_qsl
+ from urllib.parse import parse_qsl, quote, unquote, urlparse
except ImportError:
- from urllib import unquote # noqa
+ from urllib import quote, unquote # noqa
from urlparse import urlparse, parse_qsl # noqa
-from . import kwdict
+from kombu.five import string_t
+
+safequote = partial(quote, safe='')
def _parse_url(url):
@@ -14,21 +18,13 @@ def _parse_url(url):
schemeless = url[len(scheme) + 3:]
# parse with HTTP URL semantics
parts = urlparse('http://' + schemeless)
-
- # The first pymongo.Connection() argument (host) can be
- # a mongodb connection URI. If this is the case, don't
- # use port but let pymongo get the port(s) from the URI instead.
- # This enables the use of replica sets and sharding.
- # See pymongo.Connection() for more info.
- port = scheme != 'mongodb' and parts.port or None
- hostname = schemeless if scheme == 'mongodb' else parts.hostname
path = parts.path or ''
path = path[1:] if path and path[0] == '/' else path
- return (scheme, unquote(hostname or '') or None, port,
+ return (scheme, unquote(parts.hostname or '') or None, parts.port,
unquote(parts.username or '') or None,
unquote(parts.password or '') or None,
unquote(path or '') or None,
- kwdict(dict(parse_qsl(parts.query))))
+ dict(parse_qsl(parts.query)))
def parse_url(url):
@@ -36,3 +32,32 @@ def parse_url(url):
return dict(transport=scheme, hostname=host,
port=port, userid=user,
password=password, virtual_host=path, **query)
+
+
+def as_url(scheme, host=None, port=None, user=None, password=None,
+ path=None, query=None, sanitize=False, mask='**'):
+ parts = ['{0}://'.format(scheme)]
+ if user or password:
+ if user:
+ parts.append(safequote(user))
+ if password:
+ if sanitize:
+ parts.extend([':', mask] if mask else [':'])
+ else:
+ parts.extend([':', safequote(password)])
+ parts.append('@')
+ parts.append(safequote(host) if host else '')
+ if port:
+ parts.extend([':', port])
+ parts.extend(['/', path])
+ return ''.join(str(part) for part in parts if part)
+
+
+def sanitize_url(url, mask='**'):
+ return as_url(*_parse_url(url), sanitize=True, mask=mask)
+
+
+def maybe_sanitize_url(url, mask='**'):
+ if isinstance(url, string_t) and '://' in url:
+ return sanitize_url(url, mask)
+ return url
diff --git a/pavement.py b/pavement.py
deleted file mode 100644
index 017ad7d5..00000000
--- a/pavement.py
+++ /dev/null
@@ -1,190 +0,0 @@
-import os
-
-from paver.easy import * # noqa
-from paver import doctools # noqa
-from paver.setuputils import setup # noqa
-
-PYCOMPILE_CACHES = ['*.pyc', '*$py.class']
-
-options(
- sphinx=Bunch(builddir='.build'),
-)
-
-
-def sphinx_builddir(options):
- return path('docs') / options.sphinx.builddir / 'html'
-
-
-@task
-def clean_docs(options):
- sphinx_builddir(options).rmtree()
-
-
-@task
-@needs('clean_docs', 'paver.doctools.html')
-def html(options):
- destdir = path('Documentation')
- destdir.rmtree()
- builtdocs = sphinx_builddir(options)
- builtdocs.move(destdir)
-
-
-@task
-@needs('paver.doctools.html')
-def qhtml(options):
- destdir = path('Documentation')
- builtdocs = sphinx_builddir(options)
- sh('rsync -az %s/ %s' % (builtdocs, destdir))
-
-
-@task
-@needs('clean_docs', 'paver.doctools.html')
-def ghdocs(options):
- builtdocs = sphinx_builddir(options)
- sh("git checkout gh-pages && \
- cp -r %s/* . && \
- git commit . -m 'Rendered documentation for Github Pages.' && \
- git push origin gh-pages && \
- git checkout master" % builtdocs)
-
-
-@task
-@needs('clean_docs', 'paver.doctools.html')
-def upload_pypi_docs(options):
- builtdocs = path('docs') / options.builddir / 'html'
- sh("python setup.py upload_sphinx --upload-dir='%s'" % (builtdocs))
-
-
-@task
-@needs('upload_pypi_docs', 'ghdocs')
-def upload_docs(options):
- pass
-
-
-@task
-def autodoc(options):
- sh('extra/release/doc4allmods kombu')
-
-
-@task
-def verifyindex(options):
- sh('extra/release/verify-reference-index.sh')
-
-
-@task
-def clean_readme(options):
- path('README').unlink()
- path('README.rst').unlink()
-
-
-@task
-@needs('clean_readme')
-def readme(options):
- sh('python extra/release/sphinx-to-rst.py docs/templates/readme.txt \
- > README.rst')
- sh('ln -sf README.rst README')
-
-
-@task
-@cmdopts([
- ('custom=', 'C', 'custom version'),
-])
-def bump(options):
- s = ("-- '%s'" % (options.custom, ) if getattr(options, 'custom', None)
- else '')
- sh('extra/release/bump_version.py \
- kombu/__init__.py README.rst %s' % (s, ))
-
-
-@task
-@cmdopts([
- ('coverage', 'c', 'Enable coverage'),
- ('quick', 'q', 'Quick test'),
- ('verbose', 'V', 'Make more noise'),
-])
-def test(options):
- cmd = 'nosetests'
- if getattr(options, 'coverage', False):
- cmd += ' --with-coverage3'
- if getattr(options, 'quick', False):
- cmd = 'QUICKTEST=1 SKIP_RLIMITS=1 %s' % cmd
- if getattr(options, 'verbose', False):
- cmd += ' --verbosity=2'
- sh(cmd)
-
-
-@task
-@cmdopts([
- ('noerror', 'E', 'Ignore errors'),
-])
-def flake8(options):
- noerror = getattr(options, 'noerror', False)
- complexity = getattr(options, 'complexity', 22)
- migrations_path = os.path.join('kombu', 'transport', 'django',
- 'migrations', '0.+?\.py')
- sh("""flake8 kombu | perl -mstrict -mwarnings -nle'
- my $ignore = (m/too complex \((\d+)\)/ && $1 le %s)
- || (m{^%s});
- if (! $ignore) { print STDERR; our $FOUND_FLAKE = 1 }
- }{exit $FOUND_FLAKE;
- '""" % (complexity, migrations_path), ignore_error=noerror)
-
-
-@task
-@cmdopts([
- ('noerror', 'E', 'Ignore errors'),
-])
-def flakeplus(options):
- noerror = getattr(options, 'noerror', False)
- sh('flakeplus kombu --2.6',
- ignore_error=noerror)
-
-
-@task
-@cmdopts([
- ('noerror', 'E', 'Ignore errors'),
-])
-def flakes(options):
- flake8(options)
- flakeplus(options)
-
-
-@task
-@cmdopts([
- ('noerror', 'E', 'Ignore errors'),
-])
-def pep8(options):
- noerror = getattr(options, 'noerror', False)
- return sh("""find kombu -name "*.py" | xargs pep8 | perl -nle'\
- print; $a=1 if $_}{exit($a)'""", ignore_error=noerror)
-
-
-@task
-def removepyc(options):
- sh('find . -type f -a \\( %s \\) | xargs rm' % (
- ' -o '.join("-name '%s'" % (pat, ) for pat in PYCOMPILE_CACHES), ))
- sh('find . -type d -name "__pycache__" | xargs rm -r')
-
-
-@task
-@needs('removepyc')
-def gitclean(options):
- sh('git clean -xdn')
-
-
-@task
-@needs('removepyc')
-def gitcleanforce(options):
- sh('git clean -xdf')
-
-
-@task
-@needs('flakes', 'autodoc', 'verifyindex', 'test', 'gitclean')
-def releaseok(options):
- pass
-
-
-@task
-@needs('releaseok', 'removepyc', 'upload_docs')
-def release(options):
- pass
diff --git a/requirements/default.txt b/requirements/default.txt
index 520bd90c..dc144e79 100644
--- a/requirements/default.txt
+++ b/requirements/default.txt
@@ -1,2 +1 @@
-anyjson>=0.3.3
amqp>=1.4.5,<2.0
diff --git a/requirements/extras/librabbitmq.txt b/requirements/extras/librabbitmq.txt
index 8f9a2dbc..866d11bc 100644
--- a/requirements/extras/librabbitmq.txt
+++ b/requirements/extras/librabbitmq.txt
@@ -1 +1 @@
-librabbitmq>=1.5.0
+librabbitmq>=1.5.2
diff --git a/requirements/pkgutils.txt b/requirements/pkgutils.txt
index 5da811f1..de53de22 100644
--- a/requirements/pkgutils.txt
+++ b/requirements/pkgutils.txt
@@ -1,3 +1,2 @@
-paver
flake8
Sphinx
diff --git a/requirements/py26.txt b/requirements/py26.txt
deleted file mode 100644
index 1807d7cb..00000000
--- a/requirements/py26.txt
+++ /dev/null
@@ -1,2 +0,0 @@
-importlib
-ordereddict
diff --git a/setup.cfg b/setup.cfg
index 2d042cdb..8db09a39 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -13,7 +13,4 @@ upload-dir = docs/.build/html
[bdist_rpm]
-requires = anyjson >= 0.3.3
- amqp >= 1.4.5
- importlib
- ordereddict
+requires = amqp >= 1.4.5
diff --git a/setup.py b/setup.py
index e0cbe40c..e5e7b60b 100644
--- a/setup.py
+++ b/setup.py
@@ -7,8 +7,8 @@ import codecs
extra = {}
PY3 = sys.version_info[0] == 3
-if sys.version_info < (2, 6):
- raise Exception('Kombu requires Python 2.6 or higher.')
+if sys.version_info < (2, 7):
+ raise Exception('Kombu requires Python 2.7 or higher.')
try:
from setuptools import setup
@@ -113,8 +113,6 @@ def reqs(*f):
) if r]
install_requires = reqs('default.txt')
-if py_version[0:2] == (2, 6):
- install_requires.extend(reqs('py26.txt'))
# -*- Tests Requires -*-
@@ -160,7 +158,6 @@ setup(
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 2.7',
- 'Programming Language :: Python :: 2.6',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python :: Implementation :: PyPy',
diff --git a/tox.ini b/tox.ini
index 74100c2f..5b8c32d7 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,6 +1,5 @@
[tox]
envlist =
- 2.6,
2.7,
3.3,
3.4,
@@ -39,16 +38,6 @@ commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
pip install -U -r{toxinidir}/requirements/dev.txt
nosetests --with-coverage --cover-inclusive --cover-erase []
-[testenv:2.6]
-basepython = python2.6
-deps = -r{toxinidir}/requirements/default.txt
- -r{toxinidir}/requirements/py26.txt
- -r{toxinidir}/requirements/test.txt
- -r{toxinidir}/requirements/test-ci.txt
-commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
- pip install -U -r{toxinidir}/requirements/dev.txt
- nosetests --with-coverage --cover-inclusive --cover-erase []
-
[testenv:pypy]
basepython = pypy
deps = -r{toxinidir}/requirements/default.txt