summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/conf.py1
-rw-r--r--docs/reference/index.rst17
-rw-r--r--docs/reference/kombu.utils.collections.rst11
-rw-r--r--docs/reference/kombu.utils.compat.rst11
-rw-r--r--docs/reference/kombu.utils.div.rst (renamed from docs/reference/kombu.utils.rst)6
-rw-r--r--docs/reference/kombu.utils.imports.rst11
-rw-r--r--docs/reference/kombu.utils.objects.rst11
-rw-r--r--docs/reference/kombu.utils.uuid.rst11
-rw-r--r--docs/userguide/consumers.rst2
-rw-r--r--examples/rpc-tut6/rpc_client.py3
-rw-r--r--examples/simple_task_queue/worker.py2
-rw-r--r--funtests/tests/test_mongodb.py2
-rw-r--r--kombu/abstract.py2
-rw-r--r--kombu/async/debug.py2
-rw-r--r--kombu/async/http/base.py4
-rw-r--r--kombu/async/hub.py3
-rw-r--r--kombu/common.py2
-rw-r--r--kombu/connection.py6
-rw-r--r--kombu/log.py2
-rw-r--r--kombu/messaging.py10
-rw-r--r--kombu/mixins.py3
-rw-r--r--kombu/pidbox.py5
-rw-r--r--kombu/pools.py3
-rw-r--r--kombu/resource.py2
-rw-r--r--kombu/serialization.py2
-rw-r--r--kombu/tests/async/aws/sqs/test_connection.py2
-rw-r--r--kombu/tests/async/aws/sqs/test_message.py2
-rw-r--r--kombu/tests/test_messaging.py2
-rw-r--r--kombu/tests/test_pidbox.py2
-rw-r--r--kombu/tests/test_pools.py2
-rw-r--r--kombu/tests/transport/virtual/test_base.py4
-rw-r--r--kombu/tests/utils/test_compat.py32
-rw-r--r--kombu/tests/utils/test_div.py51
-rw-r--r--kombu/tests/utils/test_functional.py148
-rw-r--r--kombu/tests/utils/test_imports.py37
-rw-r--r--kombu/tests/utils/test_objects.py55
-rw-r--r--kombu/tests/utils/test_utils.py328
-rw-r--r--kombu/tests/utils/test_uuid.py17
-rw-r--r--kombu/transport/SLMQ.py2
-rw-r--r--kombu/transport/SQS.py4
-rw-r--r--kombu/transport/__init__.py2
-rw-r--r--kombu/transport/base.py2
-rw-r--r--kombu/transport/consul.py2
-rw-r--r--kombu/transport/filesystem.py2
-rw-r--r--kombu/transport/mongodb.py2
-rw-r--r--kombu/transport/pyro.py2
-rw-r--r--kombu/transport/redis.py4
-rw-r--r--kombu/transport/virtual/__init__.py3
-rw-r--r--kombu/transport/virtual/exchange.py4
-rw-r--r--kombu/utils/__init__.py432
-rw-r--r--kombu/utils/collections.py37
-rw-r--r--kombu/utils/compat.py92
-rw-r--r--kombu/utils/div.py32
-rw-r--r--kombu/utils/functional.py123
-rw-r--r--kombu/utils/imports.py65
-rw-r--r--kombu/utils/objects.py63
-rw-r--r--kombu/utils/scheduling.py2
-rw-r--r--kombu/utils/text.py7
-rw-r--r--kombu/utils/uuid.py13
59 files changed, 901 insertions, 810 deletions
diff --git a/docs/conf.py b/docs/conf.py
index f2f7482a..75fefc1c 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -25,5 +25,6 @@ globals().update(conf.build_config(
'kombu.async.aws.ext',
'kombu.async.aws.sqs.ext',
'kombu.transport.qpid_patches',
+ 'kombu.utils',
],
))
diff --git a/docs/reference/index.rst b/docs/reference/index.rst
index 6c3285ca..5e963c9b 100644
--- a/docs/reference/index.rst
+++ b/docs/reference/index.rst
@@ -55,15 +55,20 @@
kombu.transport.virtual
kombu.transport.virtual.exchange
kombu.serialization
- kombu.utils
- kombu.utils.scheduling
- kombu.utils.eventio
- kombu.utils.limits
+ kombu.utils.amq_manager
+ kombu.utils.collections
+ kombu.utils.compat
kombu.utils.debug
+ kombu.utils.div
kombu.utils.encoding
+ kombu.utils.eventio
kombu.utils.functional
+ kombu.utils.imports
kombu.utils.json
- kombu.utils.url
+ kombu.utils.limits
+ kombu.utils.objects
+ kombu.utils.scheduling
kombu.utils.text
- kombu.utils.amq_manager
+ kombu.utils.url
+ kombu.utils.uuid
kombu.five
diff --git a/docs/reference/kombu.utils.collections.rst b/docs/reference/kombu.utils.collections.rst
new file mode 100644
index 00000000..dc84ce32
--- /dev/null
+++ b/docs/reference/kombu.utils.collections.rst
@@ -0,0 +1,11 @@
+==========================================================
+ Custom Collections - ``kombu.utils.collections``
+==========================================================
+
+.. contents::
+ :local:
+.. currentmodule:: kombu.utils.collections
+
+.. automodule:: kombu.utils.collections
+ :members:
+ :undoc-members:
diff --git a/docs/reference/kombu.utils.compat.rst b/docs/reference/kombu.utils.compat.rst
new file mode 100644
index 00000000..1935a1c6
--- /dev/null
+++ b/docs/reference/kombu.utils.compat.rst
@@ -0,0 +1,11 @@
+==========================================================
+ Python Compatibility - ``kombu.utils.compat``
+==========================================================
+
+.. contents::
+ :local:
+.. currentmodule:: kombu.utils.compat
+
+.. automodule:: kombu.utils.compat
+ :members:
+ :undoc-members:
diff --git a/docs/reference/kombu.utils.rst b/docs/reference/kombu.utils.div.rst
index a267d380..6fb07d8e 100644
--- a/docs/reference/kombu.utils.rst
+++ b/docs/reference/kombu.utils.div.rst
@@ -1,11 +1,11 @@
==========================================================
- Utilities - ``kombu.utils``
+ Div Utilities - ``kombu.utils.div``
==========================================================
.. contents::
:local:
-.. currentmodule:: kombu.utils
+.. currentmodule:: kombu.utils.div
-.. automodule:: kombu.utils
+.. automodule:: kombu.utils.div
:members:
:undoc-members:
diff --git a/docs/reference/kombu.utils.imports.rst b/docs/reference/kombu.utils.imports.rst
new file mode 100644
index 00000000..36fdafe1
--- /dev/null
+++ b/docs/reference/kombu.utils.imports.rst
@@ -0,0 +1,11 @@
+==========================================================
+ Module Importing Utilities - ``kombu.utils.imports``
+==========================================================
+
+.. contents::
+ :local:
+.. currentmodule:: kombu.utils.imports
+
+.. automodule:: kombu.utils.imports
+ :members:
+ :undoc-members:
diff --git a/docs/reference/kombu.utils.objects.rst b/docs/reference/kombu.utils.objects.rst
new file mode 100644
index 00000000..3f411ae7
--- /dev/null
+++ b/docs/reference/kombu.utils.objects.rst
@@ -0,0 +1,11 @@
+==========================================================
+ Object/Property Utilities - ``kombu.utils.objects``
+==========================================================
+
+.. contents::
+ :local:
+.. currentmodule:: kombu.utils.objects
+
+.. automodule:: kombu.utils.objects
+ :members:
+ :undoc-members:
diff --git a/docs/reference/kombu.utils.uuid.rst b/docs/reference/kombu.utils.uuid.rst
new file mode 100644
index 00000000..5af7ff6b
--- /dev/null
+++ b/docs/reference/kombu.utils.uuid.rst
@@ -0,0 +1,11 @@
+==========================================================
+ UUID Utilities - ``kombu.utils.uuid``
+==========================================================
+
+.. contents::
+ :local:
+.. currentmodule:: kombu.utils.uuid
+
+.. automodule:: kombu.utils.uuid
+ :members:
+ :undoc-members:
diff --git a/docs/userguide/consumers.rst b/docs/userguide/consumers.rst
index e76a88b6..02ed2cf0 100644
--- a/docs/userguide/consumers.rst
+++ b/docs/userguide/consumers.rst
@@ -37,7 +37,7 @@ Draining events from several consumers:
.. code-block:: python
- from kombu.utils import nested
+ from kombu.utils.compat import nested
with connection.channel(), connection.channel() as (channel1, channel2):
with nested(Consumer(channel1, queues1, accept=['json']),
diff --git a/examples/rpc-tut6/rpc_client.py b/examples/rpc-tut6/rpc_client.py
index 27a23903..0d90bed6 100644
--- a/examples/rpc-tut6/rpc_client.py
+++ b/examples/rpc-tut6/rpc_client.py
@@ -1,8 +1,7 @@
#!/usr/bin/env python
from __future__ import absolute_import, unicode_literals
-from kombu import Connection, Producer, Consumer, Queue
-from kombu.utils import uuid
+from kombu import Connection, Producer, Consumer, Queue, uuid
class FibonacciRpcClient(object):
diff --git a/examples/simple_task_queue/worker.py b/examples/simple_task_queue/worker.py
index 6b617cf0..bd2dd7f8 100644
--- a/examples/simple_task_queue/worker.py
+++ b/examples/simple_task_queue/worker.py
@@ -2,7 +2,7 @@ from __future__ import absolute_import, unicode_literals
from kombu.mixins import ConsumerMixin
from kombu.log import get_logger
-from kombu.utils import reprcall
+from kombu.utils.functional import reprcall
from .queues import task_queues
diff --git a/funtests/tests/test_mongodb.py b/funtests/tests/test_mongodb.py
index 2f41972c..dcf00ef1 100644
--- a/funtests/tests/test_mongodb.py
+++ b/funtests/tests/test_mongodb.py
@@ -2,7 +2,7 @@ from __future__ import absolute_import, unicode_literals
from kombu import Consumer, Producer, Exchange, Queue
from kombu.five import range
-from kombu.utils import nested
+from kombu.utils.compat import nested
from funtests import transport
diff --git a/kombu/abstract.py b/kombu/abstract.py
index 6f9423fd..e120fbd6 100644
--- a/kombu/abstract.py
+++ b/kombu/abstract.py
@@ -6,7 +6,7 @@ from copy import copy
from .connection import maybe_channel
from .exceptions import NotBoundError
from .five import python_2_unicode_compatible
-from .utils import ChannelPromise
+from .utils.functional import ChannelPromise
__all__ = ['Object', 'MaybeChannelBound']
diff --git a/kombu/async/debug.py b/kombu/async/debug.py
index d1aebe69..6385d233 100644
--- a/kombu/async/debug.py
+++ b/kombu/async/debug.py
@@ -2,8 +2,8 @@
from __future__ import absolute_import, unicode_literals
from kombu.five import items, string_t
-from kombu.utils import reprcall
from kombu.utils.eventio import READ, WRITE, ERR
+from kombu.utils.functional import reprcall
def repr_flag(flag):
diff --git a/kombu/async/http/base.py b/kombu/async/http/base.py
index e39f0ae9..92c6bdf4 100644
--- a/kombu/async/http/base.py
+++ b/kombu/async/http/base.py
@@ -6,7 +6,7 @@ from vine import Thenable, promise, maybe_promise
from kombu.exceptions import HttpError
from kombu.five import items, python_2_unicode_compatible
-from kombu.utils import coro
+from kombu.utils.compat import coro
from kombu.utils.encoding import bytes_to_str
from kombu.utils.functional import maybe_list, memoize
@@ -223,7 +223,7 @@ class BaseClient(object):
self._header_parser = header_parser()
def perform(self, request, **kwargs):
- for req in maybe_list(request):
+ for req in maybe_list(request) or []:
if not isinstance(req, self.Request):
req = self.Request(req, **kwargs)
self.add_request(req)
diff --git a/kombu/async/hub.py b/kombu/async/hub.py
index cf2abde8..5e3da78d 100644
--- a/kombu/async/hub.py
+++ b/kombu/async/hub.py
@@ -12,8 +12,9 @@ from vine import Thenable, promise
from kombu.five import Empty, python_2_unicode_compatible, range
from kombu.log import get_logger
-from kombu.utils import cached_property, fileno
+from kombu.utils.compat import fileno
from kombu.utils.eventio import READ, WRITE, ERR, poll
+from kombu.utils.objects import cached_property
from .timer import Timer
diff --git a/kombu/common.py b/kombu/common.py
index b2d811c0..fa75cd9f 100644
--- a/kombu/common.py
+++ b/kombu/common.py
@@ -17,7 +17,7 @@ from .entity import Exchange, Queue
from .five import bytes_if_py2, range
from .log import get_logger
from .serialization import registry as serializers
-from .utils import uuid
+from .utils.uuid import uuid
try:
from _thread import get_ident
diff --git a/kombu/connection.py b/kombu/connection.py
index d6e4b0eb..3f08663d 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -12,12 +12,14 @@ from operator import itemgetter
# jython breaks on relative import for .exceptions for some reason
# (Issue #112)
from kombu import exceptions
+
from .five import bytes_if_py2, python_2_unicode_compatible, string_t, text_t
from .log import get_logger
from .resource import Resource
from .transport import get_transport_cls, supports_librabbitmq
-from .utils import cached_property, retry_over_time, shufflecycle, HashedSeq
-from .utils.functional import dictfilter, lazy
+from .utils.collections import HashedSeq
+from .utils.functional import dictfilter, lazy, retry_over_time, shufflecycle
+from .utils.objects import cached_property
from .utils.url import as_url, parse_url, quote, urlparse
__all__ = ['Connection', 'ConnectionPool', 'ChannelPool']
diff --git a/kombu/log.py b/kombu/log.py
index 92a9bb21..7428e5e1 100644
--- a/kombu/log.py
+++ b/kombu/log.py
@@ -8,9 +8,9 @@ import sys
from logging.handlers import WatchedFileHandler
from .five import string_t
-from .utils import cached_property
from .utils.encoding import safe_repr, safe_str
from .utils.functional import maybe_evaluate
+from .utils.objects import cached_property
__all__ = ['LogMixin', 'LOG_LEVELS', 'get_loglevel', 'setup_logging']
diff --git a/kombu/messaging.py b/kombu/messaging.py
index a99e1723..ee2ab7e0 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -1,10 +1,4 @@
-"""
-kombu.messaging
-===============
-
-Sending and receiving messages.
-
-"""
+"""Sending and receiving messages."""
from __future__ import absolute_import, unicode_literals
from itertools import count
@@ -16,7 +10,7 @@ from .entity import Exchange, Queue, maybe_delivery_mode
from .exceptions import ContentDisallowed
from .five import items, python_2_unicode_compatible, text_t, values
from .serialization import dumps, prepare_accept_content
-from .utils import ChannelPromise, maybe_list
+from .utils.functional import ChannelPromise, maybe_list
__all__ = ['Exchange', 'Queue', 'Producer', 'Consumer']
diff --git a/kombu/mixins.py b/kombu/mixins.py
index bb930c1f..112ce9f4 100644
--- a/kombu/mixins.py
+++ b/kombu/mixins.py
@@ -19,9 +19,10 @@ from .common import ignore_errors
from .five import range
from .messaging import Consumer, Producer
from .log import get_logger
-from .utils import cached_property, nested
+from .utils.compat import nested
from .utils.encoding import safe_repr
from .utils.limits import TokenBucket
+from .utils.objects import cached_property
__all__ = ['ConsumerMixin']
diff --git a/kombu/pidbox.py b/kombu/pidbox.py
index ee443495..99485ca8 100644
--- a/kombu/pidbox.py
+++ b/kombu/pidbox.py
@@ -17,8 +17,9 @@ from .common import maybe_declare, oid_from
from .exceptions import InconsistencyError
from .five import range
from .log import get_logger
-from .utils import cached_property, uuid, reprcall
-from .utils.functional import maybe_evaluate
+from .utils.functional import maybe_evaluate, reprcall
+from .utils.objects import cached_property
+from .utils.uuid import uuid
REPLY_QUEUE_EXPIRES = 10
diff --git a/kombu/pools.py b/kombu/pools.py
index bc1bcb48..ed5fc521 100644
--- a/kombu/pools.py
+++ b/kombu/pools.py
@@ -8,7 +8,8 @@ from itertools import chain
from .connection import Resource
from .five import range, values
from .messaging import Producer
-from .utils import EqualityDict, register_after_fork
+from .utils.collections import EqualityDict
+from .utils.compat import register_after_fork
from .utils.functional import lazy
__all__ = ['ProducerPool', 'PoolGroup', 'register_group',
diff --git a/kombu/resource.py b/kombu/resource.py
index 43d76911..11e44366 100644
--- a/kombu/resource.py
+++ b/kombu/resource.py
@@ -7,7 +7,7 @@ from collections import deque
from . import exceptions
from .five import Empty, LifoQueue as _LifoQueue
-from .utils import register_after_fork
+from .utils.compat import register_after_fork
from .utils.functional import lazy
diff --git a/kombu/serialization.py b/kombu/serialization.py
index a4660f71..78233ad6 100644
--- a/kombu/serialization.py
+++ b/kombu/serialization.py
@@ -19,7 +19,7 @@ from .exceptions import (
ContentDisallowed, DecodeError, EncodeError, SerializerNotInstalled
)
from .five import reraise, text_t
-from .utils import entrypoints
+from .utils.compat import entrypoints
from .utils.encoding import bytes_to_str, str_to_bytes, bytes_t
__all__ = ['pickle', 'loads', 'dumps', 'register', 'unregister']
diff --git a/kombu/tests/async/aws/sqs/test_connection.py b/kombu/tests/async/aws/sqs/test_connection.py
index a36c8085..9a9dcdf0 100644
--- a/kombu/tests/async/aws/sqs/test_connection.py
+++ b/kombu/tests/async/aws/sqs/test_connection.py
@@ -6,7 +6,7 @@ from kombu.async.aws.sqs.connection import (
)
from kombu.async.aws.sqs.message import AsyncMessage
from kombu.async.aws.sqs.queue import AsyncQueue
-from kombu.utils import uuid
+from kombu.utils.uuid import uuid
from kombu.tests.async.aws.case import AWSCase
from kombu.tests.case import PromiseMock, Mock
diff --git a/kombu/tests/async/aws/sqs/test_message.py b/kombu/tests/async/aws/sqs/test_message.py
index 29107332..0f1a033b 100644
--- a/kombu/tests/async/aws/sqs/test_message.py
+++ b/kombu/tests/async/aws/sqs/test_message.py
@@ -5,7 +5,7 @@ from kombu.async.aws.sqs.message import AsyncMessage
from kombu.tests.async.aws.case import AWSCase
from kombu.tests.case import PromiseMock, Mock
-from kombu.utils import uuid
+from kombu.utils.uuid import uuid
class test_AsyncMessage(AWSCase):
diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py
index 7a0fc134..8f48f7cd 100644
--- a/kombu/tests/test_messaging.py
+++ b/kombu/tests/test_messaging.py
@@ -7,8 +7,8 @@ from collections import defaultdict
from kombu import Connection, Consumer, Producer, Exchange, Queue
from kombu.exceptions import MessageStateError
-from kombu.utils import ChannelPromise
from kombu.utils import json
+from kombu.utils.functional import ChannelPromise
from .case import Case, Mock, patch
from .mocks import Transport
diff --git a/kombu/tests/test_pidbox.py b/kombu/tests/test_pidbox.py
index 1fa02ead..852dbbf5 100644
--- a/kombu/tests/test_pidbox.py
+++ b/kombu/tests/test_pidbox.py
@@ -6,7 +6,7 @@ import warnings
from kombu import Connection
from kombu import pidbox
from kombu.exceptions import ContentDisallowed, InconsistencyError
-from kombu.utils import uuid
+from kombu.utils.uuid import uuid
from .case import Case, Mock, patch
diff --git a/kombu/tests/test_pools.py b/kombu/tests/test_pools.py
index 05bf5bdc..7a632eee 100644
--- a/kombu/tests/test_pools.py
+++ b/kombu/tests/test_pools.py
@@ -3,7 +3,7 @@ from __future__ import absolute_import, unicode_literals
from kombu import Connection, Producer
from kombu import pools
from kombu.connection import ConnectionPool
-from kombu.utils import eqhash
+from kombu.utils.collections import eqhash
from .case import Case, Mock
diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py
index a0c7b945..8b73a7fa 100644
--- a/kombu/tests/transport/virtual/test_base.py
+++ b/kombu/tests/transport/virtual/test_base.py
@@ -4,10 +4,10 @@ import sys
import warnings
from kombu import Connection
+from kombu.compression import compress
from kombu.exceptions import ResourceError, ChannelError
from kombu.transport import virtual
-from kombu.utils import uuid
-from kombu.compression import compress
+from kombu.utils.uuid import uuid
from kombu.tests.case import Case, MagicMock, Mock, mock, patch
diff --git a/kombu/tests/utils/test_compat.py b/kombu/tests/utils/test_compat.py
new file mode 100644
index 00000000..e4bd2c7a
--- /dev/null
+++ b/kombu/tests/utils/test_compat.py
@@ -0,0 +1,32 @@
+from __future__ import absolute_import, unicode_literals
+
+from kombu.utils.compat import entrypoints, maybe_fileno
+
+from kombu.tests.case import Case, Mock, mock, patch
+
+
+class test_entrypoints(Case):
+
+ @mock.mask_modules('pkg_resources')
+ def test_without_pkg_resources(self):
+ self.assertListEqual(list(entrypoints('kombu.test')), [])
+
+ @mock.module_exists('pkg_resources')
+ def test_with_pkg_resources(self):
+ with patch('pkg_resources.iter_entry_points', create=True) as iterep:
+ eps = iterep.return_value = [Mock(), Mock()]
+
+ self.assertTrue(list(entrypoints('kombu.test')))
+ iterep.assert_called_with('kombu.test')
+ eps[0].load.assert_called_with()
+ eps[1].load.assert_called_with()
+
+
+class test_maybe_fileno(Case):
+
+ def test_maybe_fileno(self):
+ self.assertEqual(maybe_fileno(3), 3)
+ f = Mock(name='file')
+ self.assertIs(maybe_fileno(f), f.fileno())
+ f.fileno.side_effect = ValueError()
+ self.assertIsNone(maybe_fileno(f))
diff --git a/kombu/tests/utils/test_div.py b/kombu/tests/utils/test_div.py
new file mode 100644
index 00000000..513cb1b7
--- /dev/null
+++ b/kombu/tests/utils/test_div.py
@@ -0,0 +1,51 @@
+from __future__ import absolute_import, unicode_literals
+
+import pickle
+
+from io import StringIO, BytesIO
+
+from kombu.utils.div import emergency_dump_state
+
+from kombu.tests.case import Case, mock
+
+
+class MyStringIO(StringIO):
+
+ def close(self):
+ pass
+
+
+class MyBytesIO(BytesIO):
+
+ def close(self):
+ pass
+
+
+class test_emergency_dump_state(Case):
+
+ @mock.stdouts
+ def test_dump(self, stdout, stderr):
+ fh = MyBytesIO()
+
+ emergency_dump_state(
+ {'foo': 'bar'}, open_file=lambda n, m: fh)
+ self.assertDictEqual(
+ pickle.loads(fh.getvalue()), {'foo': 'bar'})
+ self.assertTrue(stderr.getvalue())
+ self.assertFalse(stdout.getvalue())
+
+ @mock.stdouts
+ def test_dump_second_strategy(self, stdout, stderr):
+ fh = MyStringIO()
+
+ def raise_something(*args, **kwargs):
+ raise KeyError('foo')
+
+ emergency_dump_state(
+ {'foo': 'bar'},
+ open_file=lambda n, m: fh, dump=raise_something
+ )
+ self.assertIn('foo', fh.getvalue())
+ self.assertIn('bar', fh.getvalue())
+ self.assertTrue(stderr.getvalue())
+ self.assertFalse(stdout.getvalue())
diff --git a/kombu/tests/utils/test_functional.py b/kombu/tests/utils/test_functional.py
index 5f26df32..e953979f 100644
--- a/kombu/tests/utils/test_functional.py
+++ b/kombu/tests/utils/test_functional.py
@@ -5,9 +5,44 @@ import pickle
from itertools import count
from kombu.five import items
-from kombu.utils.functional import LRUCache, memoize, lazy, maybe_evaluate
+from kombu.utils import functional as utils
+from kombu.utils.functional import (
+ ChannelPromise, LRUCache, fxrange, fxrangemax, memoize, lazy,
+ maybe_evaluate, maybe_list, reprcall, reprkwargs, retry_over_time,
+)
-from kombu.tests.case import Case, skip
+from kombu.tests.case import Case, Mock, mock, skip
+
+
+class test_ChannelPromise(Case):
+
+ def test_repr(self):
+ obj = Mock(name='cb')
+ self.assertIn(
+ 'promise',
+ repr(ChannelPromise(obj)),
+ )
+ obj.assert_not_called()
+
+
+class test_shufflecycle(Case):
+
+ def test_shuffles(self):
+ prev_repeat, utils.repeat = utils.repeat, Mock()
+ try:
+ utils.repeat.return_value = list(range(10))
+ values = {'A', 'B', 'C'}
+ cycle = utils.shufflecycle(values)
+ seen = set()
+ for i in range(10):
+ next(cycle)
+ utils.repeat.assert_called_with(None)
+ self.assertTrue(seen.issubset(values))
+ with self.assertRaises(StopIteration):
+ next(cycle)
+ next(cycle)
+ finally:
+ utils.repeat = prev_repeat
def double(x):
@@ -141,3 +176,112 @@ class test_maybe_evaluate(Case):
def test_evaluates(self):
self.assertEqual(maybe_evaluate(lazy(lambda: 10)), 10)
self.assertEqual(maybe_evaluate(20), 20)
+
+
+class test_retry_over_time(Case):
+
+ def setup(self):
+ self.index = 0
+
+ class Predicate(Exception):
+ pass
+
+ def myfun(self):
+ if self.index < 9:
+ raise self.Predicate()
+ return 42
+
+ def errback(self, exc, intervals, retries):
+ interval = next(intervals)
+ sleepvals = (None, 2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, 16.0, 16.0)
+ self.index += 1
+ self.assertEqual(interval, sleepvals[self.index])
+ return interval
+
+ @mock.sleepdeprived(module=utils)
+ def test_simple(self):
+ prev_count, utils.count = utils.count, Mock()
+ try:
+ utils.count.return_value = list(range(1))
+ x = retry_over_time(self.myfun, self.Predicate,
+ errback=None, interval_max=14)
+ self.assertIsNone(x)
+ utils.count.return_value = list(range(10))
+ cb = Mock()
+ x = retry_over_time(self.myfun, self.Predicate,
+ errback=self.errback, callback=cb,
+ interval_max=14)
+ self.assertEqual(x, 42)
+ self.assertEqual(self.index, 9)
+ cb.assert_called_with()
+ finally:
+ utils.count = prev_count
+
+ @mock.sleepdeprived(module=utils)
+ def test_retry_once(self):
+ with self.assertRaises(self.Predicate):
+ retry_over_time(
+ self.myfun, self.Predicate,
+ max_retries=1, errback=self.errback, interval_max=14,
+ )
+ self.assertEqual(self.index, 1)
+ # no errback
+ with self.assertRaises(self.Predicate):
+ retry_over_time(
+ self.myfun, self.Predicate,
+ max_retries=1, errback=None, interval_max=14,
+ )
+
+ @mock.sleepdeprived(module=utils)
+ def test_retry_always(self):
+ Predicate = self.Predicate
+
+ class Fun(object):
+
+ def __init__(self):
+ self.calls = 0
+
+ def __call__(self, *args, **kwargs):
+ try:
+ if self.calls >= 10:
+ return 42
+ raise Predicate()
+ finally:
+ self.calls += 1
+ fun = Fun()
+
+ self.assertEqual(
+ retry_over_time(
+ fun, self.Predicate,
+ max_retries=0, errback=None, interval_max=14,
+ ),
+ 42,
+ )
+ self.assertEqual(fun.calls, 11)
+
+
+class test_utils(Case):
+
+ def test_maybe_list(self):
+ self.assertIsNone(maybe_list(None))
+ self.assertEqual(maybe_list(1), [1])
+ self.assertEqual(maybe_list([1, 2, 3]), [1, 2, 3])
+
+ def test_fxrange_no_repeatlast(self):
+ self.assertEqual(list(fxrange(1.0, 3.0, 1.0)),
+ [1.0, 2.0, 3.0])
+
+ def test_fxrangemax(self):
+ self.assertEqual(list(fxrangemax(1.0, 3.0, 1.0, 30.0)),
+ [1.0, 2.0, 3.0, 3.0, 3.0, 3.0,
+ 3.0, 3.0, 3.0, 3.0, 3.0])
+ self.assertEqual(list(fxrangemax(1.0, None, 1.0, 30.0)),
+ [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0])
+
+ def test_reprkwargs(self):
+ self.assertTrue(reprkwargs({'foo': 'bar', 1: 2, 'k': 'v'}))
+
+ def test_reprcall(self):
+ self.assertTrue(
+ reprcall('add', (2, 2), {'copy': True}),
+ )
diff --git a/kombu/tests/utils/test_imports.py b/kombu/tests/utils/test_imports.py
new file mode 100644
index 00000000..49e2a72f
--- /dev/null
+++ b/kombu/tests/utils/test_imports.py
@@ -0,0 +1,37 @@
+from __future__ import absolute_import, unicode_literals
+
+from kombu import Exchange
+from kombu.utils.imports import symbol_by_name
+
+from kombu.tests.case import Case, Mock
+
+
+class test_symbol_by_name(Case):
+
+ def test_instance_returns_instance(self):
+ instance = object()
+ self.assertIs(symbol_by_name(instance), instance)
+
+ def test_returns_default(self):
+ default = object()
+ self.assertIs(
+ symbol_by_name('xyz.ryx.qedoa.weq:foz', default=default),
+ default,
+ )
+
+ def test_no_default(self):
+ with self.assertRaises(ImportError):
+ symbol_by_name('xyz.ryx.qedoa.weq:foz')
+
+ def test_imp_reraises_ValueError(self):
+ imp = Mock()
+ imp.side_effect = ValueError()
+ with self.assertRaises(ValueError):
+ symbol_by_name('kombu.Connection', imp=imp)
+
+ def test_package(self):
+ self.assertIs(
+ symbol_by_name('.entity:Exchange', package='kombu'),
+ Exchange,
+ )
+ self.assertTrue(symbol_by_name(':Consumer', package='kombu'))
diff --git a/kombu/tests/utils/test_objects.py b/kombu/tests/utils/test_objects.py
new file mode 100644
index 00000000..22893e48
--- /dev/null
+++ b/kombu/tests/utils/test_objects.py
@@ -0,0 +1,55 @@
+from __future__ import absolute_import, unicode_literals
+
+from kombu.utils.objects import cached_property
+
+from kombu.tests.case import Case
+
+
+class test_cached_property(Case):
+
+ def test_deleting(self):
+
+ class X(object):
+ xx = False
+
+ @cached_property
+ def foo(self):
+ return 42
+
+ @foo.deleter # noqa
+ def foo(self, value):
+ self.xx = value
+
+ x = X()
+ del(x.foo)
+ self.assertFalse(x.xx)
+ x.__dict__['foo'] = 'here'
+ del(x.foo)
+ self.assertEqual(x.xx, 'here')
+
+ def test_when_access_from_class(self):
+
+ class X(object):
+ xx = None
+
+ @cached_property
+ def foo(self):
+ return 42
+
+ @foo.setter # noqa
+ def foo(self, value):
+ self.xx = 10
+
+ desc = X.__dict__['foo']
+ self.assertIs(X.foo, desc)
+
+ self.assertIs(desc.__get__(None), desc)
+ self.assertIs(desc.__set__(None, 1), desc)
+ self.assertIs(desc.__delete__(None), desc)
+ self.assertTrue(desc.setter(1))
+
+ x = X()
+ x.foo = 30
+ self.assertEqual(x.xx, 10)
+
+ del(x.foo)
diff --git a/kombu/tests/utils/test_utils.py b/kombu/tests/utils/test_utils.py
index 806fb042..84adf6c9 100644
--- a/kombu/tests/utils/test_utils.py
+++ b/kombu/tests/utils/test_utils.py
@@ -1,31 +1,9 @@
from __future__ import absolute_import, unicode_literals
-import pickle
-
-from io import StringIO, BytesIO
-
from kombu import version_info_t
-from kombu import utils
-from kombu.five import python_2_unicode_compatible
from kombu.utils.text import version_string_as_tuple
-from kombu.tests.case import Case, Mock, patch, mock
-
-
-@python_2_unicode_compatible
-class OldString(object):
-
- def __init__(self, value):
- self.value = value
-
- def __str__(self):
- return self.value
-
- def split(self, *args, **kwargs):
- return self.value.split(*args, **kwargs)
-
- def rsplit(self, *args, **kwargs):
- return self.value.rsplit(*args, **kwargs)
+from kombu.tests.case import Case
class test_kombu_module(Case):
@@ -35,300 +13,6 @@ class test_kombu_module(Case):
self.assertTrue(dir(kombu))
-class test_utils(Case):
-
- def test_maybe_list(self):
- self.assertEqual(utils.maybe_list(None), [])
- self.assertEqual(utils.maybe_list(1), [1])
- self.assertEqual(utils.maybe_list([1, 2, 3]), [1, 2, 3])
-
- def test_fxrange_no_repeatlast(self):
- self.assertEqual(list(utils.fxrange(1.0, 3.0, 1.0)),
- [1.0, 2.0, 3.0])
-
- def test_fxrangemax(self):
- self.assertEqual(list(utils.fxrangemax(1.0, 3.0, 1.0, 30.0)),
- [1.0, 2.0, 3.0, 3.0, 3.0, 3.0,
- 3.0, 3.0, 3.0, 3.0, 3.0])
- self.assertEqual(list(utils.fxrangemax(1.0, None, 1.0, 30.0)),
- [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0])
-
- def test_reprkwargs(self):
- self.assertTrue(utils.reprkwargs({'foo': 'bar', 1: 2, 'k': 'v'}))
-
- def test_reprcall(self):
- self.assertTrue(
- utils.reprcall('add', (2, 2), {'copy': True}),
- )
-
-
-class test_UUID(Case):
-
- def test_uuid4(self):
- self.assertNotEqual(utils.uuid4(),
- utils.uuid4())
-
- def test_uuid(self):
- i1 = utils.uuid()
- i2 = utils.uuid()
- self.assertIsInstance(i1, str)
- self.assertNotEqual(i1, i2)
-
-
-class MyStringIO(StringIO):
-
- def close(self):
- pass
-
-
-class MyBytesIO(BytesIO):
-
- def close(self):
- pass
-
-
-class test_emergency_dump_state(Case):
-
- @mock.stdouts
- def test_dump(self, stdout, stderr):
- fh = MyBytesIO()
-
- utils.emergency_dump_state(
- {'foo': 'bar'}, open_file=lambda n, m: fh)
- self.assertDictEqual(
- pickle.loads(fh.getvalue()), {'foo': 'bar'})
- self.assertTrue(stderr.getvalue())
- self.assertFalse(stdout.getvalue())
-
- @mock.stdouts
- def test_dump_second_strategy(self, stdout, stderr):
- fh = MyStringIO()
-
- def raise_something(*args, **kwargs):
- raise KeyError('foo')
-
- utils.emergency_dump_state(
- {'foo': 'bar'},
- open_file=lambda n, m: fh, dump=raise_something
- )
- self.assertIn('foo', fh.getvalue())
- self.assertIn('bar', fh.getvalue())
- self.assertTrue(stderr.getvalue())
- self.assertFalse(stdout.getvalue())
-
-
-class test_retry_over_time(Case):
-
- def setup(self):
- self.index = 0
-
- class Predicate(Exception):
- pass
-
- def myfun(self):
- if self.index < 9:
- raise self.Predicate()
- return 42
-
- def errback(self, exc, intervals, retries):
- interval = next(intervals)
- sleepvals = (None, 2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, 16.0, 16.0)
- self.index += 1
- self.assertEqual(interval, sleepvals[self.index])
- return interval
-
- @mock.sleepdeprived(module=utils)
- def test_simple(self):
- prev_count, utils.count = utils.count, Mock()
- try:
- utils.count.return_value = list(range(1))
- x = utils.retry_over_time(self.myfun, self.Predicate,
- errback=None, interval_max=14)
- self.assertIsNone(x)
- utils.count.return_value = list(range(10))
- cb = Mock()
- x = utils.retry_over_time(self.myfun, self.Predicate,
- errback=self.errback, callback=cb,
- interval_max=14)
- self.assertEqual(x, 42)
- self.assertEqual(self.index, 9)
- cb.assert_called_with()
- finally:
- utils.count = prev_count
-
- @mock.sleepdeprived(module=utils)
- def test_retry_once(self):
- with self.assertRaises(self.Predicate):
- utils.retry_over_time(
- self.myfun, self.Predicate,
- max_retries=1, errback=self.errback, interval_max=14,
- )
- self.assertEqual(self.index, 1)
- # no errback
- with self.assertRaises(self.Predicate):
- utils.retry_over_time(
- self.myfun, self.Predicate,
- max_retries=1, errback=None, interval_max=14,
- )
-
- @mock.sleepdeprived(module=utils)
- def test_retry_always(self):
- Predicate = self.Predicate
-
- class Fun(object):
-
- def __init__(self):
- self.calls = 0
-
- def __call__(self, *args, **kwargs):
- try:
- if self.calls >= 10:
- return 42
- raise Predicate()
- finally:
- self.calls += 1
- fun = Fun()
-
- self.assertEqual(
- utils.retry_over_time(
- fun, self.Predicate,
- max_retries=0, errback=None, interval_max=14,
- ),
- 42,
- )
- self.assertEqual(fun.calls, 11)
-
-
-class test_cached_property(Case):
-
- def test_deleting(self):
-
- class X(object):
- xx = False
-
- @utils.cached_property
- def foo(self):
- return 42
-
- @foo.deleter # noqa
- def foo(self, value):
- self.xx = value
-
- x = X()
- del(x.foo)
- self.assertFalse(x.xx)
- x.__dict__['foo'] = 'here'
- del(x.foo)
- self.assertEqual(x.xx, 'here')
-
- def test_when_access_from_class(self):
-
- class X(object):
- xx = None
-
- @utils.cached_property
- def foo(self):
- return 42
-
- @foo.setter # noqa
- def foo(self, value):
- self.xx = 10
-
- desc = X.__dict__['foo']
- self.assertIs(X.foo, desc)
-
- self.assertIs(desc.__get__(None), desc)
- self.assertIs(desc.__set__(None, 1), desc)
- self.assertIs(desc.__delete__(None), desc)
- self.assertTrue(desc.setter(1))
-
- x = X()
- x.foo = 30
- self.assertEqual(x.xx, 10)
-
- del(x.foo)
-
-
-class test_symbol_by_name(Case):
-
- def test_instance_returns_instance(self):
- instance = object()
- self.assertIs(utils.symbol_by_name(instance), instance)
-
- def test_returns_default(self):
- default = object()
- self.assertIs(
- utils.symbol_by_name('xyz.ryx.qedoa.weq:foz', default=default),
- default,
- )
-
- def test_no_default(self):
- with self.assertRaises(ImportError):
- utils.symbol_by_name('xyz.ryx.qedoa.weq:foz')
-
- def test_imp_reraises_ValueError(self):
- imp = Mock()
- imp.side_effect = ValueError()
- with self.assertRaises(ValueError):
- utils.symbol_by_name('kombu.Connection', imp=imp)
-
- def test_package(self):
- from kombu.entity import Exchange
- self.assertIs(
- utils.symbol_by_name('.entity:Exchange', package='kombu'),
- Exchange,
- )
- self.assertTrue(utils.symbol_by_name(':Consumer', package='kombu'))
-
-
-class test_ChannelPromise(Case):
-
- def test_repr(self):
- obj = Mock(name='cb')
- self.assertIn(
- 'promise',
- repr(utils.ChannelPromise(obj)),
- )
- obj.assert_not_called()
-
-
-class test_entrypoints(Case):
-
- @mock.mask_modules('pkg_resources')
- def test_without_pkg_resources(self):
- self.assertListEqual(list(utils.entrypoints('kombu.test')), [])
-
- @mock.module_exists('pkg_resources')
- def test_with_pkg_resources(self):
- with patch('pkg_resources.iter_entry_points', create=True) as iterep:
- eps = iterep.return_value = [Mock(), Mock()]
-
- self.assertTrue(list(utils.entrypoints('kombu.test')))
- iterep.assert_called_with('kombu.test')
- eps[0].load.assert_called_with()
- eps[1].load.assert_called_with()
-
-
-class test_shufflecycle(Case):
-
- def test_shuffles(self):
- prev_repeat, utils.repeat = utils.repeat, Mock()
- try:
- utils.repeat.return_value = list(range(10))
- values = {'A', 'B', 'C'}
- cycle = utils.shufflecycle(values)
- seen = set()
- for i in range(10):
- next(cycle)
- utils.repeat.assert_called_with(None)
- self.assertTrue(seen.issubset(values))
- with self.assertRaises(StopIteration):
- next(cycle)
- next(cycle)
- finally:
- utils.repeat = prev_repeat
-
-
class test_version_string_as_tuple(Case):
def test_versions(self):
@@ -356,13 +40,3 @@ class test_version_string_as_tuple(Case):
version_string_as_tuple('3.3.1.a3.40c32'),
version_info_t(3, 3, 1, 'a3', '40c32'),
)
-
-
-class test_maybe_fileno(Case):
-
- def test_maybe_fileno(self):
- self.assertEqual(utils.maybe_fileno(3), 3)
- f = Mock(name='file')
- self.assertIs(utils.maybe_fileno(f), f.fileno())
- f.fileno.side_effect = ValueError()
- self.assertIsNone(utils.maybe_fileno(f))
diff --git a/kombu/tests/utils/test_uuid.py b/kombu/tests/utils/test_uuid.py
new file mode 100644
index 00000000..ac018f09
--- /dev/null
+++ b/kombu/tests/utils/test_uuid.py
@@ -0,0 +1,17 @@
+from __future__ import absolute_import, unicode_literals
+
+from kombu.utils.uuid import uuid
+
+from kombu.tests.case import Case
+
+
+class test_UUID(Case):
+
+ def test_uuid4(self):
+ self.assertNotEqual(uuid(), uuid())
+
+ def test_uuid(self):
+ i1 = uuid()
+ i2 = uuid()
+ self.assertIsInstance(i1, str)
+ self.assertNotEqual(i1, i2)
diff --git a/kombu/transport/SLMQ.py b/kombu/transport/SLMQ.py
index 4515e0d3..689046f7 100644
--- a/kombu/transport/SLMQ.py
+++ b/kombu/transport/SLMQ.py
@@ -7,9 +7,9 @@ import string
import os
from kombu.five import Empty, text_t
-from kombu.utils import cached_property # , uuid
from kombu.utils.encoding import bytes_to_str, safe_str
from kombu.utils.json import loads, dumps
+from kombu.utils.objects import cached_property
from . import virtual
diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py
index ac2f58a7..0f1679c1 100644
--- a/kombu/transport/SQS.py
+++ b/kombu/transport/SQS.py
@@ -51,10 +51,10 @@ from kombu.async.aws.sqs.ext import regions
from kombu.async.aws.sqs.message import Message
from kombu.five import Empty, range, string_t, text_t
from kombu.log import get_logger
-from kombu.utils import cached_property
+from kombu.utils import scheduling
from kombu.utils.encoding import bytes_to_str, safe_str
from kombu.utils.json import loads, dumps
-from kombu.utils import scheduling
+from kombu.utils.objects import cached_property
from . import virtual
diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py
index ff4bf69c..57b67236 100644
--- a/kombu/transport/__init__.py
+++ b/kombu/transport/__init__.py
@@ -3,7 +3,7 @@ from __future__ import absolute_import, unicode_literals
from kombu.five import string_t
from kombu.syn import _detect_environment
-from kombu.utils import symbol_by_name
+from kombu.utils.imports import symbol_by_name
def supports_librabbitmq():
diff --git a/kombu/transport/base.py b/kombu/transport/base.py
index 7eeace88..4e2d6a9d 100644
--- a/kombu/transport/base.py
+++ b/kombu/transport/base.py
@@ -8,7 +8,7 @@ from amqp.exceptions import RecoverableConnectionError
from kombu.exceptions import ChannelError, ConnectionError
from kombu.message import Message
-from kombu.utils import cached_property
+from kombu.utils.objects import cached_property
__all__ = ['Message', 'StdChannel', 'Management', 'Transport']
diff --git a/kombu/transport/consul.py b/kombu/transport/consul.py
index 898d68fa..2598e473 100644
--- a/kombu/transport/consul.py
+++ b/kombu/transport/consul.py
@@ -15,8 +15,8 @@ from contextlib import contextmanager
from kombu.exceptions import ChannelError
from kombu.five import Empty, monotonic
from kombu.log import get_logger
-from kombu.utils import cached_property
from kombu.utils.json import loads, dumps
+from kombu.utils.objects import cached_property
from . import virtual
diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py
index 55a32f7e..1bace4cf 100644
--- a/kombu/transport/filesystem.py
+++ b/kombu/transport/filesystem.py
@@ -12,9 +12,9 @@ import tempfile
from . import virtual
from kombu.exceptions import ChannelError
from kombu.five import Empty, monotonic
-from kombu.utils import cached_property
from kombu.utils.encoding import bytes_to_str, str_to_bytes
from kombu.utils.json import loads, dumps
+from kombu.utils.objects import cached_property
VERSION = (1, 0, 0)
diff --git a/kombu/transport/mongodb.py b/kombu/transport/mongodb.py
index 528b9970..1c1e0280 100644
--- a/kombu/transport/mongodb.py
+++ b/kombu/transport/mongodb.py
@@ -17,7 +17,7 @@ from kombu.five import Empty
from kombu.syn import _detect_environment
from kombu.utils.encoding import bytes_to_str
from kombu.utils.json import loads, dumps
-from kombu.utils import cached_property
+from kombu.utils.objects import cached_property
from . import virtual
diff --git a/kombu/transport/pyro.py b/kombu/transport/pyro.py
index 5a726dac..8fb15ad6 100644
--- a/kombu/transport/pyro.py
+++ b/kombu/transport/pyro.py
@@ -7,7 +7,7 @@ from __future__ import absolute_import, unicode_literals
import sys
from kombu.five import reraise
-from kombu.utils import cached_property
+from kombu.utils.objects import cached_property
from . import virtual
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index d020eaa0..f1dc3d48 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -14,12 +14,14 @@ from vine import promise
from kombu.exceptions import InconsistencyError, VersionMismatch
from kombu.five import Empty, values, string_t
from kombu.log import get_logger
-from kombu.utils import cached_property, register_after_fork, uuid
+from kombu.utils.compat import register_after_fork
from kombu.utils.eventio import poll, READ, ERR
from kombu.utils.encoding import bytes_to_str
from kombu.utils.json import loads, dumps
+from kombu.utils.objects import cached_property
from kombu.utils.scheduling import cycle_by_name
from kombu.utils.url import _parse_url
+from kombu.utils.uuid import uuid
from . import virtual
diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py
index d6bd6ef9..5940159b 100644
--- a/kombu/transport/virtual/__init__.py
+++ b/kombu/transport/virtual/__init__.py
@@ -20,9 +20,10 @@ from amqp.protocol import queue_declare_ok_t
from kombu.exceptions import ResourceError, ChannelError
from kombu.five import Empty, items, monotonic
from kombu.log import get_logger
-from kombu.utils import emergency_dump_state, uuid
from kombu.utils.encoding import str_to_bytes, bytes_to_str
+from kombu.utils.div import emergency_dump_state
from kombu.utils.scheduling import FairCycle
+from kombu.utils.uuid import uuid
from kombu.transport import base
diff --git a/kombu/transport/virtual/exchange.py b/kombu/transport/virtual/exchange.py
index 3f2de089..c61d7147 100644
--- a/kombu/transport/virtual/exchange.py
+++ b/kombu/transport/virtual/exchange.py
@@ -5,10 +5,10 @@ by the AMQ protocol (excluding the `headers` exchange).
"""
from __future__ import absolute_import, unicode_literals
-from kombu.utils import escape_regex
-
import re
+from kombu.utils.text import escape_regex
+
class ExchangeType(object):
"""Implements the specifics for an exchange type.
diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py
index 4ecb7314..b1a37ab7 100644
--- a/kombu/utils/__init__.py
+++ b/kombu/utils/__init__.py
@@ -1,36 +1,14 @@
-"""Internal utilities."""
+"""DEPRECATED - Import from modules below"""
from __future__ import absolute_import, print_function, unicode_literals
-import importlib
-import numbers
-import random
-import sys
-
-from contextlib import contextmanager
-from itertools import count, repeat
-from time import sleep
-from uuid import uuid4
-
-from vine.utils import wraps
-
-from kombu.five import items, python_2_unicode_compatible, reraise, string_t
-
-from .encoding import default_encode, safe_repr as _safe_repr
-
-try:
- from io import UnsupportedOperation
- FILENO_ERRORS = (AttributeError, ValueError, UnsupportedOperation)
-except ImportError: # pragma: no cover
- # Py2
- FILENO_ERRORS = (AttributeError, ValueError) # noqa
-
-try:
- from billiard.util import register_after_fork
-except ImportError: # pragma: no cover
- try:
- from multiprocessing.util import register_after_fork # noqa
- except ImportError:
- register_after_fork = None # noqa
+from .collections import EqualityDict
+from .compat import fileno, maybe_fileno, nested, register_after_fork
+from .div import emergency_dump_state
+from .functional import (
+ fxrange, fxrangemax, maybe_list, reprcall, retry_over_time,
+)
+from .objects import cached_property
+from .uuid import uuid
__all__ = [
'EqualityDict', 'uuid', 'maybe_list',
@@ -38,395 +16,3 @@ __all__ = [
'emergency_dump_state', 'cached_property', 'register_after_fork',
'reprkwargs', 'reprcall', 'nested', 'fileno', 'maybe_fileno',
]
-
-
-def symbol_by_name(name, aliases={}, imp=None, package=None,
- sep='.', default=None, **kwargs):
- """Get symbol by qualified name.
-
- The name should be the full dot-separated path to the class::
-
- modulename.ClassName
-
- Example::
-
- celery.concurrency.processes.TaskPool
- ^- class name
-
- or using ':' to separate module and symbol::
-
- celery.concurrency.processes:TaskPool
-
- If `aliases` is provided, a dict containing short name/long name
- mappings, the name is looked up in the aliases first.
-
- Examples:
- >>> symbol_by_name('celery.concurrency.processes.TaskPool')
- <class 'celery.concurrency.processes.TaskPool'>
-
- >>> symbol_by_name('default', {
- ... 'default': 'celery.concurrency.processes.TaskPool'})
- <class 'celery.concurrency.processes.TaskPool'>
-
- # Does not try to look up non-string names.
- >>> from celery.concurrency.processes import TaskPool
- >>> symbol_by_name(TaskPool) is TaskPool
- True
- """
- if imp is None:
- imp = importlib.import_module
-
- if not isinstance(name, string_t):
- return name # already a class
-
- name = aliases.get(name) or name
- sep = ':' if ':' in name else sep
- module_name, _, cls_name = name.rpartition(sep)
- if not module_name:
- cls_name, module_name = None, package if package else cls_name
- try:
- try:
- module = imp(module_name, package=package, **kwargs)
- except ValueError as exc:
- reraise(ValueError,
- ValueError("Couldn't import {0!r}: {1}".format(name, exc)),
- sys.exc_info()[2])
- return getattr(module, cls_name) if cls_name else module
- except (ImportError, AttributeError):
- if default is None:
- raise
- return default
-
-
-class HashedSeq(list):
- """type used for hash() to make sure the hash is not generated
- multiple times."""
- __slots__ = 'hashvalue'
-
- def __init__(self, *seq):
- self[:] = seq
- self.hashvalue = hash(seq)
-
- def __hash__(self):
- return self.hashvalue
-
-
-def eqhash(o):
- try:
- return o.__eqhash__()
- except AttributeError:
- return hash(o)
-
-
-class EqualityDict(dict):
-
- def __getitem__(self, key):
- h = eqhash(key)
- if h not in self:
- return self.__missing__(key)
- return dict.__getitem__(self, h)
-
- def __setitem__(self, key, value):
- return dict.__setitem__(self, eqhash(key), value)
-
- def __delitem__(self, key):
- return dict.__delitem__(self, eqhash(key))
-
-
-def uuid():
- """Generate a unique id, having - hopefully - a very small chance of
- collision.
-
- For now this is provided by :func:`uuid.uuid4`.
- """
- return str(uuid4())
-
-
-def maybe_list(v):
- if v is None:
- return []
- if hasattr(v, '__iter__'):
- return v
- return [v]
-
-
-def fxrange(start=1.0, stop=None, step=1.0, repeatlast=False):
- cur = start * 1.0
- while 1:
- if not stop or cur <= stop:
- yield cur
- cur += step
- else:
- if not repeatlast:
- break
- yield cur - step
-
-
-def fxrangemax(start=1.0, stop=None, step=1.0, max=100.0):
- sum_, cur = 0, start * 1.0
- while 1:
- if sum_ >= max:
- break
- yield cur
- if stop:
- cur = min(cur + step, stop)
- else:
- cur += step
- sum_ += cur
-
-
-def retry_over_time(fun, catch, args=[], kwargs={}, errback=None,
- max_retries=None, interval_start=2, interval_step=2,
- interval_max=30, callback=None):
- """Retry the function over and over until max retries is exceeded.
-
- For each retry we sleep a for a while before we try again, this interval
- is increased for every retry until the max seconds is reached.
-
- Arguments:
- fun (Callable): The function to try
- catch (Tuple[BaseException]): Exceptions to catch, can be either
- tuple or a single exception class.
-
- Keyword Arguments:
- args (Tuple): Positional arguments passed on to the function.
- kwargs (Dict): Keyword arguments passed on to the function.
- errback (Callable): Callback for when an exception in ``catch``
- is raised. The callback must take three arguments:
- ``exc``, ``interval_range`` and ``retries``, where ``exc``
- is the exception instance, ``interval_range`` is an iterator
- which return the time in seconds to sleep next, and ``retries``
- is the number of previous retries.
- max_retries (int): Maximum number of retries before we give up.
- If this is not set, we will retry forever.
- interval_start (float): How long (in seconds) we start sleeping
- between retries.
- interval_step (float): By how much the interval is increased for
- each retry.
- interval_max (float): Maximum number of seconds to sleep
- between retries.
- """
- retries = 0
- interval_range = fxrange(interval_start,
- interval_max + interval_start,
- interval_step, repeatlast=True)
- for retries in count():
- try:
- return fun(*args, **kwargs)
- except catch as exc:
- if max_retries and retries >= max_retries:
- raise
- if callback:
- callback()
- tts = float(errback(exc, interval_range, retries) if errback
- else next(interval_range))
- if tts:
- for _ in range(int(tts)):
- if callback:
- callback()
- sleep(1.0)
- # sleep remainder after int truncation above.
- sleep(abs(int(tts) - tts))
-
-
-def emergency_dump_state(state, open_file=open, dump=None, stderr=None):
- from pprint import pformat
- from tempfile import mktemp
- stderr = sys.stderr if stderr is None else stderr
-
- if dump is None:
- import pickle
- dump = pickle.dump
- persist = mktemp()
- print('EMERGENCY DUMP STATE TO FILE -> {0} <-'.format(persist), ## noqa
- file=stderr)
- fh = open_file(persist, 'w')
- try:
- try:
- dump(state, fh, protocol=0)
- except Exception as exc:
- print( # noqa
- 'Cannot pickle state: {0!r}. Fallback to pformat.'.format(exc),
- file=stderr,
- )
- fh.write(default_encode(pformat(state)))
- finally:
- fh.flush()
- fh.close()
- return persist
-
-
-class cached_property(object):
- """Property descriptor that caches the return value
- of the get function.
-
- Examples:
- .. code-block:: python
-
- @cached_property
- def connection(self):
- return Connection()
-
- @connection.setter # Prepares stored value
- def connection(self, value):
- if value is None:
- raise TypeError('Connection must be a connection')
- return value
-
- @connection.deleter
- def connection(self, value):
- # Additional action to do at del(self.attr)
- if value is not None:
- print('Connection {0!r} deleted'.format(value)
- """
-
- def __init__(self, fget=None, fset=None, fdel=None, doc=None):
- self.__get = fget
- self.__set = fset
- self.__del = fdel
- self.__doc__ = doc or fget.__doc__
- self.__name__ = fget.__name__
- self.__module__ = fget.__module__
-
- def __get__(self, obj, type=None):
- if obj is None:
- return self
- try:
- return obj.__dict__[self.__name__]
- except KeyError:
- value = obj.__dict__[self.__name__] = self.__get(obj)
- return value
-
- def __set__(self, obj, value):
- if obj is None:
- return self
- if self.__set is not None:
- value = self.__set(obj, value)
- obj.__dict__[self.__name__] = value
-
- def __delete__(self, obj, _sentinel=object()):
- if obj is None:
- return self
- value = obj.__dict__.pop(self.__name__, _sentinel)
- if self.__del is not None and value is not _sentinel:
- self.__del(obj, value)
-
- def setter(self, fset):
- return self.__class__(self.__get, fset, self.__del)
-
- def deleter(self, fdel):
- return self.__class__(self.__get, self.__set, fdel)
-
-
-def reprkwargs(kwargs, sep=', ', fmt='{0}={1}'):
- return sep.join(fmt.format(k, _safe_repr(v)) for k, v in items(kwargs))
-
-
-def reprcall(name, args=(), kwargs={}, sep=', '):
- return '{0}({1}{2}{3})'.format(
- name, sep.join(map(_safe_repr, args or ())),
- (args and kwargs) and sep or '',
- reprkwargs(kwargs, sep),
- )
-
-
-@contextmanager
-def nested(*managers): # pragma: no cover
- # flake8: noqa
- """Combine multiple context managers into a single nested
- context manager."""
- exits = []
- vars = []
- exc = (None, None, None)
- try:
- try:
- for mgr in managers:
- exit = mgr.__exit__
- enter = mgr.__enter__
- vars.append(enter())
- exits.append(exit)
- yield vars
- except:
- exc = sys.exc_info()
- finally:
- while exits:
- exit = exits.pop()
- try:
- if exit(*exc):
- exc = (None, None, None)
- except:
- exc = sys.exc_info()
- if exc != (None, None, None):
- # Don't rely on sys.exc_info() still containing
- # the right information. Another exception may
- # have been raised and caught by an exit method
- reraise(exc[0], exc[1], exc[2])
- finally:
- del(exc)
-
-
-def shufflecycle(it):
- it = list(it) # don't modify callers list
- shuffle = random.shuffle
- for _ in repeat(None):
- shuffle(it)
- yield it[0]
-
-
-def entrypoints(namespace):
- try:
- from pkg_resources import iter_entry_points
- except ImportError:
- return iter([])
- return ((ep, ep.load()) for ep in iter_entry_points(namespace))
-
-
-@python_2_unicode_compatible
-class ChannelPromise(object):
-
- def __init__(self, contract):
- self.__contract__ = contract
-
- def __call__(self):
- try:
- return self.__value__
- except AttributeError:
- value = self.__value__ = self.__contract__()
- return value
-
- def __repr__(self):
- try:
- return repr(self.__value__)
- except AttributeError:
- return '<promise: 0x{0:x}>'.format(id(self.__contract__))
-
-
-def escape_regex(p, white=''):
- # what's up with re.escape? that code must be neglected or someting
- return ''.join(c if c.isalnum() or c in white
- else ('\\000' if c == '\000' else '\\' + c)
- for c in p)
-
-
-def fileno(f):
- if isinstance(f, numbers.Integral):
- return f
- return f.fileno()
-
-
-def maybe_fileno(f):
- """Get object fileno, or :const:`None` if not defined."""
- try:
- return fileno(f)
- except FILENO_ERRORS:
- pass
-
-
-def coro(gen):
-
- @wraps(gen)
- def wind_up(*args, **kwargs):
- it = gen(*args, **kwargs)
- next(it)
- return it
- return wind_up
diff --git a/kombu/utils/collections.py b/kombu/utils/collections.py
new file mode 100644
index 00000000..7d8fd1ee
--- /dev/null
+++ b/kombu/utils/collections.py
@@ -0,0 +1,37 @@
+"""Custom maps, sequences, etc."""
+from __future__ import absolute_import, unicode_literals
+
+
+class HashedSeq(list):
+ """type used for hash() to make sure the hash is not generated
+ multiple times."""
+ __slots__ = 'hashvalue'
+
+ def __init__(self, *seq):
+ self[:] = seq
+ self.hashvalue = hash(seq)
+
+ def __hash__(self):
+ return self.hashvalue
+
+
+def eqhash(o):
+ try:
+ return o.__eqhash__()
+ except AttributeError:
+ return hash(o)
+
+
+class EqualityDict(dict):
+
+ def __getitem__(self, key):
+ h = eqhash(key)
+ if h not in self:
+ return self.__missing__(key)
+ return dict.__getitem__(self, h)
+
+ def __setitem__(self, key, value):
+ return dict.__setitem__(self, eqhash(key), value)
+
+ def __delitem__(self, key):
+ return dict.__delitem__(self, eqhash(key))
diff --git a/kombu/utils/compat.py b/kombu/utils/compat.py
new file mode 100644
index 00000000..7d21222d
--- /dev/null
+++ b/kombu/utils/compat.py
@@ -0,0 +1,92 @@
+from __future__ import absolute_import, unicode_literals
+
+import numbers
+import sys
+
+from functools import wraps
+
+from contextlib import contextmanager
+
+from kombu.five import reraise
+
+try:
+ from io import UnsupportedOperation
+ FILENO_ERRORS = (AttributeError, ValueError, UnsupportedOperation)
+except ImportError: # pragma: no cover
+ # Py2
+ FILENO_ERRORS = (AttributeError, ValueError) # noqa
+
+try:
+ from billiard.util import register_after_fork
+except ImportError: # pragma: no cover
+ try:
+ from multiprocessing.util import register_after_fork # noqa
+ except ImportError:
+ register_after_fork = None # noqa
+
+
+def coro(gen):
+
+ @wraps(gen)
+ def wind_up(*args, **kwargs):
+ it = gen(*args, **kwargs)
+ next(it)
+ return it
+ return wind_up
+
+
+def entrypoints(namespace):
+ try:
+ from pkg_resources import iter_entry_points
+ except ImportError:
+ return iter([])
+ return ((ep, ep.load()) for ep in iter_entry_points(namespace))
+
+
+def fileno(f):
+ if isinstance(f, numbers.Integral):
+ return f
+ return f.fileno()
+
+
+def maybe_fileno(f):
+ """Get object fileno, or :const:`None` if not defined."""
+ try:
+ return fileno(f)
+ except FILENO_ERRORS:
+ pass
+
+
+@contextmanager
+def nested(*managers): # pragma: no cover
+ # flake8: noqa
+ """Combine multiple context managers into a single nested
+ context manager."""
+ exits = []
+ vars = []
+ exc = (None, None, None)
+ try:
+ try:
+ for mgr in managers:
+ exit = mgr.__exit__
+ enter = mgr.__enter__
+ vars.append(enter())
+ exits.append(exit)
+ yield vars
+ except:
+ exc = sys.exc_info()
+ finally:
+ while exits:
+ exit = exits.pop()
+ try:
+ if exit(*exc):
+ exc = (None, None, None)
+ except:
+ exc = sys.exc_info()
+ if exc != (None, None, None):
+ # Don't rely on sys.exc_info() still containing
+ # the right information. Another exception may
+ # have been raised and caught by an exit method
+ reraise(exc[0], exc[1], exc[2])
+ finally:
+ del(exc)
diff --git a/kombu/utils/div.py b/kombu/utils/div.py
new file mode 100644
index 00000000..d73afe07
--- /dev/null
+++ b/kombu/utils/div.py
@@ -0,0 +1,32 @@
+from __future__ import absolute_import, unicode_literals, print_function
+
+from .encoding import default_encode
+
+import sys
+
+
+def emergency_dump_state(state, open_file=open, dump=None, stderr=None):
+ from pprint import pformat
+ from tempfile import mktemp
+ stderr = sys.stderr if stderr is None else stderr
+
+ if dump is None:
+ import pickle
+ dump = pickle.dump
+ persist = mktemp()
+ print('EMERGENCY DUMP STATE TO FILE -> {0} <-'.format(persist), # noqa
+ file=stderr)
+ fh = open_file(persist, 'w')
+ try:
+ try:
+ dump(state, fh, protocol=0)
+ except Exception as exc:
+ print( # noqa
+ 'Cannot pickle state: {0!r}. Fallback to pformat.'.format(exc),
+ file=stderr,
+ )
+ fh.write(default_encode(pformat(state)))
+ finally:
+ fh.flush()
+ fh.close()
+ return persist
diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py
index 6ea7ba96..114ecb95 100644
--- a/kombu/utils/functional.py
+++ b/kombu/utils/functional.py
@@ -1,9 +1,12 @@
from __future__ import absolute_import, unicode_literals
+import random
import sys
import threading
from collections import Iterable, Mapping, OrderedDict
+from itertools import count, repeat
+from time import sleep
from vine.utils import wraps
@@ -11,6 +14,8 @@ from kombu.five import (
UserDict, items, keys, python_2_unicode_compatible, string_t,
)
+from .encoding import safe_repr as _safe_repr
+
__all__ = [
'LRUCache', 'memoize', 'lazy', 'maybe_evaluate',
'is_list', 'maybe_list', 'dictfilter',
@@ -19,6 +24,26 @@ __all__ = [
KEYWORD_MARK = object()
+@python_2_unicode_compatible
+class ChannelPromise(object):
+
+ def __init__(self, contract):
+ self.__contract__ = contract
+
+ def __call__(self):
+ try:
+ return self.__value__
+ except AttributeError:
+ value = self.__value__ = self.__contract__()
+ return value
+
+ def __repr__(self):
+ try:
+ return repr(self.__value__)
+ except AttributeError:
+ return '<promise: 0x{0:x}>'.format(id(self.__contract__))
+
+
class LRUCache(UserDict):
"""LRU Cache implementation using a doubly linked list to track access.
@@ -231,6 +256,104 @@ def dictfilter(d=None, **kw):
return {k: v for k, v in items(d) if v is not None}
+def shufflecycle(it):
+ it = list(it) # don't modify callers list
+ shuffle = random.shuffle
+ for _ in repeat(None):
+ shuffle(it)
+ yield it[0]
+
+
+def fxrange(start=1.0, stop=None, step=1.0, repeatlast=False):
+ cur = start * 1.0
+ while 1:
+ if not stop or cur <= stop:
+ yield cur
+ cur += step
+ else:
+ if not repeatlast:
+ break
+ yield cur - step
+
+
+def fxrangemax(start=1.0, stop=None, step=1.0, max=100.0):
+ sum_, cur = 0, start * 1.0
+ while 1:
+ if sum_ >= max:
+ break
+ yield cur
+ if stop:
+ cur = min(cur + step, stop)
+ else:
+ cur += step
+ sum_ += cur
+
+
+def retry_over_time(fun, catch, args=[], kwargs={}, errback=None,
+ max_retries=None, interval_start=2, interval_step=2,
+ interval_max=30, callback=None):
+ """Retry the function over and over until max retries is exceeded.
+
+ For each retry we sleep a for a while before we try again, this interval
+ is increased for every retry until the max seconds is reached.
+
+ Arguments:
+ fun (Callable): The function to try
+ catch (Tuple[BaseException]): Exceptions to catch, can be either
+ tuple or a single exception class.
+
+ Keyword Arguments:
+ args (Tuple): Positional arguments passed on to the function.
+ kwargs (Dict): Keyword arguments passed on to the function.
+ errback (Callable): Callback for when an exception in ``catch``
+ is raised. The callback must take three arguments:
+ ``exc``, ``interval_range`` and ``retries``, where ``exc``
+ is the exception instance, ``interval_range`` is an iterator
+ which return the time in seconds to sleep next, and ``retries``
+ is the number of previous retries.
+ max_retries (int): Maximum number of retries before we give up.
+ If this is not set, we will retry forever.
+ interval_start (float): How long (in seconds) we start sleeping
+ between retries.
+ interval_step (float): By how much the interval is increased for
+ each retry.
+ interval_max (float): Maximum number of seconds to sleep
+ between retries.
+ """
+ retries = 0
+ interval_range = fxrange(interval_start,
+ interval_max + interval_start,
+ interval_step, repeatlast=True)
+ for retries in count():
+ try:
+ return fun(*args, **kwargs)
+ except catch as exc:
+ if max_retries and retries >= max_retries:
+ raise
+ if callback:
+ callback()
+ tts = float(errback(exc, interval_range, retries) if errback
+ else next(interval_range))
+ if tts:
+ for _ in range(int(tts)):
+ if callback:
+ callback()
+ sleep(1.0)
+ # sleep remainder after int truncation above.
+ sleep(abs(int(tts) - tts))
+
+
+def reprkwargs(kwargs, sep=', ', fmt='{0}={1}'):
+ return sep.join(fmt.format(k, _safe_repr(v)) for k, v in items(kwargs))
+
+
+def reprcall(name, args=(), kwargs={}, sep=', '):
+ return '{0}({1}{2}{3})'.format(
+ name, sep.join(map(_safe_repr, args or ())),
+ (args and kwargs) and sep or '',
+ reprkwargs(kwargs, sep),
+ )
+
# Compat names (before kombu 3.0)
promise = lazy
maybe_promise = maybe_evaluate
diff --git a/kombu/utils/imports.py b/kombu/utils/imports.py
new file mode 100644
index 00000000..05f48d1d
--- /dev/null
+++ b/kombu/utils/imports.py
@@ -0,0 +1,65 @@
+"""Import related utilities."""
+from __future__ import absolute_import, unicode_literals
+
+import importlib
+import sys
+
+from kombu.five import reraise, string_t
+
+
+def symbol_by_name(name, aliases={}, imp=None, package=None,
+ sep='.', default=None, **kwargs):
+ """Get symbol by qualified name.
+
+ The name should be the full dot-separated path to the class::
+
+ modulename.ClassName
+
+ Example::
+
+ celery.concurrency.processes.TaskPool
+ ^- class name
+
+ or using ':' to separate module and symbol::
+
+ celery.concurrency.processes:TaskPool
+
+ If `aliases` is provided, a dict containing short name/long name
+ mappings, the name is looked up in the aliases first.
+
+ Examples:
+ >>> symbol_by_name('celery.concurrency.processes.TaskPool')
+ <class 'celery.concurrency.processes.TaskPool'>
+
+ >>> symbol_by_name('default', {
+ ... 'default': 'celery.concurrency.processes.TaskPool'})
+ <class 'celery.concurrency.processes.TaskPool'>
+
+ # Does not try to look up non-string names.
+ >>> from celery.concurrency.processes import TaskPool
+ >>> symbol_by_name(TaskPool) is TaskPool
+ True
+ """
+ if imp is None:
+ imp = importlib.import_module
+
+ if not isinstance(name, string_t):
+ return name # already a class
+
+ name = aliases.get(name) or name
+ sep = ':' if ':' in name else sep
+ module_name, _, cls_name = name.rpartition(sep)
+ if not module_name:
+ cls_name, module_name = None, package if package else cls_name
+ try:
+ try:
+ module = imp(module_name, package=package, **kwargs)
+ except ValueError as exc:
+ reraise(ValueError,
+ ValueError("Couldn't import {0!r}: {1}".format(name, exc)),
+ sys.exc_info()[2])
+ return getattr(module, cls_name) if cls_name else module
+ except (ImportError, AttributeError):
+ if default is None:
+ raise
+ return default
diff --git a/kombu/utils/objects.py b/kombu/utils/objects.py
new file mode 100644
index 00000000..b1f09e96
--- /dev/null
+++ b/kombu/utils/objects.py
@@ -0,0 +1,63 @@
+from __future__ import absolute_import, unicode_literals
+
+
+class cached_property(object):
+ """Property descriptor that caches the return value
+ of the get function.
+
+ Examples:
+ .. code-block:: python
+
+ @cached_property
+ def connection(self):
+ return Connection()
+
+ @connection.setter # Prepares stored value
+ def connection(self, value):
+ if value is None:
+ raise TypeError('Connection must be a connection')
+ return value
+
+ @connection.deleter
+ def connection(self, value):
+ # Additional action to do at del(self.attr)
+ if value is not None:
+ print('Connection {0!r} deleted'.format(value)
+ """
+
+ def __init__(self, fget=None, fset=None, fdel=None, doc=None):
+ self.__get = fget
+ self.__set = fset
+ self.__del = fdel
+ self.__doc__ = doc or fget.__doc__
+ self.__name__ = fget.__name__
+ self.__module__ = fget.__module__
+
+ def __get__(self, obj, type=None):
+ if obj is None:
+ return self
+ try:
+ return obj.__dict__[self.__name__]
+ except KeyError:
+ value = obj.__dict__[self.__name__] = self.__get(obj)
+ return value
+
+ def __set__(self, obj, value):
+ if obj is None:
+ return self
+ if self.__set is not None:
+ value = self.__set(obj, value)
+ obj.__dict__[self.__name__] = value
+
+ def __delete__(self, obj, _sentinel=object()):
+ if obj is None:
+ return self
+ value = obj.__dict__.pop(self.__name__, _sentinel)
+ if self.__del is not None and value is not _sentinel:
+ self.__del(obj, value)
+
+ def setter(self, fset):
+ return self.__class__(self.__get, fset, self.__del)
+
+ def deleter(self, fdel):
+ return self.__class__(self.__get, self.__set, fdel)
diff --git a/kombu/utils/scheduling.py b/kombu/utils/scheduling.py
index d2ccf272..f44222db 100644
--- a/kombu/utils/scheduling.py
+++ b/kombu/utils/scheduling.py
@@ -5,7 +5,7 @@ from itertools import count
from kombu.five import python_2_unicode_compatible
-from . import symbol_by_name
+from .imports import symbol_by_name
__all__ = [
'FairCycle', 'priority_cycle', 'round_robin_cycle', 'sorted_cycle',
diff --git a/kombu/utils/text.py b/kombu/utils/text.py
index 1b450c03..717b7ecc 100644
--- a/kombu/utils/text.py
+++ b/kombu/utils/text.py
@@ -7,6 +7,13 @@ from kombu import version_info_t
from kombu.five import string_t
+def escape_regex(p, white=''):
+ # what's up with re.escape? that code must be neglected or someting
+ return ''.join(c if c.isalnum() or c in white
+ else ('\\000' if c == '\000' else '\\' + c)
+ for c in p)
+
+
def fmatch_iter(needle, haystack, min_ratio=0.6):
for key in haystack:
ratio = SequenceMatcher(None, needle, key).ratio()
diff --git a/kombu/utils/uuid.py b/kombu/utils/uuid.py
new file mode 100644
index 00000000..286f07fb
--- /dev/null
+++ b/kombu/utils/uuid.py
@@ -0,0 +1,13 @@
+from __future__ import absolute_import, unicode_literals
+
+from uuid import uuid4
+
+
+def uuid(_uuid=uuid4):
+ """Generate a unique id, having - hopefully - a very small chance of
+ collision.
+
+ See Also:
+ For now this is provided by :func:`uuid.uuid4`.
+ """
+ return str(_uuid())