summaryrefslogtreecommitdiff
path: root/t/unit/transport/virtual/test_exchange.py
diff options
context:
space:
mode:
Diffstat (limited to 't/unit/transport/virtual/test_exchange.py')
-rw-r--r--t/unit/transport/virtual/test_exchange.py169
1 files changed, 169 insertions, 0 deletions
diff --git a/t/unit/transport/virtual/test_exchange.py b/t/unit/transport/virtual/test_exchange.py
new file mode 100644
index 00000000..93d48228
--- /dev/null
+++ b/t/unit/transport/virtual/test_exchange.py
@@ -0,0 +1,169 @@
+from __future__ import absolute_import, unicode_literals
+
+import pytest
+
+from case import Mock
+
+from kombu import Connection
+from kombu.transport.virtual import exchange
+
+from t.mocks import Transport
+
+
+class ExchangeCase:
+ type = None
+
+ def setup(self):
+ if self.type:
+ self.e = self.type(Connection(transport=Transport).channel())
+
+
+class test_Direct(ExchangeCase):
+ type = exchange.DirectExchange
+ table = [('rFoo', None, 'qFoo'),
+ ('rFoo', None, 'qFox'),
+ ('rBar', None, 'qBar'),
+ ('rBaz', None, 'qBaz')]
+
+ @pytest.mark.parametrize('exchange,routing_key,default,expected', [
+ ('eFoo', 'rFoo', None, {'qFoo', 'qFox'}),
+ ('eMoz', 'rMoz', 'DEFAULT', set()),
+ ('eBar', 'rBar', None, {'qBar'}),
+ ])
+ def test_lookup(self, exchange, routing_key, default, expected):
+ assert self.e.lookup(
+ self.table, exchange, routing_key, default) == expected
+
+
+class test_Fanout(ExchangeCase):
+ type = exchange.FanoutExchange
+ table = [(None, None, 'qFoo'),
+ (None, None, 'qFox'),
+ (None, None, 'qBar')]
+
+ def test_lookup(self):
+ assert self.e.lookup(self.table, 'eFoo', 'rFoo', None) == {
+ 'qFoo', 'qFox', 'qBar',
+ }
+
+ def test_deliver_when_fanout_supported(self):
+ self.e.channel = Mock()
+ self.e.channel.supports_fanout = True
+ message = Mock()
+
+ self.e.deliver(message, 'exchange', 'rkey')
+ self.e.channel._put_fanout.assert_called_with(
+ 'exchange', message, 'rkey',
+ )
+
+ def test_deliver_when_fanout_unsupported(self):
+ self.e.channel = Mock()
+ self.e.channel.supports_fanout = False
+
+ self.e.deliver(Mock(), 'exchange', None)
+ self.e.channel._put_fanout.assert_not_called()
+
+
+class test_Topic(ExchangeCase):
+ type = exchange.TopicExchange
+ table = [
+ ('stock.#', None, 'rFoo'),
+ ('stock.us.*', None, 'rBar'),
+ ]
+
+ def setup(self):
+ ExchangeCase.setup(self)
+ self.table = [(rkey, self.e.key_to_pattern(rkey), queue)
+ for rkey, _, queue in self.table]
+
+ def test_prepare_bind(self):
+ x = self.e.prepare_bind('qFoo', 'eFoo', 'stock.#', {})
+ assert x == ('stock.#', r'^stock\..*?$', 'qFoo')
+
+ @pytest.mark.parametrize('exchange,routing_key,default,expected', [
+ ('eFoo', 'stock.us.nasdaq', None, {'rFoo', 'rBar'}),
+ ('eFoo', 'stock.europe.OSE', None, {'rFoo'}),
+ ('eFoo', 'stockxeuropexOSE', None, set()),
+ ('eFoo', 'candy.schleckpulver.snap_crackle', None, set()),
+ ])
+ def test_lookup(self, exchange, routing_key, default, expected):
+ assert self.e.lookup(
+ self.table, exchange, routing_key, default) == expected
+ assert self.e._compiled
+
+ def test_deliver(self):
+ self.e.channel = Mock()
+ self.e.channel._lookup.return_value = ('a', 'b')
+ message = Mock()
+ self.e.deliver(message, 'exchange', 'rkey')
+
+ assert self.e.channel._put.call_args_list == [
+ (('a', message), {}),
+ (('b', message), {}),
+ ]
+
+
+class test_TopicMultibind(ExchangeCase):
+ # Testing message delivery in case of multiple overlapping
+ # bindings for the same queue. As AMQP states, in case of
+ # overlapping bindings, a message must be delivered once to
+ # each matching queue.
+ type = exchange.TopicExchange
+ table = [
+ ('stock', None, 'rFoo'),
+ ('stock.#', None, 'rFoo'),
+ ('stock.us.*', None, 'rFoo'),
+ ('#', None, 'rFoo'),
+ ]
+
+ def setup(self):
+ ExchangeCase.setup(self)
+ self.table = [(rkey, self.e.key_to_pattern(rkey), queue)
+ for rkey, _, queue in self.table]
+
+ @pytest.mark.parametrize('exchange,routing_key,default,expected', [
+ ('eFoo', 'stock.us.nasdaq', None, {'rFoo'}),
+ ('eFoo', 'stock.europe.OSE', None, {'rFoo'}),
+ ('eFoo', 'stockxeuropexOSE', None, {'rFoo'}),
+ ('eFoo', 'candy.schleckpulver.snap_crackle', None, {'rFoo'}),
+ ])
+ def test_lookup(self, exchange, routing_key, default, expected):
+ assert self.e._compiled
+ assert self.e.lookup(
+ self.table, exchange, routing_key, default) == expected
+
+
+class test_ExchangeType(ExchangeCase):
+ type = exchange.ExchangeType
+
+ def test_lookup(self):
+ with pytest.raises(NotImplementedError):
+ self.e.lookup([], 'eFoo', 'rFoo', None)
+
+ def test_prepare_bind(self):
+ assert self.e.prepare_bind('qFoo', 'eFoo', 'rFoo', {}) == (
+ 'rFoo', None, 'qFoo',
+ )
+
+ e1 = dict(
+ type='direct',
+ durable=True,
+ auto_delete=True,
+ arguments={},
+ )
+ e2 = dict(e1, arguments={'expires': 3000})
+
+ @pytest.mark.parametrize('ex,eq,name,type,durable,auto_delete,arguments', [
+ (e1, True, 'eFoo', 'direct', True, True, {}),
+ (e1, False, 'eFoo', 'topic', True, True, {}),
+ (e1, False, 'eFoo', 'direct', False, True, {}),
+ (e1, False, 'eFoo', 'direct', True, False, {}),
+ (e1, False, 'eFoo', 'direct', True, True, {'expires': 3000}),
+ (e2, True, 'eFoo', 'direct', True, True, {'expires': 3000}),
+ (e2, False, 'eFoo', 'direct', True, True, {'expires': 6000}),
+ ])
+ def test_equivalent(
+ self, ex, eq, name, type, durable, auto_delete, arguments):
+ is_eq = self.e.equivalent(
+ ex, name, type, durable, auto_delete, arguments)
+ assert is_eq if eq else not is_eq