summaryrefslogtreecommitdiff
path: root/kombu/tests
diff options
context:
space:
mode:
Diffstat (limited to 'kombu/tests')
-rw-r--r--kombu/tests/mocks.py2
-rw-r--r--kombu/tests/test_clocks.py27
-rw-r--r--kombu/tests/test_common.py172
-rw-r--r--kombu/tests/test_connection.py174
-rw-r--r--kombu/tests/test_entities.py109
-rw-r--r--kombu/tests/test_messaging.py39
-rw-r--r--kombu/tests/test_pools.py35
-rw-r--r--kombu/tests/test_utils.py125
-rw-r--r--kombu/tests/transport/test_amqplib.py2
-rw-r--r--kombu/tests/transport/test_filesystem.py4
-rw-r--r--kombu/tests/transport/test_memory.py4
-rw-r--r--kombu/tests/transport/test_mongodb.py2
-rw-r--r--kombu/tests/transport/test_pyamqp.py26
-rw-r--r--kombu/tests/transport/test_redis.py33
-rw-r--r--kombu/tests/transport/test_sqlalchemy.py2
-rw-r--r--kombu/tests/transport/virtual/test_base.py29
-rw-r--r--kombu/tests/utilities/test_amq_manager.py35
-rw-r--r--kombu/tests/utilities/test_debug.py58
18 files changed, 825 insertions, 53 deletions
diff --git a/kombu/tests/mocks.py b/kombu/tests/mocks.py
index 6e1be5eb..c3f1e882 100644
--- a/kombu/tests/mocks.py
+++ b/kombu/tests/mocks.py
@@ -22,6 +22,7 @@ class Message(base.Message):
class Channel(base.StdChannel):
open = True
throw_decode_error = False
+ _ids = count(1)
def __init__(self, connection):
self.connection = connection
@@ -29,6 +30,7 @@ class Channel(base.StdChannel):
self.deliveries = count(1)
self.to_deliver = []
self.events = {'basic_return': set()}
+ self.channel_id = next(self._ids)
def _called(self, name):
self.called.append(name)
diff --git a/kombu/tests/test_clocks.py b/kombu/tests/test_clocks.py
index 27b68f93..ed8c9fa8 100644
--- a/kombu/tests/test_clocks.py
+++ b/kombu/tests/test_clocks.py
@@ -1,5 +1,7 @@
from __future__ import absolute_import
+from heapq import heappush
+
from kombu.clocks import LamportClock
from .utils import TestCase
@@ -17,6 +19,7 @@ class test_LamportClock(TestCase):
c1.forward()
c2.adjust(c1.value)
self.assertEqual(c2.value, c1.value + 1)
+ self.assertTrue(repr(c1))
c2_val = c2.value
c2.forward()
@@ -26,3 +29,27 @@ class test_LamportClock(TestCase):
c1.adjust(c2.value)
self.assertEqual(c1.value, c2.value + 1)
+
+ def test_sort(self):
+ c = LamportClock()
+ pid1 = 'a.example.com:312'
+ pid2 = 'b.example.com:311'
+
+ events = []
+
+ m1 = (c.forward(), pid1)
+ heappush(events, m1)
+ m2 = (c.forward(), pid2)
+ heappush(events, m2)
+ m3 = (c.forward(), pid1)
+ heappush(events, m3)
+ m4 = (30, pid1)
+ heappush(events, m4)
+ m5 = (30, pid2)
+ heappush(events, m5)
+
+ self.assertEqual(str(c), str(c.value))
+
+ self.assertEqual(c.sort_heap(events), m1)
+ self.assertEqual(c.sort_heap([m4, m5]), m4)
+ self.assertEqual(c.sort_heap([m4, m5, m1]), m4)
diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py
index 524f60b9..ffcf9424 100644
--- a/kombu/tests/test_common.py
+++ b/kombu/tests/test_common.py
@@ -5,13 +5,55 @@ import socket
from mock import patch
from kombu import common
-from kombu.common import (Broadcast, maybe_declare,
- send_reply, isend_reply, collect_replies)
+from kombu.common import (
+ Broadcast, maybe_declare,
+ send_reply, isend_reply, collect_replies,
+ declaration_cached, ignore_errors,
+ QoS, PREFETCH_COUNT_MAX,
+ entry_to_queue,
+)
+from kombu.exceptions import StdChannelError
from .utils import TestCase
from .utils import ContextMock, Mock, MockPool
+class test_ignore_errors(TestCase):
+
+ def test_ignored(self):
+ connection = Mock()
+ connection.channel_errors = (KeyError, )
+ connection.connection_errors = (KeyError, )
+
+ with ignore_errors(connection):
+ raise KeyError()
+
+ def raising():
+ raise KeyError()
+
+ ignore_errors(connection, raising)
+
+ connection.channel_errors = connection.connection_errors = \
+ ()
+
+ with self.assertRaises(KeyError):
+ with ignore_errors(connection):
+ raise KeyError()
+
+
+class test_declaration_cached(TestCase):
+
+ def test_when_cached(self):
+ chan = Mock()
+ chan.connection.client.declared_entities = ['foo']
+ self.assertTrue(declaration_cached('foo', chan))
+
+ def test_when_not_cached(self):
+ chan = Mock()
+ chan.connection.client.declared_entities = ['bar']
+ self.assertFalse(declaration_cached('foo', chan))
+
+
class test_Broadcast(TestCase):
def test_arguments(self):
@@ -46,6 +88,10 @@ class test_maybe_declare(TestCase):
maybe_declare(entity, channel)
self.assertEqual(entity.declare.call_count, 1)
+ entity.channel.connection = None
+ with self.assertRaises(StdChannelError):
+ maybe_declare(entity)
+
def test_binds_entities(self):
channel = Mock()
channel.connection.client.declared_entities = set()
@@ -298,3 +344,125 @@ class test_itermessages(TestCase):
with self.assertRaises(StopIteration):
next(it)
+
+
+class test_entry_to_queue(TestCase):
+
+ def test_calls_Queue_from_dict(self):
+ with patch('kombu.common.Queue') as Queue:
+ entry_to_queue('name', exchange='bar')
+ Queue.from_dict.assert_called_with('name', exchange='bar')
+
+
+class test_QoS(TestCase):
+
+ class _QoS(QoS):
+ def __init__(self, value):
+ self.value = value
+ QoS.__init__(self, None, value)
+
+ def set(self, value):
+ return value
+
+ def test_qos_exceeds_16bit(self):
+ with patch('kombu.common.logger') as logger:
+ callback = Mock()
+ qos = QoS(callback, 10)
+ qos.prev = 100
+ qos.set(2 ** 32)
+ self.assertTrue(logger.warn.called)
+ callback.assert_called_with(prefetch_count=0)
+
+ def test_qos_increment_decrement(self):
+ qos = self._QoS(10)
+ self.assertEqual(qos.increment_eventually(), 11)
+ self.assertEqual(qos.increment_eventually(3), 14)
+ self.assertEqual(qos.increment_eventually(-30), 14)
+ self.assertEqual(qos.decrement_eventually(7), 7)
+ self.assertEqual(qos.decrement_eventually(), 6)
+
+ def test_qos_disabled_increment_decrement(self):
+ qos = self._QoS(0)
+ self.assertEqual(qos.increment_eventually(), 0)
+ self.assertEqual(qos.increment_eventually(3), 0)
+ self.assertEqual(qos.increment_eventually(-30), 0)
+ self.assertEqual(qos.decrement_eventually(7), 0)
+ self.assertEqual(qos.decrement_eventually(), 0)
+ self.assertEqual(qos.decrement_eventually(10), 0)
+
+ def test_qos_thread_safe(self):
+ qos = self._QoS(10)
+
+ def add():
+ for i in range(1000):
+ qos.increment_eventually()
+
+ def sub():
+ for i in range(1000):
+ qos.decrement_eventually()
+
+ def threaded(funs):
+ from threading import Thread
+ threads = [Thread(target=fun) for fun in funs]
+ for thread in threads:
+ thread.start()
+ for thread in threads:
+ thread.join()
+
+ threaded([add, add])
+ self.assertEqual(qos.value, 2010)
+
+ qos.value = 1000
+ threaded([add, sub]) # n = 2
+ self.assertEqual(qos.value, 1000)
+
+ def test_exceeds_short(self):
+ qos = QoS(Mock(), PREFETCH_COUNT_MAX - 1)
+ qos.update()
+ self.assertEqual(qos.value, PREFETCH_COUNT_MAX - 1)
+ qos.increment_eventually()
+ self.assertEqual(qos.value, PREFETCH_COUNT_MAX)
+ qos.increment_eventually()
+ self.assertEqual(qos.value, PREFETCH_COUNT_MAX + 1)
+ qos.decrement_eventually()
+ self.assertEqual(qos.value, PREFETCH_COUNT_MAX)
+ qos.decrement_eventually()
+ self.assertEqual(qos.value, PREFETCH_COUNT_MAX - 1)
+
+ def test_consumer_increment_decrement(self):
+ mconsumer = Mock()
+ qos = QoS(mconsumer.qos, 10)
+ qos.update()
+ self.assertEqual(qos.value, 10)
+ mconsumer.qos.assert_called_with(prefetch_count=10)
+ qos.decrement_eventually()
+ qos.update()
+ self.assertEqual(qos.value, 9)
+ mconsumer.qos.assert_called_with(prefetch_count=9)
+ qos.decrement_eventually()
+ self.assertEqual(qos.value, 8)
+ mconsumer.qos.assert_called_with(prefetch_count=9)
+ self.assertIn({'prefetch_count': 9}, mconsumer.qos.call_args)
+
+ # Does not decrement 0 value
+ qos.value = 0
+ qos.decrement_eventually()
+ self.assertEqual(qos.value, 0)
+ qos.increment_eventually()
+ self.assertEqual(qos.value, 0)
+
+ def test_consumer_decrement_eventually(self):
+ mconsumer = Mock()
+ qos = QoS(mconsumer.qos, 10)
+ qos.decrement_eventually()
+ self.assertEqual(qos.value, 9)
+ qos.value = 0
+ qos.decrement_eventually()
+ self.assertEqual(qos.value, 0)
+
+ def test_set(self):
+ mconsumer = Mock()
+ qos = QoS(mconsumer.qos, 10)
+ qos.set(12)
+ self.assertEqual(qos.prev, 12)
+ qos.set(qos.prev)
diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py
index 9bdf6343..4af9eb37 100644
--- a/kombu/tests/test_connection.py
+++ b/kombu/tests/test_connection.py
@@ -1,7 +1,11 @@
from __future__ import absolute_import
+import errno
import pickle
+import socket
+from copy import copy
+from mock import patch
from nose import SkipTest
from kombu import Connection, Consumer, Producer, parse_url
@@ -10,6 +14,7 @@ from kombu.five import items
from .mocks import Transport
from .utils import TestCase
+
from .utils import Mock, skip_if_not_module
@@ -154,6 +159,157 @@ class test_Connection(TestCase):
self.assertFalse(_connection.connected)
self.assertIsInstance(conn.transport, Transport)
+ def test_multiple_urls(self):
+ conn1 = Connection('amqp://foo;amqp://bar')
+ self.assertEqual(conn1.hostname, 'foo')
+ self.assertListEqual(conn1.alt, ['amqp://foo', 'amqp://bar'])
+
+ conn2 = Connection(['amqp://foo', 'amqp://bar'])
+ self.assertEqual(conn2.hostname, 'foo')
+ self.assertListEqual(conn2.alt, ['amqp://foo', 'amqp://bar'])
+
+ def test_uri_passthrough(self):
+ from kombu import connection as mod
+ prev, mod.URI_PASSTHROUGH = mod.URI_PASSTHROUGH, set(['foo'])
+ try:
+ with patch('kombu.connection.parse_url') as parse_url:
+ c = Connection('foo+mysql://some_host')
+ self.assertEqual(c.transport_cls, 'foo')
+ self.assertFalse(parse_url.called)
+ self.assertEqual(c.hostname, 'mysql://some_host')
+ self.assertTrue(c.as_uri().startswith('foo+'))
+ with patch('kombu.connection.parse_url') as parse_url:
+ c = Connection('mysql://some_host', transport='foo')
+ self.assertEqual(c.transport_cls, 'foo')
+ self.assertFalse(parse_url.called)
+ self.assertEqual(c.hostname, 'mysql://some_host')
+ finally:
+ mod.URI_PASSTHROUGH = prev
+ c = Connection('amqp+sqlite://some_host')
+ self.assertTrue(c.as_uri().startswith('amqp+'))
+
+ def test_default_ensure_callback(self):
+ with patch('kombu.connection.logger') as logger:
+ c = Connection(transport=Mock)
+ c._default_ensure_callback(KeyError(), 3)
+ self.assertTrue(logger.error.called)
+
+ def test_ensure_connection_on_error(self):
+ c = Connection('amqp://A;amqp://B')
+ with patch('kombu.connection.retry_over_time') as rot:
+ c.ensure_connection()
+ self.assertTrue(rot.called)
+
+ args = rot.call_args[0]
+ cb = args[4]
+ intervals = iter([1, 2, 3, 4, 5])
+ self.assertEqual(cb(KeyError(), intervals, 0), 0)
+ self.assertEqual(cb(KeyError(), intervals, 1), 1)
+ self.assertEqual(cb(KeyError(), intervals, 2), 0)
+ self.assertEqual(cb(KeyError(), intervals, 3), 2)
+ self.assertEqual(cb(KeyError(), intervals, 4), 0)
+ self.assertEqual(cb(KeyError(), intervals, 5), 3)
+ self.assertEqual(cb(KeyError(), intervals, 6), 0)
+ self.assertEqual(cb(KeyError(), intervals, 7), 4)
+
+ errback = Mock()
+ c.ensure_connection(errback=errback)
+ args = rot.call_args[0]
+ cb = args[4]
+ self.assertEqual(cb(KeyError(), intervals, 0), 0)
+ self.assertTrue(errback.called)
+
+ def test_drain_nowait(self):
+ c = Connection(transport=Mock)
+ c.drain_events = Mock()
+ c.drain_events.side_effect = socket.timeout()
+
+ c.more_to_read = True
+ self.assertFalse(c.drain_nowait())
+ self.assertFalse(c.more_to_read)
+
+ c.drain_events.side_effect = socket.error()
+ c.drain_events.side_effect.errno = errno.EAGAIN
+ c.more_to_read = True
+ self.assertFalse(c.drain_nowait())
+ self.assertFalse(c.more_to_read)
+
+ c.drain_events.side_effect = socket.error()
+ c.drain_events.side_effect.errno = errno.EPERM
+ with self.assertRaises(socket.error):
+ c.drain_nowait()
+
+ c.more_to_read = False
+ c.drain_events = Mock()
+ self.assertTrue(c.drain_nowait())
+ c.drain_events.assert_called_with(timeout=0)
+ self.assertTrue(c.more_to_read)
+
+ def test_supports_heartbeats(self):
+ c = Connection(transport=Mock)
+ c.transport.supports_heartbeats = False
+ self.assertFalse(c.supports_heartbeats)
+
+ def test_is_evented(self):
+ c = Connection(transport=Mock)
+ c.transport.supports_ev = False
+ self.assertFalse(c.is_evented)
+
+ def test_eventmap(self):
+ c = Connection(transport=Mock)
+ c.transport.eventmap.return_value = {1: 1, 2: 2}
+ self.assertDictEqual(c.eventmap, {1: 1, 2: 2})
+ c.transport.eventmap.assert_called_with(c.connection)
+
+ def test_manager(self):
+ c = Connection(transport=Mock)
+ self.assertIs(c.manager, c.transport.manager)
+
+ def test_copy(self):
+ c = Connection('amqp://example.com')
+ self.assertEqual(copy(c).info(), c.info())
+
+ def test_switch(self):
+ c = Connection('amqp://foo')
+ c._closed = True
+ c.switch('redis://example.com//3')
+ self.assertFalse(c._closed)
+ self.assertEqual(c.hostname, 'example.com')
+ self.assertEqual(c.transport_cls, 'redis')
+ self.assertEqual(c.virtual_host, '/3')
+
+ def test_maybe_switch_next(self):
+ c = Connection('amqp://foo;redis://example.com//3')
+ c.maybe_switch_next()
+ self.assertFalse(c._closed)
+ self.assertEqual(c.hostname, 'example.com')
+ self.assertEqual(c.transport_cls, 'redis')
+ self.assertEqual(c.virtual_host, '/3')
+
+ def test_maybe_switch_next_no_cycle(self):
+ c = Connection('amqp://foo')
+ c.maybe_switch_next()
+ self.assertFalse(c._closed)
+ self.assertEqual(c.hostname, 'foo')
+ self.assertIn(c.transport_cls, ('librabbitmq', 'pyamqp'))
+
+ def test_heartbeat_check(self):
+ c = Connection(transport=Transport)
+ c.transport.heartbeat_check = Mock()
+ c.heartbeat_check(3)
+ c.transport.heartbeat_check.assert_called_with(c.connection, rate=3)
+
+ def test_completes_cycle_no_cycle(self):
+ c = Connection('amqp://')
+ self.assertTrue(c.completes_cycle(0))
+ self.assertTrue(c.completes_cycle(1))
+
+ def test_completes_cycle(self):
+ c = Connection('amqp://a;amqp://b;amqp://c')
+ self.assertFalse(c.completes_cycle(0))
+ self.assertFalse(c.completes_cycle(1))
+ self.assertTrue(c.completes_cycle(2))
+
def test__enter____exit__(self):
conn = self.conn
context = conn.__enter__()
@@ -357,6 +513,18 @@ class ResourceCase(TestCase):
[chan.release() for chan in chans]
self.assertState(P, 10, 0)
+ def test_acquire_prepare_raises(self):
+ if self.abstract:
+ return
+ P = self.create_resource(10, 0)
+
+ self.assertEqual(len(P._resource.queue), 10)
+ P.prepare = Mock()
+ P.prepare.side_effect = IOError()
+ with self.assertRaises(IOError):
+ P.acquire(block=True)
+ self.assertEqual(len(P._resource.queue), 10)
+
def test_acquire_no_limit(self):
if self.abstract:
return
@@ -440,6 +608,12 @@ class test_ConnectionPool(ResourceCase):
self.assertIsNotNone(q[1]._connection)
self.assertIsNone(q[2]()._connection)
+ def test_release_no__debug(self):
+ P = self.create_resource(10, 2)
+ R = Mock()
+ R._debug.side_effect = AttributeError()
+ P.release_resource(R)
+
def test_setup_no_limit(self):
P = self.create_resource(None, None)
self.assertFalse(P._resource.queue)
diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py
index cdd9c34a..fadaf358 100644
--- a/kombu/tests/test_entities.py
+++ b/kombu/tests/test_entities.py
@@ -1,7 +1,10 @@
from __future__ import absolute_import
-from kombu import Connection
-from kombu.entity import Exchange, Queue
+import pickle
+
+from mock import call
+
+from kombu import Connection, Exchange, Queue, binding
from kombu.exceptions import NotBoundError
from .mocks import Transport
@@ -13,6 +16,48 @@ def get_conn():
return Connection(transport=Transport)
+class test_binding(TestCase):
+
+ def test_constructor(self):
+ x = binding(Exchange('foo'), 'rkey',
+ arguments={'barg': 'bval'},
+ unbind_arguments={'uarg': 'uval'},
+ )
+ self.assertEqual(x.exchange, Exchange('foo'))
+ self.assertEqual(x.routing_key, 'rkey')
+ self.assertDictEqual(x.arguments, {'barg': 'bval'})
+ self.assertDictEqual(x.unbind_arguments, {'uarg': 'uval'})
+
+ def test_declare(self):
+ chan = get_conn().channel()
+ x = binding(Exchange('foo'), 'rkey')
+ x.declare(chan)
+ self.assertIn('exchange_declare', chan)
+
+ def test_declare_no_exchange(self):
+ chan = get_conn().channel()
+ x = binding()
+ x.declare(chan)
+ self.assertNotIn('exchange_declare', chan)
+
+ def test_bind(self):
+ chan = get_conn().channel()
+ x = binding(Exchange('foo'))
+ x.bind(Exchange('bar')(chan))
+ self.assertIn('exchange_bind', chan)
+
+ def test_unbind(self):
+ chan = get_conn().channel()
+ x = binding(Exchange('foo'))
+ x.unbind(Exchange('bar')(chan))
+ self.assertIn('exchange_unbind', chan)
+
+ def test_repr(self):
+ b = binding(Exchange('foo'), 'rkey')
+ self.assertIn('foo', repr(b))
+ self.assertIn('rkey', repr(b))
+
+
class test_Exchange(TestCase):
def test_bound(self):
@@ -24,7 +69,8 @@ class test_Exchange(TestCase):
bound = exchange.bind(chan)
self.assertTrue(bound.is_bound)
self.assertIs(bound.channel, chan)
- self.assertIn('<bound', repr(bound))
+ self.assertIn('bound to chan:%r' % (chan.channel_id, ),
+ repr(bound))
def test_hash(self):
self.assertEqual(hash(Exchange('a')), hash(Exchange('a')))
@@ -34,6 +80,11 @@ class test_Exchange(TestCase):
self.assertTrue(Exchange('a', durable=True).can_cache_declaration)
self.assertFalse(Exchange('a', durable=False).can_cache_declaration)
+ def test_pickle(self):
+ e1 = Exchange('foo', 'direct')
+ e2 = pickle.loads(pickle.dumps(e1))
+ self.assertEqual(e1, e2)
+
def test_eq(self):
e1 = Exchange('foo', 'direct')
e2 = Exchange('foo', 'direct')
@@ -111,6 +162,12 @@ class test_Exchange(TestCase):
foo(chan).bind_to(bar)
self.assertIn('exchange_bind', chan)
+ def test_bind_to_by_name(self):
+ chan = get_conn().channel()
+ foo = Exchange('foo', 'topic')
+ foo(chan).bind_to('bar')
+ self.assertIn('exchange_bind', chan)
+
def test_unbind_from(self):
chan = get_conn().channel()
foo = Exchange('foo', 'topic')
@@ -118,6 +175,12 @@ class test_Exchange(TestCase):
foo(chan).unbind_from(bar)
self.assertIn('exchange_unbind', chan)
+ def test_unbind_from_by_name(self):
+ chan = get_conn().channel()
+ foo = Exchange('foo', 'topic')
+ foo(chan).unbind_from('bar')
+ self.assertIn('exchange_unbind', chan)
+
class test_Queue(TestCase):
@@ -128,6 +191,14 @@ class test_Queue(TestCase):
self.assertEqual(hash(Queue('a')), hash(Queue('a')))
self.assertNotEqual(hash(Queue('a')), hash(Queue('b')))
+ def test_anonymous(self):
+ chan = Mock()
+ x = Queue(bindings=[binding(Exchange('foo'), 'rkey')])
+ chan.queue_declare.return_value = 'generated', 0, 0
+ xx = x(chan)
+ xx.declare()
+ self.assertEqual(xx.name, 'generated')
+
def test_when_bound_but_no_exchange(self):
q = Queue('a')
q.exchange = None
@@ -141,7 +212,37 @@ class test_Queue(TestCase):
q.declare()
q.queue_declare.assert_called_with(False, passive=False)
- q.queue_bind.assert_called_with(False)
+
+ def test_bind_to_when_name(self):
+ chan = Mock()
+ q = Queue('a')
+ q(chan).bind_to('ex')
+ self.assertTrue(chan.queue_bind.called)
+
+ def test_get_when_no_m2p(self):
+ chan = Mock()
+ q = Queue('a')(chan)
+ chan.message_to_python = None
+ self.assertTrue(q.get())
+
+ def test_multiple_bindings(self):
+ chan = Mock()
+ q = Queue('mul', [
+ binding(Exchange('mul1'), 'rkey1'),
+ binding(Exchange('mul2'), 'rkey2'),
+ binding(Exchange('mul3'), 'rkey3'),
+ ])
+ q(chan).declare()
+ self.assertIn(
+ call(nowait=False,
+ exchange='mul1',
+ auto_delete=False,
+ passive=False,
+ arguments=None,
+ type='direct',
+ durable=True,
+ ), chan.exchange_declare.call_args_list,
+ )
def test_can_cache_declaration(self):
self.assertTrue(Queue('a', durable=True).can_cache_declaration)
diff --git a/kombu/tests/test_messaging.py b/kombu/tests/test_messaging.py
index 4c5fe619..249af87f 100644
--- a/kombu/tests/test_messaging.py
+++ b/kombu/tests/test_messaging.py
@@ -1,13 +1,14 @@
from __future__ import absolute_import, unicode_literals
import anyjson
+import pickle
+from collections import defaultdict
from mock import patch
-from kombu import Connection
+from kombu import Connection, Consumer, Producer, Exchange, Queue
from kombu.exceptions import MessageStateError
-from kombu.messaging import Consumer, Producer
-from kombu.entity import Exchange, Queue
+from kombu.utils import ChannelPromise
from .mocks import Transport
from .utils import TestCase
@@ -23,6 +24,16 @@ class test_Producer(TestCase):
self.assertTrue(self.connection.connection.connected)
self.assertFalse(self.exchange.is_bound)
+ def test_pickle(self):
+ chan = Mock()
+ producer = Producer(chan, serializer='pickle')
+ p2 = pickle.loads(pickle.dumps(producer))
+ self.assertEqual(p2.serializer, producer.serializer)
+
+ def test_no_channel(self):
+ p = Producer(None)
+ self.assertFalse(p._channel)
+
@patch('kombu.common.maybe_declare')
def test_maybe_declare(self, maybe_declare):
p = self.connection.Producer()
@@ -109,11 +120,25 @@ class test_Producer(TestCase):
def test_publish_with_Exchange_instance(self):
p = self.connection.Producer()
p.channel = Mock()
- p.publish('hello', exchange=Exchange('foo'))
+ p.publish('hello', exchange=Exchange('foo'), delivery_mode='transient')
self.assertEqual(
p._channel.basic_publish.call_args[1]['exchange'], 'foo',
)
+ def test_set_on_return(self):
+ chan = Mock()
+ chan.events = defaultdict(Mock)
+ p = Producer(ChannelPromise(lambda: chan), on_return='on_return')
+ p.channel
+ chan.events['basic_return'].add.assert_called_with('on_return')
+
+ def test_publish_retry_calls_ensure(self):
+ p = Producer(Mock())
+ p._connection = Mock()
+ ensure = p.connection.ensure = Mock()
+ p.publish('foo', exchange='foo', retry=True)
+ self.assertTrue(ensure.called)
+
def test_publish_retry_with_declare(self):
p = self.connection.Producer()
p.maybe_declare = Mock()
@@ -195,6 +220,12 @@ class test_Consumer(TestCase):
self.assertTrue(self.connection.connection.connected)
self.exchange = Exchange('foo', 'direct')
+ def test_set_no_channel(self):
+ c = Consumer(None)
+ self.assertIsNone(c.channel)
+ c.revive(Mock())
+ self.assertTrue(c.channel)
+
def test_set_no_ack(self):
channel = self.connection.channel()
queue = Queue('qname', self.exchange, 'rkey')
diff --git a/kombu/tests/test_pools.py b/kombu/tests/test_pools.py
index 32d1294f..a9386602 100644
--- a/kombu/tests/test_pools.py
+++ b/kombu/tests/test_pools.py
@@ -1,9 +1,8 @@
from __future__ import absolute_import
-from kombu import Connection
+from kombu import Connection, Producer
from kombu import pools
from kombu.connection import ConnectionPool
-from kombu.messaging import Producer
from kombu.utils import eqhash
from .utils import TestCase
@@ -26,6 +25,34 @@ class test_ProducerPool(TestCase):
self.connections = Mock()
self.pool = self.Pool(self.connections, limit=10)
+ def test_releases_connection_when_Producer_raises(self):
+ self.pool.Producer = Mock()
+ self.pool.Producer.side_effect = IOError()
+ acq = self.pool._acquire_connection = Mock()
+ conn = acq.return_value = Mock()
+ with self.assertRaises(IOError):
+ self.pool.create_producer()
+ conn.release.assert_called_with()
+
+ def test_prepare_release_connection_on_error(self):
+ pp = Mock()
+ p = pp.return_value = Mock()
+ p.revive.side_effect = IOError()
+ acq = self.pool._acquire_connection = Mock()
+ conn = acq.return_value = Mock()
+ p._channel = None
+ with self.assertRaises(IOError):
+ self.pool.prepare(pp)
+ conn.release.assert_called_with()
+
+ def test_release_releases_connection(self):
+ p = Mock()
+ p.__connection__ = Mock()
+ self.pool.release(p)
+ p.__connection__.release.assert_called_with()
+ p.__connection__ = None
+ self.pool.release(p)
+
def test_init(self):
self.assertIs(self.pool.connections, self.connections)
@@ -57,7 +84,7 @@ class test_ProducerPool(TestCase):
def test_prepare(self):
connection = self.connections.acquire.return_value = Mock()
pool = self.MyPool(self.connections, limit=10)
- pool.instance.channel = None
+ pool.instance._channel = None
first = pool._resource.get_nowait()
producer = pool.prepare(first)
self.assertTrue(self.connections.acquire.called)
@@ -66,7 +93,7 @@ class test_ProducerPool(TestCase):
def test_prepare_channel_already_created(self):
self.connections.acquire.return_value = Mock()
pool = self.MyPool(self.connections, limit=10)
- pool.instance.channel = Mock()
+ pool.instance._channel = Mock()
first = pool._resource.get_nowait()
self.connections.acquire.reset()
producer = pool.prepare(first)
diff --git a/kombu/tests/test_utils.py b/kombu/tests/test_utils.py
index 3633f8b1..38b0f581 100644
--- a/kombu/tests/test_utils.py
+++ b/kombu/tests/test_utils.py
@@ -5,12 +5,15 @@ import sys
from functools import wraps
from io import BytesIO, StringIO
+from mock import Mock, patch
from kombu import utils
from kombu.five import string_t
-from .utils import redirect_stdouts, mask_modules, skip_if_module
-from .utils import TestCase
+from .utils import (
+ TestCase,
+ redirect_stdouts, mask_modules, module_exists, skip_if_module,
+)
class OldString(object):
@@ -28,6 +31,13 @@ class OldString(object):
return self.value.rsplit(*args, **kwargs)
+class test_kombu_module(TestCase):
+
+ def test_dir(self):
+ import kombu
+ self.assertTrue(dir(kombu))
+
+
class test_utils(TestCase):
def test_maybe_list(self):
@@ -174,10 +184,21 @@ class test_retry_over_time(TestCase):
@insomnia
def test_simple(self):
- x = utils.retry_over_time(self.myfun, self.Predicate,
- errback=self.errback, interval_max=14)
- self.assertEqual(x, 42)
- self.assertEqual(self.index, 9)
+ prev_count, utils.count = utils.count, Mock()
+ try:
+ utils.count.return_value = range(1)
+ x = utils.retry_over_time(self.myfun, self.Predicate,
+ errback=None, interval_max=14)
+ self.assertIsNone(x)
+ utils.count.return_value = range(10)
+ cb = Mock()
+ x = utils.retry_over_time(self.myfun, self.Predicate,
+ errback=self.errback, callback=cb, interval_max=14)
+ self.assertEqual(x, 42)
+ self.assertEqual(self.index, 9)
+ cb.assert_called_with()
+ finally:
+ utils.count = prev_count
@insomnia
def test_retry_once(self):
@@ -200,6 +221,26 @@ class test_retry_over_time(TestCase):
class test_cached_property(TestCase):
+ def test_deleting(self):
+
+ class X(object):
+ xx = False
+
+ @utils.cached_property
+ def foo(self):
+ return 42
+
+ @foo.deleter # noqa
+ def foo(self, value):
+ self.xx = value
+
+ x = X()
+ del(x.foo)
+ self.assertFalse(x.xx)
+ x.__dict__['foo'] = 'here'
+ del(x.foo)
+ self.assertEqual(x.xx, 'here')
+
def test_when_access_from_class(self):
class X(object):
@@ -226,3 +267,75 @@ class test_cached_property(TestCase):
self.assertEqual(x.xx, 10)
del(x.foo)
+
+
+class test_symbol_by_name(TestCase):
+
+ def test_instance_returns_instance(self):
+ instance = object()
+ self.assertIs(utils.symbol_by_name(instance), instance)
+
+ def test_returns_default(self):
+ default = object()
+ self.assertIs(utils.symbol_by_name('xyz.ryx.qedoa.weq:foz',
+ default=default), default)
+
+ def test_no_default(self):
+ with self.assertRaises(ImportError):
+ utils.symbol_by_name('xyz.ryx.qedoa.weq:foz')
+
+ def test_imp_reraises_ValueError(self):
+ imp = Mock()
+ imp.side_effect = ValueError()
+ with self.assertRaises(ValueError):
+ utils.symbol_by_name('kombu.Connection', imp=imp)
+
+ def test_package(self):
+ from kombu.entity import Exchange
+ self.assertIs(utils.symbol_by_name('.entity:Exchange',
+ package='kombu'), Exchange)
+ self.assertTrue(utils.symbol_by_name(':Consumer', package='kombu'))
+
+
+class test_ChannelPromise(TestCase):
+
+ def test_repr(self):
+ self.assertEqual(repr(utils.ChannelPromise(lambda: 'foo')),
+ "<promise: 'foo'>")
+
+
+class test_entrypoints(TestCase):
+
+ @mask_modules('pkg_resources')
+ def test_without_pkg_resources(self):
+ self.assertListEqual(list(utils.entrypoints('kombu.test')), [])
+
+ @module_exists('pkg_resources')
+ def test_with_pkg_resources(self):
+ with patch('pkg_resources.iter_entry_points', create=True) as iterep:
+ eps = iterep.return_value = [Mock(), Mock()]
+
+ self.assertTrue(list(utils.entrypoints('kombu.test')))
+ iterep.assert_called_with('kombu.test')
+ eps[0].load.assert_called_with()
+ eps[1].load.assert_called_with()
+
+
+class test_shufflecycle(TestCase):
+
+ def test_shuffles(self):
+ prev_repeat, utils.repeat = utils.repeat, Mock()
+ try:
+ utils.repeat.return_value = range(10)
+ values = set(['A', 'B', 'C'])
+ cycle = utils.shufflecycle(values)
+ seen = set()
+ for i in xrange(10):
+ cycle.next()
+ utils.repeat.assert_called_with(None)
+ self.assertTrue(seen.issubset(values))
+ with self.assertRaises(StopIteration):
+ cycle.next()
+ cycle.next()
+ finally:
+ utils.repeat = prev_repeat
diff --git a/kombu/tests/transport/test_amqplib.py b/kombu/tests/transport/test_amqplib.py
index 6ce95cc9..fb6c0141 100644
--- a/kombu/tests/transport/test_amqplib.py
+++ b/kombu/tests/transport/test_amqplib.py
@@ -4,7 +4,7 @@ import sys
from nose import SkipTest
-from kombu.connection import Connection
+from kombu import Connection
from kombu.tests.utils import TestCase
from kombu.tests.utils import mask_modules, Mock
diff --git a/kombu/tests/transport/test_filesystem.py b/kombu/tests/transport/test_filesystem.py
index 6a8d64d1..7bdad8f5 100644
--- a/kombu/tests/transport/test_filesystem.py
+++ b/kombu/tests/transport/test_filesystem.py
@@ -4,9 +4,7 @@ import tempfile
from nose import SkipTest
-from kombu.connection import Connection
-from kombu.entity import Exchange, Queue
-from kombu.messaging import Consumer, Producer
+from kombu import Connection, Exchange, Queue, Consumer, Producer
from kombu.tests.utils import TestCase
diff --git a/kombu/tests/transport/test_memory.py b/kombu/tests/transport/test_memory.py
index 95e3432e..970681d7 100644
--- a/kombu/tests/transport/test_memory.py
+++ b/kombu/tests/transport/test_memory.py
@@ -2,9 +2,7 @@ from __future__ import absolute_import
import socket
-from kombu.connection import Connection
-from kombu.entity import Exchange, Queue
-from kombu.messaging import Consumer, Producer
+from kombu import Connection, Exchange, Queue, Consumer, Producer
from kombu.tests.utils import TestCase
diff --git a/kombu/tests/transport/test_mongodb.py b/kombu/tests/transport/test_mongodb.py
index b05e7d9f..522df04a 100644
--- a/kombu/tests/transport/test_mongodb.py
+++ b/kombu/tests/transport/test_mongodb.py
@@ -2,7 +2,7 @@ from __future__ import absolute_import
from nose import SkipTest
-from kombu.connection import Connection
+from kombu import Connection
from kombu.tests.utils import TestCase, skip_if_not_module
diff --git a/kombu/tests/transport/test_pyamqp.py b/kombu/tests/transport/test_pyamqp.py
index b48ec153..7936484f 100644
--- a/kombu/tests/transport/test_pyamqp.py
+++ b/kombu/tests/transport/test_pyamqp.py
@@ -2,6 +2,7 @@ from __future__ import absolute_import
import sys
+from mock import patch
from nose import SkipTest
try:
@@ -10,7 +11,7 @@ except ImportError:
pyamqp = None # noqa
else:
from kombu.transport import pyamqp
-from kombu.connection import Connection
+from kombu import Connection
from kombu.tests.utils import TestCase
from kombu.tests.utils import mask_modules, Mock
@@ -160,3 +161,26 @@ class test_pyamqp(TestCase):
c = Connection(port=1337, transport=Transport).connect()
self.assertEqual(c['host'], '127.0.0.1:1337')
+
+ def test_eventmap(self):
+ t = pyamqp.Transport(Mock())
+ conn = Mock()
+ self.assertDictEqual(t.eventmap(conn),
+ {conn.sock: t.client.drain_nowait})
+
+ def test_event_interface(self):
+ t = pyamqp.Transport(Mock())
+ t.on_poll_init(Mock())
+ t.on_poll_start()
+
+ def test_heartbeat_check(self):
+ t = pyamqp.Transport(Mock())
+ conn = Mock()
+ t.heartbeat_check(conn, rate=4.331)
+ conn.heartbeat_tick.assert_called_with(rate=4.331)
+
+ def test_get_manager(self):
+ with patch('kombu.transport.pyamqp.get_manager') as get_manager:
+ t = pyamqp.Transport(Mock())
+ t.get_manager(1, kw=2)
+ get_manager.assert_called_with(t.client, 1, kw=2)
diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py
index 403e815e..cf166e87 100644
--- a/kombu/tests/transport/test_redis.py
+++ b/kombu/tests/transport/test_redis.py
@@ -7,11 +7,9 @@ from anyjson import dumps
from collections import defaultdict
from itertools import count
-from kombu.connection import Connection
-from kombu.entity import Exchange, Queue
+from kombu import Connection, Exchange, Queue, Consumer, Producer
from kombu.exceptions import InconsistencyError, VersionMismatch
from kombu.five import Empty, Queue as _Queue
-from kombu.messaging import Consumer, Producer
from kombu.utils import eventio # patch poll
from kombu.tests.utils import TestCase
@@ -42,9 +40,7 @@ class Client(object):
hashes = defaultdict(dict)
shard_hint = None
- def __init__(self, db=None, port=None, **kwargs):
- self.port = port
- self.db = db
+ def __init__(self, db=None, port=None, connection_pool=None, **kwargs):
self._called = []
self._connection = None
self.bgsave_raises_ResponseError = False
@@ -384,10 +380,9 @@ class test_Channel(TestCase):
c.connection.disconnect.assert_called_with()
def test_invalid_database_raises_ValueError(self):
- self.channel.connection.client.virtual_host = 'xfeqwewkfk'
with self.assertRaises(ValueError):
- self.channel._create_client()
+ Connection('redis:///dwqwewqe').channel()
@skip_if_not_module('redis')
def test_get_client(self):
@@ -395,7 +390,7 @@ class test_Channel(TestCase):
KombuRedis = redis.Channel._get_client(self.channel)
self.assertTrue(KombuRedis)
- Rv = getattr(R, 'VERSION')
+ Rv = getattr(R, 'VERSION', None)
try:
R.VERSION = (2, 4, 0)
with self.assertRaises(VersionMismatch):
@@ -528,29 +523,23 @@ class test_Redis(TestCase):
channel.close()
def test_db_values(self):
- c1 = Connection(virtual_host=1,
- transport=Transport).channel()
- self.assertEqual(c1.client.db, 1)
+ Connection(virtual_host=1,
+ transport=Transport).channel()
- c2 = Connection(virtual_host='1',
- transport=Transport).channel()
- self.assertEqual(c2.client.db, 1)
+ Connection(virtual_host='1',
+ transport=Transport).channel()
- c3 = Connection(virtual_host='/1',
- transport=Transport).channel()
- self.assertEqual(c3.client.db, 1)
+ Connection(virtual_host='/1',
+ transport=Transport).channel()
with self.assertRaises(Exception):
- Connection(virtual_host='/foo',
- transport=Transport).channel()
+ Connection('redis:///foo').channel()
def test_db_port(self):
c1 = Connection(port=None, transport=Transport).channel()
- self.assertEqual(c1.client.port, Transport.default_port)
c1.close()
c2 = Connection(port=9999, transport=Transport).channel()
- self.assertEqual(c2.client.port, 9999)
c2.close()
def test_close_poller_not_active(self):
diff --git a/kombu/tests/transport/test_sqlalchemy.py b/kombu/tests/transport/test_sqlalchemy.py
index 82b51ebc..f3d7d239 100644
--- a/kombu/tests/transport/test_sqlalchemy.py
+++ b/kombu/tests/transport/test_sqlalchemy.py
@@ -3,7 +3,7 @@ from __future__ import absolute_import
from mock import patch
from nose import SkipTest
-from kombu.connection import Connection
+from kombu import Connection
from kombu.tests.utils import TestCase
diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py
index eb7eaf6d..232be80b 100644
--- a/kombu/tests/transport/virtual/test_base.py
+++ b/kombu/tests/transport/virtual/test_base.py
@@ -91,6 +91,9 @@ class test_QoS(TestCase):
self.assertTrue(stderr.getvalue())
self.assertFalse(stdout.getvalue())
+ self.q.restore_at_shutdown = False
+ self.q.restore_unacked_once()
+
def test_get(self):
self.q._delivered['foo'] = 1
self.assertEqual(self.q.get('foo'), 1)
@@ -176,10 +179,34 @@ class test_Channel(TestCase):
if self.channel._qos is not None:
self.channel._qos._on_collect.cancel()
+ def test_exchange_bind_interface(self):
+ with self.assertRaises(NotImplementedError):
+ self.channel.exchange_bind('dest', 'src', 'key')
+
+ def test_exchange_unbind_interface(self):
+ with self.assertRaises(NotImplementedError):
+ self.channel.exchange_unbind('dest', 'src', 'key')
+
+ def test_queue_unbind_interface(self):
+ with self.assertRaises(NotImplementedError):
+ self.channel.queue_unbind('dest', 'ex', 'key')
+
+ def test_management(self):
+ m = self.channel.connection.client.get_manager()
+ self.assertTrue(m)
+ m.get_bindings()
+ m.close()
+
def test_exchange_declare(self):
c = self.channel
+
+ with self.assertRaises(StdChannelError):
+ c.exchange_declare('test_exchange_declare', 'direct',
+ durable=True, auto_delete=True, passive=True)
c.exchange_declare('test_exchange_declare', 'direct',
durable=True, auto_delete=True)
+ c.exchange_declare('test_exchange_declare', 'direct',
+ durable=True, auto_delete=True, passive=True)
self.assertIn('test_exchange_declare', c.state.exchanges)
# can declare again with same values
c.exchange_declare('test_exchange_declare', 'direct',
@@ -348,7 +375,7 @@ class test_Channel(TestCase):
exc = None
try:
raise KeyError()
- except KeyError as exc_:
+ except KeyError, exc_:
exc = exc_
ru.return_value = [(exc, 1)]
diff --git a/kombu/tests/utilities/test_amq_manager.py b/kombu/tests/utilities/test_amq_manager.py
new file mode 100644
index 00000000..8f85a2c3
--- /dev/null
+++ b/kombu/tests/utilities/test_amq_manager.py
@@ -0,0 +1,35 @@
+from __future__ import absolute_import
+
+from mock import patch
+
+from kombu import Connection
+from kombu.tests.utils import TestCase, mask_modules, module_exists
+
+
+class test_get_manager(TestCase):
+
+ @mask_modules('pyrabbit')
+ def test_without_pyrabbit(self):
+ with self.assertRaises(ImportError):
+ Connection('amqp://').get_manager()
+
+ @module_exists('pyrabbit')
+ def test_with_pyrabbit(self):
+ with patch('pyrabbit.Client', create=True) as Client:
+ manager = Connection('amqp://').get_manager()
+ self.assertIsNotNone(manager)
+ Client.assert_called_with('localhost:55672',
+ 'guest', 'guest')
+
+ @module_exists('pyrabbit')
+ def test_transport_options(self):
+ with patch('pyrabbit.Client', create=True) as Client:
+ manager = Connection('amqp://', transport_options={
+ 'manager_hostname': 'admin.mq.vandelay.com',
+ 'manager_port': 808,
+ 'manager_userid': 'george',
+ 'manager_password': 'bosco',
+ }).get_manager()
+ self.assertIsNotNone(manager)
+ Client.assert_called_with('admin.mq.vandelay.com:808',
+ 'george', 'bosco')
diff --git a/kombu/tests/utilities/test_debug.py b/kombu/tests/utilities/test_debug.py
new file mode 100644
index 00000000..d364400f
--- /dev/null
+++ b/kombu/tests/utilities/test_debug.py
@@ -0,0 +1,58 @@
+from __future__ import absolute_import
+
+import logging
+
+from mock import Mock, patch
+
+from kombu.utils.debug import (
+ setup_logging,
+ Logwrapped,
+)
+from kombu.tests.utils import TestCase
+
+
+class test_setup_logging(TestCase):
+
+ def test_adds_handlers_sets_level(self):
+ with patch('kombu.utils.debug.get_logger') as get_logger:
+ logger = get_logger.return_value = Mock()
+ setup_logging(loggers=['kombu.test'])
+
+ get_logger.assert_called_with('kombu.test')
+
+ self.assertTrue(logger.addHandler.called)
+ logger.setLevel.assert_called_with(logging.DEBUG)
+
+
+class test_Logwrapped(TestCase):
+
+ def test_wraps(self):
+ with patch('kombu.utils.debug.get_logger') as get_logger:
+ logger = get_logger.return_value = Mock()
+
+ W = Logwrapped(Mock(), 'kombu.test')
+ get_logger.assert_called_with('kombu.test')
+ self.assertIsNotNone(W.instance)
+ self.assertIs(W.logger, logger)
+
+ W.instance.__repr__ = lambda s: 'foo'
+ self.assertEqual(repr(W), 'foo')
+ self.assertListEqual(dir(W), dir(W.instance))
+
+ W.instance.some_attr = 303
+ self.assertEqual(W.some_attr, 303)
+
+ W.instance.some_method.__name__ = 'some_method'
+ W.some_method(1, 2, kw=1)
+ W.instance.some_method.assert_called_with(1, 2, kw=1)
+
+ W.some_method()
+ W.instance.some_method.assert_called_with()
+
+ W.some_method(kw=1)
+ W.instance.some_method.assert_called_with(kw=1)
+
+ W.ident = 'ident'
+ W.some_method(kw=1)
+ self.assertTrue(logger.debug.called)
+ self.assertIn('ident', logger.debug.call_args[0][0])