diff options
Diffstat (limited to 't/unit/transport/virtual/test_exchange.py')
-rw-r--r-- | t/unit/transport/virtual/test_exchange.py | 169 |
1 files changed, 169 insertions, 0 deletions
diff --git a/t/unit/transport/virtual/test_exchange.py b/t/unit/transport/virtual/test_exchange.py new file mode 100644 index 00000000..93d48228 --- /dev/null +++ b/t/unit/transport/virtual/test_exchange.py @@ -0,0 +1,169 @@ +from __future__ import absolute_import, unicode_literals + +import pytest + +from case import Mock + +from kombu import Connection +from kombu.transport.virtual import exchange + +from t.mocks import Transport + + +class ExchangeCase: + type = None + + def setup(self): + if self.type: + self.e = self.type(Connection(transport=Transport).channel()) + + +class test_Direct(ExchangeCase): + type = exchange.DirectExchange + table = [('rFoo', None, 'qFoo'), + ('rFoo', None, 'qFox'), + ('rBar', None, 'qBar'), + ('rBaz', None, 'qBaz')] + + @pytest.mark.parametrize('exchange,routing_key,default,expected', [ + ('eFoo', 'rFoo', None, {'qFoo', 'qFox'}), + ('eMoz', 'rMoz', 'DEFAULT', set()), + ('eBar', 'rBar', None, {'qBar'}), + ]) + def test_lookup(self, exchange, routing_key, default, expected): + assert self.e.lookup( + self.table, exchange, routing_key, default) == expected + + +class test_Fanout(ExchangeCase): + type = exchange.FanoutExchange + table = [(None, None, 'qFoo'), + (None, None, 'qFox'), + (None, None, 'qBar')] + + def test_lookup(self): + assert self.e.lookup(self.table, 'eFoo', 'rFoo', None) == { + 'qFoo', 'qFox', 'qBar', + } + + def test_deliver_when_fanout_supported(self): + self.e.channel = Mock() + self.e.channel.supports_fanout = True + message = Mock() + + self.e.deliver(message, 'exchange', 'rkey') + self.e.channel._put_fanout.assert_called_with( + 'exchange', message, 'rkey', + ) + + def test_deliver_when_fanout_unsupported(self): + self.e.channel = Mock() + self.e.channel.supports_fanout = False + + self.e.deliver(Mock(), 'exchange', None) + self.e.channel._put_fanout.assert_not_called() + + +class test_Topic(ExchangeCase): + type = exchange.TopicExchange + table = [ + ('stock.#', None, 'rFoo'), + ('stock.us.*', None, 'rBar'), + ] + + def setup(self): + ExchangeCase.setup(self) + self.table = [(rkey, self.e.key_to_pattern(rkey), queue) + for rkey, _, queue in self.table] + + def test_prepare_bind(self): + x = self.e.prepare_bind('qFoo', 'eFoo', 'stock.#', {}) + assert x == ('stock.#', r'^stock\..*?$', 'qFoo') + + @pytest.mark.parametrize('exchange,routing_key,default,expected', [ + ('eFoo', 'stock.us.nasdaq', None, {'rFoo', 'rBar'}), + ('eFoo', 'stock.europe.OSE', None, {'rFoo'}), + ('eFoo', 'stockxeuropexOSE', None, set()), + ('eFoo', 'candy.schleckpulver.snap_crackle', None, set()), + ]) + def test_lookup(self, exchange, routing_key, default, expected): + assert self.e.lookup( + self.table, exchange, routing_key, default) == expected + assert self.e._compiled + + def test_deliver(self): + self.e.channel = Mock() + self.e.channel._lookup.return_value = ('a', 'b') + message = Mock() + self.e.deliver(message, 'exchange', 'rkey') + + assert self.e.channel._put.call_args_list == [ + (('a', message), {}), + (('b', message), {}), + ] + + +class test_TopicMultibind(ExchangeCase): + # Testing message delivery in case of multiple overlapping + # bindings for the same queue. As AMQP states, in case of + # overlapping bindings, a message must be delivered once to + # each matching queue. + type = exchange.TopicExchange + table = [ + ('stock', None, 'rFoo'), + ('stock.#', None, 'rFoo'), + ('stock.us.*', None, 'rFoo'), + ('#', None, 'rFoo'), + ] + + def setup(self): + ExchangeCase.setup(self) + self.table = [(rkey, self.e.key_to_pattern(rkey), queue) + for rkey, _, queue in self.table] + + @pytest.mark.parametrize('exchange,routing_key,default,expected', [ + ('eFoo', 'stock.us.nasdaq', None, {'rFoo'}), + ('eFoo', 'stock.europe.OSE', None, {'rFoo'}), + ('eFoo', 'stockxeuropexOSE', None, {'rFoo'}), + ('eFoo', 'candy.schleckpulver.snap_crackle', None, {'rFoo'}), + ]) + def test_lookup(self, exchange, routing_key, default, expected): + assert self.e._compiled + assert self.e.lookup( + self.table, exchange, routing_key, default) == expected + + +class test_ExchangeType(ExchangeCase): + type = exchange.ExchangeType + + def test_lookup(self): + with pytest.raises(NotImplementedError): + self.e.lookup([], 'eFoo', 'rFoo', None) + + def test_prepare_bind(self): + assert self.e.prepare_bind('qFoo', 'eFoo', 'rFoo', {}) == ( + 'rFoo', None, 'qFoo', + ) + + e1 = dict( + type='direct', + durable=True, + auto_delete=True, + arguments={}, + ) + e2 = dict(e1, arguments={'expires': 3000}) + + @pytest.mark.parametrize('ex,eq,name,type,durable,auto_delete,arguments', [ + (e1, True, 'eFoo', 'direct', True, True, {}), + (e1, False, 'eFoo', 'topic', True, True, {}), + (e1, False, 'eFoo', 'direct', False, True, {}), + (e1, False, 'eFoo', 'direct', True, False, {}), + (e1, False, 'eFoo', 'direct', True, True, {'expires': 3000}), + (e2, True, 'eFoo', 'direct', True, True, {'expires': 3000}), + (e2, False, 'eFoo', 'direct', True, True, {'expires': 6000}), + ]) + def test_equivalent( + self, ex, eq, name, type, durable, auto_delete, arguments): + is_eq = self.e.equivalent( + ex, name, type, durable, auto_delete, arguments) + assert is_eq if eq else not is_eq |