diff options
Diffstat (limited to 'kombu/tests')
-rw-r--r-- | kombu/tests/mocks.py | 2 | ||||
-rw-r--r-- | kombu/tests/test_clocks.py | 27 | ||||
-rw-r--r-- | kombu/tests/test_common.py | 172 | ||||
-rw-r--r-- | kombu/tests/test_connection.py | 174 | ||||
-rw-r--r-- | kombu/tests/test_entities.py | 109 | ||||
-rw-r--r-- | kombu/tests/test_messaging.py | 39 | ||||
-rw-r--r-- | kombu/tests/test_pools.py | 35 | ||||
-rw-r--r-- | kombu/tests/test_utils.py | 125 | ||||
-rw-r--r-- | kombu/tests/transport/test_amqplib.py | 2 | ||||
-rw-r--r-- | kombu/tests/transport/test_filesystem.py | 4 | ||||
-rw-r--r-- | kombu/tests/transport/test_memory.py | 4 | ||||
-rw-r--r-- | kombu/tests/transport/test_mongodb.py | 2 | ||||
-rw-r--r-- | kombu/tests/transport/test_pyamqp.py | 26 | ||||
-rw-r--r-- | kombu/tests/transport/test_redis.py | 33 | ||||
-rw-r--r-- | kombu/tests/transport/test_sqlalchemy.py | 2 | ||||
-rw-r--r-- | kombu/tests/transport/virtual/test_base.py | 29 | ||||
-rw-r--r-- | kombu/tests/utilities/test_amq_manager.py | 35 | ||||
-rw-r--r-- | kombu/tests/utilities/test_debug.py | 58 |
18 files changed, 825 insertions, 53 deletions
diff --git a/kombu/tests/mocks.py b/kombu/tests/mocks.py index 6e1be5eb..c3f1e882 100644 --- a/kombu/tests/mocks.py +++ b/kombu/tests/mocks.py @@ -22,6 +22,7 @@ class Message(base.Message): class Channel(base.StdChannel): open = True throw_decode_error = False + _ids = count(1) def __init__(self, connection): self.connection = connection @@ -29,6 +30,7 @@ class Channel(base.StdChannel): self.deliveries = count(1) self.to_deliver = [] self.events = {'basic_return': set()} + self.channel_id = next(self._ids) def _called(self, name): self.called.append(name) diff --git a/kombu/tests/test_clocks.py b/kombu/tests/test_clocks.py index 27b68f93..ed8c9fa8 100644 --- a/kombu/tests/test_clocks.py +++ b/kombu/tests/test_clocks.py @@ -1,5 +1,7 @@ from __future__ import absolute_import +from heapq import heappush + from kombu.clocks import LamportClock from .utils import TestCase @@ -17,6 +19,7 @@ class test_LamportClock(TestCase): c1.forward() c2.adjust(c1.value) self.assertEqual(c2.value, c1.value + 1) + self.assertTrue(repr(c1)) c2_val = c2.value c2.forward() @@ -26,3 +29,27 @@ class test_LamportClock(TestCase): c1.adjust(c2.value) self.assertEqual(c1.value, c2.value + 1) + + def test_sort(self): + c = LamportClock() + pid1 = 'a.example.com:312' + pid2 = 'b.example.com:311' + + events = [] + + m1 = (c.forward(), pid1) + heappush(events, m1) + m2 = (c.forward(), pid2) + heappush(events, m2) + m3 = (c.forward(), pid1) + heappush(events, m3) + m4 = (30, pid1) + heappush(events, m4) + m5 = (30, pid2) + heappush(events, m5) + + self.assertEqual(str(c), str(c.value)) + + self.assertEqual(c.sort_heap(events), m1) + self.assertEqual(c.sort_heap([m4, m5]), m4) + self.assertEqual(c.sort_heap([m4, m5, m1]), m4) diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py index 524f60b9..ffcf9424 100644 --- a/kombu/tests/test_common.py +++ b/kombu/tests/test_common.py @@ -5,13 +5,55 @@ import socket from mock import patch from kombu import common -from kombu.common import (Broadcast, maybe_declare, - send_reply, isend_reply, collect_replies) +from kombu.common import ( + Broadcast, maybe_declare, + send_reply, isend_reply, collect_replies, + declaration_cached, ignore_errors, + QoS, PREFETCH_COUNT_MAX, + entry_to_queue, +) +from kombu.exceptions import StdChannelError from .utils import TestCase from .utils import ContextMock, Mock, MockPool +class test_ignore_errors(TestCase): + + def test_ignored(self): + connection = Mock() + connection.channel_errors = (KeyError, ) + connection.connection_errors = (KeyError, ) + + with ignore_errors(connection): + raise KeyError() + + def raising(): + raise KeyError() + + ignore_errors(connection, raising) + + connection.channel_errors = connection.connection_errors = \ + () + + with self.assertRaises(KeyError): + with ignore_errors(connection): + raise KeyError() + + +class test_declaration_cached(TestCase): + + def test_when_cached(self): + chan = Mock() + chan.connection.client.declared_entities = ['foo'] + self.assertTrue(declaration_cached('foo', chan)) + + def test_when_not_cached(self): + chan = Mock() + chan.connection.client.declared_entities = ['bar'] + self.assertFalse(declaration_cached('foo', chan)) + + class test_Broadcast(TestCase): def test_arguments(self): @@ -46,6 +88,10 @@ class test_maybe_declare(TestCase): maybe_declare(entity, channel) self.assertEqual(entity.declare.call_count, 1) + entity.channel.connection = None + with self.assertRaises(StdChannelError): + maybe_declare(entity) + def test_binds_entities(self): channel = Mock() channel.connection.client.declared_entities = set() @@ -298,3 +344,125 @@ class test_itermessages(TestCase): with self.assertRaises(StopIteration): next(it) + + +class test_entry_to_queue(TestCase): + + def test_calls_Queue_from_dict(self): + with patch('kombu.common.Queue') as Queue: + entry_to_queue('name', exchange='bar') + Queue.from_dict.assert_called_with('name', exchange='bar') + + +class test_QoS(TestCase): + + class _QoS(QoS): + def __init__(self, value): + self.value = value + QoS.__init__(self, None, value) + + def set(self, value): + return value + + def test_qos_exceeds_16bit(self): + with patch('kombu.common.logger') as logger: + callback = Mock() + qos = QoS(callback, 10) + qos.prev = 100 + qos.set(2 ** 32) + self.assertTrue(logger.warn.called) + callback.assert_called_with(prefetch_count=0) + + def test_qos_increment_decrement(self): + qos = self._QoS(10) + self.assertEqual(qos.increment_eventually(), 11) + self.assertEqual(qos.increment_eventually(3), 14) + self.assertEqual(qos.increment_eventually(-30), 14) + self.assertEqual(qos.decrement_eventually(7), 7) + self.assertEqual(qos.decrement_eventually(), 6) + + def test_qos_disabled_increment_decrement(self): + qos = self._QoS(0) + self.assertEqual(qos.increment_eventually(), 0) + self.assertEqual(qos.increment_eventually(3), 0) + self.assertEqual(qos.increment_eventually(-30), 0) + self.assertEqual(qos.decrement_eventually(7), 0) + self.assertEqual(qos.decrement_eventually(), 0) + self.assertEqual(qos.decrement_eventually(10), 0) + + def test_qos_thread_safe(self): + qos = self._QoS(10) + + def add(): + for i in range(1000): + qos.increment_eventually() + + def sub(): + for i in range(1000): + qos.decrement_eventually() + + def threaded(funs): + from threading import Thread + threads = [Thread(target=fun) for fun in funs] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + threaded([add, add]) + self.assertEqual(qos.value, 2010) + + qos.value = 1000 + threaded([add, sub]) # n = 2 + self.assertEqual(qos.value, 1000) + + def test_exceeds_short(self): + qos = QoS(Mock(), PREFETCH_COUNT_MAX - 1) + qos.update() + self.assertEqual(qos.value, PREFETCH_COUNT_MAX - 1) + qos.increment_eventually() + self.assertEqual(qos.value, PREFETCH_COUNT_MAX) + qos.increment_eventually() + self.assertEqual(qos.value, PREFETCH_COUNT_MAX + 1) + qos.decrement_eventually() + self.assertEqual(qos.value, PREFETCH_COUNT_MAX) + qos.decrement_eventually() + self.assertEqual(qos.value, PREFETCH_COUNT_MAX - 1) + + def test_consumer_increment_decrement(self): + mconsumer = Mock() + qos = QoS(mconsumer.qos, 10) + qos.update() + self.assertEqual(qos.value, 10) + mconsumer.qos.assert_called_with(prefetch_count=10) + qos.decrement_eventually() + qos.update() + self.assertEqual(qos.value, 9) + mconsumer.qos.assert_called_with(prefetch_count=9) + qos.decrement_eventually() + self.assertEqual(qos.value, 8) + mconsumer.qos.assert_called_with(prefetch_count=9) + self.assertIn({'prefetch_count': 9}, mconsumer.qos.call_args) + + # Does not decrement 0 value + qos.value = 0 + qos.decrement_eventually() + self.assertEqual(qos.value, 0) + qos.increment_eventually() + self.assertEqual(qos.value, 0) + + def test_consumer_decrement_eventually(self): + mconsumer = Mock() + qos = QoS(mconsumer.qos, 10) + qos.decrement_eventually() + self.assertEqual(qos.value, 9) + qos.value = 0 + qos.decrement_eventually() + self.assertEqual(qos.value, 0) + + def test_set(self): + mconsumer = Mock() + qos = QoS(mconsumer.qos, 10) + qos.set(12) + self.assertEqual(qos.prev, 12) + qos.set(qos.prev) diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py index 9bdf6343..4af9eb37 100644 --- a/kombu/tests/test_connection.py +++ b/kombu/tests/test_connection.py @@ -1,7 +1,11 @@ from __future__ import absolute_import +import errno import pickle +import socket +from copy import copy +from mock import patch from nose import SkipTest from kombu import Connection, Consumer, Producer, parse_url @@ -10,6 +14,7 @@ from kombu.five import items from .mocks import Transport from .utils import TestCase + from .utils import Mock, skip_if_not_module @@ -154,6 +159,157 @@ class test_Connection(TestCase): self.assertFalse(_connection.connected) self.assertIsInstance(conn.transport, Transport) + def test_multiple_urls(self): + conn1 = Connection('amqp://foo;amqp://bar') + self.assertEqual(conn1.hostname, 'foo') + self.assertListEqual(conn1.alt, ['amqp://foo', 'amqp://bar']) + + conn2 = Connection(['amqp://foo', 'amqp://bar']) + self.assertEqual(conn2.hostname, 'foo') + self.assertListEqual(conn2.alt, ['amqp://foo', 'amqp://bar']) + + def test_uri_passthrough(self): + from kombu import connection as mod + prev, mod.URI_PASSTHROUGH = mod.URI_PASSTHROUGH, set(['foo']) + try: + with patch('kombu.connection.parse_url') as parse_url: + c = Connection('foo+mysql://some_host') + self.assertEqual(c.transport_cls, 'foo') + self.assertFalse(parse_url.called) + self.assertEqual(c.hostname, 'mysql://some_host') + self.assertTrue(c.as_uri().startswith('foo+')) + with patch('kombu.connection.parse_url') as parse_url: + c = Connection('mysql://some_host', transport='foo') + self.assertEqual(c.transport_cls, 'foo') + self.assertFalse(parse_url.called) + self.assertEqual(c.hostname, 'mysql://some_host') + finally: + mod.URI_PASSTHROUGH = prev + c = Connection('amqp+sqlite://some_host') + self.assertTrue(c.as_uri().startswith('amqp+')) + + def test_default_ensure_callback(self): + with patch('kombu.connection.logger') as logger: + c = Connection(transport=Mock) + c._default_ensure_callback(KeyError(), 3) + self.assertTrue(logger.error.called) + + def test_ensure_connection_on_error(self): + c = Connection('amqp://A;amqp://B') + with patch('kombu.connection.retry_over_time') as rot: + c.ensure_connection() + self.assertTrue(rot.called) + + args = rot.call_args[0] + cb = args[4] + intervals = iter([1, 2, 3, 4, 5]) + self.assertEqual(cb(KeyError(), intervals, 0), 0) + self.assertEqual(cb(KeyError(), intervals, 1), 1) + self.assertEqual(cb(KeyError(), intervals, 2), 0) + self.assertEqual(cb(KeyError(), intervals, 3), 2) + self.assertEqual(cb(KeyError(), intervals, 4), 0) + self.assertEqual(cb(KeyError(), intervals, 5), 3) + self.assertEqual(cb(KeyError(), intervals, 6), 0) + self.assertEqual(cb(KeyError(), intervals, 7), 4) + + errback = Mock() + c.ensure_connection(errback=errback) + args = rot.call_args[0] + cb = args[4] + self.assertEqual(cb(KeyError(), intervals, 0), 0) + self.assertTrue(errback.called) + + def test_drain_nowait(self): + c = Connection(transport=Mock) + c.drain_events = Mock() + c.drain_events.side_effect = socket.timeout() + + c.more_to_read = True + self.assertFalse(c.drain_nowait()) + self.assertFalse(c.more_to_read) + + c.drain_events.side_effect = socket.error() + c.drain_events.side_effect.errno = errno.EAGAIN + c.more_to_read = True + self.assertFalse(c.drain_nowait()) + self.assertFalse(c.more_to_read) + + c.drain_events.side_effect = socket.error() + c.drain_events.side_effect.errno = errno.EPERM + with self.assertRaises(socket.error): + c.drain_nowait() + + c.more_to_read = False + c.drain_events = Mock() + self.assertTrue(c.drain_nowait()) + c.drain_events.assert_called_with(timeout=0) + self.assertTrue(c.more_to_read) + + def test_supports_heartbeats(self): + c = Connection(transport=Mock) + c.transport.supports_heartbeats = False + self.assertFalse(c.supports_heartbeats) + + def test_is_evented(self): + c = Connection(transport=Mock) + c.transport.supports_ev = False + self.assertFalse(c.is_evented) + + def test_eventmap(self): + c = Connection(transport=Mock) + c.transport.eventmap.return_value = {1: 1, 2: 2} + self.assertDictEqual(c.eventmap, {1: 1, 2: 2}) + c.transport.eventmap.assert_called_with(c.connection) + + def test_manager(self): + c = Connection(transport=Mock) + self.assertIs(c.manager, c.transport.manager) + + def test_copy(self): + c = Connection('amqp://example.com') + self.assertEqual(copy(c).info(), c.info()) + + def test_switch(self): + c = Connection('amqp://foo') + c._closed = True + c.switch('redis://example.com//3') + self.assertFalse(c._closed) + self.assertEqual(c.hostname, 'example.com') + self.assertEqual(c.transport_cls, 'redis') + self.assertEqual(c.virtual_host, '/3') + + def test_maybe_switch_next(self): + c = Connection('amqp://foo;redis://example.com//3') + c.maybe_switch_next() + self.assertFalse(c._closed) + self.assertEqual(c.hostname, 'example.com') + self.assertEqual(c.transport_cls, 'redis') + self.assertEqual(c.virtual_host, '/3') + + def test_maybe_switch_next_no_cycle(self): + c = Connection('amqp://foo') + c.maybe_switch_next() + self.assertFalse(c._closed) + self.assertEqual(c.hostname, 'foo') + self.assertIn(c.transport_cls, ('librabbitmq', 'pyamqp')) + + def test_heartbeat_check(self): + c = Connection(transport=Transport) + c.transport.heartbeat_check = Mock() + c.heartbeat_check(3) + c.transport.heartbeat_check.assert_called_with(c.connection, rate=3) + + def test_completes_cycle_no_cycle(self): + c = Connection('amqp://') + self.assertTrue(c.completes_cycle(0)) + self.assertTrue(c.completes_cycle(1)) + + def test_completes_cycle(self): + c = Connection('amqp://a;amqp://b;amqp://c') + self.assertFalse(c.completes_cycle(0)) + self.assertFalse(c.completes_cycle(1)) + self.assertTrue(c.completes_cycle(2)) + def test__enter____exit__(self): conn = self.conn context = conn.__enter__() @@ -357,6 +513,18 @@ class ResourceCase(TestCase): [chan.release() for chan in chans] self.assertState(P, 10, 0) + def test_acquire_prepare_raises(self): + if self.abstract: + return + P = self.create_resource(10, 0) + + self.assertEqual(len(P._resource.queue), 10) + P.prepare = Mock() + P.prepare.side_effect = IOError() + with self.assertRaises(IOError): + P.acquire(block=True) + self.assertEqual(len(P._resource.queue), 10) + def test_acquire_no_limit(self): if self.abstract: return @@ -440,6 +608,12 @@ class test_ConnectionPool(ResourceCase): self.assertIsNotNone(q[1]._connection) self.assertIsNone(q[2]()._connection) + def test_release_no__debug(self): + P = self.create_resource(10, 2) + R = Mock() + R._debug.side_effect = AttributeError() + P.release_resource(R) + def test_setup_no_limit(self): P = self.create_resource(None, None) self.assertFalse(P._resource.queue) diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py index cdd9c34a..fadaf358 100644 --- a/kombu/tests/test_entities.py +++ b/kombu/tests/test_entities.py @@ -1,7 +1,10 @@ from __future__ import absolute_import -from kombu import Connection -from kombu.entity import Exchange, Queue +import pickle + +from mock import call + +from kombu import Connection, Exchange, Queue, binding from kombu.exceptions import NotBoundError from .mocks import Transport @@ -13,6 +16,48 @@ def get_conn(): return Connection(transport=Transport) +class test_binding(TestCase): + + def test_constructor(self): + x = binding(Exchange('foo'), 'rkey', + arguments={'barg': 'bval'}, + unbind_arguments={'uarg': 'uval'}, + ) + self.assertEqual(x.exchange, Exchange('foo')) + self.assertEqual(x.routing_key, 'rkey') + self.assertDictEqual(x.arguments, {'barg': 'bval'}) + self.assertDictEqual(x.unbind_arguments, {'uarg': 'uval'}) + + def test_declare(self): + chan = get_conn().channel() + x = binding(Exchange('foo'), 'rkey') + x.declare(chan) + self.assertIn('exchange_declare', chan) + + def test_declare_no_exchange(self): + chan = get_conn().channel() + x = binding() + x.declare(chan) + self.assertNotIn('exchange_declare', chan) + + def test_bind(self): + chan = get_conn().channel() + x = binding(Exchange('foo')) + x.bind(Exchange('bar')(chan)) + self.assertIn('exchange_bind', chan) + + def test_unbind(self): + chan = get_conn().channel() + x = binding(Exchange('foo')) + x.unbind(Exchange('bar')(chan)) + self.assertIn('exchange_unbind', chan) + + def test_repr(self): + b = binding(Exchange('foo'), 'rkey') + self.assertIn('foo', repr(b)) + self.assertIn('rkey', repr(b)) + + class test_Exchange(TestCase): def test_bound(self): @@ -24,7 +69,8 @@ class test_Exchange(TestCase): bound = exchange.bind(chan) self.assertTrue(bound.is_bound) self.assertIs(bound.channel, chan) - self.assertIn('<bound', repr(bound)) + self.assertIn('bound to chan:%r' % (chan.channel_id, ), + repr(bound)) def test_hash(self): self.assertEqual(hash(Exchange('a')), hash(Exchange('a'))) @@ -34,6 +80,11 @@ class test_Exchange(TestCase): self.assertTrue(Exchange('a', durable=True).can_cache_declaration) self.assertFalse(Exchange('a', durable=False).can_cache_declaration) + def test_pickle(self): + e1 = Exchange('foo', 'direct') + e2 = pickle.loads(pickle.dumps(e1)) + self.assertEqual(e1, e2) + def test_eq(self): e1 = Exchange('foo', 'direct') e2 = Exchange('foo', 'direct') @@ -111,6 +162,12 @@ class test_Exchange(TestCase): foo(chan).bind_to(bar) self.assertIn('exchange_bind', chan) + def test_bind_to_by_name(self): + chan = get_conn().channel() + foo = Exchange('foo', 'topic') + foo(chan).bind_to('bar') + self.assertIn('exchange_bind', chan) + def test_unbind_from(self): chan = get_conn().channel() foo = Exchange('foo', 'topic') @@ -118,6 +175,12 @@ class test_Exchange(TestCase): foo(chan).unbind_from(bar) self.assertIn('exchange_unbind', chan) + def test_unbind_from_by_name(self): + chan = get_conn().channel() + foo = Exchange('foo', 'topic') + foo(chan).unbind_from('bar') + self.assertIn('exchange_unbind', chan) + class test_Queue(TestCase): @@ -128,6 +191,14 @@ class test_Queue(TestCase): self.assertEqual(hash(Queue('a')), hash(Queue('a'))) self.assertNotEqual(hash(Queue('a')), hash(Queue('b'))) + def test_anonymous(self): + chan = Mock() + x = Queue(bindings=[binding(Exchange('foo'), 'rkey')]) + chan.queue_declare.return_value = 'generated', 0, 0 + xx = x(chan) + xx.declare() + self.assertEqual(xx.name, 'generated') + def test_when_bound_but_no_exchange(self): q = Queue('a') q.exchange = None @@ -141,7 +212,37 @@ class test_Queue(TestCase): q.declare() q.queue_declare.assert_called_with(False, passive=False) - q.queue_bind.assert_called_with(False) + + def test_bind_to_when_name(self): + chan = Mock() + q = Queue('a') + q(chan).bind_to('ex') + self.assertTrue(chan.queue_bind.called) + + def test_get_when_no_m2p(self): + chan = Mock() + q = Queue('a')(chan) + chan.message_to_python = None + self.assertTrue(q.get()) + + def test_multiple_bindings(self): + chan = Mock() + q = Queue('mul', [ + binding(Exchange('mul1'), 'rkey1'), + binding(Exchange('mul2'), 'rkey2'), + binding(Exchange('mul3'), 'rkey3'), + ]) + q(chan).declare() + self.assertIn( + call(nowait=False, + exchange='mul1', + auto_delete=False, + passive=False, + arguments=None, + type='direct', + durable=True, + ), chan.exchange_declare.call_args_list, + ) def test_can_cache_declaration(self): self.assertTrue(Queue('a', durable=True).can_cache_declaration) diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py index 4c5fe619..249af87f 100644 --- a/kombu/tests/test_messaging.py +++ b/kombu/tests/test_messaging.py @@ -1,13 +1,14 @@ from __future__ import absolute_import, unicode_literals import anyjson +import pickle +from collections import defaultdict from mock import patch -from kombu import Connection +from kombu import Connection, Consumer, Producer, Exchange, Queue from kombu.exceptions import MessageStateError -from kombu.messaging import Consumer, Producer -from kombu.entity import Exchange, Queue +from kombu.utils import ChannelPromise from .mocks import Transport from .utils import TestCase @@ -23,6 +24,16 @@ class test_Producer(TestCase): self.assertTrue(self.connection.connection.connected) self.assertFalse(self.exchange.is_bound) + def test_pickle(self): + chan = Mock() + producer = Producer(chan, serializer='pickle') + p2 = pickle.loads(pickle.dumps(producer)) + self.assertEqual(p2.serializer, producer.serializer) + + def test_no_channel(self): + p = Producer(None) + self.assertFalse(p._channel) + @patch('kombu.common.maybe_declare') def test_maybe_declare(self, maybe_declare): p = self.connection.Producer() @@ -109,11 +120,25 @@ class test_Producer(TestCase): def test_publish_with_Exchange_instance(self): p = self.connection.Producer() p.channel = Mock() - p.publish('hello', exchange=Exchange('foo')) + p.publish('hello', exchange=Exchange('foo'), delivery_mode='transient') self.assertEqual( p._channel.basic_publish.call_args[1]['exchange'], 'foo', ) + def test_set_on_return(self): + chan = Mock() + chan.events = defaultdict(Mock) + p = Producer(ChannelPromise(lambda: chan), on_return='on_return') + p.channel + chan.events['basic_return'].add.assert_called_with('on_return') + + def test_publish_retry_calls_ensure(self): + p = Producer(Mock()) + p._connection = Mock() + ensure = p.connection.ensure = Mock() + p.publish('foo', exchange='foo', retry=True) + self.assertTrue(ensure.called) + def test_publish_retry_with_declare(self): p = self.connection.Producer() p.maybe_declare = Mock() @@ -195,6 +220,12 @@ class test_Consumer(TestCase): self.assertTrue(self.connection.connection.connected) self.exchange = Exchange('foo', 'direct') + def test_set_no_channel(self): + c = Consumer(None) + self.assertIsNone(c.channel) + c.revive(Mock()) + self.assertTrue(c.channel) + def test_set_no_ack(self): channel = self.connection.channel() queue = Queue('qname', self.exchange, 'rkey') diff --git a/kombu/tests/test_pools.py b/kombu/tests/test_pools.py index 32d1294f..a9386602 100644 --- a/kombu/tests/test_pools.py +++ b/kombu/tests/test_pools.py @@ -1,9 +1,8 @@ from __future__ import absolute_import -from kombu import Connection +from kombu import Connection, Producer from kombu import pools from kombu.connection import ConnectionPool -from kombu.messaging import Producer from kombu.utils import eqhash from .utils import TestCase @@ -26,6 +25,34 @@ class test_ProducerPool(TestCase): self.connections = Mock() self.pool = self.Pool(self.connections, limit=10) + def test_releases_connection_when_Producer_raises(self): + self.pool.Producer = Mock() + self.pool.Producer.side_effect = IOError() + acq = self.pool._acquire_connection = Mock() + conn = acq.return_value = Mock() + with self.assertRaises(IOError): + self.pool.create_producer() + conn.release.assert_called_with() + + def test_prepare_release_connection_on_error(self): + pp = Mock() + p = pp.return_value = Mock() + p.revive.side_effect = IOError() + acq = self.pool._acquire_connection = Mock() + conn = acq.return_value = Mock() + p._channel = None + with self.assertRaises(IOError): + self.pool.prepare(pp) + conn.release.assert_called_with() + + def test_release_releases_connection(self): + p = Mock() + p.__connection__ = Mock() + self.pool.release(p) + p.__connection__.release.assert_called_with() + p.__connection__ = None + self.pool.release(p) + def test_init(self): self.assertIs(self.pool.connections, self.connections) @@ -57,7 +84,7 @@ class test_ProducerPool(TestCase): def test_prepare(self): connection = self.connections.acquire.return_value = Mock() pool = self.MyPool(self.connections, limit=10) - pool.instance.channel = None + pool.instance._channel = None first = pool._resource.get_nowait() producer = pool.prepare(first) self.assertTrue(self.connections.acquire.called) @@ -66,7 +93,7 @@ class test_ProducerPool(TestCase): def test_prepare_channel_already_created(self): self.connections.acquire.return_value = Mock() pool = self.MyPool(self.connections, limit=10) - pool.instance.channel = Mock() + pool.instance._channel = Mock() first = pool._resource.get_nowait() self.connections.acquire.reset() producer = pool.prepare(first) diff --git a/kombu/tests/test_utils.py b/kombu/tests/test_utils.py index 3633f8b1..38b0f581 100644 --- a/kombu/tests/test_utils.py +++ b/kombu/tests/test_utils.py @@ -5,12 +5,15 @@ import sys from functools import wraps from io import BytesIO, StringIO +from mock import Mock, patch from kombu import utils from kombu.five import string_t -from .utils import redirect_stdouts, mask_modules, skip_if_module -from .utils import TestCase +from .utils import ( + TestCase, + redirect_stdouts, mask_modules, module_exists, skip_if_module, +) class OldString(object): @@ -28,6 +31,13 @@ class OldString(object): return self.value.rsplit(*args, **kwargs) +class test_kombu_module(TestCase): + + def test_dir(self): + import kombu + self.assertTrue(dir(kombu)) + + class test_utils(TestCase): def test_maybe_list(self): @@ -174,10 +184,21 @@ class test_retry_over_time(TestCase): @insomnia def test_simple(self): - x = utils.retry_over_time(self.myfun, self.Predicate, - errback=self.errback, interval_max=14) - self.assertEqual(x, 42) - self.assertEqual(self.index, 9) + prev_count, utils.count = utils.count, Mock() + try: + utils.count.return_value = range(1) + x = utils.retry_over_time(self.myfun, self.Predicate, + errback=None, interval_max=14) + self.assertIsNone(x) + utils.count.return_value = range(10) + cb = Mock() + x = utils.retry_over_time(self.myfun, self.Predicate, + errback=self.errback, callback=cb, interval_max=14) + self.assertEqual(x, 42) + self.assertEqual(self.index, 9) + cb.assert_called_with() + finally: + utils.count = prev_count @insomnia def test_retry_once(self): @@ -200,6 +221,26 @@ class test_retry_over_time(TestCase): class test_cached_property(TestCase): + def test_deleting(self): + + class X(object): + xx = False + + @utils.cached_property + def foo(self): + return 42 + + @foo.deleter # noqa + def foo(self, value): + self.xx = value + + x = X() + del(x.foo) + self.assertFalse(x.xx) + x.__dict__['foo'] = 'here' + del(x.foo) + self.assertEqual(x.xx, 'here') + def test_when_access_from_class(self): class X(object): @@ -226,3 +267,75 @@ class test_cached_property(TestCase): self.assertEqual(x.xx, 10) del(x.foo) + + +class test_symbol_by_name(TestCase): + + def test_instance_returns_instance(self): + instance = object() + self.assertIs(utils.symbol_by_name(instance), instance) + + def test_returns_default(self): + default = object() + self.assertIs(utils.symbol_by_name('xyz.ryx.qedoa.weq:foz', + default=default), default) + + def test_no_default(self): + with self.assertRaises(ImportError): + utils.symbol_by_name('xyz.ryx.qedoa.weq:foz') + + def test_imp_reraises_ValueError(self): + imp = Mock() + imp.side_effect = ValueError() + with self.assertRaises(ValueError): + utils.symbol_by_name('kombu.Connection', imp=imp) + + def test_package(self): + from kombu.entity import Exchange + self.assertIs(utils.symbol_by_name('.entity:Exchange', + package='kombu'), Exchange) + self.assertTrue(utils.symbol_by_name(':Consumer', package='kombu')) + + +class test_ChannelPromise(TestCase): + + def test_repr(self): + self.assertEqual(repr(utils.ChannelPromise(lambda: 'foo')), + "<promise: 'foo'>") + + +class test_entrypoints(TestCase): + + @mask_modules('pkg_resources') + def test_without_pkg_resources(self): + self.assertListEqual(list(utils.entrypoints('kombu.test')), []) + + @module_exists('pkg_resources') + def test_with_pkg_resources(self): + with patch('pkg_resources.iter_entry_points', create=True) as iterep: + eps = iterep.return_value = [Mock(), Mock()] + + self.assertTrue(list(utils.entrypoints('kombu.test'))) + iterep.assert_called_with('kombu.test') + eps[0].load.assert_called_with() + eps[1].load.assert_called_with() + + +class test_shufflecycle(TestCase): + + def test_shuffles(self): + prev_repeat, utils.repeat = utils.repeat, Mock() + try: + utils.repeat.return_value = range(10) + values = set(['A', 'B', 'C']) + cycle = utils.shufflecycle(values) + seen = set() + for i in xrange(10): + cycle.next() + utils.repeat.assert_called_with(None) + self.assertTrue(seen.issubset(values)) + with self.assertRaises(StopIteration): + cycle.next() + cycle.next() + finally: + utils.repeat = prev_repeat diff --git a/kombu/tests/transport/test_amqplib.py b/kombu/tests/transport/test_amqplib.py index 6ce95cc9..fb6c0141 100644 --- a/kombu/tests/transport/test_amqplib.py +++ b/kombu/tests/transport/test_amqplib.py @@ -4,7 +4,7 @@ import sys from nose import SkipTest -from kombu.connection import Connection +from kombu import Connection from kombu.tests.utils import TestCase from kombu.tests.utils import mask_modules, Mock diff --git a/kombu/tests/transport/test_filesystem.py b/kombu/tests/transport/test_filesystem.py index 6a8d64d1..7bdad8f5 100644 --- a/kombu/tests/transport/test_filesystem.py +++ b/kombu/tests/transport/test_filesystem.py @@ -4,9 +4,7 @@ import tempfile from nose import SkipTest -from kombu.connection import Connection -from kombu.entity import Exchange, Queue -from kombu.messaging import Consumer, Producer +from kombu import Connection, Exchange, Queue, Consumer, Producer from kombu.tests.utils import TestCase diff --git a/kombu/tests/transport/test_memory.py b/kombu/tests/transport/test_memory.py index 95e3432e..970681d7 100644 --- a/kombu/tests/transport/test_memory.py +++ b/kombu/tests/transport/test_memory.py @@ -2,9 +2,7 @@ from __future__ import absolute_import import socket -from kombu.connection import Connection -from kombu.entity import Exchange, Queue -from kombu.messaging import Consumer, Producer +from kombu import Connection, Exchange, Queue, Consumer, Producer from kombu.tests.utils import TestCase diff --git a/kombu/tests/transport/test_mongodb.py b/kombu/tests/transport/test_mongodb.py index b05e7d9f..522df04a 100644 --- a/kombu/tests/transport/test_mongodb.py +++ b/kombu/tests/transport/test_mongodb.py @@ -2,7 +2,7 @@ from __future__ import absolute_import from nose import SkipTest -from kombu.connection import Connection +from kombu import Connection from kombu.tests.utils import TestCase, skip_if_not_module diff --git a/kombu/tests/transport/test_pyamqp.py b/kombu/tests/transport/test_pyamqp.py index b48ec153..7936484f 100644 --- a/kombu/tests/transport/test_pyamqp.py +++ b/kombu/tests/transport/test_pyamqp.py @@ -2,6 +2,7 @@ from __future__ import absolute_import import sys +from mock import patch from nose import SkipTest try: @@ -10,7 +11,7 @@ except ImportError: pyamqp = None # noqa else: from kombu.transport import pyamqp -from kombu.connection import Connection +from kombu import Connection from kombu.tests.utils import TestCase from kombu.tests.utils import mask_modules, Mock @@ -160,3 +161,26 @@ class test_pyamqp(TestCase): c = Connection(port=1337, transport=Transport).connect() self.assertEqual(c['host'], '127.0.0.1:1337') + + def test_eventmap(self): + t = pyamqp.Transport(Mock()) + conn = Mock() + self.assertDictEqual(t.eventmap(conn), + {conn.sock: t.client.drain_nowait}) + + def test_event_interface(self): + t = pyamqp.Transport(Mock()) + t.on_poll_init(Mock()) + t.on_poll_start() + + def test_heartbeat_check(self): + t = pyamqp.Transport(Mock()) + conn = Mock() + t.heartbeat_check(conn, rate=4.331) + conn.heartbeat_tick.assert_called_with(rate=4.331) + + def test_get_manager(self): + with patch('kombu.transport.pyamqp.get_manager') as get_manager: + t = pyamqp.Transport(Mock()) + t.get_manager(1, kw=2) + get_manager.assert_called_with(t.client, 1, kw=2) diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py index 403e815e..cf166e87 100644 --- a/kombu/tests/transport/test_redis.py +++ b/kombu/tests/transport/test_redis.py @@ -7,11 +7,9 @@ from anyjson import dumps from collections import defaultdict from itertools import count -from kombu.connection import Connection -from kombu.entity import Exchange, Queue +from kombu import Connection, Exchange, Queue, Consumer, Producer from kombu.exceptions import InconsistencyError, VersionMismatch from kombu.five import Empty, Queue as _Queue -from kombu.messaging import Consumer, Producer from kombu.utils import eventio # patch poll from kombu.tests.utils import TestCase @@ -42,9 +40,7 @@ class Client(object): hashes = defaultdict(dict) shard_hint = None - def __init__(self, db=None, port=None, **kwargs): - self.port = port - self.db = db + def __init__(self, db=None, port=None, connection_pool=None, **kwargs): self._called = [] self._connection = None self.bgsave_raises_ResponseError = False @@ -384,10 +380,9 @@ class test_Channel(TestCase): c.connection.disconnect.assert_called_with() def test_invalid_database_raises_ValueError(self): - self.channel.connection.client.virtual_host = 'xfeqwewkfk' with self.assertRaises(ValueError): - self.channel._create_client() + Connection('redis:///dwqwewqe').channel() @skip_if_not_module('redis') def test_get_client(self): @@ -395,7 +390,7 @@ class test_Channel(TestCase): KombuRedis = redis.Channel._get_client(self.channel) self.assertTrue(KombuRedis) - Rv = getattr(R, 'VERSION') + Rv = getattr(R, 'VERSION', None) try: R.VERSION = (2, 4, 0) with self.assertRaises(VersionMismatch): @@ -528,29 +523,23 @@ class test_Redis(TestCase): channel.close() def test_db_values(self): - c1 = Connection(virtual_host=1, - transport=Transport).channel() - self.assertEqual(c1.client.db, 1) + Connection(virtual_host=1, + transport=Transport).channel() - c2 = Connection(virtual_host='1', - transport=Transport).channel() - self.assertEqual(c2.client.db, 1) + Connection(virtual_host='1', + transport=Transport).channel() - c3 = Connection(virtual_host='/1', - transport=Transport).channel() - self.assertEqual(c3.client.db, 1) + Connection(virtual_host='/1', + transport=Transport).channel() with self.assertRaises(Exception): - Connection(virtual_host='/foo', - transport=Transport).channel() + Connection('redis:///foo').channel() def test_db_port(self): c1 = Connection(port=None, transport=Transport).channel() - self.assertEqual(c1.client.port, Transport.default_port) c1.close() c2 = Connection(port=9999, transport=Transport).channel() - self.assertEqual(c2.client.port, 9999) c2.close() def test_close_poller_not_active(self): diff --git a/kombu/tests/transport/test_sqlalchemy.py b/kombu/tests/transport/test_sqlalchemy.py index 82b51ebc..f3d7d239 100644 --- a/kombu/tests/transport/test_sqlalchemy.py +++ b/kombu/tests/transport/test_sqlalchemy.py @@ -3,7 +3,7 @@ from __future__ import absolute_import from mock import patch from nose import SkipTest -from kombu.connection import Connection +from kombu import Connection from kombu.tests.utils import TestCase diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py index eb7eaf6d..232be80b 100644 --- a/kombu/tests/transport/virtual/test_base.py +++ b/kombu/tests/transport/virtual/test_base.py @@ -91,6 +91,9 @@ class test_QoS(TestCase): self.assertTrue(stderr.getvalue()) self.assertFalse(stdout.getvalue()) + self.q.restore_at_shutdown = False + self.q.restore_unacked_once() + def test_get(self): self.q._delivered['foo'] = 1 self.assertEqual(self.q.get('foo'), 1) @@ -176,10 +179,34 @@ class test_Channel(TestCase): if self.channel._qos is not None: self.channel._qos._on_collect.cancel() + def test_exchange_bind_interface(self): + with self.assertRaises(NotImplementedError): + self.channel.exchange_bind('dest', 'src', 'key') + + def test_exchange_unbind_interface(self): + with self.assertRaises(NotImplementedError): + self.channel.exchange_unbind('dest', 'src', 'key') + + def test_queue_unbind_interface(self): + with self.assertRaises(NotImplementedError): + self.channel.queue_unbind('dest', 'ex', 'key') + + def test_management(self): + m = self.channel.connection.client.get_manager() + self.assertTrue(m) + m.get_bindings() + m.close() + def test_exchange_declare(self): c = self.channel + + with self.assertRaises(StdChannelError): + c.exchange_declare('test_exchange_declare', 'direct', + durable=True, auto_delete=True, passive=True) c.exchange_declare('test_exchange_declare', 'direct', durable=True, auto_delete=True) + c.exchange_declare('test_exchange_declare', 'direct', + durable=True, auto_delete=True, passive=True) self.assertIn('test_exchange_declare', c.state.exchanges) # can declare again with same values c.exchange_declare('test_exchange_declare', 'direct', @@ -348,7 +375,7 @@ class test_Channel(TestCase): exc = None try: raise KeyError() - except KeyError as exc_: + except KeyError, exc_: exc = exc_ ru.return_value = [(exc, 1)] diff --git a/kombu/tests/utilities/test_amq_manager.py b/kombu/tests/utilities/test_amq_manager.py new file mode 100644 index 00000000..8f85a2c3 --- /dev/null +++ b/kombu/tests/utilities/test_amq_manager.py @@ -0,0 +1,35 @@ +from __future__ import absolute_import + +from mock import patch + +from kombu import Connection +from kombu.tests.utils import TestCase, mask_modules, module_exists + + +class test_get_manager(TestCase): + + @mask_modules('pyrabbit') + def test_without_pyrabbit(self): + with self.assertRaises(ImportError): + Connection('amqp://').get_manager() + + @module_exists('pyrabbit') + def test_with_pyrabbit(self): + with patch('pyrabbit.Client', create=True) as Client: + manager = Connection('amqp://').get_manager() + self.assertIsNotNone(manager) + Client.assert_called_with('localhost:55672', + 'guest', 'guest') + + @module_exists('pyrabbit') + def test_transport_options(self): + with patch('pyrabbit.Client', create=True) as Client: + manager = Connection('amqp://', transport_options={ + 'manager_hostname': 'admin.mq.vandelay.com', + 'manager_port': 808, + 'manager_userid': 'george', + 'manager_password': 'bosco', + }).get_manager() + self.assertIsNotNone(manager) + Client.assert_called_with('admin.mq.vandelay.com:808', + 'george', 'bosco') diff --git a/kombu/tests/utilities/test_debug.py b/kombu/tests/utilities/test_debug.py new file mode 100644 index 00000000..d364400f --- /dev/null +++ b/kombu/tests/utilities/test_debug.py @@ -0,0 +1,58 @@ +from __future__ import absolute_import + +import logging + +from mock import Mock, patch + +from kombu.utils.debug import ( + setup_logging, + Logwrapped, +) +from kombu.tests.utils import TestCase + + +class test_setup_logging(TestCase): + + def test_adds_handlers_sets_level(self): + with patch('kombu.utils.debug.get_logger') as get_logger: + logger = get_logger.return_value = Mock() + setup_logging(loggers=['kombu.test']) + + get_logger.assert_called_with('kombu.test') + + self.assertTrue(logger.addHandler.called) + logger.setLevel.assert_called_with(logging.DEBUG) + + +class test_Logwrapped(TestCase): + + def test_wraps(self): + with patch('kombu.utils.debug.get_logger') as get_logger: + logger = get_logger.return_value = Mock() + + W = Logwrapped(Mock(), 'kombu.test') + get_logger.assert_called_with('kombu.test') + self.assertIsNotNone(W.instance) + self.assertIs(W.logger, logger) + + W.instance.__repr__ = lambda s: 'foo' + self.assertEqual(repr(W), 'foo') + self.assertListEqual(dir(W), dir(W.instance)) + + W.instance.some_attr = 303 + self.assertEqual(W.some_attr, 303) + + W.instance.some_method.__name__ = 'some_method' + W.some_method(1, 2, kw=1) + W.instance.some_method.assert_called_with(1, 2, kw=1) + + W.some_method() + W.instance.some_method.assert_called_with() + + W.some_method(kw=1) + W.instance.some_method.assert_called_with(kw=1) + + W.ident = 'ident' + W.some_method(kw=1) + self.assertTrue(logger.debug.called) + self.assertIn('ident', logger.debug.call_args[0][0]) |