diff options
author | Ask Solem <ask@celeryproject.org> | 2016-04-01 15:04:01 -0700 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2016-04-01 15:04:01 -0700 |
commit | e851974bc6e0dadc549b976ef6217dd037cf168d (patch) | |
tree | 7e3da4e9fe4617d21981fe0e2ecc7c6cb6f5bf2a | |
parent | 7674b8b4298834b6dbde8ef1789583d8ece0bbed (diff) | |
download | kombu-e851974bc6e0dadc549b976ef6217dd037cf168d.tar.gz |
94% coverage
26 files changed, 1263 insertions, 71 deletions
diff --git a/.coveragerc b/.coveragerc index 8088bc90..ba44c433 100644 --- a/.coveragerc +++ b/.coveragerc @@ -9,9 +9,11 @@ omit = */python?.?/* */site-packages/* */pypy/* + *kombu/async/http/curl.py *kombu/five.py *kombu/transport/mongodb.py - *Kombu/transport/filesystem.py + *kombu/transport/filesystem.py + *kombu/transport/sqlalchemy/* *kombu/utils.compat.py *kombu/utils/eventio.py *kombu/transport/amqplib.py diff --git a/kombu/async/aws/connection.py b/kombu/async/aws/connection.py index 87e8e10e..97adf222 100644 --- a/kombu/async/aws/connection.py +++ b/kombu/async/aws/connection.py @@ -4,7 +4,7 @@ from __future__ import absolute_import try: # pragma: no cover from email import message_from_file from email.mime.message import MIMEMessage -except ImportError: # Py2 +except ImportError: # pragma: no cover from mimetools import Message as MIMEMessage # noqa def message_from_file(m): # noqa diff --git a/kombu/async/http/curl.py b/kombu/async/http/curl.py index a55b86dd..fc5e1040 100644 --- a/kombu/async/http/curl.py +++ b/kombu/async/http/curl.py @@ -14,7 +14,7 @@ from .base import BaseClient try: import pycurl # noqa -except ImportError: +except ImportError: # pragma: no cover pycurl = Curl = METH_TO_CURL = None # noqa else: from pycurl import Curl # noqa diff --git a/kombu/async/timer.py b/kombu/async/timer.py index 4f9217ef..8c5d671c 100644 --- a/kombu/async/timer.py +++ b/kombu/async/timer.py @@ -13,7 +13,7 @@ import sys from collections import namedtuple from datetime import datetime -from functools import wraps +from functools import total_ordering, wraps from time import time from weakref import proxy as weakrefproxy @@ -22,7 +22,7 @@ from kombu.log import get_logger try: from pytz import utc -except ImportError: +except ImportError: # pragma: no cover utc = None DEFAULT_MAX_INTERVAL = 2 @@ -44,6 +44,7 @@ def to_timestamp(d, default_timezone=utc): return d +@total_ordering class Entry(object): if not IS_PYPY: # pragma: no cover __slots__ = ( @@ -79,15 +80,6 @@ class Entry(object): def __lt__(self, other): return id(self) < id(other) - def __gt__(self, other): - return id(self) > id(other) - - def __le__(self, other): - return id(self) <= id(other) - - def __ge__(self, other): - return id(self) >= id(other) - def __eq__(self, other): return hash(self) == hash(other) diff --git a/kombu/message.py b/kombu/message.py index 229823d0..b5d2faa5 100644 --- a/kombu/message.py +++ b/kombu/message.py @@ -13,6 +13,7 @@ from .compression import decompress from .exceptions import MessageStateError from .five import reraise, text_t from .serialization import loads +from .utils.functional import dictfilter ACK_STATES = frozenset(['ACK', 'REJECTED', 'REQUEUED']) @@ -162,20 +163,19 @@ class Message(object): return self._decoded_cache if self._decoded_cache else self.decode() def __repr__(self): - details = { - 'state': self._state, - 'content_type': self.content_type, - 'delivery_tag': self.delivery_tag, - 'properties': {}, - } - if self.body is not None: - details['body_length'] = len(self.body) - for k in ('correlation_id', 'type'): - if k in self.properties: - details['properties'][k] = self.properties[k] - if 'routing_key' in self.delivery_info: - details['delivery_info'] = { - 'routing_key': self.delivery_info['routing_key'], - } - return '<%s object at 0x%x with details %s>' % ( - self.__class__.__name__, id(self), details) + return '<{0} object at {1:#x} with details {2!r}>'.format( + type(self).__name__, id(self), dictfilter( + state=self._state, + content_type=self.content_type, + delivery_tag=self.delivery_tag, + body_length=len(self.body) if self.body is not None else None, + properties=dictfilter( + correlation_id=self.properties.get('correlation_id'), + type=self.properties.get('type'), + ), + delivery_info=dictfilter( + exchange=self.delivery_info.get('exchange'), + routing_key=self.delivery_info.get('routing_key'), + ), + ), + ) diff --git a/kombu/tests/async/aws/sqs/test_connection.py b/kombu/tests/async/aws/sqs/test_connection.py index 7aafa4db..cc0f9c43 100644 --- a/kombu/tests/async/aws/sqs/test_connection.py +++ b/kombu/tests/async/aws/sqs/test_connection.py @@ -21,6 +21,15 @@ class test_AsyncSQSConnection(AWSCase): self.x.get_list = Mock(nanme='X.get_list') self.callback = PromiseMock(name='callback') + def test_without_boto(self): + from kombu.async.aws.sqs import connection + prev, connection.boto = connection.boto, None + try: + with self.assertRaises(ImportError): + AsyncSQSConnection('ak', 'sk', http_client=Mock()) + finally: + connection.boto = prev + def test_default_region(self): self.assertTrue(self.x.region) self.assertTrue(issubclass( diff --git a/kombu/tests/async/test_hub.py b/kombu/tests/async/test_hub.py index 9476ac48..3e295ff5 100644 --- a/kombu/tests/async/test_hub.py +++ b/kombu/tests/async/test_hub.py @@ -1,9 +1,131 @@ from __future__ import absolute_import +import errno + from kombu.async import hub as _hub -from kombu.async.hub import Hub, get_event_loop, set_event_loop +from kombu.async import Hub, READ, WRITE, ERR +from kombu.async.debug import callback_for, repr_flag, _rcb +from kombu.async.hub import ( + Stop, get_event_loop, set_event_loop, + _raise_stop_error, _dummy_context +) +from kombu.async.semaphore import DummyLock, LaxBoundedSemaphore + +from kombu.tests.case import Case, Mock, call, patch + + +class File(object): + + def __init__(self, fd): + self.fd = fd + + def fileno(self): + return self.fd + + def __eq__(self, other): + if isinstance(other, File): + return self.fd == other.fd + return NotImplemented + + def __hash__(self): + return hash(self.fd) + + +class test_DummyLock(Case): + + def test_context(self): + mutex = DummyLock() + with mutex: + pass + + +class test_LaxBoundedSemaphore(Case): + + def test_acquire_release(self): + x = LaxBoundedSemaphore(2) + + c1 = Mock() + x.acquire(c1, 1) + self.assertEqual(x.value, 1) + c1.assert_called_with(1) + + c2 = Mock() + x.acquire(c2, 2) + self.assertEqual(x.value, 0) + c2.assert_called_with(2) + + c3 = Mock() + x.acquire(c3, 3) + self.assertEqual(x.value, 0) + self.assertFalse(c3.called) + + x.release() + self.assertEqual(x.value, 0) + x.release() + self.assertEqual(x.value, 1) + x.release() + self.assertEqual(x.value, 2) + c3.assert_called_with(3) + + def test_repr(self): + self.assertTrue(repr(LaxBoundedSemaphore(2))) + + def test_bounded(self): + x = LaxBoundedSemaphore(2) + for i in range(100): + x.release() + self.assertEqual(x.value, 2) + + def test_grow_shrink(self): + x = LaxBoundedSemaphore(1) + self.assertEqual(x.initial_value, 1) + cb1 = Mock() + x.acquire(cb1, 1) + cb1.assert_called_with(1) + self.assertEqual(x.value, 0) + + cb2 = Mock() + x.acquire(cb2, 2) + self.assertFalse(cb2.called) + self.assertEqual(x.value, 0) -from kombu.tests.case import Case + cb3 = Mock() + x.acquire(cb3, 3) + self.assertFalse(cb3.called) + + x.grow(2) + cb2.assert_called_with(2) + cb3.assert_called_with(3) + self.assertEqual(x.value, 2) + self.assertEqual(x.initial_value, 3) + + self.assertFalse(x._waiting) + x.grow(3) + for i in range(x.initial_value): + self.assertTrue(x.acquire(Mock())) + self.assertFalse(x.acquire(Mock())) + x.clear() + + x.shrink(3) + for i in range(x.initial_value): + self.assertTrue(x.acquire(Mock())) + self.assertFalse(x.acquire(Mock())) + self.assertEqual(x.value, 0) + + for i in range(100): + x.release() + self.assertEqual(x.value, x.initial_value) + + def test_clear(self): + x = LaxBoundedSemaphore(10) + for i in range(11): + x.acquire(Mock()) + self.assertTrue(x._waiting) + self.assertEqual(x.value, 0) + + x.clear() + self.assertFalse(x._waiting) + self.assertEqual(x.value, x.initial_value) class test_Utils(Case): @@ -23,6 +145,14 @@ class test_Utils(Case): self.assertIs(_hub._current_loop, hub) self.assertIs(get_event_loop(), hub) + def test_dummy_context(self): + with _dummy_context(): + pass + + def test_raise_stop_error(self): + with self.assertRaises(Stop): + _raise_stop_error() + class test_Hub(Case): @@ -31,3 +161,319 @@ class test_Hub(Case): def teardown(self): self.hub.close() + + def test_reset(self): + self.hub.close = Mock(name='close') + self.hub._create_poller = Mock(name='_create_poller') + self.hub.reset() + self.hub.close.assert_called_with() + self.hub._create_poller.assert_called_with() + + def test__close_poller__no_poller(self): + self.hub.poller = None + self.hub._close_poller() + + def test__close_poller(self): + poller = self.hub.poller = Mock(name='poller') + self.hub._close_poller() + poller.close.assert_called_with() + self.assertIsNone(self.hub.poller) + + def test_stop(self): + self.hub.call_soon = Mock(name='call_soon') + self.hub.stop() + self.hub.call_soon.assert_called_with(_raise_stop_error) + + def test_call_later(self): + callback = Mock(name='callback') + self.hub.timer = Mock(name='hub.timer') + self.hub.call_later(10.0, callback, 1, 2) + self.hub.timer.call_after.assert_called_with(10.0, callback, (1, 2)) + + def test_call_at(self): + callback = Mock(name='callback') + self.hub.timer = Mock(name='hub.timer') + self.hub.call_at(21231122, callback, 1, 2) + self.hub.timer.call_at.assert_called_with(21231122, callback, (1, 2)) + + def test_repr(self): + self.assertTrue(repr(self.hub)) + + def test_repr_flag(self): + self.assertEqual(repr_flag(READ), 'R') + self.assertEqual(repr_flag(WRITE), 'W') + self.assertEqual(repr_flag(ERR), '!') + self.assertEqual(repr_flag(READ | WRITE), 'RW') + self.assertEqual(repr_flag(READ | ERR), 'R!') + self.assertEqual(repr_flag(WRITE | ERR), 'W!') + self.assertEqual(repr_flag(READ | WRITE | ERR), 'RW!') + + def test_repr_callback_rcb(self): + + def f(): + pass + + self.assertEqual(_rcb(f), f.__name__) + self.assertEqual(_rcb('foo'), 'foo') + + @patch('kombu.async.hub.poll') + def test_start_stop(self, poll): + self.hub = Hub() + poll.assert_called_with() + + poller = self.hub.poller + self.hub.stop() + self.hub.close() + poller.close.assert_called_with() + + def test_fire_timers(self): + self.hub.timer = Mock() + self.hub.timer._queue = [] + self.assertEqual( + self.hub.fire_timers(min_delay=42.324, max_delay=32.321), + 32.321, + ) + + self.hub.timer._queue = [1] + self.hub.scheduler = iter([(3.743, None)]) + self.assertEqual(self.hub.fire_timers(), 3.743) + + e1, e2, e3 = Mock(), Mock(), Mock() + entries = [e1, e2, e3] + + def reset(): + return [m.reset() for m in [e1, e2, e3]] + + def se(): + while 1: + while entries: + yield None, entries.pop() + yield 3.982, None + self.hub.scheduler = se() + + self.assertEqual(self.hub.fire_timers(max_timers=10), 3.982) + for E in [e3, e2, e1]: + E.assert_called_with() + reset() + + entries[:] = [Mock() for _ in range(11)] + keep = list(entries) + self.assertEqual( + self.hub.fire_timers(max_timers=10, min_delay=1.13), + 1.13, + ) + for E in reversed(keep[1:]): + E.assert_called_with() + reset() + self.assertEqual(self.hub.fire_timers(max_timers=10), 3.982) + keep[0].assert_called_with() + + def test_fire_timers_raises(self): + eback = Mock() + eback.side_effect = KeyError('foo') + self.hub.timer = Mock() + self.hub.scheduler = iter([(0, eback)]) + with self.assertRaises(KeyError): + self.hub.fire_timers(propagate=(KeyError,)) + + eback.side_effect = ValueError('foo') + self.hub.scheduler = iter([(0, eback)]) + with patch('kombu.async.hub.logger') as logger: + with self.assertRaises(StopIteration): + self.hub.fire_timers() + self.assertTrue(logger.error.called) + + eback.side_effect = MemoryError('foo') + self.hub.scheduler = iter([(0, eback)]) + with self.assertRaises(MemoryError): + self.hub.fire_timers() + + eback.side_effect = OSError() + eback.side_effect.errno = errno.ENOMEM + self.hub.scheduler = iter([(0, eback)]) + with self.assertRaises(OSError): + self.hub.fire_timers() + + eback.side_effect = OSError() + eback.side_effect.errno = errno.ENOENT + self.hub.scheduler = iter([(0, eback)]) + with patch('kombu.async.hub.logger') as logger: + with self.assertRaises(StopIteration): + self.hub.fire_timers() + self.assertTrue(logger.error.called) + + def test_add_raises_ValueError(self): + self.hub.poller = Mock(name='hub.poller') + self.hub.poller.register.side_effect = ValueError() + self.hub._discard = Mock(name='hub.discard') + with self.assertRaises(ValueError): + self.hub.add(2, Mock(), READ) + self.hub._discard.assert_called_with(2) + + def test_remove_reader(self): + self.hub.poller = Mock(name='hub.poller') + self.hub.add(2, Mock(), READ) + self.hub.add(2, Mock(), WRITE) + self.hub.remove_reader(2) + self.assertNotIn(2, self.hub.readers) + self.assertIn(2, self.hub.writers) + + def test_remove_writer(self): + self.hub.poller = Mock(name='hub.poller') + self.hub.add(2, Mock(), READ) + self.hub.add(2, Mock(), WRITE) + self.hub.remove_writer(2) + self.assertIn(2, self.hub.readers) + self.assertNotIn(2, self.hub.writers) + + def test_add__consolidate(self): + self.hub.poller = Mock(name='hub.poller') + self.hub.add(2, Mock(), WRITE, consolidate=True) + self.assertIn(2, self.hub.consolidate) + self.assertIsNone(self.hub.writers[2]) + + @patch('kombu.async.hub.logger') + def test_on_callback_error(self, logger): + self.hub.on_callback_error(Mock(name='callback'), KeyError()) + self.assertTrue(logger.error.called) + + def test_loop_property(self): + self.hub._loop = None + self.hub.create_loop = Mock(name='hub.create_loop') + self.assertIs(self.hub.loop, self.hub.create_loop()) + self.assertIs(self.hub._loop, self.hub.create_loop()) + + def test_run_forever(self): + self.hub.run_once = Mock(name='hub.run_once') + self.hub.run_once.side_effect = Stop() + self.hub.run_forever() + + def test_run_once(self): + self.hub._loop = iter([1]) + self.hub.run_once() + self.hub.run_once() + self.assertIsNone(self.hub._loop) + + def test_repr_active(self): + self.hub.readers = {1: Mock(), 2: Mock()} + self.hub.writers = {3: Mock(), 4: Mock()} + for value in list( + self.hub.readers.values()) + list(self.hub.writers.values()): + value.__name__ = 'mock' + self.assertTrue(self.hub.repr_active()) + + def test_repr_events(self): + self.hub.readers = {6: Mock(), 7: Mock(), 8: Mock()} + self.hub.writers = {9: Mock()} + for value in list( + self.hub.readers.values()) + list(self.hub.writers.values()): + value.__name__ = 'mock' + self.assertTrue(self.hub.repr_events([ + (6, READ), + (7, ERR), + (8, READ | ERR), + (9, WRITE), + (10, 13213), + ])) + + def test_callback_for(self): + reader, writer = Mock(), Mock() + self.hub.readers = {6: reader} + self.hub.writers = {7: writer} + + self.assertEqual(callback_for(self.hub, 6, READ), reader) + self.assertEqual(callback_for(self.hub, 7, WRITE), writer) + with self.assertRaises(KeyError): + callback_for(self.hub, 6, WRITE) + self.assertEqual(callback_for(self.hub, 6, WRITE, 'foo'), 'foo') + + def test_add_remove_readers(self): + P = self.hub.poller = Mock() + + read_A = Mock() + read_B = Mock() + self.hub.add_reader(10, read_A, 10) + self.hub.add_reader(File(11), read_B, 11) + + P.register.assert_has_calls([ + call(10, self.hub.READ | self.hub.ERR), + call(11, self.hub.READ | self.hub.ERR), + ], any_order=True) + + self.assertEqual(self.hub.readers[10], (read_A, (10,))) + self.assertEqual(self.hub.readers[11], (read_B, (11,))) + + self.hub.remove(10) + self.assertNotIn(10, self.hub.readers) + self.hub.remove(File(11)) + self.assertNotIn(11, self.hub.readers) + P.unregister.assert_has_calls([ + call(10), call(11), + ]) + + def test_can_remove_unknown_fds(self): + self.hub.poller = Mock() + self.hub.remove(30) + self.hub.remove(File(301)) + + def test_remove__unregister_raises(self): + self.hub.poller = Mock() + self.hub.poller.unregister.side_effect = OSError() + + self.hub.remove(313) + + def test_add_writers(self): + P = self.hub.poller = Mock() + + write_A = Mock() + write_B = Mock() + self.hub.add_writer(20, write_A) + self.hub.add_writer(File(21), write_B) + + P.register.assert_has_calls([ + call(20, self.hub.WRITE), + call(21, self.hub.WRITE), + ], any_order=True) + + self.assertEqual(self.hub.writers[20], (write_A, ())) + self.assertEqual(self.hub.writers[21], (write_B, ())) + + self.hub.remove(20) + self.assertNotIn(20, self.hub.writers) + self.hub.remove(File(21)) + self.assertNotIn(21, self.hub.writers) + P.unregister.assert_has_calls([ + call(20), call(21), + ]) + + def test_enter__exit(self): + P = self.hub.poller = Mock() + on_close = Mock() + self.hub.on_close.add(on_close) + + try: + read_A = Mock() + read_B = Mock() + self.hub.add_reader(10, read_A) + self.hub.add_reader(File(11), read_B) + write_A = Mock() + write_B = Mock() + self.hub.add_writer(20, write_A) + self.hub.add_writer(File(21), write_B) + self.assertTrue(self.hub.readers) + self.assertTrue(self.hub.writers) + finally: + assert self.hub.poller + self.hub.close() + self.assertFalse(self.hub.readers) + self.assertFalse(self.hub.writers) + + P.unregister.assert_has_calls([ + call(10), call(11), call(20), call(21), + ], any_order=True) + + on_close.assert_called_with(self.hub) + + def test_scheduler_property(self): + hub = Hub(timer=[1, 2, 3]) + self.assertEqual(list(hub.scheduler), [1, 2, 3]) diff --git a/kombu/tests/async/test_timer.py b/kombu/tests/async/test_timer.py new file mode 100644 index 00000000..c30a3289 --- /dev/null +++ b/kombu/tests/async/test_timer.py @@ -0,0 +1,158 @@ +from __future__ import absolute_import, unicode_literals + +from kombu.async.timer import Entry, Timer, to_timestamp + +from kombu.tests.case import Case, Mock, patch, redirect_stdouts + + +class test_to_timestamp(Case): + + def test_to_timestamp(self): + self.assertIs(to_timestamp(3.13), 3.13) + + +class test_Entry(Case): + + def test_call(self): + scratch = [None] + + def timed(x, y, moo='foo'): + scratch[0] = (x, y, moo) + + tref = Entry(timed, (4, 4), {'moo': 'baz'}) + tref() + + self.assertTupleEqual(scratch[0], (4, 4, 'baz')) + + def test_cancel(self): + tref = Entry(lambda x: x, (1,), {}) + self.assertFalse(tref.canceled) + self.assertFalse(tref.cancelled) + tref.cancel() + self.assertTrue(tref.canceled) + self.assertTrue(tref.cancelled) + + def test_repr(self): + tref = Entry(lambda x: x(1,), {}) + self.assertTrue(repr(tref)) + + def test_hash(self): + self.assertTrue(hash(Entry(lambda: None))) + + def test_ordering(self): + # we don't care about results, just that it's possible + Entry(lambda x: 1) < Entry(lambda x: 2) + Entry(lambda x: 1) > Entry(lambda x: 2) + Entry(lambda x: 1) >= Entry(lambda x: 2) + Entry(lambda x: 1) <= Entry(lambda x: 2) + + def test_eq(self): + x = Entry(lambda x: 1) + y = Entry(lambda x: 1) + self.assertEqual(x, x) + self.assertNotEqual(x, y) + + +class test_Timer(Case): + + def test_enter_exit(self): + x = Timer() + x.stop = Mock(name='timer.stop') + with x: + pass + x.stop.assert_called_with() + + def test_supports_Timer_interface(self): + x = Timer() + x.stop() + + tref = Mock() + x.cancel(tref) + tref.cancel.assert_called_with() + + self.assertIs(x.schedule, x) + + def test_handle_error(self): + from datetime import datetime + scratch = [None] + + def on_error(exc_info): + scratch[0] = exc_info + + s = Timer(on_error=on_error) + + with patch('kombu.async.timer.to_timestamp') as tot: + tot.side_effect = OverflowError() + s.enter_at(Entry(lambda: None, (), {}), + eta=datetime.now()) + s.enter_at(Entry(lambda: None, (), {}), eta=None) + s.on_error = None + with self.assertRaises(OverflowError): + s.enter_at(Entry(lambda: None, (), {}), + eta=datetime.now()) + exc = scratch[0] + self.assertIsInstance(exc, OverflowError) + + def test_call_repeatedly(self): + t = Timer() + try: + t.schedule.enter_after = Mock() + + myfun = Mock() + myfun.__name__ = b'myfun' + t.call_repeatedly(0.03, myfun) + + self.assertEqual(t.schedule.enter_after.call_count, 1) + args1, _ = t.schedule.enter_after.call_args_list[0] + sec1, tref1, _ = args1 + self.assertEqual(sec1, 0.03) + tref1() + + self.assertEqual(t.schedule.enter_after.call_count, 2) + args2, _ = t.schedule.enter_after.call_args_list[1] + sec2, tref2, _ = args2 + self.assertEqual(sec2, 0.03) + tref2.canceled = True + tref2() + + self.assertEqual(t.schedule.enter_after.call_count, 2) + finally: + t.stop() + + @patch('kombu.async.timer.logger') + def test_apply_entry_error_handled(self, logger): + t = Timer() + t.schedule.on_error = None + + fun = Mock() + fun.side_effect = ValueError() + + t.schedule.apply_entry(fun) + self.assertTrue(logger.error.called) + + @redirect_stdouts + def test_apply_entry_error_not_handled(self, stdout, stderr): + t = Timer() + t.schedule.on_error = Mock() + + fun = Mock() + fun.side_effect = ValueError() + t.schedule.apply_entry(fun) + fun.assert_called_with() + self.assertFalse(stderr.getvalue()) + + def test_enter_after(self): + t = Timer() + t._enter = Mock() + fun = Mock(name='fun') + time = Mock(name='time') + time.return_value = 10 + t.enter_after(10, fun, time=time) + time.assert_called_with() + t._enter.assert_called_with(20, 0, fun) + + def test_cancel(self): + t = Timer() + tref = Mock() + t.cancel(tref) + tref.cancel.assert_called_with() diff --git a/kombu/tests/case.py b/kombu/tests/case.py index 38b6cdb2..f9a2a21e 100644 --- a/kombu/tests/case.py +++ b/kombu/tests/case.py @@ -1,12 +1,13 @@ from __future__ import absolute_import import importlib +import inspect import os import sys import types from contextlib import contextmanager -from functools import wraps +from functools import partial, wraps from io import StringIO try: @@ -16,7 +17,7 @@ except ImportError: from nose import SkipTest -from kombu.five import builtins, string_t +from kombu.five import builtins, reraise, string_t from kombu.utils.encoding import ensure_bytes try: @@ -51,6 +52,51 @@ class Case(unittest.TestCase): def teardown(self): pass + def patch(self, *path, **options): + manager = patch('.'.join(path), **options) + patched = manager.start() + self.addCleanup(manager.stop) + return patched + + def mock_modules(self, *mods): + modules = [] + for mod in mods: + mod = mod.split('.') + modules.extend(reversed([ + '.'.join(mod[:-i] if i else mod) for i in range(len(mod)) + ])) + modules = sorted(set(modules)) + return self.wrap_context(mock_module(*modules)) + + def on_nth_call_do(self, mock, side_effect, n=1): + + def on_call(*args, **kwargs): + if mock.call_count >= n: + mock.side_effect = side_effect + return mock.return_value + mock.side_effect = on_call + return mock + + def on_nth_call_return(self, mock, retval, n=1): + + def on_call(*args, **kwargs): + if mock.call_count >= n: + mock.return_value = retval + return mock.return_value + mock.side_effect = on_call + return mock + + def mask_modules(self, *modules): + self.wrap_context(mask_modules(*modules)) + + def wrap_context(self, context): + ret = context.__enter__() + self.addCleanup(partial(context.__exit__, None, None, None)) + return ret + + def mock_environ(self, env_name, env_value): + return self.wrap_context(mock_environ(env_name, env_value)) + def PromiseMock(*args, **kwargs): m = Mock(*args, **kwargs) @@ -97,14 +143,18 @@ def case_no_pypy(cls): def case_no_python3(cls): - setup = cls.setUp + wrap, is_cls = cls, False + if inspect.isclass(cls): + wrap, is_cls = cls.setUp, True - @wraps(setup) + @wraps(wrap) def around_setup(self): if PY3: raise SkipTest('Python 3 incompatible') - setup(self) - cls.setUp = around_setup + return wrap(self) if is_cls else wrap() + + if is_cls: + cls.setUp = around_setup return cls @@ -306,3 +356,65 @@ def set_module_symbol(module, key, value): yield finally: setattr(module, key, prev) + + +@contextmanager +def mock_module(*names): + prev = {} + + class MockModule(types.ModuleType): + + def __getattr__(self, attr): + setattr(self, attr, Mock()) + return types.ModuleType.__getattribute__(self, attr) + + mods = [] + for name in names: + try: + prev[name] = sys.modules[name] + except KeyError: + pass + mod = sys.modules[name] = MockModule(name) + mods.append(mod) + try: + yield mods + finally: + for name in names: + try: + sys.modules[name] = prev[name] + except KeyError: + try: + del(sys.modules[name]) + except KeyError: + pass + + +@contextmanager +def mock_context(mock, typ=Mock): + context = mock.return_value = Mock() + context.__enter__ = typ() + context.__exit__ = typ() + + def on_exit(*x): + if x[0]: + reraise(x[0], x[1], x[2]) + context.__exit__.side_effect = on_exit + context.__enter__.return_value = context + try: + yield context + finally: + context.reset() + + +@contextmanager +def mock_environ(env_name, env_value): + sentinel = object() + prev_val = os.environ.get(env_name, sentinel) + os.environ[env_name] = env_value + try: + yield env_value + finally: + if prev_val is sentinel: + os.environ.pop(env_name, None) + else: + os.environ[env_name] = prev_val diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py index 84786424..44c8749d 100644 --- a/kombu/tests/test_connection.py +++ b/kombu/tests/test_connection.py @@ -8,6 +8,7 @@ from copy import copy from kombu import Connection, Consumer, Producer, parse_url from kombu.connection import Resource from kombu.five import items, range +from kombu.utils.functional import lazy from .case import Case, Mock, SkipTest, patch, skip_if_not_module from .mocks import Transport @@ -332,6 +333,28 @@ class test_Connection(Case): self.assertFalse(c.completes_cycle(1)) self.assertTrue(c.completes_cycle(2)) + def test_get_heartbeat_interval(self): + self.conn.transport.get_heartbeat_interval = Mock(name='ghi') + self.assertIs( + self.conn.get_heartbeat_interval(), + self.conn.transport.get_heartbeat_interval.return_value, + ) + self.conn.transport.get_heartbeat_interval.assert_called_with( + self.conn.connection) + + def test_supports_exchange_type(self): + self.conn.transport.implements.exchange_type = {'topic'} + self.assertTrue(self.conn.supports_exchange_type('topic')) + self.assertFalse(self.conn.supports_exchange_type('fanout')) + + def test_qos_semantics_matches_spec(self): + qsms = self.conn.transport.qos_semantics_matches_spec = Mock(name='qsms') + self.assertIs( + self.conn.qos_semantics_matches_spec, + qsms.return_value, + ) + qsms.assert_called_with(self.conn.connection) + def test__enter____exit__(self): conn = self.conn context = conn.__enter__() @@ -622,6 +645,19 @@ class test_ConnectionPool(ResourceCase): def create_resource(self, limit): return Connection(port=5672, transport=Transport).Pool(limit) + def test_collect_resource__does_not_collect_lazy_resource(self): + P = self.create_resource(10) + res = lazy(object()) + res.collect = Mock(name='collect') + P.collect_resource(res) + self.assertFalse(res.collect.called) + + def test_collect_resource(self): + res = Mock(name='res') + P = self.create_resource(10) + P.collect_resource(res, socket_timeout=10.3) + res.collect.assert_called_with(10.3) + def test_setup(self): P = self.create_resource(10) q = P._resource.queue diff --git a/kombu/tests/test_exceptions.py b/kombu/tests/test_exceptions.py new file mode 100644 index 00000000..1d04b359 --- /dev/null +++ b/kombu/tests/test_exceptions.py @@ -0,0 +1,11 @@ +from __future__ import absolute_import, unicode_literals + +from kombu.exceptions import HttpError + +from kombu.tests.case import Case, Mock + + +class test_HttpError(Case): + + def test_str(self): + self.assertTrue(str(HttpError(200, 'msg', Mock(name='response')))) diff --git a/kombu/tests/test_message.py b/kombu/tests/test_message.py new file mode 100644 index 00000000..a27d6e0f --- /dev/null +++ b/kombu/tests/test_message.py @@ -0,0 +1,44 @@ +from __future__ import absolute_import, unicode_literals + +import sys + +from kombu.message import Message + +from .case import Case, Mock, patch + + +class test_Message(Case): + + def test_repr(self): + self.assertTrue(repr(Message(Mock(), 'b'))) + + def test_decode(self): + m = Message(Mock(), 'body') + decode = m._decode = Mock() + self.assertIsNone(m._decoded_cache) + self.assertIs(m.decode(), m._decode.return_value) + self.assertIs(m._decoded_cache, m._decode.return_value) + m._decode.assert_called_with() + m._decode = Mock() + self.assertIs(m.decode(), decode.return_value) + + def test_reraise_error(self): + m = Message(Mock(), 'body') + callback = Mock(name='callback') + try: + raise KeyError('foo') + except KeyError as exc: + m.errors.append(sys.exc_info()) + m._reraise_error(callback) + self.assertTrue(callback.called) + + with self.assertRaises(KeyError): + m._reraise_error(None) + + @patch('kombu.message.decompress') + def test_decompression_stores_error(self, decompress): + decompress.side_effect = RuntimeError() + m = Message(Mock(), 'body', headers={'compression': 'zlib'}) + with self.assertRaises(RuntimeError): + m._reraise_error(None) + diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py index 5d7a2711..4e1a24e1 100644 --- a/kombu/tests/test_messaging.py +++ b/kombu/tests/test_messaging.py @@ -1,6 +1,7 @@ from __future__ import absolute_import, unicode_literals import pickle +import sys from collections import defaultdict @@ -129,6 +130,19 @@ class test_Producer(Case): p._channel.basic_publish.call_args[1]['exchange'], 'foo', ) + def test_publish_with_expiration(self): + p = self.connection.Producer() + p.channel = Mock() + p.publish('hello', exchange=Exchange('foo'), expiration=10) + properties = p._channel.prepare_message.call_args[0][5] + self.assertEqual(properties['expiration'], '10000') + + def test_publish_with_reply_to(self): + p = self.connection.Producer() + p.channel = Mock() + p.publish('hello', exchange=Exchange('foo'), reply_to=Queue('foo')) + properties = p._channel.prepare_message.call_args[0][5] + self.assertEqual(properties['reply_to'], 'foo') def test_set_on_return(self): chan = Mock() chan.events = defaultdict(Mock) @@ -349,6 +363,19 @@ class test_Consumer(Case): finally: channel.message_to_python = m2p + def test_receive_callback__message_errors(self): + channel = self.connection.channel() + channel.message_to_python = None + c = channel.Consumer() + message = Mock() + try: + raise KeyError('foo') + except KeyError: + message.errors = [sys.exc_info()] + message._reraise_error.side_effect = KeyError() + with self.assertRaises(KeyError): + c._receive_callback(message) + def test_set_callbacks(self): channel = self.connection.channel() queue = Queue('qname', self.exchange, 'rkey') diff --git a/kombu/tests/transport/test_sqlalchemy.py b/kombu/tests/transport/test_sqlalchemy.py index 72ae2d98..374d7a19 100644 --- a/kombu/tests/transport/test_sqlalchemy.py +++ b/kombu/tests/transport/test_sqlalchemy.py @@ -1,6 +1,7 @@ from __future__ import absolute_import from kombu import Connection + from kombu.tests.case import Case, SkipTest, patch, case_requires diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py index 33764341..a012c857 100644 --- a/kombu/tests/transport/virtual/test_base.py +++ b/kombu/tests/transport/virtual/test_base.py @@ -9,7 +9,7 @@ from kombu.transport import virtual from kombu.utils import uuid from kombu.compression import compress -from kombu.tests.case import Case, Mock, patch, redirect_stdouts +from kombu.tests.case import Case, MagicMock, Mock, patch, redirect_stdouts PY3 = sys.version_info[0] == 3 PRINT_FQDN = 'builtins.print' if PY3 else '__builtin__.print' @@ -49,6 +49,10 @@ class test_QoS(Case): self.assertFalse(self.q._delivered.restored) self.assertTrue(self.q._on_collect) + def test_restore_visible__interface(self): + qos = virtual.QoS(client().channel()) + qos.restore_visible() + @redirect_stdouts def test_can_consume(self, stdout, stderr): _restored = [] @@ -297,6 +301,15 @@ class test_Channel(Case): c.queue_purge(n) self.assertIn(n, c.purged) + def test_basic_publish__anon_exchange(self): + c = memory_client().channel() + msg = MagicMock(name='msg') + c.encode_body = Mock(name='c.encode_body') + c.encode_body.return_value = (1, 2) + c._put = Mock(name='c._put') + c.basic_publish(msg, None, 'rkey', kw=1) + c._put.assert_called_with('rkey', msg, kw=1) + def test_basic_publish_unique_delivery_tags(self, n='test_uniq_tag'): c1 = memory_client().channel() c2 = memory_client().channel() @@ -573,3 +586,38 @@ class test_Transport(Case): channel = self.transport.create_channel(self.transport) with self.assertRaises(virtual.Empty): self.transport._drain_channel(channel) + + def test__deliver__no_queue(self): + with self.assertRaises(KeyError): + self.transport._deliver(Mock(name='msg'), queue=None) + + def test__reject_inbound_message(self): + channel = Mock(name='channel') + self.transport.channels = [None, channel] + self.transport._reject_inbound_message({'foo': 'bar'}) + channel.Message.assert_called_with(channel, {'foo': 'bar'}) + channel.qos.append.assert_called_with( + channel.Message(), channel.Message().delivery_tag, + ) + channel.basic_reject.assert_called_with( + channel.Message().delivery_tag, requeue=True, + ) + + def test_on_message_ready(self): + channel = Mock(name='channel') + msg = Mock(name='msg') + callback = Mock(name='callback') + self.transport._callbacks = {'q1': callback} + self.transport.on_message_ready(channel, msg, queue='q1') + callback.assert_called_with(msg) + + def test_on_message_ready__no_queue(self): + with self.assertRaises(KeyError): + self.transport.on_message_ready( + Mock(name='channel'), Mock(name='msg'), queue=None) + + def test_on_message_ready__no_callback(self): + self.transport._callbacks = {} + with self.assertRaises(KeyError): + self.transport.on_message_ready( + Mock(name='channel'), Mock(name='msg'), queue='q1') diff --git a/kombu/tests/utils/test_encoding.py b/kombu/tests/utils/test_encoding.py index cc7e514e..9d99f0e5 100644 --- a/kombu/tests/utils/test_encoding.py +++ b/kombu/tests/utils/test_encoding.py @@ -7,7 +7,10 @@ import sys from contextlib import contextmanager from kombu.five import bytes_t, string_t -from kombu.utils.encoding import safe_str, default_encoding +from kombu.utils.encoding import ( + get_default_encoding_file, safe_str, + set_default_encoding_file, default_encoding, +) from kombu.tests.case import Case, SkipTest, patch @@ -25,6 +28,14 @@ def clean_encoding(): class test_default_encoding(Case): + def test_set_default_file(self): + prev = get_default_encoding_file() + try: + set_default_encoding_file('/foo.txt') + self.assertEqual(get_default_encoding_file(), '/foo.txt') + finally: + set_default_encoding_file(prev) + @patch('sys.getfilesystemencoding') def test_default(self, getdefaultencoding): getdefaultencoding.return_value = 'ascii' diff --git a/kombu/tests/utils/test_functional.py b/kombu/tests/utils/test_functional.py index f215dea5..fa782764 100644 --- a/kombu/tests/utils/test_functional.py +++ b/kombu/tests/utils/test_functional.py @@ -3,7 +3,10 @@ from __future__ import absolute_import import pickle import sys -from kombu.utils.functional import lazy, maybe_evaluate +from itertools import count + +from kombu.five import THREAD_TIMEOUT_MAX, items +from kombu.utils.functional import LRUCache, memoize, lazy, maybe_evaluate from kombu.tests.case import Case, SkipTest @@ -12,6 +15,132 @@ def double(x): return x * 2 +class test_LRUCache(Case): + + def test_expires(self): + limit = 100 + x = LRUCache(limit=limit) + slots = list(range(limit * 2)) + for i in slots: + x[i] = i + self.assertListEqual(list(x.keys()), list(slots[limit:])) + self.assertTrue(x.items()) + self.assertTrue(x.values()) + + def test_is_pickleable(self): + x = LRUCache(limit=10) + x.update(luke=1, leia=2) + y = pickle.loads(pickle.dumps(x)) + self.assertEqual(y.limit, y.limit) + self.assertEqual(y, x) + + def test_update_expires(self): + limit = 100 + x = LRUCache(limit=limit) + slots = list(range(limit * 2)) + for i in slots: + x.update({i: i}) + + self.assertListEqual(list(x.keys()), list(slots[limit:])) + + def test_least_recently_used(self): + x = LRUCache(3) + + x[1], x[2], x[3] = 1, 2, 3 + self.assertEqual(list(x.keys()), [1, 2, 3]) + + x[4], x[5] = 4, 5 + self.assertEqual(list(x.keys()), [3, 4, 5]) + + # access 3, which makes it the last used key. + x[3] + x[6] = 6 + self.assertEqual(list(x.keys()), [5, 3, 6]) + + x[7] = 7 + self.assertEqual(list(x.keys()), [3, 6, 7]) + + def test_update_larger_than_cache_size(self): + x = LRUCache(2) + x.update({x: x for x in range(100)}) + self.assertEqual(list(x.keys()), [98, 99]) + + def assertSafeIter(self, method, interval=0.01, size=10000): + if sys.version_info >= (3, 5): + raise SkipTest('Fails on Py3.5') + from threading import Thread, Event + from time import sleep + x = LRUCache(size) + x.update(zip(range(size), range(size))) + + class Burglar(Thread): + + def __init__(self, cache): + self.cache = cache + self.__is_shutdown = Event() + self.__is_stopped = Event() + Thread.__init__(self) + + def run(self): + while not self.__is_shutdown.isSet(): + try: + self.cache.popitem(last=False) + except KeyError: + break + self.__is_stopped.set() + + def stop(self): + self.__is_shutdown.set() + self.__is_stopped.wait() + self.join(THREAD_TIMEOUT_MAX) + + burglar = Burglar(x) + burglar.start() + try: + for _ in getattr(x, method)(): + sleep(0.0001) + finally: + burglar.stop() + + def test_safe_to_remove_while_iteritems(self): + self.assertSafeIter('iteritems') + + def test_safe_to_remove_while_keys(self): + self.assertSafeIter('keys') + + def test_safe_to_remove_while_itervalues(self): + self.assertSafeIter('itervalues') + + def test_items(self): + c = LRUCache() + c.update(a=1, b=2, c=3) + self.assertTrue(list(items(c))) + + def test_incr(self): + c = LRUCache() + c.update(a='1') + c.incr('a') + self.assertEqual(c['a'], '2') + + +class test_memoize(Case): + + def test_memoize(self): + counter = count(1) + + @memoize(maxsize=2) + def x(i): + return next(counter) + + self.assertEqual(x(1), 1) + self.assertEqual(x(1), 1) + self.assertEqual(x(2), 2) + self.assertEqual(x(3), 3) + self.assertEqual(x(1), 4) + x.clear() + self.assertEqual(x(3), 5) + + class test_lazy(Case): def test__str__(self): diff --git a/kombu/tests/utils/test_json.py b/kombu/tests/utils/test_json.py new file mode 100644 index 00000000..00f38e7d --- /dev/null +++ b/kombu/tests/utils/test_json.py @@ -0,0 +1,49 @@ +from __future__ import absolute_import, unicode_literals + +from kombu.utils.encoding import str_to_bytes +from kombu.utils.json import _DecodeError, dumps, loads + +from kombu.tests.case import Case, MagicMock, Mock, case_no_python3 + + +class Custom(object): + + def __init__(self, data): + self.data = data + + def __json__(self): + return self.data + + +class test_dumps_loads(Case): + + def test_dumps_custom_object(self): + x = {'foo': Custom({'a': 'b'})} + self.assertEqual(loads(dumps(x)), {'foo': x['foo'].__json__()}) + + def test_dumps_custom_object_no_json(self): + x = {'foo': object()} + with self.assertRaises(TypeError): + dumps(x) + + def test_loads_memoryview(self): + self.assertEqual(loads(memoryview(dumps({'x': 'z'}))), {'x': 'z'}) + + def test_loads_bytearray(self): + self.assertEqual(loads(bytearray(dumps({'x': 'z'}))), {'x': 'z'}) + + def test_loads_bytes(self): + self.assertEqual( + loads(str_to_bytes(dumps({'x': 'z'})), decode_bytes=True), + {'x': 'z'}, + ) + + @case_no_python3 + def test_loads_buffer(self): + self.assertEqual(loads(buffer(dumps({'x': 'z'}))), {'x': 'z'}) + + def test_loads_DecodeError(self): + _loads = Mock(name='_loads') + _loads.side_effect = _DecodeError( + MagicMock(), MagicMock(), MagicMock()) + self.assertEqual(loads(dumps({'x': 'z'}), _loads=_loads), {'x': 'z'}) diff --git a/kombu/tests/utils/test_scheduling.py b/kombu/tests/utils/test_scheduling.py index e1f1fe5a..70277a31 100644 --- a/kombu/tests/utils/test_scheduling.py +++ b/kombu/tests/utils/test_scheduling.py @@ -1,6 +1,6 @@ from __future__ import absolute_import -from kombu.utils.scheduling import FairCycle +from kombu.utils.scheduling import FairCycle, cycle_by_name from kombu.tests.case import Case @@ -65,3 +65,48 @@ class test_FairCycle(Case): def test__repr__(self): self.assertTrue(repr(FairCycle(lambda x: x, [1, 2, 3], MyEmpty))) + + +class test_round_robin_cycle(Case): + + def test_round_robin_cycle(self): + it = cycle_by_name('round_robin')(['A', 'B', 'C']) + self.assertListEqual(it.consume(3), ['A', 'B', 'C']) + it.rotate('B') + self.assertListEqual(it.consume(3), ['A', 'C', 'B']) + it.rotate('A') + self.assertListEqual(it.consume(3), ['C', 'B', 'A']) + it.rotate('A') + self.assertListEqual(it.consume(3), ['C', 'B', 'A']) + it.rotate('C') + self.assertListEqual(it.consume(3), ['B', 'A', 'C']) + + +class test_priority_cycle(Case): + + def test_priority_cycle(self): + it = cycle_by_name('priority')(['A', 'B', 'C']) + self.assertListEqual(it.consume(3), ['A', 'B', 'C']) + it.rotate('B') + self.assertListEqual(it.consume(3), ['A', 'B', 'C']) + it.rotate('A') + self.assertListEqual(it.consume(3), ['A', 'B', 'C']) + it.rotate('A') + self.assertListEqual(it.consume(3), ['A', 'B', 'C']) + it.rotate('C') + self.assertListEqual(it.consume(3), ['A', 'B', 'C']) + + +class test_sorted_cycle(Case): + + def test_sorted_cycle(self): + it = cycle_by_name('sorted')(['B', 'C', 'A']) + self.assertListEqual(it.consume(3), ['A', 'B', 'C']) + it.rotate('B') + self.assertListEqual(it.consume(3), ['A', 'B', 'C']) + it.rotate('A') + self.assertListEqual(it.consume(3), ['A', 'B', 'C']) + it.rotate('A') + self.assertListEqual(it.consume(3), ['A', 'B', 'C']) + it.rotate('C') + self.assertListEqual(it.consume(3), ['A', 'B', 'C']) diff --git a/kombu/tests/utils/test_url.py b/kombu/tests/utils/test_url.py new file mode 100644 index 00000000..67c7efce --- /dev/null +++ b/kombu/tests/utils/test_url.py @@ -0,0 +1,50 @@ +from __future__ import absolute_import, unicode_literals + +from kombu.utils.url import as_url, parse_url, maybe_sanitize_url + +from kombu.tests.case import Case + + +class test_parse_url(Case): + + def test_parse_url(self): + result = parse_url('amqp://user:pass@localhost:5672/my/vhost') + self.assertDictEqual(result, { + 'transport': 'amqp', + 'userid': 'user', + 'password': 'pass', + 'hostname': 'localhost', + 'port': 5672, + 'virtual_host': 'my/vhost', + }) + + +class test_as_url(Case): + + def test_as_url(self): + self.assertEqual(as_url('https'), 'https:///') + self.assertEqual(as_url('https', 'e.com'), 'https://e.com/') + self.assertEqual(as_url('https', 'e.com', 80), 'https://e.com:80/') + self.assertEqual( + as_url('https', 'e.com', 80, 'u'), 'https://u@e.com:80/', + ) + self.assertEqual( + as_url('https', 'e.com', 80, 'u', 'p'), 'https://u:p@e.com:80/', + ) + self.assertEqual( + as_url('https', 'e.com', 80, None, 'p'), 'https://:p@e.com:80/', + ) + self.assertEqual( + as_url('https', 'e.com', 80, None, 'p', '/foo'), + 'https://:p@e.com:80//foo', + ) + + +class test_maybe_sanitize_url(Case): + + def test_maybe_sanitize_url(self): + self.assertEqual(maybe_sanitize_url('foo'), 'foo') + self.assertEqual( + maybe_sanitize_url('http://u:p@e.com//foo'), + 'http://u:**@e.com//foo', + ) diff --git a/kombu/tests/utils/test_utils.py b/kombu/tests/utils/test_utils.py index 29a0a9b1..6900e131 100644 --- a/kombu/tests/utils/test_utils.py +++ b/kombu/tests/utils/test_utils.py @@ -373,3 +373,13 @@ class test_version_string_as_tuple(Case): version_string_as_tuple('3.3.1.a3.40c32'), version_info_t(3, 3, 1, 'a3', '40c32'), ) + + +class test_maybe_fileno(Case): + + def test_maybe_fileno(self): + self.assertEqual(utils.maybe_fileno(3), 3) + f = Mock(name='file') + self.assertIs(utils.maybe_fileno(f), f.fileno()) + f.fileno.side_effect = ValueError() + self.assertIsNone(utils.maybe_fileno(f)) diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index bb305b0b..3184fad1 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -881,7 +881,7 @@ class Transport(base.Transport): def _deliver(self, message, queue): if not queue: raise KeyError( - 'Received message without destination queue: {1}'.format( + 'Received message without destination queue: {0}'.format( message)) try: callback = self._callbacks[queue] diff --git a/kombu/utils/encoding.py b/kombu/utils/encoding.py index d054257a..72f91539 100644 --- a/kombu/utils/encoding.py +++ b/kombu/utils/encoding.py @@ -100,7 +100,7 @@ def safe_str(s, errors='replace'): return _safe_str(s, errors) -if is_py3k: +if is_py3k: # pragma: no cover def _safe_str(s, errors='replace', file=None): if isinstance(s, str): diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py index 83aca99a..0dc7f8ac 100644 --- a/kombu/utils/functional.py +++ b/kombu/utils/functional.py @@ -5,7 +5,6 @@ import threading from collections import Iterable, Mapping, OrderedDict from functools import wraps -from itertools import islice from kombu.five import UserDict, items, keys, string_t @@ -32,7 +31,7 @@ class LRUCache(UserDict): def __getitem__(self, key): with self.mutex: value = self[key] = self.data.pop(key) - return value + return value def update(self, *args, **kwargs): with self.mutex: @@ -40,9 +39,12 @@ class LRUCache(UserDict): data.update(*args, **kwargs) if limit and len(data) > limit: # pop additional items in case limit exceeded - # negative overflow will lead to an empty list - for item in islice(iter(data), len(data) - limit): - data.pop(item) + for _ in range(len(data) - limit): + data.popitem(last=False) + + def popitem(self, last=True): + with self.mutex: + return self.data.popitem(last) def __setitem__(self, key, value): # remove least recently used key. @@ -55,24 +57,28 @@ class LRUCache(UserDict): return iter(self.data) def _iterate_items(self): - for k in self: - try: - yield (k, self.data[k]) - except KeyError: # pragma: no cover - pass + with self.mutex: + for k in self: + try: + yield (k, self.data[k]) + except KeyError: # pragma: no cover + pass iteritems = _iterate_items def _iterate_values(self): - for k in self: - try: - yield self.data[k] - except KeyError: # pragma: no cover - pass + with self.mutex: + for k in self: + try: + yield self.data[k] + except KeyError: # pragma: no cover + pass + itervalues = _iterate_values def _iterate_keys(self): # userdict.keys in py3k calls __getitem__ - return keys(self.data) + with self.mutex: + return keys(self.data) iterkeys = _iterate_keys def incr(self, key, delta=1): @@ -81,7 +87,7 @@ class LRUCache(UserDict): # integer as long as it exists and we can cast it newval = int(self.data.pop(key)) + delta self[key] = str(newval) - return newval + return newval def __getstate__(self): d = dict(vars(self)) @@ -108,7 +114,7 @@ class LRUCache(UserDict): return list(self._iterate_items()) -def memoize(maxsize=None, Cache=LRUCache): +def memoize(maxsize=None, keyfun=None, Cache=LRUCache): def _memoize(fun): mutex = threading.Lock() @@ -116,7 +122,10 @@ def memoize(maxsize=None, Cache=LRUCache): @wraps(fun) def _M(*args, **kwargs): - key = args + (KEYWORD_MARK,) + tuple(sorted(kwargs.items())) + if keyfun: + key = keyfun(args, kwargs) + else: + key = args + (KEYWORD_MARK,) + tuple(sorted(kwargs.items())) try: with mutex: value = cache[key] diff --git a/kombu/utils/json.py b/kombu/utils/json.py index be4e6a05..3b3d1911 100644 --- a/kombu/utils/json.py +++ b/kombu/utils/json.py @@ -11,6 +11,11 @@ try: except ImportError: # pragma: no cover import json # noqa + class _DecodeError(Exception): # noqa + pass +else: + from simplejson.decoder import JSONDecodeError as _DecodeError + IS_PY3 = sys.version_info[0] == 3 _encoder_cls = type(json._default_encoder) @@ -47,11 +52,8 @@ def loads(s, _loads=json.loads, decode_bytes=IS_PY3): elif isinstance(s, buffer_t): s = text_t(s) # ... awwwwwww :( - if json.__name__ == 'simplejson': - try: - return _loads(s) - # catch simplejson.decoder.JSONDecodeError: Unpaired high surrogate - except json.decoder.JSONDecodeError: - return stdjson.loads(s) - else: + try: return _loads(s) + except _DecodeError: + # catch "Unpaired high surrogate" error + return stdjson.loads(s) diff --git a/kombu/utils/scheduling.py b/kombu/utils/scheduling.py index bc788489..7e179e8a 100644 --- a/kombu/utils/scheduling.py +++ b/kombu/utils/scheduling.py @@ -71,6 +71,7 @@ class round_robin_cycle(object): items.append(items.pop(items.index(last_used))) except ValueError: pass + return last_used class priority_cycle(round_robin_cycle): |