summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2016-04-01 15:04:01 -0700
committerAsk Solem <ask@celeryproject.org>2016-04-01 15:04:01 -0700
commite851974bc6e0dadc549b976ef6217dd037cf168d (patch)
tree7e3da4e9fe4617d21981fe0e2ecc7c6cb6f5bf2a
parent7674b8b4298834b6dbde8ef1789583d8ece0bbed (diff)
downloadkombu-e851974bc6e0dadc549b976ef6217dd037cf168d.tar.gz
94% coverage
-rw-r--r--.coveragerc4
-rw-r--r--kombu/async/aws/connection.py2
-rw-r--r--kombu/async/http/curl.py2
-rw-r--r--kombu/async/timer.py14
-rw-r--r--kombu/message.py34
-rw-r--r--kombu/tests/async/aws/sqs/test_connection.py9
-rw-r--r--kombu/tests/async/test_hub.py450
-rw-r--r--kombu/tests/async/test_timer.py158
-rw-r--r--kombu/tests/case.py124
-rw-r--r--kombu/tests/test_connection.py36
-rw-r--r--kombu/tests/test_exceptions.py11
-rw-r--r--kombu/tests/test_message.py44
-rw-r--r--kombu/tests/test_messaging.py27
-rw-r--r--kombu/tests/transport/test_sqlalchemy.py1
-rw-r--r--kombu/tests/transport/virtual/test_base.py50
-rw-r--r--kombu/tests/utils/test_encoding.py13
-rw-r--r--kombu/tests/utils/test_functional.py131
-rw-r--r--kombu/tests/utils/test_json.py49
-rw-r--r--kombu/tests/utils/test_scheduling.py47
-rw-r--r--kombu/tests/utils/test_url.py50
-rw-r--r--kombu/tests/utils/test_utils.py10
-rw-r--r--kombu/transport/virtual/__init__.py2
-rw-r--r--kombu/utils/encoding.py2
-rw-r--r--kombu/utils/functional.py47
-rw-r--r--kombu/utils/json.py16
-rw-r--r--kombu/utils/scheduling.py1
26 files changed, 1263 insertions, 71 deletions
diff --git a/.coveragerc b/.coveragerc
index 8088bc90..ba44c433 100644
--- a/.coveragerc
+++ b/.coveragerc
@@ -9,9 +9,11 @@ omit =
*/python?.?/*
*/site-packages/*
*/pypy/*
+ *kombu/async/http/curl.py
*kombu/five.py
*kombu/transport/mongodb.py
- *Kombu/transport/filesystem.py
+ *kombu/transport/filesystem.py
+ *kombu/transport/sqlalchemy/*
*kombu/utils.compat.py
*kombu/utils/eventio.py
*kombu/transport/amqplib.py
diff --git a/kombu/async/aws/connection.py b/kombu/async/aws/connection.py
index 87e8e10e..97adf222 100644
--- a/kombu/async/aws/connection.py
+++ b/kombu/async/aws/connection.py
@@ -4,7 +4,7 @@ from __future__ import absolute_import
try: # pragma: no cover
from email import message_from_file
from email.mime.message import MIMEMessage
-except ImportError: # Py2
+except ImportError: # pragma: no cover
from mimetools import Message as MIMEMessage # noqa
def message_from_file(m): # noqa
diff --git a/kombu/async/http/curl.py b/kombu/async/http/curl.py
index a55b86dd..fc5e1040 100644
--- a/kombu/async/http/curl.py
+++ b/kombu/async/http/curl.py
@@ -14,7 +14,7 @@ from .base import BaseClient
try:
import pycurl # noqa
-except ImportError:
+except ImportError: # pragma: no cover
pycurl = Curl = METH_TO_CURL = None # noqa
else:
from pycurl import Curl # noqa
diff --git a/kombu/async/timer.py b/kombu/async/timer.py
index 4f9217ef..8c5d671c 100644
--- a/kombu/async/timer.py
+++ b/kombu/async/timer.py
@@ -13,7 +13,7 @@ import sys
from collections import namedtuple
from datetime import datetime
-from functools import wraps
+from functools import total_ordering, wraps
from time import time
from weakref import proxy as weakrefproxy
@@ -22,7 +22,7 @@ from kombu.log import get_logger
try:
from pytz import utc
-except ImportError:
+except ImportError: # pragma: no cover
utc = None
DEFAULT_MAX_INTERVAL = 2
@@ -44,6 +44,7 @@ def to_timestamp(d, default_timezone=utc):
return d
+@total_ordering
class Entry(object):
if not IS_PYPY: # pragma: no cover
__slots__ = (
@@ -79,15 +80,6 @@ class Entry(object):
def __lt__(self, other):
return id(self) < id(other)
- def __gt__(self, other):
- return id(self) > id(other)
-
- def __le__(self, other):
- return id(self) <= id(other)
-
- def __ge__(self, other):
- return id(self) >= id(other)
-
def __eq__(self, other):
return hash(self) == hash(other)
diff --git a/kombu/message.py b/kombu/message.py
index 229823d0..b5d2faa5 100644
--- a/kombu/message.py
+++ b/kombu/message.py
@@ -13,6 +13,7 @@ from .compression import decompress
from .exceptions import MessageStateError
from .five import reraise, text_t
from .serialization import loads
+from .utils.functional import dictfilter
ACK_STATES = frozenset(['ACK', 'REJECTED', 'REQUEUED'])
@@ -162,20 +163,19 @@ class Message(object):
return self._decoded_cache if self._decoded_cache else self.decode()
def __repr__(self):
- details = {
- 'state': self._state,
- 'content_type': self.content_type,
- 'delivery_tag': self.delivery_tag,
- 'properties': {},
- }
- if self.body is not None:
- details['body_length'] = len(self.body)
- for k in ('correlation_id', 'type'):
- if k in self.properties:
- details['properties'][k] = self.properties[k]
- if 'routing_key' in self.delivery_info:
- details['delivery_info'] = {
- 'routing_key': self.delivery_info['routing_key'],
- }
- return '<%s object at 0x%x with details %s>' % (
- self.__class__.__name__, id(self), details)
+ return '<{0} object at {1:#x} with details {2!r}>'.format(
+ type(self).__name__, id(self), dictfilter(
+ state=self._state,
+ content_type=self.content_type,
+ delivery_tag=self.delivery_tag,
+ body_length=len(self.body) if self.body is not None else None,
+ properties=dictfilter(
+ correlation_id=self.properties.get('correlation_id'),
+ type=self.properties.get('type'),
+ ),
+ delivery_info=dictfilter(
+ exchange=self.delivery_info.get('exchange'),
+ routing_key=self.delivery_info.get('routing_key'),
+ ),
+ ),
+ )
diff --git a/kombu/tests/async/aws/sqs/test_connection.py b/kombu/tests/async/aws/sqs/test_connection.py
index 7aafa4db..cc0f9c43 100644
--- a/kombu/tests/async/aws/sqs/test_connection.py
+++ b/kombu/tests/async/aws/sqs/test_connection.py
@@ -21,6 +21,15 @@ class test_AsyncSQSConnection(AWSCase):
self.x.get_list = Mock(nanme='X.get_list')
self.callback = PromiseMock(name='callback')
+ def test_without_boto(self):
+ from kombu.async.aws.sqs import connection
+ prev, connection.boto = connection.boto, None
+ try:
+ with self.assertRaises(ImportError):
+ AsyncSQSConnection('ak', 'sk', http_client=Mock())
+ finally:
+ connection.boto = prev
+
def test_default_region(self):
self.assertTrue(self.x.region)
self.assertTrue(issubclass(
diff --git a/kombu/tests/async/test_hub.py b/kombu/tests/async/test_hub.py
index 9476ac48..3e295ff5 100644
--- a/kombu/tests/async/test_hub.py
+++ b/kombu/tests/async/test_hub.py
@@ -1,9 +1,131 @@
from __future__ import absolute_import
+import errno
+
from kombu.async import hub as _hub
-from kombu.async.hub import Hub, get_event_loop, set_event_loop
+from kombu.async import Hub, READ, WRITE, ERR
+from kombu.async.debug import callback_for, repr_flag, _rcb
+from kombu.async.hub import (
+ Stop, get_event_loop, set_event_loop,
+ _raise_stop_error, _dummy_context
+)
+from kombu.async.semaphore import DummyLock, LaxBoundedSemaphore
+
+from kombu.tests.case import Case, Mock, call, patch
+
+
+class File(object):
+
+ def __init__(self, fd):
+ self.fd = fd
+
+ def fileno(self):
+ return self.fd
+
+ def __eq__(self, other):
+ if isinstance(other, File):
+ return self.fd == other.fd
+ return NotImplemented
+
+ def __hash__(self):
+ return hash(self.fd)
+
+
+class test_DummyLock(Case):
+
+ def test_context(self):
+ mutex = DummyLock()
+ with mutex:
+ pass
+
+
+class test_LaxBoundedSemaphore(Case):
+
+ def test_acquire_release(self):
+ x = LaxBoundedSemaphore(2)
+
+ c1 = Mock()
+ x.acquire(c1, 1)
+ self.assertEqual(x.value, 1)
+ c1.assert_called_with(1)
+
+ c2 = Mock()
+ x.acquire(c2, 2)
+ self.assertEqual(x.value, 0)
+ c2.assert_called_with(2)
+
+ c3 = Mock()
+ x.acquire(c3, 3)
+ self.assertEqual(x.value, 0)
+ self.assertFalse(c3.called)
+
+ x.release()
+ self.assertEqual(x.value, 0)
+ x.release()
+ self.assertEqual(x.value, 1)
+ x.release()
+ self.assertEqual(x.value, 2)
+ c3.assert_called_with(3)
+
+ def test_repr(self):
+ self.assertTrue(repr(LaxBoundedSemaphore(2)))
+
+ def test_bounded(self):
+ x = LaxBoundedSemaphore(2)
+ for i in range(100):
+ x.release()
+ self.assertEqual(x.value, 2)
+
+ def test_grow_shrink(self):
+ x = LaxBoundedSemaphore(1)
+ self.assertEqual(x.initial_value, 1)
+ cb1 = Mock()
+ x.acquire(cb1, 1)
+ cb1.assert_called_with(1)
+ self.assertEqual(x.value, 0)
+
+ cb2 = Mock()
+ x.acquire(cb2, 2)
+ self.assertFalse(cb2.called)
+ self.assertEqual(x.value, 0)
-from kombu.tests.case import Case
+ cb3 = Mock()
+ x.acquire(cb3, 3)
+ self.assertFalse(cb3.called)
+
+ x.grow(2)
+ cb2.assert_called_with(2)
+ cb3.assert_called_with(3)
+ self.assertEqual(x.value, 2)
+ self.assertEqual(x.initial_value, 3)
+
+ self.assertFalse(x._waiting)
+ x.grow(3)
+ for i in range(x.initial_value):
+ self.assertTrue(x.acquire(Mock()))
+ self.assertFalse(x.acquire(Mock()))
+ x.clear()
+
+ x.shrink(3)
+ for i in range(x.initial_value):
+ self.assertTrue(x.acquire(Mock()))
+ self.assertFalse(x.acquire(Mock()))
+ self.assertEqual(x.value, 0)
+
+ for i in range(100):
+ x.release()
+ self.assertEqual(x.value, x.initial_value)
+
+ def test_clear(self):
+ x = LaxBoundedSemaphore(10)
+ for i in range(11):
+ x.acquire(Mock())
+ self.assertTrue(x._waiting)
+ self.assertEqual(x.value, 0)
+
+ x.clear()
+ self.assertFalse(x._waiting)
+ self.assertEqual(x.value, x.initial_value)
class test_Utils(Case):
@@ -23,6 +145,14 @@ class test_Utils(Case):
self.assertIs(_hub._current_loop, hub)
self.assertIs(get_event_loop(), hub)
+ def test_dummy_context(self):
+ with _dummy_context():
+ pass
+
+ def test_raise_stop_error(self):
+ with self.assertRaises(Stop):
+ _raise_stop_error()
+
class test_Hub(Case):
@@ -31,3 +161,319 @@ class test_Hub(Case):
def teardown(self):
self.hub.close()
+
+ def test_reset(self):
+ self.hub.close = Mock(name='close')
+ self.hub._create_poller = Mock(name='_create_poller')
+ self.hub.reset()
+ self.hub.close.assert_called_with()
+ self.hub._create_poller.assert_called_with()
+
+ def test__close_poller__no_poller(self):
+ self.hub.poller = None
+ self.hub._close_poller()
+
+ def test__close_poller(self):
+ poller = self.hub.poller = Mock(name='poller')
+ self.hub._close_poller()
+ poller.close.assert_called_with()
+ self.assertIsNone(self.hub.poller)
+
+ def test_stop(self):
+ self.hub.call_soon = Mock(name='call_soon')
+ self.hub.stop()
+ self.hub.call_soon.assert_called_with(_raise_stop_error)
+
+ def test_call_later(self):
+ callback = Mock(name='callback')
+ self.hub.timer = Mock(name='hub.timer')
+ self.hub.call_later(10.0, callback, 1, 2)
+ self.hub.timer.call_after.assert_called_with(10.0, callback, (1, 2))
+
+ def test_call_at(self):
+ callback = Mock(name='callback')
+ self.hub.timer = Mock(name='hub.timer')
+ self.hub.call_at(21231122, callback, 1, 2)
+ self.hub.timer.call_at.assert_called_with(21231122, callback, (1, 2))
+
+ def test_repr(self):
+ self.assertTrue(repr(self.hub))
+
+ def test_repr_flag(self):
+ self.assertEqual(repr_flag(READ), 'R')
+ self.assertEqual(repr_flag(WRITE), 'W')
+ self.assertEqual(repr_flag(ERR), '!')
+ self.assertEqual(repr_flag(READ | WRITE), 'RW')
+ self.assertEqual(repr_flag(READ | ERR), 'R!')
+ self.assertEqual(repr_flag(WRITE | ERR), 'W!')
+ self.assertEqual(repr_flag(READ | WRITE | ERR), 'RW!')
+
+ def test_repr_callback_rcb(self):
+
+ def f():
+ pass
+
+ self.assertEqual(_rcb(f), f.__name__)
+ self.assertEqual(_rcb('foo'), 'foo')
+
+ @patch('kombu.async.hub.poll')
+ def test_start_stop(self, poll):
+ self.hub = Hub()
+ poll.assert_called_with()
+
+ poller = self.hub.poller
+ self.hub.stop()
+ self.hub.close()
+ poller.close.assert_called_with()
+
+ def test_fire_timers(self):
+ self.hub.timer = Mock()
+ self.hub.timer._queue = []
+ self.assertEqual(
+ self.hub.fire_timers(min_delay=42.324, max_delay=32.321),
+ 32.321,
+ )
+
+ self.hub.timer._queue = [1]
+ self.hub.scheduler = iter([(3.743, None)])
+ self.assertEqual(self.hub.fire_timers(), 3.743)
+
+ e1, e2, e3 = Mock(), Mock(), Mock()
+ entries = [e1, e2, e3]
+
+ def reset():
+ return [m.reset() for m in [e1, e2, e3]]
+
+ def se():
+ while 1:
+ while entries:
+ yield None, entries.pop()
+ yield 3.982, None
+ self.hub.scheduler = se()
+
+ self.assertEqual(self.hub.fire_timers(max_timers=10), 3.982)
+ for E in [e3, e2, e1]:
+ E.assert_called_with()
+ reset()
+
+ entries[:] = [Mock() for _ in range(11)]
+ keep = list(entries)
+ self.assertEqual(
+ self.hub.fire_timers(max_timers=10, min_delay=1.13),
+ 1.13,
+ )
+ for E in reversed(keep[1:]):
+ E.assert_called_with()
+ reset()
+ self.assertEqual(self.hub.fire_timers(max_timers=10), 3.982)
+ keep[0].assert_called_with()
+
+ def test_fire_timers_raises(self):
+ eback = Mock()
+ eback.side_effect = KeyError('foo')
+ self.hub.timer = Mock()
+ self.hub.scheduler = iter([(0, eback)])
+ with self.assertRaises(KeyError):
+ self.hub.fire_timers(propagate=(KeyError,))
+
+ eback.side_effect = ValueError('foo')
+ self.hub.scheduler = iter([(0, eback)])
+ with patch('kombu.async.hub.logger') as logger:
+ with self.assertRaises(StopIteration):
+ self.hub.fire_timers()
+ self.assertTrue(logger.error.called)
+
+ eback.side_effect = MemoryError('foo')
+ self.hub.scheduler = iter([(0, eback)])
+ with self.assertRaises(MemoryError):
+ self.hub.fire_timers()
+
+ eback.side_effect = OSError()
+ eback.side_effect.errno = errno.ENOMEM
+ self.hub.scheduler = iter([(0, eback)])
+ with self.assertRaises(OSError):
+ self.hub.fire_timers()
+
+ eback.side_effect = OSError()
+ eback.side_effect.errno = errno.ENOENT
+ self.hub.scheduler = iter([(0, eback)])
+ with patch('kombu.async.hub.logger') as logger:
+ with self.assertRaises(StopIteration):
+ self.hub.fire_timers()
+ self.assertTrue(logger.error.called)
+
+ def test_add_raises_ValueError(self):
+ self.hub.poller = Mock(name='hub.poller')
+ self.hub.poller.register.side_effect = ValueError()
+ self.hub._discard = Mock(name='hub.discard')
+ with self.assertRaises(ValueError):
+ self.hub.add(2, Mock(), READ)
+ self.hub._discard.assert_called_with(2)
+
+ def test_remove_reader(self):
+ self.hub.poller = Mock(name='hub.poller')
+ self.hub.add(2, Mock(), READ)
+ self.hub.add(2, Mock(), WRITE)
+ self.hub.remove_reader(2)
+ self.assertNotIn(2, self.hub.readers)
+ self.assertIn(2, self.hub.writers)
+
+ def test_remove_writer(self):
+ self.hub.poller = Mock(name='hub.poller')
+ self.hub.add(2, Mock(), READ)
+ self.hub.add(2, Mock(), WRITE)
+ self.hub.remove_writer(2)
+ self.assertIn(2, self.hub.readers)
+ self.assertNotIn(2, self.hub.writers)
+
+ def test_add__consolidate(self):
+ self.hub.poller = Mock(name='hub.poller')
+ self.hub.add(2, Mock(), WRITE, consolidate=True)
+ self.assertIn(2, self.hub.consolidate)
+ self.assertIsNone(self.hub.writers[2])
+
+ @patch('kombu.async.hub.logger')
+ def test_on_callback_error(self, logger):
+ self.hub.on_callback_error(Mock(name='callback'), KeyError())
+ self.assertTrue(logger.error.called)
+
+ def test_loop_property(self):
+ self.hub._loop = None
+ self.hub.create_loop = Mock(name='hub.create_loop')
+ self.assertIs(self.hub.loop, self.hub.create_loop())
+ self.assertIs(self.hub._loop, self.hub.create_loop())
+
+ def test_run_forever(self):
+ self.hub.run_once = Mock(name='hub.run_once')
+ self.hub.run_once.side_effect = Stop()
+ self.hub.run_forever()
+
+ def test_run_once(self):
+ self.hub._loop = iter([1])
+ self.hub.run_once()
+ self.hub.run_once()
+ self.assertIsNone(self.hub._loop)
+
+ def test_repr_active(self):
+ self.hub.readers = {1: Mock(), 2: Mock()}
+ self.hub.writers = {3: Mock(), 4: Mock()}
+ for value in list(
+ self.hub.readers.values()) + list(self.hub.writers.values()):
+ value.__name__ = 'mock'
+ self.assertTrue(self.hub.repr_active())
+
+ def test_repr_events(self):
+ self.hub.readers = {6: Mock(), 7: Mock(), 8: Mock()}
+ self.hub.writers = {9: Mock()}
+ for value in list(
+ self.hub.readers.values()) + list(self.hub.writers.values()):
+ value.__name__ = 'mock'
+ self.assertTrue(self.hub.repr_events([
+ (6, READ),
+ (7, ERR),
+ (8, READ | ERR),
+ (9, WRITE),
+ (10, 13213),
+ ]))
+
+ def test_callback_for(self):
+ reader, writer = Mock(), Mock()
+ self.hub.readers = {6: reader}
+ self.hub.writers = {7: writer}
+
+ self.assertEqual(callback_for(self.hub, 6, READ), reader)
+ self.assertEqual(callback_for(self.hub, 7, WRITE), writer)
+ with self.assertRaises(KeyError):
+ callback_for(self.hub, 6, WRITE)
+ self.assertEqual(callback_for(self.hub, 6, WRITE, 'foo'), 'foo')
+
+ def test_add_remove_readers(self):
+ P = self.hub.poller = Mock()
+
+ read_A = Mock()
+ read_B = Mock()
+ self.hub.add_reader(10, read_A, 10)
+ self.hub.add_reader(File(11), read_B, 11)
+
+ P.register.assert_has_calls([
+ call(10, self.hub.READ | self.hub.ERR),
+ call(11, self.hub.READ | self.hub.ERR),
+ ], any_order=True)
+
+ self.assertEqual(self.hub.readers[10], (read_A, (10,)))
+ self.assertEqual(self.hub.readers[11], (read_B, (11,)))
+
+ self.hub.remove(10)
+ self.assertNotIn(10, self.hub.readers)
+ self.hub.remove(File(11))
+ self.assertNotIn(11, self.hub.readers)
+ P.unregister.assert_has_calls([
+ call(10), call(11),
+ ])
+
+ def test_can_remove_unknown_fds(self):
+ self.hub.poller = Mock()
+ self.hub.remove(30)
+ self.hub.remove(File(301))
+
+ def test_remove__unregister_raises(self):
+ self.hub.poller = Mock()
+ self.hub.poller.unregister.side_effect = OSError()
+
+ self.hub.remove(313)
+
+ def test_add_writers(self):
+ P = self.hub.poller = Mock()
+
+ write_A = Mock()
+ write_B = Mock()
+ self.hub.add_writer(20, write_A)
+ self.hub.add_writer(File(21), write_B)
+
+ P.register.assert_has_calls([
+ call(20, self.hub.WRITE),
+ call(21, self.hub.WRITE),
+ ], any_order=True)
+
+ self.assertEqual(self.hub.writers[20], (write_A, ()))
+ self.assertEqual(self.hub.writers[21], (write_B, ()))
+
+ self.hub.remove(20)
+ self.assertNotIn(20, self.hub.writers)
+ self.hub.remove(File(21))
+ self.assertNotIn(21, self.hub.writers)
+ P.unregister.assert_has_calls([
+ call(20), call(21),
+ ])
+
+ def test_enter__exit(self):
+ P = self.hub.poller = Mock()
+ on_close = Mock()
+ self.hub.on_close.add(on_close)
+
+ try:
+ read_A = Mock()
+ read_B = Mock()
+ self.hub.add_reader(10, read_A)
+ self.hub.add_reader(File(11), read_B)
+ write_A = Mock()
+ write_B = Mock()
+ self.hub.add_writer(20, write_A)
+ self.hub.add_writer(File(21), write_B)
+ self.assertTrue(self.hub.readers)
+ self.assertTrue(self.hub.writers)
+ finally:
+ assert self.hub.poller
+ self.hub.close()
+ self.assertFalse(self.hub.readers)
+ self.assertFalse(self.hub.writers)
+
+ P.unregister.assert_has_calls([
+ call(10), call(11), call(20), call(21),
+ ], any_order=True)
+
+ on_close.assert_called_with(self.hub)
+
+ def test_scheduler_property(self):
+ hub = Hub(timer=[1, 2, 3])
+ self.assertEqual(list(hub.scheduler), [1, 2, 3])
diff --git a/kombu/tests/async/test_timer.py b/kombu/tests/async/test_timer.py
new file mode 100644
index 00000000..c30a3289
--- /dev/null
+++ b/kombu/tests/async/test_timer.py
@@ -0,0 +1,158 @@
+from __future__ import absolute_import, unicode_literals
+
+from kombu.async.timer import Entry, Timer, to_timestamp
+
+from kombu.tests.case import Case, Mock, patch, redirect_stdouts
+
+
+class test_to_timestamp(Case):
+
+ def test_to_timestamp(self):
+ self.assertIs(to_timestamp(3.13), 3.13)
+
+
+class test_Entry(Case):
+
+ def test_call(self):
+ scratch = [None]
+
+ def timed(x, y, moo='foo'):
+ scratch[0] = (x, y, moo)
+
+ tref = Entry(timed, (4, 4), {'moo': 'baz'})
+ tref()
+
+ self.assertTupleEqual(scratch[0], (4, 4, 'baz'))
+
+ def test_cancel(self):
+ tref = Entry(lambda x: x, (1,), {})
+ self.assertFalse(tref.canceled)
+ self.assertFalse(tref.cancelled)
+ tref.cancel()
+ self.assertTrue(tref.canceled)
+ self.assertTrue(tref.cancelled)
+
+ def test_repr(self):
+ tref = Entry(lambda x: x(1,), {})
+ self.assertTrue(repr(tref))
+
+ def test_hash(self):
+ self.assertTrue(hash(Entry(lambda: None)))
+
+ def test_ordering(self):
+ # we don't care about results, just that it's possible
+ Entry(lambda x: 1) < Entry(lambda x: 2)
+ Entry(lambda x: 1) > Entry(lambda x: 2)
+ Entry(lambda x: 1) >= Entry(lambda x: 2)
+ Entry(lambda x: 1) <= Entry(lambda x: 2)
+
+ def test_eq(self):
+ x = Entry(lambda x: 1)
+ y = Entry(lambda x: 1)
+ self.assertEqual(x, x)
+ self.assertNotEqual(x, y)
+
+
+class test_Timer(Case):
+
+ def test_enter_exit(self):
+ x = Timer()
+ x.stop = Mock(name='timer.stop')
+ with x:
+ pass
+ x.stop.assert_called_with()
+
+ def test_supports_Timer_interface(self):
+ x = Timer()
+ x.stop()
+
+ tref = Mock()
+ x.cancel(tref)
+ tref.cancel.assert_called_with()
+
+ self.assertIs(x.schedule, x)
+
+ def test_handle_error(self):
+ from datetime import datetime
+ scratch = [None]
+
+ def on_error(exc_info):
+ scratch[0] = exc_info
+
+ s = Timer(on_error=on_error)
+
+ with patch('kombu.async.timer.to_timestamp') as tot:
+ tot.side_effect = OverflowError()
+ s.enter_at(Entry(lambda: None, (), {}),
+ eta=datetime.now())
+ s.enter_at(Entry(lambda: None, (), {}), eta=None)
+ s.on_error = None
+ with self.assertRaises(OverflowError):
+ s.enter_at(Entry(lambda: None, (), {}),
+ eta=datetime.now())
+ exc = scratch[0]
+ self.assertIsInstance(exc, OverflowError)
+
+ def test_call_repeatedly(self):
+ t = Timer()
+ try:
+ t.schedule.enter_after = Mock()
+
+ myfun = Mock()
+ myfun.__name__ = b'myfun'
+ t.call_repeatedly(0.03, myfun)
+
+ self.assertEqual(t.schedule.enter_after.call_count, 1)
+ args1, _ = t.schedule.enter_after.call_args_list[0]
+ sec1, tref1, _ = args1
+ self.assertEqual(sec1, 0.03)
+ tref1()
+
+ self.assertEqual(t.schedule.enter_after.call_count, 2)
+ args2, _ = t.schedule.enter_after.call_args_list[1]
+ sec2, tref2, _ = args2
+ self.assertEqual(sec2, 0.03)
+ tref2.canceled = True
+ tref2()
+
+ self.assertEqual(t.schedule.enter_after.call_count, 2)
+ finally:
+ t.stop()
+
+ @patch('kombu.async.timer.logger')
+ def test_apply_entry_error_handled(self, logger):
+ t = Timer()
+ t.schedule.on_error = None
+
+ fun = Mock()
+ fun.side_effect = ValueError()
+
+ t.schedule.apply_entry(fun)
+ self.assertTrue(logger.error.called)
+
+ @redirect_stdouts
+ def test_apply_entry_error_not_handled(self, stdout, stderr):
+ t = Timer()
+ t.schedule.on_error = Mock()
+
+ fun = Mock()
+ fun.side_effect = ValueError()
+ t.schedule.apply_entry(fun)
+ fun.assert_called_with()
+ self.assertFalse(stderr.getvalue())
+
+ def test_enter_after(self):
+ t = Timer()
+ t._enter = Mock()
+ fun = Mock(name='fun')
+ time = Mock(name='time')
+ time.return_value = 10
+ t.enter_after(10, fun, time=time)
+ time.assert_called_with()
+ t._enter.assert_called_with(20, 0, fun)
+
+ def test_cancel(self):
+ t = Timer()
+ tref = Mock()
+ t.cancel(tref)
+ tref.cancel.assert_called_with()
diff --git a/kombu/tests/case.py b/kombu/tests/case.py
index 38b6cdb2..f9a2a21e 100644
--- a/kombu/tests/case.py
+++ b/kombu/tests/case.py
@@ -1,12 +1,13 @@
from __future__ import absolute_import
import importlib
+import inspect
import os
import sys
import types
from contextlib import contextmanager
-from functools import wraps
+from functools import partial, wraps
from io import StringIO
try:
@@ -16,7 +17,7 @@ except ImportError:
from nose import SkipTest
-from kombu.five import builtins, string_t
+from kombu.five import builtins, reraise, string_t
from kombu.utils.encoding import ensure_bytes
try:
@@ -51,6 +52,51 @@ class Case(unittest.TestCase):
def teardown(self):
pass
+ def patch(self, *path, **options):
+ manager = patch('.'.join(path), **options)
+ patched = manager.start()
+ self.addCleanup(manager.stop)
+ return patched
+
+ def mock_modules(self, *mods):
+ modules = []
+ for mod in mods:
+ mod = mod.split('.')
+ modules.extend(reversed([
+ '.'.join(mod[:-i] if i else mod) for i in range(len(mod))
+ ]))
+ modules = sorted(set(modules))
+ return self.wrap_context(mock_module(*modules))
+
+ def on_nth_call_do(self, mock, side_effect, n=1):
+
+ def on_call(*args, **kwargs):
+ if mock.call_count >= n:
+ mock.side_effect = side_effect
+ return mock.return_value
+ mock.side_effect = on_call
+ return mock
+
+ def on_nth_call_return(self, mock, retval, n=1):
+
+ def on_call(*args, **kwargs):
+ if mock.call_count >= n:
+ mock.return_value = retval
+ return mock.return_value
+ mock.side_effect = on_call
+ return mock
+
+ def mask_modules(self, *modules):
+ self.wrap_context(mask_modules(*modules))
+
+ def wrap_context(self, context):
+ ret = context.__enter__()
+ self.addCleanup(partial(context.__exit__, None, None, None))
+ return ret
+
+ def mock_environ(self, env_name, env_value):
+ return self.wrap_context(mock_environ(env_name, env_value))
+
def PromiseMock(*args, **kwargs):
m = Mock(*args, **kwargs)
@@ -97,14 +143,18 @@ def case_no_pypy(cls):
def case_no_python3(cls):
- setup = cls.setUp
+ wrap, is_cls = cls, False
+ if inspect.isclass(cls):
+ wrap, is_cls = cls.setUp, True
- @wraps(setup)
+ @wraps(wrap)
def around_setup(self):
if PY3:
raise SkipTest('Python 3 incompatible')
- setup(self)
- cls.setUp = around_setup
+ return wrap(self) if is_cls else wrap()
+
+ if is_cls:
+ cls.setUp = around_setup
return cls
@@ -306,3 +356,65 @@ def set_module_symbol(module, key, value):
yield
finally:
setattr(module, key, prev)
+
+
+@contextmanager
+def mock_module(*names):
+ prev = {}
+
+ class MockModule(types.ModuleType):
+
+ def __getattr__(self, attr):
+ setattr(self, attr, Mock())
+ return types.ModuleType.__getattribute__(self, attr)
+
+ mods = []
+ for name in names:
+ try:
+ prev[name] = sys.modules[name]
+ except KeyError:
+ pass
+ mod = sys.modules[name] = MockModule(name)
+ mods.append(mod)
+ try:
+ yield mods
+ finally:
+ for name in names:
+ try:
+ sys.modules[name] = prev[name]
+ except KeyError:
+ try:
+ del(sys.modules[name])
+ except KeyError:
+ pass
+
+
+@contextmanager
+def mock_context(mock, typ=Mock):
+ context = mock.return_value = Mock()
+ context.__enter__ = typ()
+ context.__exit__ = typ()
+
+ def on_exit(*x):
+ if x[0]:
+ reraise(x[0], x[1], x[2])
+ context.__exit__.side_effect = on_exit
+ context.__enter__.return_value = context
+ try:
+ yield context
+ finally:
+ context.reset()
+
+
+@contextmanager
+def mock_environ(env_name, env_value):
+ sentinel = object()
+ prev_val = os.environ.get(env_name, sentinel)
+ os.environ[env_name] = env_value
+ try:
+ yield env_value
+ finally:
+ if prev_val is sentinel:
+ os.environ.pop(env_name, None)
+ else:
+ os.environ[env_name] = prev_val
diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py
index 84786424..44c8749d 100644
--- a/kombu/tests/test_connection.py
+++ b/kombu/tests/test_connection.py
@@ -8,6 +8,7 @@ from copy import copy
from kombu import Connection, Consumer, Producer, parse_url
from kombu.connection import Resource
from kombu.five import items, range
+from kombu.utils.functional import lazy
from .case import Case, Mock, SkipTest, patch, skip_if_not_module
from .mocks import Transport
@@ -332,6 +333,28 @@ class test_Connection(Case):
self.assertFalse(c.completes_cycle(1))
self.assertTrue(c.completes_cycle(2))
+ def test_get_heartbeat_interval(self):
+ self.conn.transport.get_heartbeat_interval = Mock(name='ghi')
+ self.assertIs(
+ self.conn.get_heartbeat_interval(),
+ self.conn.transport.get_heartbeat_interval.return_value,
+ )
+ self.conn.transport.get_heartbeat_interval.assert_called_with(
+ self.conn.connection)
+
+ def test_supports_exchange_type(self):
+ self.conn.transport.implements.exchange_type = {'topic'}
+ self.assertTrue(self.conn.supports_exchange_type('topic'))
+ self.assertFalse(self.conn.supports_exchange_type('fanout'))
+
+ def test_qos_semantics_matches_spec(self):
+ qsms = self.conn.transport.qos_semantics_matches_spec = Mock(name='qsms')
+ self.assertIs(
+ self.conn.qos_semantics_matches_spec,
+ qsms.return_value,
+ )
+ qsms.assert_called_with(self.conn.connection)
+
def test__enter____exit__(self):
conn = self.conn
context = conn.__enter__()
@@ -622,6 +645,19 @@ class test_ConnectionPool(ResourceCase):
def create_resource(self, limit):
return Connection(port=5672, transport=Transport).Pool(limit)
+ def test_collect_resource__does_not_collect_lazy_resource(self):
+ P = self.create_resource(10)
+ res = lazy(object())
+ res.collect = Mock(name='collect')
+ P.collect_resource(res)
+ self.assertFalse(res.collect.called)
+
+ def test_collect_resource(self):
+ res = Mock(name='res')
+ P = self.create_resource(10)
+ P.collect_resource(res, socket_timeout=10.3)
+ res.collect.assert_called_with(10.3)
+
def test_setup(self):
P = self.create_resource(10)
q = P._resource.queue
diff --git a/kombu/tests/test_exceptions.py b/kombu/tests/test_exceptions.py
new file mode 100644
index 00000000..1d04b359
--- /dev/null
+++ b/kombu/tests/test_exceptions.py
@@ -0,0 +1,11 @@
+from __future__ import absolute_import, unicode_literals
+
+from kombu.exceptions import HttpError
+
+from kombu.tests.case import Case, Mock
+
+
+class test_HttpError(Case):
+
+ def test_str(self):
+ self.assertTrue(str(HttpError(200, 'msg', Mock(name='response'))))
diff --git a/kombu/tests/test_message.py b/kombu/tests/test_message.py
new file mode 100644
index 00000000..a27d6e0f
--- /dev/null
+++ b/kombu/tests/test_message.py
@@ -0,0 +1,44 @@
+from __future__ import absolute_import, unicode_literals
+
+import sys
+
+from kombu.message import Message
+
+from .case import Case, Mock, patch
+
+
+class test_Message(Case):
+
+ def test_repr(self):
+ self.assertTrue(repr(Message(Mock(), 'b')))
+
+ def test_decode(self):
+ m = Message(Mock(), 'body')
+ decode = m._decode = Mock()
+ self.assertIsNone(m._decoded_cache)
+ self.assertIs(m.decode(), m._decode.return_value)
+ self.assertIs(m._decoded_cache, m._decode.return_value)
+ m._decode.assert_called_with()
+ m._decode = Mock()
+ self.assertIs(m.decode(), decode.return_value)
+
+ def test_reraise_error(self):
+ m = Message(Mock(), 'body')
+ callback = Mock(name='callback')
+ try:
+ raise KeyError('foo')
+ except KeyError as exc:
+ m.errors.append(sys.exc_info())
+ m._reraise_error(callback)
+ self.assertTrue(callback.called)
+
+ with self.assertRaises(KeyError):
+ m._reraise_error(None)
+
+ @patch('kombu.message.decompress')
+ def test_decompression_stores_error(self, decompress):
+ decompress.side_effect = RuntimeError()
+ m = Message(Mock(), 'body', headers={'compression': 'zlib'})
+ with self.assertRaises(RuntimeError):
+ m._reraise_error(None)
+
diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py
index 5d7a2711..4e1a24e1 100644
--- a/kombu/tests/test_messaging.py
+++ b/kombu/tests/test_messaging.py
@@ -1,6 +1,7 @@
from __future__ import absolute_import, unicode_literals
import pickle
+import sys
from collections import defaultdict
@@ -129,6 +130,19 @@ class test_Producer(Case):
p._channel.basic_publish.call_args[1]['exchange'], 'foo',
)
+ def test_publish_with_expiration(self):
+ p = self.connection.Producer()
+ p.channel = Mock()
+ p.publish('hello', exchange=Exchange('foo'), expiration=10)
+ properties = p._channel.prepare_message.call_args[0][5]
+ self.assertEqual(properties['expiration'], '10000')
+
+ def test_publish_with_reply_to(self):
+ p = self.connection.Producer()
+ p.channel = Mock()
+ p.publish('hello', exchange=Exchange('foo'), reply_to=Queue('foo'))
+ properties = p._channel.prepare_message.call_args[0][5]
+ self.assertEqual(properties['reply_to'], 'foo')
def test_set_on_return(self):
chan = Mock()
chan.events = defaultdict(Mock)
@@ -349,6 +363,19 @@ class test_Consumer(Case):
finally:
channel.message_to_python = m2p
+ def test_receive_callback__message_errors(self):
+ channel = self.connection.channel()
+ channel.message_to_python = None
+ c = channel.Consumer()
+ message = Mock()
+ try:
+ raise KeyError('foo')
+ except KeyError:
+ message.errors = [sys.exc_info()]
+ message._reraise_error.side_effect = KeyError()
+ with self.assertRaises(KeyError):
+ c._receive_callback(message)
+
def test_set_callbacks(self):
channel = self.connection.channel()
queue = Queue('qname', self.exchange, 'rkey')
diff --git a/kombu/tests/transport/test_sqlalchemy.py b/kombu/tests/transport/test_sqlalchemy.py
index 72ae2d98..374d7a19 100644
--- a/kombu/tests/transport/test_sqlalchemy.py
+++ b/kombu/tests/transport/test_sqlalchemy.py
@@ -1,6 +1,7 @@
from __future__ import absolute_import
from kombu import Connection
+
from kombu.tests.case import Case, SkipTest, patch, case_requires
diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py
index 33764341..a012c857 100644
--- a/kombu/tests/transport/virtual/test_base.py
+++ b/kombu/tests/transport/virtual/test_base.py
@@ -9,7 +9,7 @@ from kombu.transport import virtual
from kombu.utils import uuid
from kombu.compression import compress
-from kombu.tests.case import Case, Mock, patch, redirect_stdouts
+from kombu.tests.case import Case, MagicMock, Mock, patch, redirect_stdouts
PY3 = sys.version_info[0] == 3
PRINT_FQDN = 'builtins.print' if PY3 else '__builtin__.print'
@@ -49,6 +49,10 @@ class test_QoS(Case):
self.assertFalse(self.q._delivered.restored)
self.assertTrue(self.q._on_collect)
+ def test_restore_visible__interface(self):
+ qos = virtual.QoS(client().channel())
+ qos.restore_visible()
+
@redirect_stdouts
def test_can_consume(self, stdout, stderr):
_restored = []
@@ -297,6 +301,15 @@ class test_Channel(Case):
c.queue_purge(n)
self.assertIn(n, c.purged)
+ def test_basic_publish__anon_exchange(self):
+ c = memory_client().channel()
+ msg = MagicMock(name='msg')
+ c.encode_body = Mock(name='c.encode_body')
+ c.encode_body.return_value = (1, 2)
+ c._put = Mock(name='c._put')
+ c.basic_publish(msg, None, 'rkey', kw=1)
+ c._put.assert_called_with('rkey', msg, kw=1)
+
def test_basic_publish_unique_delivery_tags(self, n='test_uniq_tag'):
c1 = memory_client().channel()
c2 = memory_client().channel()
@@ -573,3 +586,38 @@ class test_Transport(Case):
channel = self.transport.create_channel(self.transport)
with self.assertRaises(virtual.Empty):
self.transport._drain_channel(channel)
+
+ def test__deliver__no_queue(self):
+ with self.assertRaises(KeyError):
+ self.transport._deliver(Mock(name='msg'), queue=None)
+
+ def test__reject_inbound_message(self):
+ channel = Mock(name='channel')
+ self.transport.channels = [None, channel]
+ self.transport._reject_inbound_message({'foo': 'bar'})
+ channel.Message.assert_called_with(channel, {'foo': 'bar'})
+ channel.qos.append.assert_called_with(
+ channel.Message(), channel.Message().delivery_tag,
+ )
+ channel.basic_reject.assert_called_with(
+ channel.Message().delivery_tag, requeue=True,
+ )
+
+ def test_on_message_ready(self):
+ channel = Mock(name='channel')
+ msg = Mock(name='msg')
+ callback = Mock(name='callback')
+ self.transport._callbacks = {'q1': callback}
+ self.transport.on_message_ready(channel, msg, queue='q1')
+ callback.assert_called_with(msg)
+
+ def test_on_message_ready__no_queue(self):
+ with self.assertRaises(KeyError):
+ self.transport.on_message_ready(
+ Mock(name='channel'), Mock(name='msg'), queue=None)
+
+ def test_on_message_ready__no_callback(self):
+ self.transport._callbacks = {}
+ with self.assertRaises(KeyError):
+ self.transport.on_message_ready(
+ Mock(name='channel'), Mock(name='msg'), queue='q1')
diff --git a/kombu/tests/utils/test_encoding.py b/kombu/tests/utils/test_encoding.py
index cc7e514e..9d99f0e5 100644
--- a/kombu/tests/utils/test_encoding.py
+++ b/kombu/tests/utils/test_encoding.py
@@ -7,7 +7,10 @@ import sys
from contextlib import contextmanager
from kombu.five import bytes_t, string_t
-from kombu.utils.encoding import safe_str, default_encoding
+from kombu.utils.encoding import (
+ get_default_encoding_file, safe_str,
+ set_default_encoding_file, default_encoding,
+)
from kombu.tests.case import Case, SkipTest, patch
@@ -25,6 +28,14 @@ def clean_encoding():
class test_default_encoding(Case):
+ def test_set_default_file(self):
+ prev = get_default_encoding_file()
+ try:
+ set_default_encoding_file('/foo.txt')
+ self.assertEqual(get_default_encoding_file(), '/foo.txt')
+ finally:
+ set_default_encoding_file(prev)
+
@patch('sys.getfilesystemencoding')
def test_default(self, getdefaultencoding):
getdefaultencoding.return_value = 'ascii'
diff --git a/kombu/tests/utils/test_functional.py b/kombu/tests/utils/test_functional.py
index f215dea5..fa782764 100644
--- a/kombu/tests/utils/test_functional.py
+++ b/kombu/tests/utils/test_functional.py
@@ -3,7 +3,10 @@ from __future__ import absolute_import
import pickle
import sys
-from kombu.utils.functional import lazy, maybe_evaluate
+from itertools import count
+
+from kombu.five import THREAD_TIMEOUT_MAX, items
+from kombu.utils.functional import LRUCache, memoize, lazy, maybe_evaluate
from kombu.tests.case import Case, SkipTest
@@ -12,6 +15,132 @@ def double(x):
return x * 2
+class test_LRUCache(Case):
+
+ def test_expires(self):
+ limit = 100
+ x = LRUCache(limit=limit)
+ slots = list(range(limit * 2))
+ for i in slots:
+ x[i] = i
+ self.assertListEqual(list(x.keys()), list(slots[limit:]))
+ self.assertTrue(x.items())
+ self.assertTrue(x.values())
+
+ def test_is_pickleable(self):
+ x = LRUCache(limit=10)
+ x.update(luke=1, leia=2)
+ y = pickle.loads(pickle.dumps(x))
+ self.assertEqual(y.limit, y.limit)
+ self.assertEqual(y, x)
+
+ def test_update_expires(self):
+ limit = 100
+ x = LRUCache(limit=limit)
+ slots = list(range(limit * 2))
+ for i in slots:
+ x.update({i: i})
+
+ self.assertListEqual(list(x.keys()), list(slots[limit:]))
+
+ def test_least_recently_used(self):
+ x = LRUCache(3)
+
+ x[1], x[2], x[3] = 1, 2, 3
+ self.assertEqual(list(x.keys()), [1, 2, 3])
+
+ x[4], x[5] = 4, 5
+ self.assertEqual(list(x.keys()), [3, 4, 5])
+
+ # access 3, which makes it the last used key.
+ x[3]
+ x[6] = 6
+ self.assertEqual(list(x.keys()), [5, 3, 6])
+
+ x[7] = 7
+ self.assertEqual(list(x.keys()), [3, 6, 7])
+
+ def test_update_larger_than_cache_size(self):
+ x = LRUCache(2)
+ x.update({x: x for x in range(100)})
+ self.assertEqual(list(x.keys()), [98, 99])
+
+ def assertSafeIter(self, method, interval=0.01, size=10000):
+ if sys.version_info >= (3, 5):
+ raise SkipTest('Fails on Py3.5')
+ from threading import Thread, Event
+ from time import sleep
+ x = LRUCache(size)
+ x.update(zip(range(size), range(size)))
+
+ class Burglar(Thread):
+
+ def __init__(self, cache):
+ self.cache = cache
+ self.__is_shutdown = Event()
+ self.__is_stopped = Event()
+ Thread.__init__(self)
+
+ def run(self):
+ while not self.__is_shutdown.isSet():
+ try:
+ self.cache.popitem(last=False)
+ except KeyError:
+ break
+ self.__is_stopped.set()
+
+ def stop(self):
+ self.__is_shutdown.set()
+ self.__is_stopped.wait()
+ self.join(THREAD_TIMEOUT_MAX)
+
+ burglar = Burglar(x)
+ burglar.start()
+ try:
+ for _ in getattr(x, method)():
+ sleep(0.0001)
+ finally:
+ burglar.stop()
+
+ def test_safe_to_remove_while_iteritems(self):
+ self.assertSafeIter('iteritems')
+
+ def test_safe_to_remove_while_keys(self):
+ self.assertSafeIter('keys')
+
+ def test_safe_to_remove_while_itervalues(self):
+ self.assertSafeIter('itervalues')
+
+ def test_items(self):
+ c = LRUCache()
+ c.update(a=1, b=2, c=3)
+ self.assertTrue(list(items(c)))
+
+ def test_incr(self):
+ c = LRUCache()
+ c.update(a='1')
+ c.incr('a')
+ self.assertEqual(c['a'], '2')
+
+
+class test_memoize(Case):
+
+ def test_memoize(self):
+ counter = count(1)
+
+ @memoize(maxsize=2)
+ def x(i):
+ return next(counter)
+
+ self.assertEqual(x(1), 1)
+ self.assertEqual(x(1), 1)
+ self.assertEqual(x(2), 2)
+ self.assertEqual(x(3), 3)
+ self.assertEqual(x(1), 4)
+ x.clear()
+ self.assertEqual(x(3), 5)
+
+
class test_lazy(Case):
def test__str__(self):
diff --git a/kombu/tests/utils/test_json.py b/kombu/tests/utils/test_json.py
new file mode 100644
index 00000000..00f38e7d
--- /dev/null
+++ b/kombu/tests/utils/test_json.py
@@ -0,0 +1,49 @@
+from __future__ import absolute_import, unicode_literals
+
+from kombu.utils.encoding import str_to_bytes
+from kombu.utils.json import _DecodeError, dumps, loads
+
+from kombu.tests.case import Case, MagicMock, Mock, case_no_python3
+
+
+class Custom(object):
+
+ def __init__(self, data):
+ self.data = data
+
+ def __json__(self):
+ return self.data
+
+
+class test_dumps_loads(Case):
+
+ def test_dumps_custom_object(self):
+ x = {'foo': Custom({'a': 'b'})}
+ self.assertEqual(loads(dumps(x)), {'foo': x['foo'].__json__()})
+
+ def test_dumps_custom_object_no_json(self):
+ x = {'foo': object()}
+ with self.assertRaises(TypeError):
+ dumps(x)
+
+ def test_loads_memoryview(self):
+ self.assertEqual(loads(memoryview(dumps({'x': 'z'}))), {'x': 'z'})
+
+ def test_loads_bytearray(self):
+ self.assertEqual(loads(bytearray(dumps({'x': 'z'}))), {'x': 'z'})
+
+ def test_loads_bytes(self):
+ self.assertEqual(
+ loads(str_to_bytes(dumps({'x': 'z'})), decode_bytes=True),
+ {'x': 'z'},
+ )
+
+ @case_no_python3
+ def test_loads_buffer(self):
+ self.assertEqual(loads(buffer(dumps({'x': 'z'}))), {'x': 'z'})
+
+ def test_loads_DecodeError(self):
+ _loads = Mock(name='_loads')
+ _loads.side_effect = _DecodeError(
+ MagicMock(), MagicMock(), MagicMock())
+ self.assertEqual(loads(dumps({'x': 'z'}), _loads=_loads), {'x': 'z'})
diff --git a/kombu/tests/utils/test_scheduling.py b/kombu/tests/utils/test_scheduling.py
index e1f1fe5a..70277a31 100644
--- a/kombu/tests/utils/test_scheduling.py
+++ b/kombu/tests/utils/test_scheduling.py
@@ -1,6 +1,6 @@
from __future__ import absolute_import
-from kombu.utils.scheduling import FairCycle
+from kombu.utils.scheduling import FairCycle, cycle_by_name
from kombu.tests.case import Case
@@ -65,3 +65,48 @@ class test_FairCycle(Case):
def test__repr__(self):
self.assertTrue(repr(FairCycle(lambda x: x, [1, 2, 3], MyEmpty)))
+
+
+class test_round_robin_cycle(Case):
+
+ def test_round_robin_cycle(self):
+ it = cycle_by_name('round_robin')(['A', 'B', 'C'])
+ self.assertListEqual(it.consume(3), ['A', 'B', 'C'])
+ it.rotate('B')
+ self.assertListEqual(it.consume(3), ['A', 'C', 'B'])
+ it.rotate('A')
+ self.assertListEqual(it.consume(3), ['C', 'B', 'A'])
+ it.rotate('A')
+ self.assertListEqual(it.consume(3), ['C', 'B', 'A'])
+ it.rotate('C')
+ self.assertListEqual(it.consume(3), ['B', 'A', 'C'])
+
+
+class test_priority_cycle(Case):
+
+ def test_priority_cycle(self):
+ it = cycle_by_name('priority')(['A', 'B', 'C'])
+ self.assertListEqual(it.consume(3), ['A', 'B', 'C'])
+ it.rotate('B')
+ self.assertListEqual(it.consume(3), ['A', 'B', 'C'])
+ it.rotate('A')
+ self.assertListEqual(it.consume(3), ['A', 'B', 'C'])
+ it.rotate('A')
+ self.assertListEqual(it.consume(3), ['A', 'B', 'C'])
+ it.rotate('C')
+ self.assertListEqual(it.consume(3), ['A', 'B', 'C'])
+
+
+class test_sorted_cycle(Case):
+
+ def test_sorted_cycle(self):
+ it = cycle_by_name('sorted')(['B', 'C', 'A'])
+ self.assertListEqual(it.consume(3), ['A', 'B', 'C'])
+ it.rotate('B')
+ self.assertListEqual(it.consume(3), ['A', 'B', 'C'])
+ it.rotate('A')
+ self.assertListEqual(it.consume(3), ['A', 'B', 'C'])
+ it.rotate('A')
+ self.assertListEqual(it.consume(3), ['A', 'B', 'C'])
+ it.rotate('C')
+ self.assertListEqual(it.consume(3), ['A', 'B', 'C'])
diff --git a/kombu/tests/utils/test_url.py b/kombu/tests/utils/test_url.py
new file mode 100644
index 00000000..67c7efce
--- /dev/null
+++ b/kombu/tests/utils/test_url.py
@@ -0,0 +1,50 @@
+from __future__ import absolute_import, unicode_literals
+
+from kombu.utils.url import as_url, parse_url, maybe_sanitize_url
+
+from kombu.tests.case import Case
+
+
+class test_parse_url(Case):
+
+ def test_parse_url(self):
+ result = parse_url('amqp://user:pass@localhost:5672/my/vhost')
+ self.assertDictEqual(result, {
+ 'transport': 'amqp',
+ 'userid': 'user',
+ 'password': 'pass',
+ 'hostname': 'localhost',
+ 'port': 5672,
+ 'virtual_host': 'my/vhost',
+ })
+
+
+class test_as_url(Case):
+
+ def test_as_url(self):
+ self.assertEqual(as_url('https'), 'https:///')
+ self.assertEqual(as_url('https', 'e.com'), 'https://e.com/')
+ self.assertEqual(as_url('https', 'e.com', 80), 'https://e.com:80/')
+ self.assertEqual(
+ as_url('https', 'e.com', 80, 'u'), 'https://u@e.com:80/',
+ )
+ self.assertEqual(
+ as_url('https', 'e.com', 80, 'u', 'p'), 'https://u:p@e.com:80/',
+ )
+ self.assertEqual(
+ as_url('https', 'e.com', 80, None, 'p'), 'https://:p@e.com:80/',
+ )
+ self.assertEqual(
+ as_url('https', 'e.com', 80, None, 'p', '/foo'),
+ 'https://:p@e.com:80//foo',
+ )
+
+
+class test_maybe_sanitize_url(Case):
+
+ def test_maybe_sanitize_url(self):
+ self.assertEqual(maybe_sanitize_url('foo'), 'foo')
+ self.assertEqual(
+ maybe_sanitize_url('http://u:p@e.com//foo'),
+ 'http://u:**@e.com//foo',
+ )
diff --git a/kombu/tests/utils/test_utils.py b/kombu/tests/utils/test_utils.py
index 29a0a9b1..6900e131 100644
--- a/kombu/tests/utils/test_utils.py
+++ b/kombu/tests/utils/test_utils.py
@@ -373,3 +373,13 @@ class test_version_string_as_tuple(Case):
version_string_as_tuple('3.3.1.a3.40c32'),
version_info_t(3, 3, 1, 'a3', '40c32'),
)
+
+
+class test_maybe_fileno(Case):
+
+ def test_maybe_fileno(self):
+ self.assertEqual(utils.maybe_fileno(3), 3)
+ f = Mock(name='file')
+ self.assertIs(utils.maybe_fileno(f), f.fileno())
+ f.fileno.side_effect = ValueError()
+ self.assertIsNone(utils.maybe_fileno(f))
diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py
index bb305b0b..3184fad1 100644
--- a/kombu/transport/virtual/__init__.py
+++ b/kombu/transport/virtual/__init__.py
@@ -881,7 +881,7 @@ class Transport(base.Transport):
def _deliver(self, message, queue):
if not queue:
raise KeyError(
- 'Received message without destination queue: {1}'.format(
+ 'Received message without destination queue: {0}'.format(
message))
try:
callback = self._callbacks[queue]
diff --git a/kombu/utils/encoding.py b/kombu/utils/encoding.py
index d054257a..72f91539 100644
--- a/kombu/utils/encoding.py
+++ b/kombu/utils/encoding.py
@@ -100,7 +100,7 @@ def safe_str(s, errors='replace'):
return _safe_str(s, errors)
-if is_py3k:
+if is_py3k: # pragma: no cover
def _safe_str(s, errors='replace', file=None):
if isinstance(s, str):
diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py
index 83aca99a..0dc7f8ac 100644
--- a/kombu/utils/functional.py
+++ b/kombu/utils/functional.py
@@ -5,7 +5,6 @@ import threading
from collections import Iterable, Mapping, OrderedDict
from functools import wraps
-from itertools import islice
from kombu.five import UserDict, items, keys, string_t
@@ -32,7 +31,7 @@ class LRUCache(UserDict):
def __getitem__(self, key):
with self.mutex:
value = self[key] = self.data.pop(key)
- return value
+ return value
def update(self, *args, **kwargs):
with self.mutex:
@@ -40,9 +39,12 @@ class LRUCache(UserDict):
data.update(*args, **kwargs)
if limit and len(data) > limit:
# pop additional items in case limit exceeded
- # negative overflow will lead to an empty list
- for item in islice(iter(data), len(data) - limit):
- data.pop(item)
+ for _ in range(len(data) - limit):
+ data.popitem(last=False)
+
+ def popitem(self, last=True):
+ with self.mutex:
+ return self.data.popitem(last)
def __setitem__(self, key, value):
# remove least recently used key.
@@ -55,24 +57,28 @@ class LRUCache(UserDict):
return iter(self.data)
def _iterate_items(self):
- for k in self:
- try:
- yield (k, self.data[k])
- except KeyError: # pragma: no cover
- pass
+ with self.mutex:
+ for k in self:
+ try:
+ yield (k, self.data[k])
+ except KeyError: # pragma: no cover
+ pass
iteritems = _iterate_items
def _iterate_values(self):
- for k in self:
- try:
- yield self.data[k]
- except KeyError: # pragma: no cover
- pass
+ with self.mutex:
+ for k in self:
+ try:
+ yield self.data[k]
+ except KeyError: # pragma: no cover
+ pass
+
itervalues = _iterate_values
def _iterate_keys(self):
# userdict.keys in py3k calls __getitem__
- return keys(self.data)
+ with self.mutex:
+ return keys(self.data)
iterkeys = _iterate_keys
def incr(self, key, delta=1):
@@ -81,7 +87,7 @@ class LRUCache(UserDict):
# integer as long as it exists and we can cast it
newval = int(self.data.pop(key)) + delta
self[key] = str(newval)
- return newval
+ return newval
def __getstate__(self):
d = dict(vars(self))
@@ -108,7 +114,7 @@ class LRUCache(UserDict):
return list(self._iterate_items())
-def memoize(maxsize=None, Cache=LRUCache):
+def memoize(maxsize=None, keyfun=None, Cache=LRUCache):
def _memoize(fun):
mutex = threading.Lock()
@@ -116,7 +122,10 @@ def memoize(maxsize=None, Cache=LRUCache):
@wraps(fun)
def _M(*args, **kwargs):
- key = args + (KEYWORD_MARK,) + tuple(sorted(kwargs.items()))
+ if keyfun:
+ key = keyfun(args, kwargs)
+ else:
+ key = args + (KEYWORD_MARK,) + tuple(sorted(kwargs.items()))
try:
with mutex:
value = cache[key]
diff --git a/kombu/utils/json.py b/kombu/utils/json.py
index be4e6a05..3b3d1911 100644
--- a/kombu/utils/json.py
+++ b/kombu/utils/json.py
@@ -11,6 +11,11 @@ try:
except ImportError: # pragma: no cover
import json # noqa
+ class _DecodeError(Exception): # noqa
+ pass
+else:
+ from simplejson.decoder import JSONDecodeError as _DecodeError
+
IS_PY3 = sys.version_info[0] == 3
_encoder_cls = type(json._default_encoder)
@@ -47,11 +52,8 @@ def loads(s, _loads=json.loads, decode_bytes=IS_PY3):
elif isinstance(s, buffer_t):
s = text_t(s) # ... awwwwwww :(
- if json.__name__ == 'simplejson':
- try:
- return _loads(s)
- # catch simplejson.decoder.JSONDecodeError: Unpaired high surrogate
- except json.decoder.JSONDecodeError:
- return stdjson.loads(s)
- else:
+ try:
return _loads(s)
+ except _DecodeError:
+ # catch "Unpaired high surrogate" error
+ return stdjson.loads(s)
diff --git a/kombu/utils/scheduling.py b/kombu/utils/scheduling.py
index bc788489..7e179e8a 100644
--- a/kombu/utils/scheduling.py
+++ b/kombu/utils/scheduling.py
@@ -71,6 +71,7 @@ class round_robin_cycle(object):
items.append(items.pop(items.index(last_used)))
except ValueError:
pass
+ return last_used
class priority_cycle(round_robin_cycle):