summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2017-02-10 12:52:33 -0800
committerAsk Solem <ask@celeryproject.org>2017-02-10 12:52:33 -0800
commit932bb6caff01d15ec9d02a376e90647ec39293e9 (patch)
treefb85bf3c12c4d087f57c03d36d5346c6e31fff94
parentaed8e8431bb61f0937447b5c8ef497ff931d8693 (diff)
downloadpy-amqp-3.0-devel.tar.gz
Use keyword-only arguments3.0-devel
-rw-r--r--amqp/__init__.py2
-rw-r--r--amqp/abstract_channel.py2
-rw-r--r--amqp/channel.py44
-rw-r--r--amqp/protocol.py4
-rw-r--r--amqp/spec.py4
-rw-r--r--amqp/transport.py4
-rw-r--r--amqp/types.py31
-rw-r--r--requirements/test-ci.txt2
-rw-r--r--t/unit/__init__.py0
-rw-r--r--t/unit/test_abstract_channel.py118
-rw-r--r--t/unit/test_basic_message.py16
-rw-r--r--t/unit/test_channel.py403
-rw-r--r--t/unit/test_connection.py310
-rw-r--r--t/unit/test_exceptions.py19
-rw-r--r--t/unit/test_method_framing.py97
-rw-r--r--t/unit/test_platform.py13
-rw-r--r--t/unit/test_serialization.py191
-rw-r--r--t/unit/test_transport.py295
-rw-r--r--t/unit/test_utils.py57
19 files changed, 56 insertions, 1556 deletions
diff --git a/amqp/__init__.py b/amqp/__init__.py
index a9a3c8a..1a326c6 100644
--- a/amqp/__init__.py
+++ b/amqp/__init__.py
@@ -99,8 +99,6 @@ __all__ = [
class version_info_t(NamedTuple):
- """Version description tuple."""
-
major: int
minor: int
micro: int
diff --git a/amqp/abstract_channel.py b/amqp/abstract_channel.py
index 169410d..e2d6824 100644
--- a/amqp/abstract_channel.py
+++ b/amqp/abstract_channel.py
@@ -26,7 +26,7 @@ from .utils import AsyncToggle, toggle_blocking
__all__ = ['ChannelBase']
-class ChannelBase(AsyncToggle, AbstractChannelT):
+class ChannelBase(AbstractChannelT, AsyncToggle):
"""Superclass for Connection and Channel.
The connection is treated as channel 0, then comes
diff --git a/amqp/channel.py b/amqp/channel.py
index f554a30..965e867 100644
--- a/amqp/channel.py
+++ b/amqp/channel.py
@@ -163,9 +163,8 @@ class Channel(ChannelBase, ChannelT):
# set first time basic_publish_confirm is called
# and publisher confirms are enabled for this channel.
+ self._confirm_publish = self.connection.confirm_publish
self._confirm_selected = False
- if self.connection.confirm_publish:
- self.basic_publish = self.basic_publish_confirm
async def __aenter__(self) -> 'Channel':
await self.open()
@@ -215,6 +214,7 @@ class Channel(ChannelBase, ChannelT):
@toggle_blocking
async def close(self,
+ *,
reply_code: int = 0,
reply_text: str = '',
method_sig: method_sig_t = method_sig_t(0, 0),
@@ -544,6 +544,7 @@ class Channel(ChannelBase, ChannelT):
@toggle_blocking
async def exchange_declare(
self, exchange: str, type: str,
+ *,
passive: bool = False,
durable: bool = False,
auto_delete: bool = True,
@@ -684,6 +685,7 @@ class Channel(ChannelBase, ChannelT):
async def exchange_delete(
self,
exchange: str,
+ *,
if_unused: bool = False,
nowait: bool = False,
argsig: str = 'Bsbb') -> None:
@@ -738,6 +740,7 @@ class Channel(ChannelBase, ChannelT):
self, destination: str,
source: str = '',
routing_key: str = '',
+ *,
nowait: bool = False,
arguments: Mapping[str, Any] = None,
argsig: str = 'BsssbF') -> None:
@@ -822,6 +825,7 @@ class Channel(ChannelBase, ChannelT):
self, destination: str,
source: str = '',
routing_key: str = '',
+ *,
nowait: bool = False,
arguments: Mapping[str, Any] = None,
argsig: str = 'BsssbF') -> None:
@@ -911,6 +915,7 @@ class Channel(ChannelBase, ChannelT):
self, queue: str,
exchange: str = '',
routing_key: str = '',
+ *,
nowait: bool = False,
arguments: Mapping[str, Any] = None,
argsig: str = 'BsssbF') -> None:
@@ -1021,6 +1026,7 @@ class Channel(ChannelBase, ChannelT):
async def queue_unbind(
self, queue: str, exchange: str,
routing_key: str = '',
+ *,
nowait: bool = False,
arguments: Mapping[str, Any] = None,
argsig: str = 'BsssF') -> None:
@@ -1083,6 +1089,7 @@ class Channel(ChannelBase, ChannelT):
async def queue_declare(
self,
queue: str = '',
+ *,
passive: bool = False,
durable: bool = False,
exclusive: bool = False,
@@ -1257,6 +1264,7 @@ class Channel(ChannelBase, ChannelT):
async def queue_delete(
self,
queue: str = '',
+ *,
if_unused: bool = False,
if_empty: bool = False,
nowait: bool = False,
@@ -1335,6 +1343,7 @@ class Channel(ChannelBase, ChannelT):
async def queue_purge(
self,
queue: str = '',
+ *,
nowait: bool = False,
argsig: str = 'Bsb') -> Optional[int]:
"""Purge a queue.
@@ -1459,6 +1468,7 @@ class Channel(ChannelBase, ChannelT):
@toggle_blocking
async def basic_ack(
self, delivery_tag: str,
+ *,
multiple: bool = False,
argsig: str = 'Lb') -> None:
"""Acknowledge one or more messages.
@@ -1513,6 +1523,7 @@ class Channel(ChannelBase, ChannelT):
@toggle_blocking
async def basic_cancel(
self, consumer_tag: str,
+ *,
nowait: bool = False,
argsig: str = 'sb') -> None:
"""End a queue consumer.
@@ -1586,6 +1597,7 @@ class Channel(ChannelBase, ChannelT):
self,
queue: str = '',
consumer_tag: str = '',
+ *,
no_local: bool = False,
no_ack: bool = False,
exclusive: bool = False,
@@ -1740,6 +1752,7 @@ class Channel(ChannelBase, ChannelT):
async def basic_get(
self,
queue: str = '',
+ *,
no_ack: bool = False,
argsig: str = 'Bsb') -> Optional[MessageT]:
"""Direct access to a queue.
@@ -1804,10 +1817,11 @@ class Channel(ChannelBase, ChannelT):
return msg
@toggle_blocking
- async def _basic_publish(
+ async def basic_publish(
self, msg: MessageT,
exchange: str = '',
routing_key: str = '',
+ *,
mandatory: bool = False,
immediate: bool = False,
timeout: float = None,
@@ -1879,6 +1893,9 @@ class Channel(ChannelBase, ChannelT):
if not self.connection:
raise RecoverableConnectionError(
'basic_publish: connection closed')
+ if self._confirm_publish and not self._confirm_selected:
+ self._confirm_selected = True
+ await self.confirm_select()
try:
await self.send_method(
spec.Basic.Publish, argsig,
@@ -1887,15 +1904,8 @@ class Channel(ChannelBase, ChannelT):
)
except socket.timeout:
raise RecoverableChannelError('basic_publish: timed out')
- basic_publish = _basic_publish
-
- @toggle_blocking
- async def basic_publish_confirm(self, *args, **kwargs) -> None:
- if not self._confirm_selected:
- self._confirm_selected = True
- await self.confirm_select()
- await self._basic_publish(*args, **kwargs)
- await self.wait(spec.Basic.Ack)
+ if self._confirm_publish:
+ await self.wait(spec.Basic.Ack)
@toggle_blocking
async def basic_qos(
@@ -1972,7 +1982,7 @@ class Channel(ChannelBase, ChannelT):
)
@toggle_blocking
- async def basic_recover(self, requeue: bool = False) -> None:
+ async def basic_recover(self, *, requeue: bool = False) -> None:
"""Redeliver unacknowledged messages.
This method asks the broker to redeliver all unacknowledged
@@ -2004,12 +2014,14 @@ class Channel(ChannelBase, ChannelT):
await self.send_method(spec.Basic.Recover, 'b', (requeue,))
@toggle_blocking
- async def basic_recover_async(self, requeue: bool = False) -> None:
+ async def basic_recover_async(self, *, requeue: bool = False) -> None:
await self.send_method(
spec.Basic.RecoverAsync, 'b', (requeue,))
@toggle_blocking
- async def basic_reject(self, delivery_tag: str, requeue: bool,
+ async def basic_reject(self, delivery_tag: str,
+ *,
+ requeue: bool = False,
argsig: str = 'Lb') -> None:
"""Reject an incoming message.
@@ -2187,7 +2199,7 @@ class Channel(ChannelBase, ChannelT):
await self.send_method(spec.Tx.Select, wait=spec.Tx.SelectOk)
@toggle_blocking
- async def confirm_select(self, nowait: bool = False) -> None:
+ async def confirm_select(self, *, nowait: bool = False) -> None:
"""Enable publisher confirms for this channel.
Note: This is an RabbitMQ extension.
diff --git a/amqp/protocol.py b/amqp/protocol.py
index ad26138..ac0575b 100644
--- a/amqp/protocol.py
+++ b/amqp/protocol.py
@@ -3,16 +3,12 @@ from typing import NamedTuple
class queue_declare_ok_t(NamedTuple):
- """Tuple returned by Queue.Declare."""
-
queue: str
message_count: int
consumer_count: int
class basic_return_t(NamedTuple):
- """Tuple provided by Basic.Return."""
-
reply_code: int
reply_text: str
exchange: str
diff --git a/amqp/spec.py b/amqp/spec.py
index 9fb20bd..d1deeff 100644
--- a/amqp/spec.py
+++ b/amqp/spec.py
@@ -3,15 +3,11 @@ from typing import Mapping, NamedTuple
class method_sig_t(NamedTuple):
- """AMQP Method signature tuple."""
-
major: int
minor: int
class method_t(NamedTuple):
- """AMQP Method invocation tuple."""
-
method_sig: method_sig_t
args: str
content: bool
diff --git a/amqp/transport.py b/amqp/transport.py
index 037d66c..59876a0 100644
--- a/amqp/transport.py
+++ b/amqp/transport.py
@@ -24,7 +24,7 @@ from struct import pack, unpack
from typing import Any, Callable, Dict, Mapping, MutableMapping, Set, Tuple
from .exceptions import UnexpectedFrame
from .platform import SOL_TCP, TCP_USER_TIMEOUT, HAS_TCP_USER_TIMEOUT
-from .types import Frame
+from .types import Frame, TransportT
from .utils import AsyncToggle, set_cloexec, toggle_blocking
AMQP_PORT = 5672
@@ -86,7 +86,7 @@ def to_host_port(host: str, default: int = AMQP_PORT) -> Tuple[str, int]:
return host, port
-class Transport(AsyncToggle):
+class Transport(TransportT, AsyncToggle):
"""Network transport."""
def __init__(self, host: str,
diff --git a/amqp/types.py b/amqp/types.py
index d73bb52..219e21f 100644
--- a/amqp/types.py
+++ b/amqp/types.py
@@ -18,8 +18,6 @@ Int = TypeVar('Int', SupportsInt, str)
class Frame(NamedTuple):
- """Frame tuple."""
-
type: int
channel: int
data: bytes
@@ -137,6 +135,7 @@ class MessageT(ContentT, metaclass=abc.ABCMeta):
@abc.abstractmethod
def __init__(self,
body: bytes=b'',
+ *,
children: Any = None,
channel: 'ChannelT' = None) -> None:
...
@@ -185,6 +184,7 @@ class AbstractChannelT(metaclass=abc.ABCMeta):
self, sig: method_sig_t,
format: str = None,
args: Sequence = None,
+ *,
content: MessageT = None,
wait: WaitMethodT = None,
callback: Callable = None,
@@ -194,6 +194,7 @@ class AbstractChannelT(metaclass=abc.ABCMeta):
@abc.abstractmethod
async def close(
self,
+ *,
reply_code: int = 0,
reply_text: str = '',
method_sig: method_sig_t = method_sig_t(0, 0),
@@ -208,6 +209,7 @@ class AbstractChannelT(metaclass=abc.ABCMeta):
async def wait(
self,
method: WaitMethodT,
+ *,
callback: Callable = None,
timeout: float = None,
returns_tuple: bool = False) -> Any:
@@ -277,6 +279,7 @@ class ConnectionT(AbstractChannelT):
host: str = 'localhost:5672',
userid: str = 'guest',
password: str = 'guest',
+ *,
login_method: str = 'AMQPLAIN',
login_response: Any = None,
virtual_host: str = '/',
@@ -374,6 +377,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta):
@abc.abstractmethod
async def exchange_declare(
self, exchange: str, type: str,
+ *,
passive: bool = False,
durable: bool = False,
auto_delete: bool = True,
@@ -385,6 +389,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta):
@abc.abstractmethod
async def exchange_delete(
self, exchange: str,
+ *,
if_unused: bool = False,
nowait: bool = False,
argsig: str = 'Bsbb') -> None:
@@ -395,6 +400,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta):
self, destination: str,
source: str = '',
routing_key: str = '',
+ *,
nowait: bool = False,
arguments: Mapping[str, Any] = None,
argsig: str = 'BsssbF') -> None:
@@ -405,6 +411,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta):
self, destination: str,
source: str = '',
routing_key: str = '',
+ *,
nowait: bool = False,
arguments: Mapping[str, Any] = None,
argsig: str = 'BsssbF') -> None:
@@ -415,6 +422,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta):
self, queue: str,
exchange: str = '',
routing_key: str = '',
+ *,
nowait: bool = False,
arguments: Mapping[str, Any] = None,
argsig: str = 'BsssbF') -> None:
@@ -424,6 +432,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta):
async def queue_unbind(
self, queue: str, exchange: str,
routing_key: str = '',
+ *,
nowait: bool = False,
arguments: Mapping[str, Any] = None,
argsig: str = 'BsssF') -> None:
@@ -433,6 +442,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta):
async def queue_declare(
self,
queue: str = '',
+ *,
passive: bool = False,
durable: bool = False,
exclusive: bool = False,
@@ -446,6 +456,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta):
async def queue_delete(
self,
queue: str = '',
+ *,
if_unused: bool = False,
if_empty: bool = False,
nowait: bool = False,
@@ -456,6 +467,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta):
async def queue_purge(
self,
queue: str = '',
+ *,
nowait: bool = False,
argsig: str = 'Bsb') -> Optional[int]:
...
@@ -463,6 +475,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta):
@abc.abstractmethod
async def basic_ack(
self, delivery_tag: str,
+ *,
multiple: bool = False,
argsig: str = 'Lb') -> None:
...
@@ -470,6 +483,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta):
@abc.abstractmethod
async def basic_cancel(
self, consumer_tag: str,
+ *,
nowait: bool = False,
argsig: str = 'sb') -> None:
...
@@ -479,6 +493,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta):
self,
queue: str = '',
consumer_tag: str = '',
+ *,
no_local: bool = False,
no_ack: bool = False,
exclusive: bool = False,
@@ -493,6 +508,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta):
async def basic_get(
self,
queue: str = '',
+ *,
no_ack: bool = False,
argsig: str = 'Bsb') -> Optional[MessageT]:
...
@@ -502,6 +518,7 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta):
self, msg: MessageT,
exchange: str = '',
routing_key: str = '',
+ *,
mandatory: bool = False,
immediate: bool = False,
timeout: float = None,
@@ -518,15 +535,17 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta):
...
@abc.abstractmethod
- async def basic_recover(self, requeue: bool = False) -> None:
+ async def basic_recover(self, *, requeue: bool = False) -> None:
...
@abc.abstractmethod
- async def basic_recover_async(self, requeue: bool = False) -> None:
+ async def basic_recover_async(self, *, requeue: bool = False) -> None:
...
@abc.abstractmethod
- async def basic_reject(self, delivery_tag: str, requeue: bool,
+ async def basic_reject(self, delivery_tag: str,
+ *,
+ requeue: bool = False,
argsig: str = 'Lb') -> None:
...
@@ -543,5 +562,5 @@ class ChannelT(AbstractChannelT, metaclass=abc.ABCMeta):
...
@abc.abstractmethod
- async def confirm_select(self, nowait: bool = False) -> None:
+ async def confirm_select(self, *, nowait: bool = False) -> None:
...
diff --git a/requirements/test-ci.txt b/requirements/test-ci.txt
index 5f867d4..f0aa93a 100644
--- a/requirements/test-ci.txt
+++ b/requirements/test-ci.txt
@@ -1,3 +1 @@
-pytest-cov
-codecov
mypy
diff --git a/t/unit/__init__.py b/t/unit/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/t/unit/__init__.py
+++ /dev/null
diff --git a/t/unit/test_abstract_channel.py b/t/unit/test_abstract_channel.py
deleted file mode 100644
index 5c009aa..0000000
--- a/t/unit/test_abstract_channel.py
+++ /dev/null
@@ -1,118 +0,0 @@
-import pytest
-from case import Mock, patch
-from vine import promise
-from amqp.abstract_channel import ChannelBase
-from amqp.exceptions import AMQPNotImplementedError, RecoverableConnectionError
-from amqp.serialization import dumps
-
-
-class test_ChannelBase:
-
- class Channel(ChannelBase):
-
- def _setup_listeners(self):
- ...
-
- @pytest.fixture(autouse=True)
- def setup_conn(self):
- self.conn = Mock(name='connection')
- self.conn.channels = {}
- self.channel_id = 1
- self.c = self.Channel(self.conn, self.channel_id)
- self.method = Mock(name='method')
- self.content = Mock(name='content')
- self.content.content_encoding = 'utf-8'
- self.c._METHODS = {(50, 61): self.method}
-
- def test_enter_exit(self):
- self.c.close = Mock(name='close')
- with self.c:
- ...
- self.c.close.assert_called_with()
-
- def test_send_method(self):
- self.c.send_method((50, 60), 'iB', (30, 0))
- self.conn.frame_writer.assert_called_with(
- 1, self.channel_id, (50, 60), dumps('iB', (30, 0)), None,
- )
-
- def test_send_method__callback(self):
- callback = Mock(name='callback')
- p = promise(callback)
- self.c.send_method((50, 60), 'iB', (30, 0), callback=p)
- callback.assert_called_with()
-
- def test_send_method__wait(self):
- self.c.wait = Mock(name='wait')
- self.c.send_method((50, 60), 'iB', (30, 0), wait=(50, 61))
- self.c.wait.assert_called_with((50, 61), returns_tuple=False)
-
- def test_send_method__no_connection(self):
- self.c.connection = None
- with pytest.raises(RecoverableConnectionError):
- self.c.send_method((50, 60))
-
- def test_close(self):
- with pytest.raises(NotImplementedError):
- self.c.close()
-
- @patch('amqp.abstract_channel.ensure_promise')
- def test_wait(self, ensure_promise):
- p = ensure_promise.return_value
- p.ready = False
-
- def on_drain(*args, **kwargs):
- p.ready = True
- self.conn.drain_events.side_effect = on_drain
-
- p.value = (1,), {'arg': 2}
- self.c.wait((50, 61), timeout=1)
- self.conn.drain_events.assert_called_with(timeout=1)
-
- prev = self.c._pending[(50, 61)] = Mock(name='p2')
- p.value = None
- self.c.wait([(50, 61)])
- assert self.c._pending[(50, 61)] is prev
-
- def test_dispatch_method__content_encoding(self):
- self.c.auto_decode = True
- self.method.args = None
- self.c.dispatch_method((50, 61), 'payload', self.content)
- self.content.body.decode.side_effect = KeyError()
- self.c.dispatch_method((50, 61), 'payload', self.content)
-
- def test_dispatch_method__unknown_method(self):
- with pytest.raises(AMQPNotImplementedError):
- self.c.dispatch_method((100, 131), 'payload', self.content)
-
- def test_dispatch_method__one_shot(self):
- self.method.args = None
- p = self.c._pending[(50, 61)] = Mock(name='oneshot')
- self.c.dispatch_method((50, 61), 'payload', self.content)
- p.assert_called_with(self.content)
-
- def test_dispatch_method__one_shot_no_content(self):
- self.method.args = None
- self.method.content = None
- p = self.c._pending[(50, 61)] = Mock(name='oneshot')
- self.c.dispatch_method((50, 61), 'payload', self.content)
- p.assert_called_with()
- assert not self.c._pending
-
- @patch('amqp.abstract_channel.loads')
- def test_dispatch_method__listeners(self, loads):
- loads.return_value = [1, 2, 3], 'foo'
- p = self.c._callbacks[(50, 61)] = Mock(name='p')
- self.c.dispatch_method((50, 61), 'payload', self.content)
- p.assert_called_with(1, 2, 3, self.content)
-
- @patch('amqp.abstract_channel.loads')
- def test_dispatch_method__listeners_and_one_shot(self, loads):
- loads.return_value = [1, 2, 3], 'foo'
- p1 = self.c._callbacks[(50, 61)] = Mock(name='p')
- p2 = self.c._pending[(50, 61)] = Mock(name='oneshot')
- self.c.dispatch_method((50, 61), 'payload', self.content)
- p1.assert_called_with(1, 2, 3, self.content)
- p2.assert_called_with(1, 2, 3, self.content)
- assert not self.c._pending
- assert self.c._callbacks[(50, 61)]
diff --git a/t/unit/test_basic_message.py b/t/unit/test_basic_message.py
deleted file mode 100644
index 013c9cc..0000000
--- a/t/unit/test_basic_message.py
+++ /dev/null
@@ -1,16 +0,0 @@
-from case import Mock
-from amqp.basic_message import Message
-
-
-class test_Message:
-
- def test_message(self):
- m = Message(
- 'foo',
- channel=Mock(name='channel'),
- application_headers={'h': 'v'},
- )
- m.delivery_info = {'delivery_tag': '1234'},
- assert m.body == 'foo'
- assert m.channel
- assert m.headers == {'h': 'v'}
diff --git a/t/unit/test_channel.py b/t/unit/test_channel.py
deleted file mode 100644
index 34d0dd9..0000000
--- a/t/unit/test_channel.py
+++ /dev/null
@@ -1,403 +0,0 @@
-import pytest
-from case import ContextMock, Mock, patch
-from amqp import spec
-from amqp.channel import Channel
-from amqp.exceptions import ConsumerCancelled, NotFound
-
-
-class test_Channel:
-
- @pytest.fixture(autouse=True)
- def setup_conn(self):
- self.conn = Mock(name='connection')
- self.conn.channels = {}
- self.conn._get_free_channel_id.return_value = 2
- self.c = Channel(self.conn, 1)
- self.c.send_method = Mock(name='send_method')
-
- def test_init_confirm_enabled(self):
- self.conn.confirm_publish = True
- c = Channel(self.conn, 2)
- assert c.basic_publish == c.basic_publish_confirm
-
- def test_init_confirm_disabled(self):
- self.conn.confirm_publish = False
- c = Channel(self.conn, 2)
- assert c.basic_publish == c._basic_publish
-
- def test_init_auto_channel(self):
- c = Channel(self.conn, None)
- self.conn._get_free_channel_id.assert_called_with()
- assert c.channel_id is self.conn._get_free_channel_id()
-
- def test_init_explicit_channel(self):
- Channel(self.conn, 3)
- self.conn._claim_channel_id.assert_called_with(3)
-
- def test_then(self):
- self.c.on_open = Mock(name='on_open')
- on_success = Mock(name='on_success')
- on_error = Mock(name='on_error')
- self.c.then(on_success, on_error)
- self.c.on_open.then.assert_called_with(on_success, on_error)
-
- def test_collect(self):
- self.c.callbacks[(50, 61)] = Mock()
- self.c.cancel_callbacks['foo'] = Mock()
- self.c.events['bar'].add(Mock())
- self.c.no_ack_consumers.add('foo')
- self.c.collect()
- assert not self.c.callbacks
- assert not self.c.cancel_callbacks
- assert not self.c.events
- assert not self.c.no_ack_consumers
- assert not self.c.is_open
- self.c.collect()
-
- def test_do_revive(self):
- self.c.open = Mock(name='open')
- self.c.is_open = True
- self.c._do_revive()
- assert not self.c.is_open
- self.c.open.assert_called_with()
-
- def test_close__not_open(self):
- self.c.is_open = False
- self.c.close()
-
- def test_close__no_connection(self):
- self.c.connection = None
- self.c.close()
-
- def test_close(self):
- self.c.is_open = True
- self.c.close(30, 'text', spec.Queue.Declare)
- self.c.send_method.assert_called_with(
- spec.Channel.Close, 'BsBB',
- (30, 'text', spec.Queue.Declare[0], spec.Queue.Declare[1]),
- wait=spec.Channel.CloseOk,
- )
- assert self.c.connection is None
-
- def test_on_close(self):
- self.c._do_revive = Mock(name='_do_revive')
- with pytest.raises(NotFound):
- self.c._on_close(404, 'text', 50, 61)
- self.c.send_method.assert_called_with(spec.Channel.CloseOk)
- self.c._do_revive.assert_called_with()
-
- def test_on_close_ok(self):
- self.c.collect = Mock(name='collect')
- self.c._on_close_ok()
- self.c.collect.assert_called_with()
-
- def test_flow(self):
- self.c.flow(0)
- self.c.send_method.assert_called_with(
- spec.Channel.Flow, 'b', (0,), wait=spec.Channel.FlowOk,
- )
-
- def test_on_flow(self):
- self.c._x_flow_ok = Mock(name='_x_flow_ok')
- self.c._on_flow(0)
- assert not self.c.active
- self.c._x_flow_ok.assert_called_with(0)
-
- def test_x_flow_ok(self):
- self.c._x_flow_ok(1)
- self.c.send_method.assert_called_with(spec.Channel.FlowOk, 'b', (1,))
-
- def test_open(self):
- self.c.is_open = True
- self.c.open()
- self.c.is_open = False
- self.c.open()
- self.c.send_method.assert_called_with(
- spec.Channel.Open, 's', ('',), wait=spec.Channel.OpenOk,
- )
-
- def test_on_open_ok(self):
- self.c.on_open = Mock(name='on_open')
- self.c.is_open = False
- self.c._on_open_ok()
- assert self.c.is_open
- self.c.on_open.assert_called_with(self.c)
-
- def test_exchange_declare(self):
- self.c.exchange_declare(
- 'foo', 'direct', False, True,
- auto_delete=False, nowait=False, arguments={'x': 1},
- )
- self.c.send_method.assert_called_with(
- spec.Exchange.Declare, 'BssbbbbbF',
- (0, 'foo', 'direct', False, True, False,
- False, False, {'x': 1}),
- wait=spec.Exchange.DeclareOk,
- )
-
- @patch('amqp.channel.warn')
- def test_exchange_declare__auto_delete(self, warn):
- self.c.exchange_declare(
- 'foo', 'direct', False, True,
- auto_delete=True, nowait=False, arguments={'x': 1},
- )
- warn.assert_called()
-
- def test_exchange_delete(self):
- self.c.exchange_delete('foo')
- self.c.send_method.assert_called_with(
- spec.Exchange.Delete, 'Bsbb',
- (0, 'foo', False, False),
- wait=spec.Exchange.DeleteOk,
- )
-
- def test_exchange_bind(self):
- self.c.exchange_bind('dest', 'source', 'rkey', arguments={'x': 1})
- self.c.send_method.assert_called_with(
- spec.Exchange.Bind, 'BsssbF',
- (0, 'dest', 'source', 'rkey', False, {'x': 1}),
- wait=spec.Exchange.BindOk,
- )
-
- def test_exchange_unbind(self):
- self.c.exchange_unbind('dest', 'source', 'rkey', arguments={'x': 1})
- self.c.send_method.assert_called_with(
- spec.Exchange.Unbind, 'BsssbF',
- (0, 'dest', 'source', 'rkey', False, {'x': 1}),
- wait=spec.Exchange.UnbindOk,
- )
-
- def test_queue_bind(self):
- self.c.queue_bind('q', 'ex', 'rkey', arguments={'x': 1})
- self.c.send_method.assert_called_with(
- spec.Queue.Bind, 'BsssbF',
- (0, 'q', 'ex', 'rkey', False, {'x': 1}),
- wait=spec.Queue.BindOk,
- )
-
- def test_queue_unbind(self):
- self.c.queue_unbind('q', 'ex', 'rkey', arguments={'x': 1})
- self.c.send_method.assert_called_with(
- spec.Queue.Unbind, 'BsssF',
- (0, 'q', 'ex', 'rkey', {'x': 1}),
- wait=spec.Queue.UnbindOk,
- )
-
- def test_queue_declare(self):
- self.c.queue_declare('q', False, True, False, False, True, {'x': 1})
- self.c.send_method.assert_called_with(
- spec.Queue.Declare, 'BsbbbbbF',
- (0, 'q', False, True, False, False, True, {'x': 1}),
- )
-
- def test_queue_declare__sync(self):
- self.c.wait = Mock(name='wait')
- self.c.wait.return_value = ('name', 123, 45)
- ret = self.c.queue_declare(
- 'q', False, True, False, False, False, {'x': 1},
- )
- self.c.send_method.assert_called_with(
- spec.Queue.Declare, 'BsbbbbbF',
- (0, 'q', False, True, False, False, False, {'x': 1}),
- )
- assert ret.queue == 'name'
- assert ret.message_count == 123
- assert ret.consumer_count == 45
- self.c.wait.assert_called_with(
- spec.Queue.DeclareOk, returns_tuple=True)
-
- def test_queue_delete(self):
- self.c.queue_delete('q')
- self.c.send_method.assert_called_with(
- spec.Queue.Delete, 'Bsbbb',
- (0, 'q', False, False, False),
- wait=spec.Queue.DeleteOk,
- )
-
- def test_queue_purge(self):
- self.c.queue_purge('q')
- self.c.send_method.assert_called_with(
- spec.Queue.Purge, 'Bsb', (0, 'q', False),
- wait=spec.Queue.PurgeOk,
- )
-
- def test_basic_ack(self):
- self.c.basic_ack(123, multiple=1)
- self.c.send_method.assert_called_with(
- spec.Basic.Ack, 'Lb', (123, 1),
- )
-
- def test_basic_cancel(self):
- self.c.basic_cancel(123)
- self.c.send_method.assert_called_with(
- spec.Basic.Cancel, 'sb', (123, False),
- wait=spec.Basic.CancelOk,
- )
- self.c.connection = None
- self.c.basic_cancel(123)
-
- def test_on_basic_cancel(self):
- self.c._remove_tag = Mock(name='_remove_tag')
- self.c._on_basic_cancel(123)
- self.c._remove_tag.return_value.assert_called_with(123)
- self.c._remove_tag.return_value = None
- with pytest.raises(ConsumerCancelled):
- self.c._on_basic_cancel(123)
-
- def test_on_basic_cancel_ok(self):
- self.c._remove_tag = Mock(name='remove_tag')
- self.c._on_basic_cancel_ok(123)
- self.c._remove_tag.assert_called_with(123)
-
- def test_remove_tag(self):
- self.c.callbacks[123] = Mock()
- p = self.c.cancel_callbacks[123] = Mock()
- assert self.c._remove_tag(123) is p
- assert 123 not in self.c.callbacks
- assert 123 not in self.c.cancel_callbacks
-
- def test_basic_consume(self):
- callback = Mock()
- on_cancel = Mock()
- self.c.basic_consume(
- 'q', 123, arguments={'x': 1},
- callback=callback,
- on_cancel=on_cancel,
- )
- self.c.send_method.assert_called_with(
- spec.Basic.Consume, 'BssbbbbF',
- (0, 'q', 123, False, False, False, False, {'x': 1}),
- wait=spec.Basic.ConsumeOk,
- )
- assert self.c.callbacks[123] is callback
- assert self.c.cancel_callbacks[123] is on_cancel
-
- def test_basic_consume__no_ack(self):
- self.c.basic_consume(
- 'q', 123, arguments={'x': 1}, no_ack=True,
- )
- assert 123 in self.c.no_ack_consumers
-
- def test_on_basic_deliver(self):
- msg = Mock()
- self.c._on_basic_deliver(123, '321', False, 'ex', 'rkey', msg)
- callback = self.c.callbacks[123] = Mock(name='cb')
- self.c._on_basic_deliver(123, '321', False, 'ex', 'rkey', msg)
- callback.assert_called_with(msg)
-
- def test_basic_get(self):
- self.c._on_get_empty = Mock()
- self.c._on_get_ok = Mock()
- self.c.send_method.return_value = ('cluster_id',)
- self.c.basic_get('q')
- self.c.send_method.assert_called_with(
- spec.Basic.Get, 'Bsb', (0, 'q', False),
- wait=[spec.Basic.GetOk, spec.Basic.GetEmpty], returns_tuple=True,
- )
- self.c._on_get_empty.assert_called_with('cluster_id')
- self.c.send_method.return_value = (
- 'dtag', 'redelivered', 'ex', 'rkey', 'mcount', 'msg',
- )
- self.c.basic_get('q')
- self.c._on_get_ok.assert_called_with(
- 'dtag', 'redelivered', 'ex', 'rkey', 'mcount', 'msg',
- )
-
- def test_on_get_empty(self):
- self.c._on_get_empty(1)
-
- def test_on_get_ok(self):
- msg = Mock()
- m = self.c._on_get_ok(
- 'dtag', 'redelivered', 'ex', 'rkey', 'mcount', msg,
- )
- assert m is msg
-
- def test_basic_publish(self):
- self.c._basic_publish('msg', 'ex', 'rkey')
- self.c.send_method.assert_called_with(
- spec.Basic.Publish, 'Bssbb',
- (0, 'ex', 'rkey', False, False), 'msg',
- )
-
- def test_basic_publish_confirm(self):
- self.c._confirm_selected = False
- self.c.confirm_select = Mock(name='confirm_select')
- self.c._basic_publish = Mock(name='_basic_publish')
- self.c.wait = Mock(name='wait')
- ret = self.c.basic_publish_confirm(1, 2, arg=1)
- self.c.confirm_select.assert_called_with()
- assert self.c._confirm_selected
- self.c._basic_publish.assert_called_with(1, 2, arg=1)
- assert ret is self.c._basic_publish()
- self.c.wait.assert_called_with(spec.Basic.Ack)
- self.c.basic_publish_confirm(1, 2, arg=1)
-
- def test_basic_qos(self):
- self.c.basic_qos(0, 123, False)
- self.c.send_method.assert_called_with(
- spec.Basic.Qos, 'lBb', (0, 123, False),
- wait=spec.Basic.QosOk,
- )
-
- def test_basic_recover(self):
- self.c.basic_recover(requeue=True)
- self.c.send_method.assert_called_with(
- spec.Basic.Recover, 'b', (True,),
- )
-
- def test_basic_recover_async(self):
- self.c.basic_recover_async(requeue=True)
- self.c.send_method.assert_called_with(
- spec.Basic.RecoverAsync, 'b', (True,),
- )
-
- def test_basic_reject(self):
- self.c.basic_reject(123, requeue=True)
- self.c.send_method.assert_called_with(
- spec.Basic.Reject, 'Lb', (123, True),
- )
-
- def test_on_basic_return(self):
- with pytest.raises(NotFound):
- self.c._on_basic_return(404, 'text', 'ex', 'rkey', 'msg')
-
- @patch('amqp.channel.error_for_code')
- def test_on_basic_return__handled(self, error_for_code):
- callback = Mock(name='callback')
- self.c.events['basic_return'].add(callback)
- self.c._on_basic_return(404, 'text', 'ex', 'rkey', 'msg')
- callback.assert_called_with(
- error_for_code(), 'ex', 'rkey', 'msg',
- )
-
- def test_tx_commit(self):
- self.c.tx_commit()
- self.c.send_method.assert_called_with(
- spec.Tx.Commit, wait=spec.Tx.CommitOk,
- )
-
- def test_tx_rollback(self):
- self.c.tx_rollback()
- self.c.send_method.assert_called_with(
- spec.Tx.Rollback, wait=spec.Tx.RollbackOk,
- )
-
- def test_tx_select(self):
- self.c.tx_select()
- self.c.send_method.assert_called_with(
- spec.Tx.Select, wait=spec.Tx.SelectOk,
- )
-
- def test_confirm_select(self):
- self.c.confirm_select()
- self.c.send_method.assert_called_with(
- spec.Confirm.Select, 'b', (False,),
- wait=spec.Confirm.SelectOk,
- )
-
- def test_on_basic_ack(self):
- callback = Mock(name='callback')
- self.c.events['basic_ack'].add(callback)
- self.c._on_basic_ack(123, True)
- callback.assert_called_with(123, True)
diff --git a/t/unit/test_connection.py b/t/unit/test_connection.py
deleted file mode 100644
index 87d2126..0000000
--- a/t/unit/test_connection.py
+++ /dev/null
@@ -1,310 +0,0 @@
-import pytest
-import socket
-from case import ContextMock, Mock, call
-from amqp import Connection
-from amqp import spec
-from amqp.connection import SSLError
-from amqp.exceptions import ConnectionError, NotFound, ResourceError
-from amqp.transport import Transport
-
-
-class test_Connection:
-
- @pytest.fixture(autouse=True)
- def setup_conn(self):
- self.frame_handler = Mock(name='frame_handler')
- self.frame_writer = Mock(name='frame_writer_cls')
- self.conn = Connection(
- frame_handler=self.frame_handler,
- frame_writer=self.frame_writer,
- )
- self.conn.Channel = Mock(name='Channel')
- self.conn.Transport = Mock(name='Transport')
- self.conn.transport = self.conn.Transport.return_value
- self.conn.send_method = Mock(name='send_method')
- self.conn.frame_writer = Mock(name='frame_writer')
-
- def test_login_response(self):
- self.conn = Connection(login_response='foo')
- assert self.conn.login_response == 'foo'
-
- def test_enter_exit(self):
- self.conn.connect = Mock(name='connect')
- self.conn.close = Mock(name='close')
- with self.conn:
- self.conn.connect.assert_called_with()
- self.conn.close.assert_called_with()
-
- def test_then(self):
- self.conn.on_open = Mock(name='on_open')
- on_success = Mock(name='on_success')
- on_error = Mock(name='on_error')
- self.conn.then(on_success, on_error)
- self.conn.on_open.then.assert_called_with(on_success, on_error)
-
- def test_connect(self):
- self.conn.transport.connected = False
- self.conn.drain_events = Mock(name='drain_events')
-
- def on_drain(*args, **kwargs):
- self.conn._handshake_complete = True
- self.conn.drain_events.side_effect = on_drain
- self.conn.connect()
- self.conn.Transport.assert_called_with(
- self.conn.host, self.conn.connect_timeout, self.conn.ssl,
- self.conn.read_timeout, self.conn.write_timeout,
- socket_settings=self.conn.socket_settings,
- )
-
- def test_connect__already_connected(self):
- callback = Mock(name='callback')
- self.conn.transport.connected = True
- assert self.conn.connect(callback) == callback.return_value
- callback.assert_called_with()
-
- def test_on_start(self):
- self.conn._on_start(3, 4, {'foo': 'bar'}, 'x y z', 'en_US en_GB')
- assert self.conn.version_major == 3
- assert self.conn.version_minor == 4
- assert self.conn.server_properties == {'foo': 'bar'}
- assert self.conn.mechanisms == ['x', 'y', 'z']
- assert self.conn.locales == ['en_US', 'en_GB']
- self.conn.send_method.assert_called_with(
- spec.Connection.StartOk, 'FsSs', (
- self.conn.client_properties, self.conn.login_method,
- self.conn.login_response, self.conn.locale,
- ),
- )
-
- def test_on_start__consumer_cancel_notify(self):
- self.conn._on_start(
- 3, 4, {'capabilities': {'consumer_cancel_notify': 1}},
- '', '',
- )
- cap = self.conn.client_properties['capabilities']
- assert cap['consumer_cancel_notify']
-
- def test_on_start__connection_blocked(self):
- self.conn._on_start(
- 3, 4, {'capabilities': {'connection.blocked': 1}},
- '', '',
- )
- cap = self.conn.client_properties['capabilities']
- assert cap['connection.blocked']
-
- def test_on_start__authentication_failure_close(self):
- self.conn._on_start(
- 3, 4, {'capabilities': {'authentication_failure_close': 1}},
- '', '',
- )
- cap = self.conn.client_properties['capabilities']
- assert cap['authentication_failure_close']
-
- def test_on_start__authentication_failure_close__disabled(self):
- self.conn._on_start(
- 3, 4, {'capabilities': {}},
- '', '',
- )
- assert 'capabilities' not in self.conn.client_properties
-
- def test_on_secure(self):
- self.conn._on_secure('vfz')
-
- def test_on_tune(self):
- self.conn.client_heartbeat = 16
- self.conn._on_tune(345, 16, 10)
- assert self.conn.channel_max == 345
- assert self.conn.frame_max == 16
- assert self.conn.server_heartbeat == 10
- assert self.conn.heartbeat == 10
- self.conn.send_method.assert_called_with(
- spec.Connection.TuneOk, 'BlB', (
- self.conn.channel_max, self.conn.frame_max,
- self.conn.heartbeat,
- ),
- callback=self.conn._on_tune_sent,
- )
-
- def test_on_tune__client_heartbeat_disabled(self):
- self.conn.client_heartbeat = 0
- self.conn._on_tune(345, 16, 10)
- assert self.conn.heartbeat == 0
-
- def test_on_tune_sent(self):
- self.conn._on_tune_sent()
- self.conn.send_method.assert_called_with(
- spec.Connection.Open, 'ssb', (self.conn.virtual_host, '', False),
- )
-
- def test_on_open_ok(self):
- self.conn.on_open = Mock(name='on_open')
- self.conn._on_open_ok()
- assert self.conn._handshake_complete
- self.conn.on_open.assert_called_with(self.conn)
-
- def test_connected(self):
- self.conn.transport.connected = False
- assert not self.conn.connected
- self.conn.transport.connected = True
- assert self.conn.connected
- self.conn.transport = None
- assert not self.conn.connected
-
- def test_collect(self):
- channels = self.conn.channels = {
- 0: self.conn, 1: Mock(name='c1'), 2: Mock(name='c2'),
- }
- transport = self.conn.transport
- self.conn.collect()
- transport.close.assert_called_with()
- for i, channel in channels.items():
- if i:
- channel.collect.assert_called_with()
-
- def test_collect__channel_raises_socket_error(self):
- self.conn.channels = self.conn.channels = {1: Mock(name='c1')}
- self.conn.channels[1].collect.side_effect = socket.error()
- self.conn.collect()
-
- def test_get_free_channel_id__raises_IndexError(self):
- self.conn._avail_channel_ids = []
- with pytest.raises(ResourceError):
- self.conn._get_free_channel_id()
-
- def test_claim_channel_id(self):
- self.conn._claim_channel_id(30)
- with pytest.raises(ConnectionError):
- self.conn._claim_channel_id(30)
-
- def test_channel(self):
- callback = Mock(name='callback')
- c = self.conn.channel(3, callback)
- self.conn.Channel.assert_called_with(self.conn, 3, on_open=callback)
- c2 = self.conn.channel(3, callback)
- assert c2 is c
-
- def test_is_alive(self):
- with pytest.raises(NotImplementedError):
- self.conn.is_alive()
-
- def test_drain_events(self):
- self.conn.blocking_read = Mock(name='blocking_read')
- self.conn.drain_events(30)
- self.conn.blocking_read.assert_called_with(30)
-
- def test_blocking_read__no_timeout(self):
- self.conn.on_inbound_frame = Mock(name='on_inbound_frame')
- ret = self.conn.blocking_read(None)
- self.conn.transport.read_frame.assert_called_with()
- self.conn.on_inbound_frame.assert_called_with(
- self.conn.transport.read_frame(),
- )
- assert ret is self.conn.on_inbound_frame()
-
- def test_blocking_read__timeout(self):
- self.conn.transport = Transport('localhost:5672')
- sock = self.conn.transport.sock = Mock(name='sock')
- sock.gettimeout.return_value = 1
- self.conn.transport.read_frame = Mock(name='read_frame')
- self.conn.on_inbound_frame = Mock(name='on_inbound_frame')
- self.conn.blocking_read(3)
- sock.gettimeout.assert_called_with()
- sock.settimeout.assert_has_calls([call(3), call(1)])
- self.conn.transport.read_frame.assert_called_with()
- self.conn.on_inbound_frame.assert_called_with(
- self.conn.transport.read_frame(),
- )
- sock.gettimeout.return_value = 3
- self.conn.blocking_read(3)
-
- def test_blocking_read__SSLError(self):
- self.conn.on_inbound_frame = Mock(name='on_inbound_frame')
- self.conn.transport = Transport('localhost:5672')
- sock = self.conn.transport.sock = Mock(name='sock')
- sock.gettimeout.return_value = 1
- self.conn.transport.read_frame = Mock(name='read_frame')
- self.conn.transport.read_frame.side_effect = SSLError(
- 'operation timed out')
- with pytest.raises(socket.timeout):
- self.conn.blocking_read(3)
- self.conn.transport.read_frame.side_effect = SSLError(
- 'The operation did not complete foo bar')
- with pytest.raises(socket.timeout):
- self.conn.blocking_read(3)
- self.conn.transport.read_frame.side_effect = SSLError(
- 'oh noes')
- with pytest.raises(SSLError):
- self.conn.blocking_read(3)
-
- def test_on_inbound_method(self):
- self.conn.channels[1] = self.conn.channel(1)
- self.conn.on_inbound_method(1, (50, 60), 'payload', 'content')
- self.conn.channels[1].dispatch_method.assert_called_with(
- (50, 60), 'payload', 'content',
- )
-
- def test_close(self):
- self.conn.close(reply_text='foo', method_sig=spec.Channel.Open)
- self.conn.send_method.assert_called_with(
- spec.Connection.Close, 'BsBB',
- (0, 'foo', spec.Channel.Open[0], spec.Channel.Open[1]),
- wait=spec.Connection.CloseOk,
- )
-
- def test_close__already_closed(self):
- self.conn.transport = None
- self.conn.close()
-
- def test_on_close(self):
- self.conn._x_close_ok = Mock(name='_x_close_ok')
- with pytest.raises(NotFound):
- self.conn._on_close(404, 'bah not found', 50, 60)
-
- def test_x_close_ok(self):
- self.conn._x_close_ok()
- self.conn.send_method.assert_called_with(
- spec.Connection.CloseOk, callback=self.conn._on_close_ok,
- )
-
- def test_on_close_ok(self):
- self.conn.collect = Mock(name='collect')
- self.conn._on_close_ok()
- self.conn.collect.assert_called_with()
-
- def test_on_blocked(self):
- self.conn._on_blocked()
- self.conn.on_blocked = Mock(name='on_blocked')
- self.conn._on_blocked()
- self.conn.on_blocked.assert_called_with(
- 'connection blocked, see broker logs')
-
- def test_on_unblocked(self):
- self.conn._on_unblocked()
- self.conn.on_unblocked = Mock(name='on_unblocked')
- self.conn._on_unblocked()
- self.conn.on_unblocked.assert_called_with()
-
- def test_send_heartbeat(self):
- self.conn.send_heartbeat()
- self.conn.frame_writer.assert_called_with(
- 8, 0, None, None, None,
- )
-
- def test_heartbeat_tick__no_heartbeat(self):
- self.conn.heartbeat = 0
- self.conn.heartbeat_tick()
-
- def test_heartbeat_tick(self):
- self.conn.heartbeat = 3
- self.conn.heartbeat_tick()
- self.conn.bytes_sent = 3124
- self.conn.bytes_recv = 123
- self.conn.heartbeat_tick()
- self.conn.last_heartbeat_received -= 1000
- self.conn.last_heartbeat_sent -= 1000
- with pytest.raises(ConnectionError):
- self.conn.heartbeat_tick()
-
- def test_server_capabilities(self):
- self.conn.server_properties['capabilities'] = {'foo': 1}
- assert self.conn.server_capabilities == {'foo': 1}
diff --git a/t/unit/test_exceptions.py b/t/unit/test_exceptions.py
deleted file mode 100644
index ca16b9a..0000000
--- a/t/unit/test_exceptions.py
+++ /dev/null
@@ -1,19 +0,0 @@
-from case import Mock
-from amqp.exceptions import AMQPError, error_for_code
-
-
-class test_AMQPError:
-
- def test_str(self):
- assert str(AMQPError())
- x = AMQPError(method_sig=(50, 60))
- assert str(x)
-
-
-class test_error_for_code:
-
- def test_unknown_error(self):
- default = Mock(name='default')
- x = error_for_code(2134214314, 't', 'm', default)
- default.assert_called_with('t', 'm', reply_code=2134214314)
- assert x is default()
diff --git a/t/unit/test_method_framing.py b/t/unit/test_method_framing.py
deleted file mode 100644
index 3d6756b..0000000
--- a/t/unit/test_method_framing.py
+++ /dev/null
@@ -1,97 +0,0 @@
-import pytest
-from case import Mock
-from struct import pack
-from amqp import spec
-from amqp.basic_message import Message
-from amqp.exceptions import UnexpectedFrame
-from amqp.method_framing import frame_handler, frame_writer
-
-
-class test_frame_handler:
-
- @pytest.fixture(autouse=True)
- def setup_conn(self):
- self.conn = Mock(name='connection')
- self.conn.bytes_recv = 0
- self.callback = Mock(name='callback')
- self.g = frame_handler(self.conn, self.callback)
-
- def test_header(self):
- buf = pack('>HH', 60, 51)
- self.g((1, 1, buf))
- self.callback.assert_called_with(1, (60, 51), buf, None)
- assert self.conn.bytes_recv
-
- def test_header_message_empty_body(self):
- self.g((1, 1, pack('>HH', *spec.Basic.Deliver)))
- self.callback.assert_not_called()
-
- with pytest.raises(UnexpectedFrame):
- self.g((1, 1, pack('>HH', *spec.Basic.Deliver)))
-
- m = Message()
- m.properties = {}
- buf = pack('>HxxQ', m.CLASS_ID, 0)
- buf += m._serialize_properties()
- self.g((2, 1, buf))
-
- self.callback.assert_called()
- msg = self.callback.call_args[0][3]
- self.callback.assert_called_with(
- 1, msg.frame_method, msg.frame_args, msg,
- )
-
- def test_header_message_content(self):
- self.g((1, 1, pack('>HH', *spec.Basic.Deliver)))
- self.callback.assert_not_called()
-
- m = Message()
- m.properties = {}
- buf = pack('>HxxQ', m.CLASS_ID, 16)
- buf += m._serialize_properties()
- self.g((2, 1, buf))
- self.callback.assert_not_called()
-
- self.g((3, 1, b'thequick'))
- self.callback.assert_not_called()
-
- self.g((3, 1, b'brownfox'))
- self.callback.assert_called()
- msg = self.callback.call_args[0][3]
- self.callback.assert_called_with(
- 1, msg.frame_method, msg.frame_args, msg,
- )
- assert msg.body == b'thequickbrownfox'
-
- def test_heartbeat_frame(self):
- self.g((8, 1, ''))
- assert self.conn.bytes_recv
-
-
-class test_frame_writer:
-
- @pytest.fixture(autouse=True)
- def setup_conn(self):
- self.connection = Mock(name='connection')
- self.transport = self.connection.Transport()
- self.connection.frame_max = 512
- self.connection.bytes_sent = 0
- self.g = frame_writer(self.connection, self.transport)
- self.write = self.transport.write
-
- def test_write_fast_header(self):
- frame = 1, 1, spec.Queue.Declare, b'x' * 30, None
- self.g(*frame)
- self.write.assert_called()
-
- def test_write_fast_content(self):
- msg = Message(body=b'y' * 10, content_type='utf-8')
- frame = 2, 1, spec.Basic.Publish, b'x' * 10, msg
- self.g(*frame)
- self.write.assert_called()
-
- def test_write_slow_content(self):
- msg = Message(body=b'y' * 2048, content_type='utf-8')
- frame = 2, 1, spec.Basic.Publish, b'x' * 10, msg
- self.g(*frame)
- self.write.assert_called()
diff --git a/t/unit/test_platform.py b/t/unit/test_platform.py
deleted file mode 100644
index 0bbfc3a..0000000
--- a/t/unit/test_platform.py
+++ /dev/null
@@ -1,13 +0,0 @@
-import pytest
-from amqp.platform import _linux_version_to_tuple
-
-
-@pytest.mark.parametrize('s,expected', [
- ('3.13.0-46-generic', (3, 13, 0)),
- ('3.19.43-1-amd64', (3, 19, 43)),
- ('4.4.34+', (4, 4, 34)),
- ('4.4.what', (4, 4, 0)),
- ('4.what.what', (4, 0, 0)),
-])
-def test_linux_version_to_tuple(s, expected):
- assert _linux_version_to_tuple(s) == expected
diff --git a/t/unit/test_serialization.py b/t/unit/test_serialization.py
deleted file mode 100644
index 6d03bde..0000000
--- a/t/unit/test_serialization.py
+++ /dev/null
@@ -1,191 +0,0 @@
-import pytest
-from datetime import datetime
-from decimal import Decimal
-from math import ceil
-from struct import pack
-from amqp.basic_message import Message
-from amqp.exceptions import FrameSyntaxError
-from amqp.serialization import GenericContent, _read_item, dumps, loads
-
-
-class _ANY:
-
- def __eq__(self, other):
- return other is not None
-
- def __ne__(self, other):
- return other is None
-
-
-class test_serialization:
-
- @pytest.mark.parametrize('descr,frame,expected,cast', [
- ('S', b's8thequick', 'thequick', None),
- ('b', b'b' + pack('>B', True), True, None),
- ('B', b'B' + pack('>b', 123), 123, None),
- ('U', b'U' + pack('>h', -321), -321, None),
- ('u', b'u' + pack('>H', 321), 321, None),
- ('i', b'i' + pack('>I', 1234), 1234, None),
- ('L', b'L' + pack('>q', -32451), -32451, None),
- ('l', b'l' + pack('>Q', 32451), 32451, None),
- ('f', b'f' + pack('>f', 33.3), 34.0, ceil),
- ])
- def test_read_item(self, descr, frame, expected, cast):
- actual = _read_item(frame)[0]
- actual = cast(actual) if cast else actual
- assert actual == expected
-
- def test_read_item_V(self):
- assert _read_item(b'V')[0] is None
-
- def test_roundtrip(self):
- format = b'bobBlLbsbST'
- x = dumps(format, [
- True, 32, False, 3415, 4513134, 13241923419,
- True, b'thequickbrownfox', False, 'jumpsoverthelazydog',
- datetime(2015, 3, 13, 10, 23),
- ])
- y = loads(format, x)
- assert [
- True, 32, False, 3415, 4513134, 13241923419,
- True, 'thequickbrownfox', False, 'jumpsoverthelazydog',
- datetime(2015, 3, 13, 10, 23),
- ] == y[0]
-
- def test_int_boundaries(self):
- format = b'F'
- x = dumps(format, [
- {'a': -2147483649, 'b': 2147483648}, # celery/celery#3121
- ])
- y = loads(format, x)
- assert y[0] == [{
- 'a': -2147483649, 'b': 2147483648, # celery/celery#3121
- }]
-
- def test_loads_unknown_type(self):
- with pytest.raises(FrameSyntaxError):
- loads('x', 'asdsad')
-
- def test_float(self):
- assert (int(loads(b'fb', dumps(b'fb', [32.31, False]))[0][0] * 100) ==
- 3231)
-
- def test_table(self):
- table = {'foo': 32, 'bar': 'baz', 'nil': None}
- assert loads(b'F', dumps(b'F', [table]))[0][0] == table
-
- def test_array(self):
- array = [
- 'A', 1, True, 33.3,
- Decimal('55.5'), Decimal('-3.4'),
- datetime(2015, 3, 13, 10, 23),
- {'quick': 'fox', 'amount': 1},
- [3, 'hens'],
- None,
- ]
- expected = list(array)
- expected[6] = _ANY()
-
- assert expected == loads('A', dumps('A', [array]))[0][0]
-
- def test_array_unknown_type(self):
- with pytest.raises(FrameSyntaxError):
- dumps('A', [[object()]])
-
-
-class test_GenericContent:
-
- @pytest.fixture(autouse=True)
- def setup_content(self):
- self.g = GenericContent()
-
- def test_getattr(self):
- self.g.properties['foo'] = 30
- with pytest.raises(AttributeError):
- self.g.__setstate__
- assert self.g.foo == 30
- with pytest.raises(AttributeError):
- self.g.bar
-
- def test_load_properties(self):
- m = Message()
- m.properties = {
- 'content_type': 'application/json',
- 'content_encoding': 'utf-8',
- 'application_headers': {
- 'foo': 1,
- 'id': 'id#1',
- },
- 'delivery_mode': 1,
- 'priority': 255,
- 'correlation_id': 'df31-142f-34fd-g42d',
- 'reply_to': 'cosmo',
- 'expiration': '2015-12-23',
- 'message_id': '3312',
- 'timestamp': 3912491234,
- 'type': 'generic',
- 'user_id': 'george',
- 'app_id': 'vandelay',
- 'cluster_id': 'NYC',
- }
- s = m._serialize_properties()
- m2 = Message()
- m2._load_properties(m2.CLASS_ID, s)
- assert m2.properties == m.properties
-
- def test_load_properties__some_missing(self):
- m = Message()
- m.properties = {
- 'content_type': 'application/json',
- 'content_encoding': 'utf-8',
- 'delivery_mode': 1,
- 'correlation_id': 'df31-142f-34fd-g42d',
- 'reply_to': 'cosmo',
- 'expiration': '2015-12-23',
- 'message_id': '3312',
- 'type': None,
- 'app_id': None,
- 'cluster_id': None,
- }
- s = m._serialize_properties()
- m2 = Message()
- m2._load_properties(m2.CLASS_ID, s)
-
- def test_inbound_header(self):
- m = Message()
- m.properties = {
- 'content_type': 'application/json',
- 'content_encoding': 'utf-8',
- }
- body = 'the quick brown fox'
- buf = b'\0' * 30 + pack('>HxxQ', m.CLASS_ID, len(body))
- buf += m._serialize_properties()
- assert m.inbound_header(buf, offset=30) == 42
- assert m.body_size == len(body)
- assert m.properties['content_type'] == 'application/json'
- assert not m.ready
-
- def test_inbound_header__empty_body(self):
- m = Message()
- m.properties = {}
- buf = pack('>HxxQ', m.CLASS_ID, 0)
- buf += m._serialize_properties()
- assert m.inbound_header(buf, offset=0) == 12
- assert m.ready
-
- def test_inbound_body(self):
- m = Message()
- m.body_size = 16
- m.body_received = 8
- m._pending_chunks = [b'the', b'quick']
- m.inbound_body(b'brown')
- assert not m.ready
- m.inbound_body(b'fox')
- assert m.ready
- assert m.body == b'thequickbrownfox'
-
- def test_inbound_body__no_chunks(self):
- m = Message()
- m.body_size = 16
- m.inbound_body('thequickbrownfox')
- assert m.ready
diff --git a/t/unit/test_transport.py b/t/unit/test_transport.py
deleted file mode 100644
index 86014ae..0000000
--- a/t/unit/test_transport.py
+++ /dev/null
@@ -1,295 +0,0 @@
-import errno
-import socket
-import pytest
-from struct import pack
-from case import Mock
-from amqp import transport
-from amqp.exceptions import UnexpectedFrame
-
-
-class MockSocket:
- options = {}
-
- def setsockopt(self, family, key, value):
- if not isinstance(value, int):
- raise socket.error()
- self.options[key] = value
-
- def getsockopt(self, family, key):
- return self.options.get(key, 0)
-
-
-TCP_KEEPIDLE = 4
-TCP_KEEPINTVL = 5
-TCP_KEEPCNT = 6
-
-
-class test_socket_options:
-
- @pytest.fixture(autouse=True)
- def setup_self(self, patching):
- self.host = '127.0.0.1'
- self.connect_timeout = 3
- self.socket = MockSocket()
- try:
- import fcntl
- except ImportError:
- fcntl = None
- if fcntl is not None:
- patching('fcntl.fcntl')
- socket = patching('socket.socket')
- socket().getsockopt = self.socket.getsockopt
- socket().setsockopt = self.socket.setsockopt
-
- self.tcp_keepidle = 20
- self.tcp_keepintvl = 30
- self.tcp_keepcnt = 40
- self.socket.setsockopt(
- socket.SOL_TCP, socket.TCP_NODELAY, 1,
- )
- self.socket.setsockopt(
- socket.SOL_TCP, TCP_KEEPIDLE, self.tcp_keepidle,
- )
- self.socket.setsockopt(
- socket.SOL_TCP, TCP_KEEPINTVL, self.tcp_keepintvl,
- )
- self.socket.setsockopt(
- socket.SOL_TCP, TCP_KEEPCNT, self.tcp_keepcnt,
- )
-
- patching('amqp.transport.TCPTransport._write')
- patching('amqp.transport.TCPTransport._setup_transport')
- patching('amqp.transport.SSLTransport._write')
- patching('amqp.transport.SSLTransport._setup_transport')
-
- def test_backward_compatibility_tcp_transport(self):
- self.transp = transport.Transport(
- self.host, self.connect_timeout, ssl=False,
- )
- self.transp.connect()
- expected = 1
- result = self.socket.getsockopt(socket.SOL_TCP, socket.TCP_NODELAY)
- assert result == expected
-
- def test_backward_compatibility_SSL_transport(self):
- self.transp = transport.Transport(
- self.host, self.connect_timeout, ssl=True,
- )
- assert self.transp.sslopts is not None
- self.transp.connect()
- assert self.transp.sock is not None
-
- def test_use_default_sock_tcp_opts(self):
- self.transp = transport.Transport(
- self.host, self.connect_timeout, socket_settings={},
- )
- self.transp.connect()
- assert (socket.TCP_NODELAY in
- self.transp._get_tcp_socket_defaults(self.transp.sock))
-
- def test_set_single_sock_tcp_opt_tcp_transport(self):
- tcp_keepidle = self.tcp_keepidle + 5
- socket_settings = {TCP_KEEPIDLE: tcp_keepidle}
- self.transp = transport.Transport(
- self.host, self.connect_timeout,
- ssl=False, socket_settings=socket_settings,
- )
- self.transp.connect()
- expected = tcp_keepidle
- result = self.socket.getsockopt(socket.SOL_TCP, TCP_KEEPIDLE)
- assert result == expected
-
- def test_set_single_sock_tcp_opt_SSL_transport(self):
- self.tcp_keepidle += 5
- socket_settings = {TCP_KEEPIDLE: self.tcp_keepidle}
- self.transp = transport.Transport(
- self.host, self.connect_timeout,
- ssl=True, socket_settings=socket_settings,
- )
- self.transp.connect()
- expected = self.tcp_keepidle
- result = self.socket.getsockopt(socket.SOL_TCP, TCP_KEEPIDLE)
- assert result == expected
-
- def test_values_are_set(self):
- socket_settings = {
- TCP_KEEPIDLE: 10,
- TCP_KEEPINTVL: 4,
- TCP_KEEPCNT: 2
- }
-
- self.transp = transport.Transport(
- self.host, self.connect_timeout,
- socket_settings=socket_settings,
- )
- self.transp.connect()
- expected = socket_settings
- tcp_keepidle = self.socket.getsockopt(socket.SOL_TCP, TCP_KEEPIDLE)
- tcp_keepintvl = self.socket.getsockopt(socket.SOL_TCP, TCP_KEEPINTVL)
- tcp_keepcnt = self.socket.getsockopt(socket.SOL_TCP, TCP_KEEPCNT)
- result = {
- TCP_KEEPIDLE: tcp_keepidle,
- TCP_KEEPINTVL: tcp_keepintvl,
- TCP_KEEPCNT: tcp_keepcnt
- }
- assert result == expected
-
- def test_passing_wrong_options(self):
- socket_settings = object()
- self.transp = transport.Transport(
- self.host, self.connect_timeout,
- socket_settings=socket_settings,
- )
- with pytest.raises(TypeError):
- self.transp.connect()
-
- def test_passing_wrong_value_options(self):
- socket_settings = {TCP_KEEPINTVL: 'a'.encode()}
- self.transp = transport.Transport(
- self.host, self.connect_timeout,
- socket_settings=socket_settings,
- )
- with pytest.raises(socket.error):
- self.transp.connect()
-
- def test_passing_value_as_string(self):
- socket_settings = {TCP_KEEPIDLE: '5'.encode()}
- self.transp = transport.Transport(
- self.host, self.connect_timeout,
- socket_settings=socket_settings,
- )
- with pytest.raises(socket.error):
- self.transp.connect()
-
- def test_passing_tcp_nodelay(self):
- socket_settings = {socket.TCP_NODELAY: 0}
- self.transp = transport.Transport(
- self.host, self.connect_timeout,
- socket_settings=socket_settings,
- )
- self.transp.connect()
- expected = 0
- result = self.socket.getsockopt(socket.SOL_TCP, socket.TCP_NODELAY)
- assert result == expected
-
-
-class test_Transport:
-
- class Transport(transport.Transport):
-
- def _connect(self, *args):
- ...
-
- def _init_socket(self, *args):
- ...
-
- @pytest.fixture(autouse=True)
- def setup_transport(self):
- self.t = self.Transport('localhost:5672', 10)
- self.t.connect()
-
- def test_port(self):
- assert self.Transport('localhost').port == 5672
- assert self.Transport('localhost:5672').port == 5672
- assert self.Transport('[fe80::1]:5432').port == 5432
-
- def test_read(self):
- with pytest.raises(NotImplementedError):
- self.t._read(1024)
-
- def test_setup_transport(self):
- self.t._setup_transport()
-
- def test_shutdown_transport(self):
- self.t._shutdown_transport()
-
- def test_write(self):
- with pytest.raises(NotImplementedError):
- self.t._write('foo')
-
- def test_close(self):
- sock = self.t.sock = Mock()
- self.t.close()
- sock.shutdown.assert_called_with(socket.SHUT_RDWR)
- sock.close.assert_called_with()
- assert self.t.sock is None
- self.t.close()
-
- def test_read_frame__timeout(self):
- self.t._read = Mock()
- self.t._read.side_effect = socket.timeout()
- with pytest.raises(socket.timeout):
- self.t.read_frame()
-
- def test_read_frame__SSLError(self):
- self.t._read = Mock()
- self.t._read.side_effect = transport.SSLError('timed out')
- with pytest.raises(socket.timeout):
- self.t.read_frame()
-
- def test_read_frame__EINTR(self):
- self.t._read = Mock()
- self.t.connected = True
- exc = OSError()
- exc.errno = errno.EINTR
- self.t._read.side_effect = exc
- with pytest.raises(OSError):
- self.t.read_frame()
- assert self.t.connected
-
- def test_read_frame__EBADF(self):
- self.t._read = Mock()
- self.t.connected = True
- exc = OSError()
- exc.errno = errno.EBADF
- self.t._read.side_effect = exc
- with pytest.raises(OSError):
- self.t.read_frame()
- assert not self.t.connected
-
- def test_read_frame__simple(self):
- self.t._read = Mock()
- checksum = [b'\xce']
-
- def on_read2(size, *args):
- return checksum[0]
-
- def on_read1(size, *args):
- ret = self.t._read.return_value
- self.t._read.return_value = b'thequickbrownfox'
- self.t._read.side_effect = on_read2
- return ret
- self.t._read.return_value = pack('>BHI', 1, 1, 16)
- self.t._read.side_effect = on_read1
-
- self.t.read_frame()
- self.t._read.return_value = pack('>BHI', 1, 1, 16)
- self.t._read.side_effect = on_read1
- checksum[0] = b'\x13'
- with pytest.raises(UnexpectedFrame):
- self.t.read_frame()
-
- def test_write__success(self):
- self.t._write = Mock()
- self.t.write('foo')
- self.t._write.assert_called_with('foo')
-
- def test_write__socket_timeout(self):
- self.t._write = Mock()
- self.t._write.side_effect = socket.timeout
- with pytest.raises(socket.timeout):
- self.t.write('foo')
-
- def test_write__EINTR(self):
- self.t.connected = True
- self.t._write = Mock()
- exc = OSError()
- exc.errno = errno.EINTR
- self.t._write.side_effect = exc
- with pytest.raises(OSError):
- self.t.write('foo')
- assert self.t.connected
- exc.errno = errno.EBADF
- with pytest.raises(OSError):
- self.t.write('foo')
- assert not self.t.connected
diff --git a/t/unit/test_utils.py b/t/unit/test_utils.py
deleted file mode 100644
index 418e4df..0000000
--- a/t/unit/test_utils.py
+++ /dev/null
@@ -1,57 +0,0 @@
-from case import Mock, patch
-from amqp.utils import get_errno, want_bytes, want_str, get_logger
-
-
-class test_get_errno:
-
- def test_has_attr(self):
- exc = KeyError('foo')
- exc.errno = 23
- assert get_errno(exc) == 23
-
- def test_in_args(self):
- exc = KeyError(34, 'foo')
- exc.args = (34, 'foo')
- assert get_errno(exc) == 34
-
- def test_args_short(self):
- exc = KeyError(34)
- assert not get_errno(exc)
-
- def test_no_args(self):
- assert not get_errno(object())
-
-
-class test_want_bytes:
-
- def test_from_unicode(self):
- assert isinstance(want_bytes('foo'), bytes)
-
- def test_from_bytes(self):
- assert isinstance(want_bytes(b'foo'), bytes)
-
-
-class test_want_str:
-
- def test_from_unicode(self):
- assert isinstance(want_str('foo'), str)
-
- def test_from_bytes(self):
- assert want_str(b'foo')
-
-
-class test_get_logger:
-
- @patch('logging.getLogger')
- def test_as_str(self, getLogger):
- x = get_logger('foo.bar')
- getLogger.assert_called_with('foo.bar')
- assert x is getLogger()
-
- @patch('logging.NullHandler')
- def test_as_logger(self, _NullHandler):
- m = Mock(name='logger')
- m.handlers = None
- x = get_logger(m)
- assert x is m
- x.addHandler.assert_called_with(_NullHandler())