diff options
author | Ask Solem <ask@celeryproject.org> | 2017-02-10 12:52:33 -0800 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2017-02-10 12:52:33 -0800 |
commit | 932bb6caff01d15ec9d02a376e90647ec39293e9 (patch) | |
tree | fb85bf3c12c4d087f57c03d36d5346c6e31fff94 | |
parent | aed8e8431bb61f0937447b5c8ef497ff931d8693 (diff) | |
download | py-amqp-3.0-devel.tar.gz |
Use keyword-only arguments3.0-devel
-rw-r--r-- | amqp/__init__.py | 2 | ||||
-rw-r--r-- | amqp/abstract_channel.py | 2 | ||||
-rw-r--r-- | amqp/channel.py | 44 | ||||
-rw-r--r-- | amqp/protocol.py | 4 | ||||
-rw-r--r-- | amqp/spec.py | 4 | ||||
-rw-r--r-- | amqp/transport.py | 4 | ||||
-rw-r--r-- | amqp/types.py | 31 | ||||
-rw-r--r-- | requirements/test-ci.txt | 2 | ||||
-rw-r--r-- | t/unit/__init__.py | 0 | ||||
-rw-r--r-- | t/unit/test_abstract_channel.py | 118 | ||||
-rw-r--r-- | t/unit/test_basic_message.py | 16 | ||||
-rw-r--r-- | t/unit/test_channel.py | 403 | ||||
-rw-r--r-- | t/unit/test_connection.py | 310 | ||||
-rw-r--r-- | t/unit/test_exceptions.py | 19 | ||||
-rw-r--r-- | t/unit/test_method_framing.py | 97 | ||||
-rw-r--r-- | t/unit/test_platform.py | 13 | ||||
-rw-r--r-- | t/unit/test_serialization.py | 191 | ||||
-rw-r--r-- | t/unit/test_transport.py | 295 | ||||
-rw-r--r-- | t/unit/test_utils.py | 57 |
19 files changed, 56 insertions, 1556 deletions
diff --git a/amqp/__init__.py b/amqp/__init__.py index a9a3c8a..1a326c6 100644 --- a/amqp/__init__.py +++ b/amqp/__init__.py @@ -99,8 +99,6 @@ __all__ = [ class version_info_t(NamedTuple): - """Version description tuple.""" - major: int minor: int micro: int diff --git a/amqp/abstract_channel.py b/amqp/abstract_channel.py index 169410d..e2d6824 100644 --- a/amqp/abstract_channel.py +++ b/amqp/abstract_channel.py @@ -26,7 +26,7 @@ from .utils import AsyncToggle, toggle_blocking __all__ = ['ChannelBase'] -class ChannelBase(AsyncToggle, AbstractChannelT): +class ChannelBase(AbstractChannelT, AsyncToggle): """Superclass for Connection and Channel. The connection is treated as channel 0, then comes diff --git a/amqp/channel.py b/amqp/channel.py index f554a30..965e867 100644 --- a/amqp/channel.py +++ b/amqp/channel.py @@ -163,9 +163,8 @@ class Channel(ChannelBase, ChannelT): # set first time basic_publish_confirm is called # and publisher confirms are enabled for this channel. + self._confirm_publish = self.connection.confirm_publish self._confirm_selected = False - if self.connection.confirm_publish: - self.basic_publish = self.basic_publish_confirm async def __aenter__(self) -> 'Channel': await self.open() @@ -215,6 +214,7 @@ class Channel(ChannelBase, ChannelT): @toggle_blocking async def close(self, + *, reply_code: int = 0, reply_text: str = '', method_sig: method_sig_t = method_sig_t(0, 0), @@ -544,6 +544,7 @@ class Channel(ChannelBase, ChannelT): @toggle_blocking async def exchange_declare( self, exchange: str, type: str, + *, passive: bool = False, durable: bool = False, auto_delete: bool = True, @@ -684,6 +685,7 @@ class Channel(ChannelBase, ChannelT): async def exchange_delete( self, exchange: str, + *, if_unused: bool = False, nowait: bool = False, argsig: str = 'Bsbb') -> None: @@ -738,6 +740,7 @@ class Channel(ChannelBase, ChannelT): self, destination: str, source: str = '', routing_key: str = '', + *, nowait: bool = False, arguments: Mapping[str, Any] = None, argsig: str = 'BsssbF') -> None: @@ -822,6 +825,7 @@ class Channel(ChannelBase, ChannelT): self, destination: str, source: str = '', routing_key: str = '', + *, nowait: bool = False, arguments: Mapping[str, Any] = None, argsig: str = 'BsssbF') -> None: @@ -911,6 +915,7 @@ class Channel(ChannelBase, ChannelT): self, queue: str, exchange: str = '', routing_key: str = '', + *, nowait: bool = False, arguments: Mapping[str, Any] = None, argsig: str = 'BsssbF') -> None: @@ -1021,6 +1026,7 @@ class Channel(ChannelBase, ChannelT): async def queue_unbind( self, queue: str, exchange: str, routing_key: str = '', + *, nowait: bool = False, arguments: Mapping[str, Any] = None, argsig: str = 'BsssF') -> None: @@ -1083,6 +1089,7 @@ class Channel(ChannelBase, ChannelT): async def queue_declare( self, queue: str = '', + *, passive: bool = False, durable: bool = False, exclusive: bool = False, @@ -1257,6 +1264,7 @@ class Channel(ChannelBase, ChannelT): async def queue_delete( self, queue: str = '', + *, if_unused: bool = False, if_empty: bool = False, nowait: bool = False, @@ -1335,6 +1343,7 @@ class Channel(ChannelBase, ChannelT): async def queue_purge( self, queue: str = '', + *, nowait: bool = False, argsig: str = 'Bsb') -> Optional[int]: """Purge a queue. @@ -1459,6 +1468,7 @@ class Channel(ChannelBase, ChannelT): @toggle_blocking async def basic_ack( self, delivery_tag: str, + *, multiple: bool = False, argsig: str = 'Lb') -> None: """Acknowledge one or more messages. @@ -1513,6 +1523,7 @@ class Channel(ChannelBase, ChannelT): @toggle_blocking async def basic_cancel( self, consumer_tag: str, + *, nowait: bool = False, argsig: str = 'sb') -> None: """End a queue consumer. @@ -1586,6 +1597,7 @@ class Channel(ChannelBase, ChannelT): self, queue: str = '', consumer_tag: str = '', + *, no_local: bool = False, no_ack: bool = False, exclusive: bool = False, @@ -1740,6 +1752,7 @@ class Channel(ChannelBase, ChannelT): async def basic_get( self, queue: str = '', + *, no_ack: bool = False, argsig: str = 'Bsb') -> Optional[MessageT]: """Direct access to a queue. @@ -1804,10 +1817,11 @@ class Channel(ChannelBase, ChannelT): return msg @toggle_blocking - async def _basic_publish( + async def basic_publish( self, msg: MessageT, exchange: str = '', routing_key: str = '', + *, mandatory: bool = False, immediate: bool = False, timeout: float = None, @@ -1879,6 +1893,9 @@ class Channel(ChannelBase, ChannelT): if not self.connection: raise RecoverableConnectionError( 'basic_publish: connection closed') + if self._confirm_publish and not self._confirm_selected: + self._confirm_selected = True + await self.confirm_select() try: await self.send_method( spec.Basic.Publish, argsig, @@ -1887,15 +1904,8 @@ class Channel(ChannelBase, ChannelT): ) except socket.timeout: raise RecoverableChannelError('basic_publish: timed out') - basic_publish = _basic_publish - - @toggle_blocking - async def basic_publish_confirm(self, *args, **kwargs) -> None: - if not self._confirm_selected: - self._confirm_selected = True - await self.confirm_select() - await self._basic_publish(*args, **kwargs) - await self.wait(spec.Basic.Ack) + if self._confirm_publish: + await self.wait(spec.Basic.Ack) @toggle_blocking async def basic_qos( @@ -1972,7 +1982,7 @@ class Channel(ChannelBase, ChannelT): ) @toggle_blocking - async def basic_recover(self, requeue: bool = False) -> None: + async def basic_recover(self, *, requeue: bool = False) -> None: """Redeliver unacknowledged messages. This method asks the broker to redeliver all unacknowledged @@ -2004,12 +2014,14 @@ class Channel(ChannelBase, ChannelT): await self.send_method(spec.Basic.Recover, 'b', (requeue,)) @toggle_blocking - async def basic_recover_async(self, requeue: bool = False) -> None: + async def basic_recover_async(self, *, requeue: bool = False) -> None: await self.send_method( spec.Basic.RecoverAsync, 'b', (requeue,)) @toggle_blocking - async def basic_reject(self, delivery_tag: str, requeue: bool, + async def basic_reject(self, delivery_tag: str, + *, + requeue: bool = False, argsig: str = 'Lb') -> None: """Reject an incoming message. @@ -2187,7 +2199,7 @@ class Channel(ChannelBase, ChannelT): await self.send_method(spec.Tx.Select, wait=spec.Tx.SelectOk) @toggle_blocking - async def confirm_select(self, nowait: bool = False) -> None: + async def confirm_select(self, *, nowait: bool = False) -> None: """Enable publisher confirms for this channel. Note: This is an RabbitMQ extension. diff --git a/amqp/protocol.py b/amqp/protocol.py index ad26138..ac0575b 100644 --- a/amqp/protocol.py +++ b/amqp/protocol.py @@ -3,16 +3,12 @@ from typing import NamedTuple class queue_declare_ok_t(NamedTuple): - """Tuple returned by Queue.Declare.""" - queue: str message_count: int consumer_count: int class basic_return_t(NamedTuple): - """Tuple provided by Basic.Return.""" - reply_code: int reply_text: str exchange: str diff --git a/amqp/spec.py b/amqp/spec.py index 9fb20bd..d1deeff 100644 --- a/amqp/spec.py +++ b/amqp/spec.py @@ -3,15 +3,11 @@ from typing import Mapping, NamedTuple class method_sig_t(NamedTuple): - """AMQP Method signature tuple.""" - major: int minor: int class method_t(NamedTuple): - """AMQP Method invocation tuple.""" - method_sig: method_sig_t args: str content: bool diff --git a/amqp/transport.py b/amqp/transport.py index 037d66c..59876a0 100644 --- a/amqp/transport.py +++ b/amqp/transport.py @@ -24,7 +24,7 @@ from struct import pack, unpack from typing import Any, Callable, Dict, Mapping, MutableMapping, Set, Tuple from .exceptions import UnexpectedFrame from .platform import SOL_TCP, TCP_USER_TIMEOUT, HAS_TCP_USER_TIMEOUT -from .types import Frame +from .types import Frame, TransportT from .utils import AsyncToggle, set_cloexec, toggle_blocking AMQP_PORT = 5672 @@ -86,7 +86,7 @@ def to_host_port(host: str, default: int = AMQP_PORT) -> Tuple[str, int]: return host, port -class Transport(AsyncToggle): +class Transport(TransportT, AsyncToggle): """Network transport.""" def __init__(self, host: str, diff --git a/amqp/types.py b/amqp/types.py index d73bb52..219e21f 100644 --- a/amqp/types.py +++ b/amqp/types.py @@ -18,8 +18,6 @@ Int = TypeVar('Int', SupportsInt, str) class Frame(NamedTuple): - """Frame tuple.""" - type: int channel: int data: bytes @@ -137,6 +135,7 @@ class MessageT(ContentT, metaclass=abc.ABCMeta): @abc.abstractmethod def __init__(self, body: bytes=b'', + *, children: Any = None, channel: 'ChannelT' = None) -> None: ... @@ -185,6 +184,7 @@ class AbstractChannelT(metaclass=abc.ABCMeta): self, sig: method_sig_t, format: str = None, args: Sequence = None, + *, content: MessageT = None, wait: WaitMethodT = None, callback: Callable = None, @@ -194,6 +194,7 @@ class AbstractChannelT(metaclass=abc.ABCMeta): @abc.abstractmethod async def close( self, + *, reply_code: int = 0, reply_text: str = '', method_sig: method_sig_t = method_sig_t(0, 0), @@ -208,6 +209,7 @@ class AbstractChannelT(metaclass=abc.ABCMeta): async def wait( self, method: WaitMethodT, + *, callback: Callable = None, timeout: float = None, returns_tuple: bool = False) -> Any: @@ -277,6 +279,7 @@ class ConnectionT(AbstractChannelT): host: str = 'localhost:5672', userid: str = 'guest', password: str = 'guest', + *, login_method: str = 'AMQPLAIN', login_response: Any = None, virtual_host: str = '/', @@ -374,6 +377,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta): @abc.abstractmethod async def exchange_declare( self, exchange: str, type: str, + *, passive: bool = False, durable: bool = False, auto_delete: bool = True, @@ -385,6 +389,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta): @abc.abstractmethod async def exchange_delete( self, exchange: str, + *, if_unused: bool = False, nowait: bool = False, argsig: str = 'Bsbb') -> None: @@ -395,6 +400,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta): self, destination: str, source: str = '', routing_key: str = '', + *, nowait: bool = False, arguments: Mapping[str, Any] = None, argsig: str = 'BsssbF') -> None: @@ -405,6 +411,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta): self, destination: str, source: str = '', routing_key: str = '', + *, nowait: bool = False, arguments: Mapping[str, Any] = None, argsig: str = 'BsssbF') -> None: @@ -415,6 +422,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta): self, queue: str, exchange: str = '', routing_key: str = '', + *, nowait: bool = False, arguments: Mapping[str, Any] = None, argsig: str = 'BsssbF') -> None: @@ -424,6 +432,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta): async def queue_unbind( self, queue: str, exchange: str, routing_key: str = '', + *, nowait: bool = False, arguments: Mapping[str, Any] = None, argsig: str = 'BsssF') -> None: @@ -433,6 +442,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta): async def queue_declare( self, queue: str = '', + *, passive: bool = False, durable: bool = False, exclusive: bool = False, @@ -446,6 +456,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta): async def queue_delete( self, queue: str = '', + *, if_unused: bool = False, if_empty: bool = False, nowait: bool = False, @@ -456,6 +467,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta): async def queue_purge( self, queue: str = '', + *, nowait: bool = False, argsig: str = 'Bsb') -> Optional[int]: ... @@ -463,6 +475,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta): @abc.abstractmethod async def basic_ack( self, delivery_tag: str, + *, multiple: bool = False, argsig: str = 'Lb') -> None: ... @@ -470,6 +483,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta): @abc.abstractmethod async def basic_cancel( self, consumer_tag: str, + *, nowait: bool = False, argsig: str = 'sb') -> None: ... @@ -479,6 +493,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta): self, queue: str = '', consumer_tag: str = '', + *, no_local: bool = False, no_ack: bool = False, exclusive: bool = False, @@ -493,6 +508,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta): async def basic_get( self, queue: str = '', + *, no_ack: bool = False, argsig: str = 'Bsb') -> Optional[MessageT]: ... @@ -502,6 +518,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta): self, msg: MessageT, exchange: str = '', routing_key: str = '', + *, mandatory: bool = False, immediate: bool = False, timeout: float = None, @@ -518,15 +535,17 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta): ... @abc.abstractmethod - async def basic_recover(self, requeue: bool = False) -> None: + async def basic_recover(self, *, requeue: bool = False) -> None: ... @abc.abstractmethod - async def basic_recover_async(self, requeue: bool = False) -> None: + async def basic_recover_async(self, *, requeue: bool = False) -> None: ... @abc.abstractmethod - async def basic_reject(self, delivery_tag: str, requeue: bool, + async def basic_reject(self, delivery_tag: str, + *, + requeue: bool = False, argsig: str = 'Lb') -> None: ... @@ -543,5 +562,5 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta): ... @abc.abstractmethod - async def confirm_select(self, nowait: bool = False) -> None: + async def confirm_select(self, *, nowait: bool = False) -> None: ... diff --git a/requirements/test-ci.txt b/requirements/test-ci.txt index 5f867d4..f0aa93a 100644 --- a/requirements/test-ci.txt +++ b/requirements/test-ci.txt @@ -1,3 +1 @@ -pytest-cov -codecov mypy diff --git a/t/unit/__init__.py b/t/unit/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/t/unit/__init__.py +++ /dev/null diff --git a/t/unit/test_abstract_channel.py b/t/unit/test_abstract_channel.py deleted file mode 100644 index 5c009aa..0000000 --- a/t/unit/test_abstract_channel.py +++ /dev/null @@ -1,118 +0,0 @@ -import pytest -from case import Mock, patch -from vine import promise -from amqp.abstract_channel import ChannelBase -from amqp.exceptions import AMQPNotImplementedError, RecoverableConnectionError -from amqp.serialization import dumps - - -class test_ChannelBase: - - class Channel(ChannelBase): - - def _setup_listeners(self): - ... - - @pytest.fixture(autouse=True) - def setup_conn(self): - self.conn = Mock(name='connection') - self.conn.channels = {} - self.channel_id = 1 - self.c = self.Channel(self.conn, self.channel_id) - self.method = Mock(name='method') - self.content = Mock(name='content') - self.content.content_encoding = 'utf-8' - self.c._METHODS = {(50, 61): self.method} - - def test_enter_exit(self): - self.c.close = Mock(name='close') - with self.c: - ... - self.c.close.assert_called_with() - - def test_send_method(self): - self.c.send_method((50, 60), 'iB', (30, 0)) - self.conn.frame_writer.assert_called_with( - 1, self.channel_id, (50, 60), dumps('iB', (30, 0)), None, - ) - - def test_send_method__callback(self): - callback = Mock(name='callback') - p = promise(callback) - self.c.send_method((50, 60), 'iB', (30, 0), callback=p) - callback.assert_called_with() - - def test_send_method__wait(self): - self.c.wait = Mock(name='wait') - self.c.send_method((50, 60), 'iB', (30, 0), wait=(50, 61)) - self.c.wait.assert_called_with((50, 61), returns_tuple=False) - - def test_send_method__no_connection(self): - self.c.connection = None - with pytest.raises(RecoverableConnectionError): - self.c.send_method((50, 60)) - - def test_close(self): - with pytest.raises(NotImplementedError): - self.c.close() - - @patch('amqp.abstract_channel.ensure_promise') - def test_wait(self, ensure_promise): - p = ensure_promise.return_value - p.ready = False - - def on_drain(*args, **kwargs): - p.ready = True - self.conn.drain_events.side_effect = on_drain - - p.value = (1,), {'arg': 2} - self.c.wait((50, 61), timeout=1) - self.conn.drain_events.assert_called_with(timeout=1) - - prev = self.c._pending[(50, 61)] = Mock(name='p2') - p.value = None - self.c.wait([(50, 61)]) - assert self.c._pending[(50, 61)] is prev - - def test_dispatch_method__content_encoding(self): - self.c.auto_decode = True - self.method.args = None - self.c.dispatch_method((50, 61), 'payload', self.content) - self.content.body.decode.side_effect = KeyError() - self.c.dispatch_method((50, 61), 'payload', self.content) - - def test_dispatch_method__unknown_method(self): - with pytest.raises(AMQPNotImplementedError): - self.c.dispatch_method((100, 131), 'payload', self.content) - - def test_dispatch_method__one_shot(self): - self.method.args = None - p = self.c._pending[(50, 61)] = Mock(name='oneshot') - self.c.dispatch_method((50, 61), 'payload', self.content) - p.assert_called_with(self.content) - - def test_dispatch_method__one_shot_no_content(self): - self.method.args = None - self.method.content = None - p = self.c._pending[(50, 61)] = Mock(name='oneshot') - self.c.dispatch_method((50, 61), 'payload', self.content) - p.assert_called_with() - assert not self.c._pending - - @patch('amqp.abstract_channel.loads') - def test_dispatch_method__listeners(self, loads): - loads.return_value = [1, 2, 3], 'foo' - p = self.c._callbacks[(50, 61)] = Mock(name='p') - self.c.dispatch_method((50, 61), 'payload', self.content) - p.assert_called_with(1, 2, 3, self.content) - - @patch('amqp.abstract_channel.loads') - def test_dispatch_method__listeners_and_one_shot(self, loads): - loads.return_value = [1, 2, 3], 'foo' - p1 = self.c._callbacks[(50, 61)] = Mock(name='p') - p2 = self.c._pending[(50, 61)] = Mock(name='oneshot') - self.c.dispatch_method((50, 61), 'payload', self.content) - p1.assert_called_with(1, 2, 3, self.content) - p2.assert_called_with(1, 2, 3, self.content) - assert not self.c._pending - assert self.c._callbacks[(50, 61)] diff --git a/t/unit/test_basic_message.py b/t/unit/test_basic_message.py deleted file mode 100644 index 013c9cc..0000000 --- a/t/unit/test_basic_message.py +++ /dev/null @@ -1,16 +0,0 @@ -from case import Mock -from amqp.basic_message import Message - - -class test_Message: - - def test_message(self): - m = Message( - 'foo', - channel=Mock(name='channel'), - application_headers={'h': 'v'}, - ) - m.delivery_info = {'delivery_tag': '1234'}, - assert m.body == 'foo' - assert m.channel - assert m.headers == {'h': 'v'} diff --git a/t/unit/test_channel.py b/t/unit/test_channel.py deleted file mode 100644 index 34d0dd9..0000000 --- a/t/unit/test_channel.py +++ /dev/null @@ -1,403 +0,0 @@ -import pytest -from case import ContextMock, Mock, patch -from amqp import spec -from amqp.channel import Channel -from amqp.exceptions import ConsumerCancelled, NotFound - - -class test_Channel: - - @pytest.fixture(autouse=True) - def setup_conn(self): - self.conn = Mock(name='connection') - self.conn.channels = {} - self.conn._get_free_channel_id.return_value = 2 - self.c = Channel(self.conn, 1) - self.c.send_method = Mock(name='send_method') - - def test_init_confirm_enabled(self): - self.conn.confirm_publish = True - c = Channel(self.conn, 2) - assert c.basic_publish == c.basic_publish_confirm - - def test_init_confirm_disabled(self): - self.conn.confirm_publish = False - c = Channel(self.conn, 2) - assert c.basic_publish == c._basic_publish - - def test_init_auto_channel(self): - c = Channel(self.conn, None) - self.conn._get_free_channel_id.assert_called_with() - assert c.channel_id is self.conn._get_free_channel_id() - - def test_init_explicit_channel(self): - Channel(self.conn, 3) - self.conn._claim_channel_id.assert_called_with(3) - - def test_then(self): - self.c.on_open = Mock(name='on_open') - on_success = Mock(name='on_success') - on_error = Mock(name='on_error') - self.c.then(on_success, on_error) - self.c.on_open.then.assert_called_with(on_success, on_error) - - def test_collect(self): - self.c.callbacks[(50, 61)] = Mock() - self.c.cancel_callbacks['foo'] = Mock() - self.c.events['bar'].add(Mock()) - self.c.no_ack_consumers.add('foo') - self.c.collect() - assert not self.c.callbacks - assert not self.c.cancel_callbacks - assert not self.c.events - assert not self.c.no_ack_consumers - assert not self.c.is_open - self.c.collect() - - def test_do_revive(self): - self.c.open = Mock(name='open') - self.c.is_open = True - self.c._do_revive() - assert not self.c.is_open - self.c.open.assert_called_with() - - def test_close__not_open(self): - self.c.is_open = False - self.c.close() - - def test_close__no_connection(self): - self.c.connection = None - self.c.close() - - def test_close(self): - self.c.is_open = True - self.c.close(30, 'text', spec.Queue.Declare) - self.c.send_method.assert_called_with( - spec.Channel.Close, 'BsBB', - (30, 'text', spec.Queue.Declare[0], spec.Queue.Declare[1]), - wait=spec.Channel.CloseOk, - ) - assert self.c.connection is None - - def test_on_close(self): - self.c._do_revive = Mock(name='_do_revive') - with pytest.raises(NotFound): - self.c._on_close(404, 'text', 50, 61) - self.c.send_method.assert_called_with(spec.Channel.CloseOk) - self.c._do_revive.assert_called_with() - - def test_on_close_ok(self): - self.c.collect = Mock(name='collect') - self.c._on_close_ok() - self.c.collect.assert_called_with() - - def test_flow(self): - self.c.flow(0) - self.c.send_method.assert_called_with( - spec.Channel.Flow, 'b', (0,), wait=spec.Channel.FlowOk, - ) - - def test_on_flow(self): - self.c._x_flow_ok = Mock(name='_x_flow_ok') - self.c._on_flow(0) - assert not self.c.active - self.c._x_flow_ok.assert_called_with(0) - - def test_x_flow_ok(self): - self.c._x_flow_ok(1) - self.c.send_method.assert_called_with(spec.Channel.FlowOk, 'b', (1,)) - - def test_open(self): - self.c.is_open = True - self.c.open() - self.c.is_open = False - self.c.open() - self.c.send_method.assert_called_with( - spec.Channel.Open, 's', ('',), wait=spec.Channel.OpenOk, - ) - - def test_on_open_ok(self): - self.c.on_open = Mock(name='on_open') - self.c.is_open = False - self.c._on_open_ok() - assert self.c.is_open - self.c.on_open.assert_called_with(self.c) - - def test_exchange_declare(self): - self.c.exchange_declare( - 'foo', 'direct', False, True, - auto_delete=False, nowait=False, arguments={'x': 1}, - ) - self.c.send_method.assert_called_with( - spec.Exchange.Declare, 'BssbbbbbF', - (0, 'foo', 'direct', False, True, False, - False, False, {'x': 1}), - wait=spec.Exchange.DeclareOk, - ) - - @patch('amqp.channel.warn') - def test_exchange_declare__auto_delete(self, warn): - self.c.exchange_declare( - 'foo', 'direct', False, True, - auto_delete=True, nowait=False, arguments={'x': 1}, - ) - warn.assert_called() - - def test_exchange_delete(self): - self.c.exchange_delete('foo') - self.c.send_method.assert_called_with( - spec.Exchange.Delete, 'Bsbb', - (0, 'foo', False, False), - wait=spec.Exchange.DeleteOk, - ) - - def test_exchange_bind(self): - self.c.exchange_bind('dest', 'source', 'rkey', arguments={'x': 1}) - self.c.send_method.assert_called_with( - spec.Exchange.Bind, 'BsssbF', - (0, 'dest', 'source', 'rkey', False, {'x': 1}), - wait=spec.Exchange.BindOk, - ) - - def test_exchange_unbind(self): - self.c.exchange_unbind('dest', 'source', 'rkey', arguments={'x': 1}) - self.c.send_method.assert_called_with( - spec.Exchange.Unbind, 'BsssbF', - (0, 'dest', 'source', 'rkey', False, {'x': 1}), - wait=spec.Exchange.UnbindOk, - ) - - def test_queue_bind(self): - self.c.queue_bind('q', 'ex', 'rkey', arguments={'x': 1}) - self.c.send_method.assert_called_with( - spec.Queue.Bind, 'BsssbF', - (0, 'q', 'ex', 'rkey', False, {'x': 1}), - wait=spec.Queue.BindOk, - ) - - def test_queue_unbind(self): - self.c.queue_unbind('q', 'ex', 'rkey', arguments={'x': 1}) - self.c.send_method.assert_called_with( - spec.Queue.Unbind, 'BsssF', - (0, 'q', 'ex', 'rkey', {'x': 1}), - wait=spec.Queue.UnbindOk, - ) - - def test_queue_declare(self): - self.c.queue_declare('q', False, True, False, False, True, {'x': 1}) - self.c.send_method.assert_called_with( - spec.Queue.Declare, 'BsbbbbbF', - (0, 'q', False, True, False, False, True, {'x': 1}), - ) - - def test_queue_declare__sync(self): - self.c.wait = Mock(name='wait') - self.c.wait.return_value = ('name', 123, 45) - ret = self.c.queue_declare( - 'q', False, True, False, False, False, {'x': 1}, - ) - self.c.send_method.assert_called_with( - spec.Queue.Declare, 'BsbbbbbF', - (0, 'q', False, True, False, False, False, {'x': 1}), - ) - assert ret.queue == 'name' - assert ret.message_count == 123 - assert ret.consumer_count == 45 - self.c.wait.assert_called_with( - spec.Queue.DeclareOk, returns_tuple=True) - - def test_queue_delete(self): - self.c.queue_delete('q') - self.c.send_method.assert_called_with( - spec.Queue.Delete, 'Bsbbb', - (0, 'q', False, False, False), - wait=spec.Queue.DeleteOk, - ) - - def test_queue_purge(self): - self.c.queue_purge('q') - self.c.send_method.assert_called_with( - spec.Queue.Purge, 'Bsb', (0, 'q', False), - wait=spec.Queue.PurgeOk, - ) - - def test_basic_ack(self): - self.c.basic_ack(123, multiple=1) - self.c.send_method.assert_called_with( - spec.Basic.Ack, 'Lb', (123, 1), - ) - - def test_basic_cancel(self): - self.c.basic_cancel(123) - self.c.send_method.assert_called_with( - spec.Basic.Cancel, 'sb', (123, False), - wait=spec.Basic.CancelOk, - ) - self.c.connection = None - self.c.basic_cancel(123) - - def test_on_basic_cancel(self): - self.c._remove_tag = Mock(name='_remove_tag') - self.c._on_basic_cancel(123) - self.c._remove_tag.return_value.assert_called_with(123) - self.c._remove_tag.return_value = None - with pytest.raises(ConsumerCancelled): - self.c._on_basic_cancel(123) - - def test_on_basic_cancel_ok(self): - self.c._remove_tag = Mock(name='remove_tag') - self.c._on_basic_cancel_ok(123) - self.c._remove_tag.assert_called_with(123) - - def test_remove_tag(self): - self.c.callbacks[123] = Mock() - p = self.c.cancel_callbacks[123] = Mock() - assert self.c._remove_tag(123) is p - assert 123 not in self.c.callbacks - assert 123 not in self.c.cancel_callbacks - - def test_basic_consume(self): - callback = Mock() - on_cancel = Mock() - self.c.basic_consume( - 'q', 123, arguments={'x': 1}, - callback=callback, - on_cancel=on_cancel, - ) - self.c.send_method.assert_called_with( - spec.Basic.Consume, 'BssbbbbF', - (0, 'q', 123, False, False, False, False, {'x': 1}), - wait=spec.Basic.ConsumeOk, - ) - assert self.c.callbacks[123] is callback - assert self.c.cancel_callbacks[123] is on_cancel - - def test_basic_consume__no_ack(self): - self.c.basic_consume( - 'q', 123, arguments={'x': 1}, no_ack=True, - ) - assert 123 in self.c.no_ack_consumers - - def test_on_basic_deliver(self): - msg = Mock() - self.c._on_basic_deliver(123, '321', False, 'ex', 'rkey', msg) - callback = self.c.callbacks[123] = Mock(name='cb') - self.c._on_basic_deliver(123, '321', False, 'ex', 'rkey', msg) - callback.assert_called_with(msg) - - def test_basic_get(self): - self.c._on_get_empty = Mock() - self.c._on_get_ok = Mock() - self.c.send_method.return_value = ('cluster_id',) - self.c.basic_get('q') - self.c.send_method.assert_called_with( - spec.Basic.Get, 'Bsb', (0, 'q', False), - wait=[spec.Basic.GetOk, spec.Basic.GetEmpty], returns_tuple=True, - ) - self.c._on_get_empty.assert_called_with('cluster_id') - self.c.send_method.return_value = ( - 'dtag', 'redelivered', 'ex', 'rkey', 'mcount', 'msg', - ) - self.c.basic_get('q') - self.c._on_get_ok.assert_called_with( - 'dtag', 'redelivered', 'ex', 'rkey', 'mcount', 'msg', - ) - - def test_on_get_empty(self): - self.c._on_get_empty(1) - - def test_on_get_ok(self): - msg = Mock() - m = self.c._on_get_ok( - 'dtag', 'redelivered', 'ex', 'rkey', 'mcount', msg, - ) - assert m is msg - - def test_basic_publish(self): - self.c._basic_publish('msg', 'ex', 'rkey') - self.c.send_method.assert_called_with( - spec.Basic.Publish, 'Bssbb', - (0, 'ex', 'rkey', False, False), 'msg', - ) - - def test_basic_publish_confirm(self): - self.c._confirm_selected = False - self.c.confirm_select = Mock(name='confirm_select') - self.c._basic_publish = Mock(name='_basic_publish') - self.c.wait = Mock(name='wait') - ret = self.c.basic_publish_confirm(1, 2, arg=1) - self.c.confirm_select.assert_called_with() - assert self.c._confirm_selected - self.c._basic_publish.assert_called_with(1, 2, arg=1) - assert ret is self.c._basic_publish() - self.c.wait.assert_called_with(spec.Basic.Ack) - self.c.basic_publish_confirm(1, 2, arg=1) - - def test_basic_qos(self): - self.c.basic_qos(0, 123, False) - self.c.send_method.assert_called_with( - spec.Basic.Qos, 'lBb', (0, 123, False), - wait=spec.Basic.QosOk, - ) - - def test_basic_recover(self): - self.c.basic_recover(requeue=True) - self.c.send_method.assert_called_with( - spec.Basic.Recover, 'b', (True,), - ) - - def test_basic_recover_async(self): - self.c.basic_recover_async(requeue=True) - self.c.send_method.assert_called_with( - spec.Basic.RecoverAsync, 'b', (True,), - ) - - def test_basic_reject(self): - self.c.basic_reject(123, requeue=True) - self.c.send_method.assert_called_with( - spec.Basic.Reject, 'Lb', (123, True), - ) - - def test_on_basic_return(self): - with pytest.raises(NotFound): - self.c._on_basic_return(404, 'text', 'ex', 'rkey', 'msg') - - @patch('amqp.channel.error_for_code') - def test_on_basic_return__handled(self, error_for_code): - callback = Mock(name='callback') - self.c.events['basic_return'].add(callback) - self.c._on_basic_return(404, 'text', 'ex', 'rkey', 'msg') - callback.assert_called_with( - error_for_code(), 'ex', 'rkey', 'msg', - ) - - def test_tx_commit(self): - self.c.tx_commit() - self.c.send_method.assert_called_with( - spec.Tx.Commit, wait=spec.Tx.CommitOk, - ) - - def test_tx_rollback(self): - self.c.tx_rollback() - self.c.send_method.assert_called_with( - spec.Tx.Rollback, wait=spec.Tx.RollbackOk, - ) - - def test_tx_select(self): - self.c.tx_select() - self.c.send_method.assert_called_with( - spec.Tx.Select, wait=spec.Tx.SelectOk, - ) - - def test_confirm_select(self): - self.c.confirm_select() - self.c.send_method.assert_called_with( - spec.Confirm.Select, 'b', (False,), - wait=spec.Confirm.SelectOk, - ) - - def test_on_basic_ack(self): - callback = Mock(name='callback') - self.c.events['basic_ack'].add(callback) - self.c._on_basic_ack(123, True) - callback.assert_called_with(123, True) diff --git a/t/unit/test_connection.py b/t/unit/test_connection.py deleted file mode 100644 index 87d2126..0000000 --- a/t/unit/test_connection.py +++ /dev/null @@ -1,310 +0,0 @@ -import pytest -import socket -from case import ContextMock, Mock, call -from amqp import Connection -from amqp import spec -from amqp.connection import SSLError -from amqp.exceptions import ConnectionError, NotFound, ResourceError -from amqp.transport import Transport - - -class test_Connection: - - @pytest.fixture(autouse=True) - def setup_conn(self): - self.frame_handler = Mock(name='frame_handler') - self.frame_writer = Mock(name='frame_writer_cls') - self.conn = Connection( - frame_handler=self.frame_handler, - frame_writer=self.frame_writer, - ) - self.conn.Channel = Mock(name='Channel') - self.conn.Transport = Mock(name='Transport') - self.conn.transport = self.conn.Transport.return_value - self.conn.send_method = Mock(name='send_method') - self.conn.frame_writer = Mock(name='frame_writer') - - def test_login_response(self): - self.conn = Connection(login_response='foo') - assert self.conn.login_response == 'foo' - - def test_enter_exit(self): - self.conn.connect = Mock(name='connect') - self.conn.close = Mock(name='close') - with self.conn: - self.conn.connect.assert_called_with() - self.conn.close.assert_called_with() - - def test_then(self): - self.conn.on_open = Mock(name='on_open') - on_success = Mock(name='on_success') - on_error = Mock(name='on_error') - self.conn.then(on_success, on_error) - self.conn.on_open.then.assert_called_with(on_success, on_error) - - def test_connect(self): - self.conn.transport.connected = False - self.conn.drain_events = Mock(name='drain_events') - - def on_drain(*args, **kwargs): - self.conn._handshake_complete = True - self.conn.drain_events.side_effect = on_drain - self.conn.connect() - self.conn.Transport.assert_called_with( - self.conn.host, self.conn.connect_timeout, self.conn.ssl, - self.conn.read_timeout, self.conn.write_timeout, - socket_settings=self.conn.socket_settings, - ) - - def test_connect__already_connected(self): - callback = Mock(name='callback') - self.conn.transport.connected = True - assert self.conn.connect(callback) == callback.return_value - callback.assert_called_with() - - def test_on_start(self): - self.conn._on_start(3, 4, {'foo': 'bar'}, 'x y z', 'en_US en_GB') - assert self.conn.version_major == 3 - assert self.conn.version_minor == 4 - assert self.conn.server_properties == {'foo': 'bar'} - assert self.conn.mechanisms == ['x', 'y', 'z'] - assert self.conn.locales == ['en_US', 'en_GB'] - self.conn.send_method.assert_called_with( - spec.Connection.StartOk, 'FsSs', ( - self.conn.client_properties, self.conn.login_method, - self.conn.login_response, self.conn.locale, - ), - ) - - def test_on_start__consumer_cancel_notify(self): - self.conn._on_start( - 3, 4, {'capabilities': {'consumer_cancel_notify': 1}}, - '', '', - ) - cap = self.conn.client_properties['capabilities'] - assert cap['consumer_cancel_notify'] - - def test_on_start__connection_blocked(self): - self.conn._on_start( - 3, 4, {'capabilities': {'connection.blocked': 1}}, - '', '', - ) - cap = self.conn.client_properties['capabilities'] - assert cap['connection.blocked'] - - def test_on_start__authentication_failure_close(self): - self.conn._on_start( - 3, 4, {'capabilities': {'authentication_failure_close': 1}}, - '', '', - ) - cap = self.conn.client_properties['capabilities'] - assert cap['authentication_failure_close'] - - def test_on_start__authentication_failure_close__disabled(self): - self.conn._on_start( - 3, 4, {'capabilities': {}}, - '', '', - ) - assert 'capabilities' not in self.conn.client_properties - - def test_on_secure(self): - self.conn._on_secure('vfz') - - def test_on_tune(self): - self.conn.client_heartbeat = 16 - self.conn._on_tune(345, 16, 10) - assert self.conn.channel_max == 345 - assert self.conn.frame_max == 16 - assert self.conn.server_heartbeat == 10 - assert self.conn.heartbeat == 10 - self.conn.send_method.assert_called_with( - spec.Connection.TuneOk, 'BlB', ( - self.conn.channel_max, self.conn.frame_max, - self.conn.heartbeat, - ), - callback=self.conn._on_tune_sent, - ) - - def test_on_tune__client_heartbeat_disabled(self): - self.conn.client_heartbeat = 0 - self.conn._on_tune(345, 16, 10) - assert self.conn.heartbeat == 0 - - def test_on_tune_sent(self): - self.conn._on_tune_sent() - self.conn.send_method.assert_called_with( - spec.Connection.Open, 'ssb', (self.conn.virtual_host, '', False), - ) - - def test_on_open_ok(self): - self.conn.on_open = Mock(name='on_open') - self.conn._on_open_ok() - assert self.conn._handshake_complete - self.conn.on_open.assert_called_with(self.conn) - - def test_connected(self): - self.conn.transport.connected = False - assert not self.conn.connected - self.conn.transport.connected = True - assert self.conn.connected - self.conn.transport = None - assert not self.conn.connected - - def test_collect(self): - channels = self.conn.channels = { - 0: self.conn, 1: Mock(name='c1'), 2: Mock(name='c2'), - } - transport = self.conn.transport - self.conn.collect() - transport.close.assert_called_with() - for i, channel in channels.items(): - if i: - channel.collect.assert_called_with() - - def test_collect__channel_raises_socket_error(self): - self.conn.channels = self.conn.channels = {1: Mock(name='c1')} - self.conn.channels[1].collect.side_effect = socket.error() - self.conn.collect() - - def test_get_free_channel_id__raises_IndexError(self): - self.conn._avail_channel_ids = [] - with pytest.raises(ResourceError): - self.conn._get_free_channel_id() - - def test_claim_channel_id(self): - self.conn._claim_channel_id(30) - with pytest.raises(ConnectionError): - self.conn._claim_channel_id(30) - - def test_channel(self): - callback = Mock(name='callback') - c = self.conn.channel(3, callback) - self.conn.Channel.assert_called_with(self.conn, 3, on_open=callback) - c2 = self.conn.channel(3, callback) - assert c2 is c - - def test_is_alive(self): - with pytest.raises(NotImplementedError): - self.conn.is_alive() - - def test_drain_events(self): - self.conn.blocking_read = Mock(name='blocking_read') - self.conn.drain_events(30) - self.conn.blocking_read.assert_called_with(30) - - def test_blocking_read__no_timeout(self): - self.conn.on_inbound_frame = Mock(name='on_inbound_frame') - ret = self.conn.blocking_read(None) - self.conn.transport.read_frame.assert_called_with() - self.conn.on_inbound_frame.assert_called_with( - self.conn.transport.read_frame(), - ) - assert ret is self.conn.on_inbound_frame() - - def test_blocking_read__timeout(self): - self.conn.transport = Transport('localhost:5672') - sock = self.conn.transport.sock = Mock(name='sock') - sock.gettimeout.return_value = 1 - self.conn.transport.read_frame = Mock(name='read_frame') - self.conn.on_inbound_frame = Mock(name='on_inbound_frame') - self.conn.blocking_read(3) - sock.gettimeout.assert_called_with() - sock.settimeout.assert_has_calls([call(3), call(1)]) - self.conn.transport.read_frame.assert_called_with() - self.conn.on_inbound_frame.assert_called_with( - self.conn.transport.read_frame(), - ) - sock.gettimeout.return_value = 3 - self.conn.blocking_read(3) - - def test_blocking_read__SSLError(self): - self.conn.on_inbound_frame = Mock(name='on_inbound_frame') - self.conn.transport = Transport('localhost:5672') - sock = self.conn.transport.sock = Mock(name='sock') - sock.gettimeout.return_value = 1 - self.conn.transport.read_frame = Mock(name='read_frame') - self.conn.transport.read_frame.side_effect = SSLError( - 'operation timed out') - with pytest.raises(socket.timeout): - self.conn.blocking_read(3) - self.conn.transport.read_frame.side_effect = SSLError( - 'The operation did not complete foo bar') - with pytest.raises(socket.timeout): - self.conn.blocking_read(3) - self.conn.transport.read_frame.side_effect = SSLError( - 'oh noes') - with pytest.raises(SSLError): - self.conn.blocking_read(3) - - def test_on_inbound_method(self): - self.conn.channels[1] = self.conn.channel(1) - self.conn.on_inbound_method(1, (50, 60), 'payload', 'content') - self.conn.channels[1].dispatch_method.assert_called_with( - (50, 60), 'payload', 'content', - ) - - def test_close(self): - self.conn.close(reply_text='foo', method_sig=spec.Channel.Open) - self.conn.send_method.assert_called_with( - spec.Connection.Close, 'BsBB', - (0, 'foo', spec.Channel.Open[0], spec.Channel.Open[1]), - wait=spec.Connection.CloseOk, - ) - - def test_close__already_closed(self): - self.conn.transport = None - self.conn.close() - - def test_on_close(self): - self.conn._x_close_ok = Mock(name='_x_close_ok') - with pytest.raises(NotFound): - self.conn._on_close(404, 'bah not found', 50, 60) - - def test_x_close_ok(self): - self.conn._x_close_ok() - self.conn.send_method.assert_called_with( - spec.Connection.CloseOk, callback=self.conn._on_close_ok, - ) - - def test_on_close_ok(self): - self.conn.collect = Mock(name='collect') - self.conn._on_close_ok() - self.conn.collect.assert_called_with() - - def test_on_blocked(self): - self.conn._on_blocked() - self.conn.on_blocked = Mock(name='on_blocked') - self.conn._on_blocked() - self.conn.on_blocked.assert_called_with( - 'connection blocked, see broker logs') - - def test_on_unblocked(self): - self.conn._on_unblocked() - self.conn.on_unblocked = Mock(name='on_unblocked') - self.conn._on_unblocked() - self.conn.on_unblocked.assert_called_with() - - def test_send_heartbeat(self): - self.conn.send_heartbeat() - self.conn.frame_writer.assert_called_with( - 8, 0, None, None, None, - ) - - def test_heartbeat_tick__no_heartbeat(self): - self.conn.heartbeat = 0 - self.conn.heartbeat_tick() - - def test_heartbeat_tick(self): - self.conn.heartbeat = 3 - self.conn.heartbeat_tick() - self.conn.bytes_sent = 3124 - self.conn.bytes_recv = 123 - self.conn.heartbeat_tick() - self.conn.last_heartbeat_received -= 1000 - self.conn.last_heartbeat_sent -= 1000 - with pytest.raises(ConnectionError): - self.conn.heartbeat_tick() - - def test_server_capabilities(self): - self.conn.server_properties['capabilities'] = {'foo': 1} - assert self.conn.server_capabilities == {'foo': 1} diff --git a/t/unit/test_exceptions.py b/t/unit/test_exceptions.py deleted file mode 100644 index ca16b9a..0000000 --- a/t/unit/test_exceptions.py +++ /dev/null @@ -1,19 +0,0 @@ -from case import Mock -from amqp.exceptions import AMQPError, error_for_code - - -class test_AMQPError: - - def test_str(self): - assert str(AMQPError()) - x = AMQPError(method_sig=(50, 60)) - assert str(x) - - -class test_error_for_code: - - def test_unknown_error(self): - default = Mock(name='default') - x = error_for_code(2134214314, 't', 'm', default) - default.assert_called_with('t', 'm', reply_code=2134214314) - assert x is default() diff --git a/t/unit/test_method_framing.py b/t/unit/test_method_framing.py deleted file mode 100644 index 3d6756b..0000000 --- a/t/unit/test_method_framing.py +++ /dev/null @@ -1,97 +0,0 @@ -import pytest -from case import Mock -from struct import pack -from amqp import spec -from amqp.basic_message import Message -from amqp.exceptions import UnexpectedFrame -from amqp.method_framing import frame_handler, frame_writer - - -class test_frame_handler: - - @pytest.fixture(autouse=True) - def setup_conn(self): - self.conn = Mock(name='connection') - self.conn.bytes_recv = 0 - self.callback = Mock(name='callback') - self.g = frame_handler(self.conn, self.callback) - - def test_header(self): - buf = pack('>HH', 60, 51) - self.g((1, 1, buf)) - self.callback.assert_called_with(1, (60, 51), buf, None) - assert self.conn.bytes_recv - - def test_header_message_empty_body(self): - self.g((1, 1, pack('>HH', *spec.Basic.Deliver))) - self.callback.assert_not_called() - - with pytest.raises(UnexpectedFrame): - self.g((1, 1, pack('>HH', *spec.Basic.Deliver))) - - m = Message() - m.properties = {} - buf = pack('>HxxQ', m.CLASS_ID, 0) - buf += m._serialize_properties() - self.g((2, 1, buf)) - - self.callback.assert_called() - msg = self.callback.call_args[0][3] - self.callback.assert_called_with( - 1, msg.frame_method, msg.frame_args, msg, - ) - - def test_header_message_content(self): - self.g((1, 1, pack('>HH', *spec.Basic.Deliver))) - self.callback.assert_not_called() - - m = Message() - m.properties = {} - buf = pack('>HxxQ', m.CLASS_ID, 16) - buf += m._serialize_properties() - self.g((2, 1, buf)) - self.callback.assert_not_called() - - self.g((3, 1, b'thequick')) - self.callback.assert_not_called() - - self.g((3, 1, b'brownfox')) - self.callback.assert_called() - msg = self.callback.call_args[0][3] - self.callback.assert_called_with( - 1, msg.frame_method, msg.frame_args, msg, - ) - assert msg.body == b'thequickbrownfox' - - def test_heartbeat_frame(self): - self.g((8, 1, '')) - assert self.conn.bytes_recv - - -class test_frame_writer: - - @pytest.fixture(autouse=True) - def setup_conn(self): - self.connection = Mock(name='connection') - self.transport = self.connection.Transport() - self.connection.frame_max = 512 - self.connection.bytes_sent = 0 - self.g = frame_writer(self.connection, self.transport) - self.write = self.transport.write - - def test_write_fast_header(self): - frame = 1, 1, spec.Queue.Declare, b'x' * 30, None - self.g(*frame) - self.write.assert_called() - - def test_write_fast_content(self): - msg = Message(body=b'y' * 10, content_type='utf-8') - frame = 2, 1, spec.Basic.Publish, b'x' * 10, msg - self.g(*frame) - self.write.assert_called() - - def test_write_slow_content(self): - msg = Message(body=b'y' * 2048, content_type='utf-8') - frame = 2, 1, spec.Basic.Publish, b'x' * 10, msg - self.g(*frame) - self.write.assert_called() diff --git a/t/unit/test_platform.py b/t/unit/test_platform.py deleted file mode 100644 index 0bbfc3a..0000000 --- a/t/unit/test_platform.py +++ /dev/null @@ -1,13 +0,0 @@ -import pytest -from amqp.platform import _linux_version_to_tuple - - -@pytest.mark.parametrize('s,expected', [ - ('3.13.0-46-generic', (3, 13, 0)), - ('3.19.43-1-amd64', (3, 19, 43)), - ('4.4.34+', (4, 4, 34)), - ('4.4.what', (4, 4, 0)), - ('4.what.what', (4, 0, 0)), -]) -def test_linux_version_to_tuple(s, expected): - assert _linux_version_to_tuple(s) == expected diff --git a/t/unit/test_serialization.py b/t/unit/test_serialization.py deleted file mode 100644 index 6d03bde..0000000 --- a/t/unit/test_serialization.py +++ /dev/null @@ -1,191 +0,0 @@ -import pytest -from datetime import datetime -from decimal import Decimal -from math import ceil -from struct import pack -from amqp.basic_message import Message -from amqp.exceptions import FrameSyntaxError -from amqp.serialization import GenericContent, _read_item, dumps, loads - - -class _ANY: - - def __eq__(self, other): - return other is not None - - def __ne__(self, other): - return other is None - - -class test_serialization: - - @pytest.mark.parametrize('descr,frame,expected,cast', [ - ('S', b's8thequick', 'thequick', None), - ('b', b'b' + pack('>B', True), True, None), - ('B', b'B' + pack('>b', 123), 123, None), - ('U', b'U' + pack('>h', -321), -321, None), - ('u', b'u' + pack('>H', 321), 321, None), - ('i', b'i' + pack('>I', 1234), 1234, None), - ('L', b'L' + pack('>q', -32451), -32451, None), - ('l', b'l' + pack('>Q', 32451), 32451, None), - ('f', b'f' + pack('>f', 33.3), 34.0, ceil), - ]) - def test_read_item(self, descr, frame, expected, cast): - actual = _read_item(frame)[0] - actual = cast(actual) if cast else actual - assert actual == expected - - def test_read_item_V(self): - assert _read_item(b'V')[0] is None - - def test_roundtrip(self): - format = b'bobBlLbsbST' - x = dumps(format, [ - True, 32, False, 3415, 4513134, 13241923419, - True, b'thequickbrownfox', False, 'jumpsoverthelazydog', - datetime(2015, 3, 13, 10, 23), - ]) - y = loads(format, x) - assert [ - True, 32, False, 3415, 4513134, 13241923419, - True, 'thequickbrownfox', False, 'jumpsoverthelazydog', - datetime(2015, 3, 13, 10, 23), - ] == y[0] - - def test_int_boundaries(self): - format = b'F' - x = dumps(format, [ - {'a': -2147483649, 'b': 2147483648}, # celery/celery#3121 - ]) - y = loads(format, x) - assert y[0] == [{ - 'a': -2147483649, 'b': 2147483648, # celery/celery#3121 - }] - - def test_loads_unknown_type(self): - with pytest.raises(FrameSyntaxError): - loads('x', 'asdsad') - - def test_float(self): - assert (int(loads(b'fb', dumps(b'fb', [32.31, False]))[0][0] * 100) == - 3231) - - def test_table(self): - table = {'foo': 32, 'bar': 'baz', 'nil': None} - assert loads(b'F', dumps(b'F', [table]))[0][0] == table - - def test_array(self): - array = [ - 'A', 1, True, 33.3, - Decimal('55.5'), Decimal('-3.4'), - datetime(2015, 3, 13, 10, 23), - {'quick': 'fox', 'amount': 1}, - [3, 'hens'], - None, - ] - expected = list(array) - expected[6] = _ANY() - - assert expected == loads('A', dumps('A', [array]))[0][0] - - def test_array_unknown_type(self): - with pytest.raises(FrameSyntaxError): - dumps('A', [[object()]]) - - -class test_GenericContent: - - @pytest.fixture(autouse=True) - def setup_content(self): - self.g = GenericContent() - - def test_getattr(self): - self.g.properties['foo'] = 30 - with pytest.raises(AttributeError): - self.g.__setstate__ - assert self.g.foo == 30 - with pytest.raises(AttributeError): - self.g.bar - - def test_load_properties(self): - m = Message() - m.properties = { - 'content_type': 'application/json', - 'content_encoding': 'utf-8', - 'application_headers': { - 'foo': 1, - 'id': 'id#1', - }, - 'delivery_mode': 1, - 'priority': 255, - 'correlation_id': 'df31-142f-34fd-g42d', - 'reply_to': 'cosmo', - 'expiration': '2015-12-23', - 'message_id': '3312', - 'timestamp': 3912491234, - 'type': 'generic', - 'user_id': 'george', - 'app_id': 'vandelay', - 'cluster_id': 'NYC', - } - s = m._serialize_properties() - m2 = Message() - m2._load_properties(m2.CLASS_ID, s) - assert m2.properties == m.properties - - def test_load_properties__some_missing(self): - m = Message() - m.properties = { - 'content_type': 'application/json', - 'content_encoding': 'utf-8', - 'delivery_mode': 1, - 'correlation_id': 'df31-142f-34fd-g42d', - 'reply_to': 'cosmo', - 'expiration': '2015-12-23', - 'message_id': '3312', - 'type': None, - 'app_id': None, - 'cluster_id': None, - } - s = m._serialize_properties() - m2 = Message() - m2._load_properties(m2.CLASS_ID, s) - - def test_inbound_header(self): - m = Message() - m.properties = { - 'content_type': 'application/json', - 'content_encoding': 'utf-8', - } - body = 'the quick brown fox' - buf = b'\0' * 30 + pack('>HxxQ', m.CLASS_ID, len(body)) - buf += m._serialize_properties() - assert m.inbound_header(buf, offset=30) == 42 - assert m.body_size == len(body) - assert m.properties['content_type'] == 'application/json' - assert not m.ready - - def test_inbound_header__empty_body(self): - m = Message() - m.properties = {} - buf = pack('>HxxQ', m.CLASS_ID, 0) - buf += m._serialize_properties() - assert m.inbound_header(buf, offset=0) == 12 - assert m.ready - - def test_inbound_body(self): - m = Message() - m.body_size = 16 - m.body_received = 8 - m._pending_chunks = [b'the', b'quick'] - m.inbound_body(b'brown') - assert not m.ready - m.inbound_body(b'fox') - assert m.ready - assert m.body == b'thequickbrownfox' - - def test_inbound_body__no_chunks(self): - m = Message() - m.body_size = 16 - m.inbound_body('thequickbrownfox') - assert m.ready diff --git a/t/unit/test_transport.py b/t/unit/test_transport.py deleted file mode 100644 index 86014ae..0000000 --- a/t/unit/test_transport.py +++ /dev/null @@ -1,295 +0,0 @@ -import errno -import socket -import pytest -from struct import pack -from case import Mock -from amqp import transport -from amqp.exceptions import UnexpectedFrame - - -class MockSocket: - options = {} - - def setsockopt(self, family, key, value): - if not isinstance(value, int): - raise socket.error() - self.options[key] = value - - def getsockopt(self, family, key): - return self.options.get(key, 0) - - -TCP_KEEPIDLE = 4 -TCP_KEEPINTVL = 5 -TCP_KEEPCNT = 6 - - -class test_socket_options: - - @pytest.fixture(autouse=True) - def setup_self(self, patching): - self.host = '127.0.0.1' - self.connect_timeout = 3 - self.socket = MockSocket() - try: - import fcntl - except ImportError: - fcntl = None - if fcntl is not None: - patching('fcntl.fcntl') - socket = patching('socket.socket') - socket().getsockopt = self.socket.getsockopt - socket().setsockopt = self.socket.setsockopt - - self.tcp_keepidle = 20 - self.tcp_keepintvl = 30 - self.tcp_keepcnt = 40 - self.socket.setsockopt( - socket.SOL_TCP, socket.TCP_NODELAY, 1, - ) - self.socket.setsockopt( - socket.SOL_TCP, TCP_KEEPIDLE, self.tcp_keepidle, - ) - self.socket.setsockopt( - socket.SOL_TCP, TCP_KEEPINTVL, self.tcp_keepintvl, - ) - self.socket.setsockopt( - socket.SOL_TCP, TCP_KEEPCNT, self.tcp_keepcnt, - ) - - patching('amqp.transport.TCPTransport._write') - patching('amqp.transport.TCPTransport._setup_transport') - patching('amqp.transport.SSLTransport._write') - patching('amqp.transport.SSLTransport._setup_transport') - - def test_backward_compatibility_tcp_transport(self): - self.transp = transport.Transport( - self.host, self.connect_timeout, ssl=False, - ) - self.transp.connect() - expected = 1 - result = self.socket.getsockopt(socket.SOL_TCP, socket.TCP_NODELAY) - assert result == expected - - def test_backward_compatibility_SSL_transport(self): - self.transp = transport.Transport( - self.host, self.connect_timeout, ssl=True, - ) - assert self.transp.sslopts is not None - self.transp.connect() - assert self.transp.sock is not None - - def test_use_default_sock_tcp_opts(self): - self.transp = transport.Transport( - self.host, self.connect_timeout, socket_settings={}, - ) - self.transp.connect() - assert (socket.TCP_NODELAY in - self.transp._get_tcp_socket_defaults(self.transp.sock)) - - def test_set_single_sock_tcp_opt_tcp_transport(self): - tcp_keepidle = self.tcp_keepidle + 5 - socket_settings = {TCP_KEEPIDLE: tcp_keepidle} - self.transp = transport.Transport( - self.host, self.connect_timeout, - ssl=False, socket_settings=socket_settings, - ) - self.transp.connect() - expected = tcp_keepidle - result = self.socket.getsockopt(socket.SOL_TCP, TCP_KEEPIDLE) - assert result == expected - - def test_set_single_sock_tcp_opt_SSL_transport(self): - self.tcp_keepidle += 5 - socket_settings = {TCP_KEEPIDLE: self.tcp_keepidle} - self.transp = transport.Transport( - self.host, self.connect_timeout, - ssl=True, socket_settings=socket_settings, - ) - self.transp.connect() - expected = self.tcp_keepidle - result = self.socket.getsockopt(socket.SOL_TCP, TCP_KEEPIDLE) - assert result == expected - - def test_values_are_set(self): - socket_settings = { - TCP_KEEPIDLE: 10, - TCP_KEEPINTVL: 4, - TCP_KEEPCNT: 2 - } - - self.transp = transport.Transport( - self.host, self.connect_timeout, - socket_settings=socket_settings, - ) - self.transp.connect() - expected = socket_settings - tcp_keepidle = self.socket.getsockopt(socket.SOL_TCP, TCP_KEEPIDLE) - tcp_keepintvl = self.socket.getsockopt(socket.SOL_TCP, TCP_KEEPINTVL) - tcp_keepcnt = self.socket.getsockopt(socket.SOL_TCP, TCP_KEEPCNT) - result = { - TCP_KEEPIDLE: tcp_keepidle, - TCP_KEEPINTVL: tcp_keepintvl, - TCP_KEEPCNT: tcp_keepcnt - } - assert result == expected - - def test_passing_wrong_options(self): - socket_settings = object() - self.transp = transport.Transport( - self.host, self.connect_timeout, - socket_settings=socket_settings, - ) - with pytest.raises(TypeError): - self.transp.connect() - - def test_passing_wrong_value_options(self): - socket_settings = {TCP_KEEPINTVL: 'a'.encode()} - self.transp = transport.Transport( - self.host, self.connect_timeout, - socket_settings=socket_settings, - ) - with pytest.raises(socket.error): - self.transp.connect() - - def test_passing_value_as_string(self): - socket_settings = {TCP_KEEPIDLE: '5'.encode()} - self.transp = transport.Transport( - self.host, self.connect_timeout, - socket_settings=socket_settings, - ) - with pytest.raises(socket.error): - self.transp.connect() - - def test_passing_tcp_nodelay(self): - socket_settings = {socket.TCP_NODELAY: 0} - self.transp = transport.Transport( - self.host, self.connect_timeout, - socket_settings=socket_settings, - ) - self.transp.connect() - expected = 0 - result = self.socket.getsockopt(socket.SOL_TCP, socket.TCP_NODELAY) - assert result == expected - - -class test_Transport: - - class Transport(transport.Transport): - - def _connect(self, *args): - ... - - def _init_socket(self, *args): - ... - - @pytest.fixture(autouse=True) - def setup_transport(self): - self.t = self.Transport('localhost:5672', 10) - self.t.connect() - - def test_port(self): - assert self.Transport('localhost').port == 5672 - assert self.Transport('localhost:5672').port == 5672 - assert self.Transport('[fe80::1]:5432').port == 5432 - - def test_read(self): - with pytest.raises(NotImplementedError): - self.t._read(1024) - - def test_setup_transport(self): - self.t._setup_transport() - - def test_shutdown_transport(self): - self.t._shutdown_transport() - - def test_write(self): - with pytest.raises(NotImplementedError): - self.t._write('foo') - - def test_close(self): - sock = self.t.sock = Mock() - self.t.close() - sock.shutdown.assert_called_with(socket.SHUT_RDWR) - sock.close.assert_called_with() - assert self.t.sock is None - self.t.close() - - def test_read_frame__timeout(self): - self.t._read = Mock() - self.t._read.side_effect = socket.timeout() - with pytest.raises(socket.timeout): - self.t.read_frame() - - def test_read_frame__SSLError(self): - self.t._read = Mock() - self.t._read.side_effect = transport.SSLError('timed out') - with pytest.raises(socket.timeout): - self.t.read_frame() - - def test_read_frame__EINTR(self): - self.t._read = Mock() - self.t.connected = True - exc = OSError() - exc.errno = errno.EINTR - self.t._read.side_effect = exc - with pytest.raises(OSError): - self.t.read_frame() - assert self.t.connected - - def test_read_frame__EBADF(self): - self.t._read = Mock() - self.t.connected = True - exc = OSError() - exc.errno = errno.EBADF - self.t._read.side_effect = exc - with pytest.raises(OSError): - self.t.read_frame() - assert not self.t.connected - - def test_read_frame__simple(self): - self.t._read = Mock() - checksum = [b'\xce'] - - def on_read2(size, *args): - return checksum[0] - - def on_read1(size, *args): - ret = self.t._read.return_value - self.t._read.return_value = b'thequickbrownfox' - self.t._read.side_effect = on_read2 - return ret - self.t._read.return_value = pack('>BHI', 1, 1, 16) - self.t._read.side_effect = on_read1 - - self.t.read_frame() - self.t._read.return_value = pack('>BHI', 1, 1, 16) - self.t._read.side_effect = on_read1 - checksum[0] = b'\x13' - with pytest.raises(UnexpectedFrame): - self.t.read_frame() - - def test_write__success(self): - self.t._write = Mock() - self.t.write('foo') - self.t._write.assert_called_with('foo') - - def test_write__socket_timeout(self): - self.t._write = Mock() - self.t._write.side_effect = socket.timeout - with pytest.raises(socket.timeout): - self.t.write('foo') - - def test_write__EINTR(self): - self.t.connected = True - self.t._write = Mock() - exc = OSError() - exc.errno = errno.EINTR - self.t._write.side_effect = exc - with pytest.raises(OSError): - self.t.write('foo') - assert self.t.connected - exc.errno = errno.EBADF - with pytest.raises(OSError): - self.t.write('foo') - assert not self.t.connected diff --git a/t/unit/test_utils.py b/t/unit/test_utils.py deleted file mode 100644 index 418e4df..0000000 --- a/t/unit/test_utils.py +++ /dev/null @@ -1,57 +0,0 @@ -from case import Mock, patch -from amqp.utils import get_errno, want_bytes, want_str, get_logger - - -class test_get_errno: - - def test_has_attr(self): - exc = KeyError('foo') - exc.errno = 23 - assert get_errno(exc) == 23 - - def test_in_args(self): - exc = KeyError(34, 'foo') - exc.args = (34, 'foo') - assert get_errno(exc) == 34 - - def test_args_short(self): - exc = KeyError(34) - assert not get_errno(exc) - - def test_no_args(self): - assert not get_errno(object()) - - -class test_want_bytes: - - def test_from_unicode(self): - assert isinstance(want_bytes('foo'), bytes) - - def test_from_bytes(self): - assert isinstance(want_bytes(b'foo'), bytes) - - -class test_want_str: - - def test_from_unicode(self): - assert isinstance(want_str('foo'), str) - - def test_from_bytes(self): - assert want_str(b'foo') - - -class test_get_logger: - - @patch('logging.getLogger') - def test_as_str(self, getLogger): - x = get_logger('foo.bar') - getLogger.assert_called_with('foo.bar') - assert x is getLogger() - - @patch('logging.NullHandler') - def test_as_logger(self, _NullHandler): - m = Mock(name='logger') - m.handlers = None - x = get_logger(m) - assert x is m - x.addHandler.assert_called_with(_NullHandler()) |