diff options
author | Matus Valo <matusvalo@users.noreply.github.com> | 2018-11-26 14:52:39 +0100 |
---|---|---|
committer | Asif Saif Uddin <auvipy@gmail.com> | 2018-11-26 19:52:39 +0600 |
commit | f16df2a17630c9804a6da614443c5e862271823f (patch) | |
tree | 3cf9a5e5d7b7f88815283dc811fe21bf96d6d2c9 | |
parent | 0e793de205e447d57214b32ea121ff2078c2819d (diff) | |
download | py-amqp-f16df2a17630c9804a6da614443c5e862271823f.tar.gz |
Fix basic_consume() with no consumer_tag provided (#221)
* Use consumer tag sent by broker instead of using directly parameter value
* Added more unittests of basic_consume() method
* Make pep8 happy
* Split integration tests to connection tests and channel tests
* Added connection closed integration test
* Added integration tests for basic_consume()
* Fix typos
-rw-r--r-- | amqp/channel.py | 22 | ||||
-rw-r--r-- | t/integration/test_integration.py | 136 | ||||
-rw-r--r-- | t/unit/test_channel.py | 41 |
3 files changed, 171 insertions, 28 deletions
diff --git a/amqp/channel.py b/amqp/channel.py index 785ad7b..c829a2c 100644 --- a/amqp/channel.py +++ b/amqp/channel.py @@ -1559,14 +1559,26 @@ class Channel(AbstractChannel): """ p = self.send_method( spec.Basic.Consume, argsig, - (0, queue, consumer_tag, no_local, no_ack, exclusive, - nowait, arguments), + ( + 0, queue, consumer_tag, no_local, no_ack, exclusive, + nowait, arguments + ), wait=None if nowait else spec.Basic.ConsumeOk, + returns_tuple=True ) - # XXX Fix this hack - if not nowait and not consumer_tag: - consumer_tag = p + if not nowait: + # send_method() returns (spec.Basic.ConsumeOk, consumer_tag) tuple. + # consumer_tag is returned by broker using following rules: + # * consumer_tag is not specified by client, random one + # is generated by Broker + # * consumer_tag is provided by client, the same one + # is returned by broker + consumer_tag = p[1] + elif nowait and not consumer_tag: + raise ValueError( + 'Consumer tag must be specified when nowait is True' + ) self.callbacks[consumer_tag] = callback diff --git a/t/integration/test_integration.py b/t/integration/test_integration.py index 6bcd9e2..2969d2b 100644 --- a/t/integration/test_integration.py +++ b/t/integration/test_integration.py @@ -4,6 +4,7 @@ import pytest from case import patch, call, Mock from amqp import spec, Connection, Channel, sasl, Message from amqp.platform import pack +from amqp.exceptions import ConnectionError from amqp.serialization import dumps, loads @@ -96,7 +97,19 @@ def handshake(conn, transport_mock): transport_mock().read_frame.side_effect = None -class test_integration: +def create_channel(channel_id, conn, transport_mock): + transport_mock().read_frame.return_value = ret_factory( + spec.Channel.OpenOk, + channel=channel_id, + args=(1, False), + arg_format='Lb' + ) + ch = conn.channel(channel_id=channel_id) + transport_mock().read_frame.side_effect = None + return ch + + +class test_connection: # Integration tests. Tests verify the correctness of communication between # library and broker. # * tests mocks broker responses mocking return values of @@ -173,6 +186,38 @@ class test_integration: ) t.close.assert_called_once_with() + def test_connection_closed_by_broker(self): + # Test that library response correctly CloseOk when + # close method is received and _on_close_ok() method is called. + frame_writer_cls_mock = Mock() + frame_writer_mock = frame_writer_cls_mock() + with patch.object(Connection, '_on_close_ok') as callback_mock: + conn = Connection(frame_writer=frame_writer_cls_mock) + with patch.object(conn, 'Transport') as transport_mock: + handshake(conn, transport_mock) + frame_writer_mock.reset_mock() + # Inject Close response from broker + transport_mock().read_frame.return_value = ret_factory( + spec.Connection.Close, + args=(1, False), + arg_format='Lb' + ) + with pytest.raises(ConnectionError): + conn.drain_events(0) + frame_writer_mock.assert_called_once_with( + 1, 0, spec.Connection.CloseOk, '', None + ) + callback_mock.assert_called_once_with() + + +class test_channel: + # Integration tests. Tests verify the correctness of communication between + # library and broker. + # * tests mocks broker responses mocking return values of + # amqp.transport.Transport.read_frame() method + # * tests asserts expected library responses to broker via calls of + # amqp.method_framing.frame_writer() function + @pytest.mark.parametrize("method, callback", connection_testdata) def test_connection_methods(self, method, callback): # Test verifying that proper Connection callback is called when @@ -244,22 +289,12 @@ class test_integration: conn = Connection() with patch.object(conn, 'Transport') as transport_mock: handshake(conn, transport_mock) - - channel_id = 1 - # Inject Open Handshake - transport_mock().read_frame.return_value = ret_factory( - spec.Channel.OpenOk, - channel=channel_id, - args=(1, False), - arg_format='Lb' - ) - - conn.channel(channel_id=channel_id) + create_channel(1, conn, transport_mock) # Inject desired method transport_mock().read_frame.return_value = ret_factory( method, - channel=channel_id, + channel=1, args=(1, False), arg_format='Lb' ) @@ -272,17 +307,8 @@ class test_integration: conn = Connection(frame_writer=frame_writer_cls_mock) with patch.object(conn, 'Transport') as transport_mock: handshake(conn, transport_mock) + ch = create_channel(1, conn, transport_mock) - channel_id = 1 - # Inject Open Handshake - transport_mock().read_frame.return_value = ret_factory( - spec.Channel.OpenOk, - channel=channel_id, - args=(1, False), - arg_format='Lb' - ) - - ch = conn.channel(channel_id=channel_id) frame_writer_mock = frame_writer_cls_mock() frame_writer_mock.reset_mock() msg = Message('test') @@ -291,3 +317,67 @@ class test_integration: 1, 1, spec.Basic.Publish, dumps('Bssbb', (0, '', '', False, False)), msg ) + + def test_consume_no_consumer_tag(self): + # Test verifing starting consuming without specified consumer_tag + callback_mock = Mock() + frame_writer_cls_mock = Mock() + conn = Connection(frame_writer=frame_writer_cls_mock) + consumer_tag = 'amq.ctag-PCmzXGkhCw_v0Zq7jXyvkg' + with patch.object(conn, 'Transport') as transport_mock: + handshake(conn, transport_mock) + ch = create_channel(1, conn, transport_mock) + + # Inject ConsumeOk response from Broker + transport_mock().read_frame.return_value = ret_factory( + spec.Basic.ConsumeOk, + channel=1, + args=(consumer_tag,), + arg_format='s' + ) + frame_writer_mock = frame_writer_cls_mock() + frame_writer_mock.reset_mock() + ch.basic_consume('my_queue', callback=callback_mock) + frame_writer_mock.assert_called_once_with( + 1, 1, spec.Basic.Consume, + dumps( + 'BssbbbbF', + (0, 'my_queue', '', False, False, False, False, None) + ), + None + ) + assert ch.callbacks[consumer_tag] == callback_mock + + def test_consume_with_consumer_tag(self): + # Test verifing starting consuming with specified consumer_tag + callback_mock = Mock() + frame_writer_cls_mock = Mock() + conn = Connection(frame_writer=frame_writer_cls_mock) + with patch.object(conn, 'Transport') as transport_mock: + handshake(conn, transport_mock) + ch = create_channel(1, conn, transport_mock) + + # Inject ConcumeOk response from Broker + transport_mock().read_frame.return_value = ret_factory( + spec.Basic.ConsumeOk, + channel=1, + args=('my_tag',), + arg_format='s' + ) + frame_writer_mock = frame_writer_cls_mock() + frame_writer_mock.reset_mock() + ch.basic_consume( + 'my_queue', callback=callback_mock, consumer_tag='my_tag' + ) + frame_writer_mock.assert_called_once_with( + 1, 1, spec.Basic.Consume, + dumps( + 'BssbbbbF', + ( + 0, 'my_queue', 'my_tag', + False, False, False, False, None + ) + ), + None + ) + assert ch.callbacks['my_tag'] == callback_mock diff --git a/t/unit/test_channel.py b/t/unit/test_channel.py index 3e7e5ff..173e0d7 100644 --- a/t/unit/test_channel.py +++ b/t/unit/test_channel.py @@ -266,6 +266,7 @@ class test_Channel: def test_basic_consume(self): callback = Mock() on_cancel = Mock() + self.c.send_method.return_value = (spec.Basic.ConsumeOk, 123) self.c.basic_consume( 'q', 123, arguments={'x': 1}, callback=callback, @@ -275,16 +276,56 @@ class test_Channel: spec.Basic.Consume, 'BssbbbbF', (0, 'q', 123, False, False, False, False, {'x': 1}), wait=spec.Basic.ConsumeOk, + returns_tuple=True ) assert self.c.callbacks[123] is callback assert self.c.cancel_callbacks[123] is on_cancel def test_basic_consume__no_ack(self): + self.c.send_method.return_value = (spec.Basic.ConsumeOk, 123) self.c.basic_consume( 'q', 123, arguments={'x': 1}, no_ack=True, ) assert 123 in self.c.no_ack_consumers + def test_basic_consume_no_consumer_tag(self): + callback = Mock() + self.c.send_method.return_value = (spec.Basic.ConsumeOk, 123) + self.c.basic_consume( + 'q', arguments={'x': 1}, + callback=callback, + ) + self.c.send_method.assert_called_with( + spec.Basic.Consume, 'BssbbbbF', + (0, 'q', '', False, False, False, False, {'x': 1}), + wait=spec.Basic.ConsumeOk, + returns_tuple=True + ) + assert self.c.callbacks[123] is callback + + def test_basic_consume_no_wait(self): + callback = Mock() + self.c.basic_consume( + 'q', 123, arguments={'x': 1}, + callback=callback, nowait=True + ) + self.c.send_method.assert_called_with( + spec.Basic.Consume, 'BssbbbbF', + (0, 'q', 123, False, False, False, True, {'x': 1}), + wait=None, + returns_tuple=True + ) + assert self.c.callbacks[123] is callback + + def test_basic_consume_no_wait_no_consumer_tag(self): + callback = Mock() + with pytest.raises(ValueError): + self.c.basic_consume( + 'q', arguments={'x': 1}, + callback=callback, nowait=True + ) + assert 123 not in self.c.callbacks + def test_on_basic_deliver(self): msg = Mock() self.c._on_basic_deliver(123, '321', False, 'ex', 'rkey', msg) |