summaryrefslogtreecommitdiff
path: root/t
diff options
context:
space:
mode:
authorMarcelo Trylesinski <marcelotryle@gmail.com>2022-05-15 09:17:49 +0200
committerGitHub <noreply@github.com>2022-05-15 13:17:49 +0600
commit0a2f54eac2d57925a448cbb307a74b92f9f370b2 (patch)
tree2acecccf70dbfa5fdd4332be9a86b21be83649ff /t
parent2f62ea49db8316c7c777ae329f8865563cf47f6a (diff)
downloadkombu-0a2f54eac2d57925a448cbb307a74b92f9f370b2.tar.gz
Annotate `abstract.py` (#1522)
* Annotate `abstract.py` * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * apply pre-commit * Use quotes * Add typing_extensions as requirement * Add quotes * Add quotes Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Diffstat (limited to 't')
-rw-r--r--t/unit/test_entity.py120
1 files changed, 60 insertions, 60 deletions
diff --git a/t/unit/test_entity.py b/t/unit/test_entity.py
index a98f889d..fcb0afb9 100644
--- a/t/unit/test_entity.py
+++ b/t/unit/test_entity.py
@@ -12,13 +12,13 @@ from kombu.serialization import registry
from t.mocks import Transport
-def get_conn():
+def get_conn() -> Connection:
return Connection(transport=Transport)
class test_binding:
- def test_constructor(self):
+ def test_constructor(self) -> None:
x = binding(
Exchange('foo'), 'rkey',
arguments={'barg': 'bval'},
@@ -29,31 +29,31 @@ class test_binding:
assert x.arguments == {'barg': 'bval'}
assert x.unbind_arguments == {'uarg': 'uval'}
- def test_declare(self):
+ def test_declare(self) -> None:
chan = get_conn().channel()
x = binding(Exchange('foo'), 'rkey')
x.declare(chan)
assert 'exchange_declare' in chan
- def test_declare_no_exchange(self):
+ def test_declare_no_exchange(self) -> None:
chan = get_conn().channel()
x = binding()
x.declare(chan)
assert 'exchange_declare' not in chan
- def test_bind(self):
+ def test_bind(self) -> None:
chan = get_conn().channel()
x = binding(Exchange('foo'))
x.bind(Exchange('bar')(chan))
assert 'exchange_bind' in chan
- def test_unbind(self):
+ def test_unbind(self) -> None:
chan = get_conn().channel()
x = binding(Exchange('foo'))
x.unbind(Exchange('bar')(chan))
assert 'exchange_unbind' in chan
- def test_repr(self):
+ def test_repr(self) -> None:
b = binding(Exchange('foo'), 'rkey')
assert 'foo' in repr(b)
assert 'rkey' in repr(b)
@@ -61,7 +61,7 @@ class test_binding:
class test_Exchange:
- def test_bound(self):
+ def test_bound(self) -> None:
exchange = Exchange('foo', 'direct')
assert not exchange.is_bound
assert '<unbound' in repr(exchange)
@@ -72,11 +72,11 @@ class test_Exchange:
assert bound.channel is chan
assert f'bound to chan:{chan.channel_id!r}' in repr(bound)
- def test_hash(self):
+ def test_hash(self) -> None:
assert hash(Exchange('a')) == hash(Exchange('a'))
assert hash(Exchange('a')) != hash(Exchange('b'))
- def test_can_cache_declaration(self):
+ def test_can_cache_declaration(self) -> None:
assert Exchange('a', durable=True).can_cache_declaration
assert Exchange('a', durable=False).can_cache_declaration
assert not Exchange('a', auto_delete=True).can_cache_declaration
@@ -84,12 +84,12 @@ class test_Exchange:
'a', durable=True, auto_delete=True,
).can_cache_declaration
- def test_pickle(self):
+ def test_pickle(self) -> None:
e1 = Exchange('foo', 'direct')
e2 = pickle.loads(pickle.dumps(e1))
assert e1 == e2
- def test_eq(self):
+ def test_eq(self) -> None:
e1 = Exchange('foo', 'direct')
e2 = Exchange('foo', 'direct')
assert e1 == e2
@@ -99,7 +99,7 @@ class test_Exchange:
assert e1.__eq__(True) == NotImplemented
- def test_revive(self):
+ def test_revive(self) -> None:
exchange = Exchange('foo', 'direct')
conn = get_conn()
chan = conn.channel()
@@ -118,7 +118,7 @@ class test_Exchange:
assert bound.is_bound
assert bound._channel is chan2
- def test_assert_is_bound(self):
+ def test_assert_is_bound(self) -> None:
exchange = Exchange('foo', 'direct')
with pytest.raises(NotBoundError):
exchange.declare()
@@ -128,80 +128,80 @@ class test_Exchange:
exchange.bind(chan).declare()
assert 'exchange_declare' in chan
- def test_set_transient_delivery_mode(self):
+ def test_set_transient_delivery_mode(self) -> None:
exc = Exchange('foo', 'direct', delivery_mode='transient')
assert exc.delivery_mode == Exchange.TRANSIENT_DELIVERY_MODE
- def test_set_passive_mode(self):
+ def test_set_passive_mode(self) -> None:
exc = Exchange('foo', 'direct', passive=True)
assert exc.passive
- def test_set_persistent_delivery_mode(self):
+ def test_set_persistent_delivery_mode(self) -> None:
exc = Exchange('foo', 'direct', delivery_mode='persistent')
assert exc.delivery_mode == Exchange.PERSISTENT_DELIVERY_MODE
- def test_bind_at_instantiation(self):
+ def test_bind_at_instantiation(self) -> None:
assert Exchange('foo', channel=get_conn().channel()).is_bound
- def test_create_message(self):
+ def test_create_message(self) -> None:
chan = get_conn().channel()
Exchange('foo', channel=chan).Message({'foo': 'bar'})
assert 'prepare_message' in chan
- def test_publish(self):
+ def test_publish(self) -> None:
chan = get_conn().channel()
Exchange('foo', channel=chan).publish('the quick brown fox')
assert 'basic_publish' in chan
- def test_delete(self):
+ def test_delete(self) -> None:
chan = get_conn().channel()
Exchange('foo', channel=chan).delete()
assert 'exchange_delete' in chan
- def test__repr__(self):
+ def test__repr__(self) -> None:
b = Exchange('foo', 'topic')
assert 'foo(topic)' in repr(b)
assert 'Exchange' in repr(b)
- def test_bind_to(self):
+ def test_bind_to(self) -> None:
chan = get_conn().channel()
foo = Exchange('foo', 'topic')
bar = Exchange('bar', 'topic')
foo(chan).bind_to(bar)
assert 'exchange_bind' in chan
- def test_bind_to_by_name(self):
+ def test_bind_to_by_name(self) -> None:
chan = get_conn().channel()
foo = Exchange('foo', 'topic')
foo(chan).bind_to('bar')
assert 'exchange_bind' in chan
- def test_unbind_from(self):
+ def test_unbind_from(self) -> None:
chan = get_conn().channel()
foo = Exchange('foo', 'topic')
bar = Exchange('bar', 'topic')
foo(chan).unbind_from(bar)
assert 'exchange_unbind' in chan
- def test_unbind_from_by_name(self):
+ def test_unbind_from_by_name(self) -> None:
chan = get_conn().channel()
foo = Exchange('foo', 'topic')
foo(chan).unbind_from('bar')
assert 'exchange_unbind' in chan
- def test_declare__no_declare(self):
+ def test_declare__no_declare(self) -> None:
chan = get_conn().channel()
foo = Exchange('foo', 'topic', no_declare=True)
foo(chan).declare()
assert 'exchange_declare' not in chan
- def test_declare__internal_exchange(self):
+ def test_declare__internal_exchange(self) -> None:
chan = get_conn().channel()
foo = Exchange('amq.rabbitmq.trace', 'topic')
foo(chan).declare()
assert 'exchange_declare' not in chan
- def test_declare(self):
+ def test_declare(self) -> None:
chan = get_conn().channel()
foo = Exchange('foo', 'topic', no_declare=False)
foo(chan).declare()
@@ -210,33 +210,33 @@ class test_Exchange:
class test_Queue:
- def setup(self):
+ def setup(self) -> None:
self.exchange = Exchange('foo', 'direct')
- def test_constructor_with_actual_exchange(self):
+ def test_constructor_with_actual_exchange(self) -> None:
exchange = Exchange('exchange_name', 'direct')
queue = Queue(name='queue_name', exchange=exchange)
assert queue.exchange == exchange
- def test_constructor_with_string_exchange(self):
+ def test_constructor_with_string_exchange(self) -> None:
exchange_name = 'exchange_name'
queue = Queue(name='queue_name', exchange=exchange_name)
assert queue.exchange == Exchange(exchange_name)
- def test_constructor_with_default_exchange(self):
+ def test_constructor_with_default_exchange(self) -> None:
queue = Queue(name='queue_name')
assert queue.exchange == Exchange('')
- def test_hash(self):
+ def test_hash(self) -> None:
assert hash(Queue('a')) == hash(Queue('a'))
assert hash(Queue('a')) != hash(Queue('b'))
- def test_repr_with_bindings(self):
+ def test_repr_with_bindings(self) -> None:
ex = Exchange('foo')
x = Queue('foo', bindings=[ex.binding('A'), ex.binding('B')])
assert repr(x)
- def test_anonymous(self):
+ def test_anonymous(self) -> None:
chan = Mock()
x = Queue(bindings=[binding(Exchange('foo'), 'rkey')])
chan.queue_declare.return_value = 'generated', 0, 0
@@ -244,7 +244,7 @@ class test_Queue:
xx.declare()
assert xx.name == 'generated'
- def test_basic_get__accept_disallowed(self):
+ def test_basic_get__accept_disallowed(self) -> None:
conn = Connection('memory://')
q = Queue('foo', exchange=self.exchange)
p = Producer(conn)
@@ -259,7 +259,7 @@ class test_Queue:
with pytest.raises(q.ContentDisallowed):
message.decode()
- def test_basic_get__accept_allowed(self):
+ def test_basic_get__accept_allowed(self) -> None:
conn = Connection('memory://')
q = Queue('foo', exchange=self.exchange)
p = Producer(conn)
@@ -274,12 +274,12 @@ class test_Queue:
payload = message.decode()
assert payload['complex']
- def test_when_bound_but_no_exchange(self):
+ def test_when_bound_but_no_exchange(self) -> None:
q = Queue('a')
q.exchange = None
assert q.when_bound() is None
- def test_declare_but_no_exchange(self):
+ def test_declare_but_no_exchange(self) -> None:
q = Queue('a')
q.queue_declare = Mock()
q.queue_bind = Mock()
@@ -289,7 +289,7 @@ class test_Queue:
q.queue_declare.assert_called_with(
channel=None, nowait=False, passive=False)
- def test_declare__no_declare(self):
+ def test_declare__no_declare(self) -> None:
q = Queue('a', no_declare=True)
q.queue_declare = Mock()
q.queue_bind = Mock()
@@ -299,19 +299,19 @@ class test_Queue:
q.queue_declare.assert_not_called()
q.queue_bind.assert_not_called()
- def test_bind_to_when_name(self):
+ def test_bind_to_when_name(self) -> None:
chan = Mock()
q = Queue('a')
q(chan).bind_to('ex')
chan.queue_bind.assert_called()
- def test_get_when_no_m2p(self):
+ def test_get_when_no_m2p(self) -> None:
chan = Mock()
q = Queue('a')(chan)
chan.message_to_python = None
assert q.get()
- def test_multiple_bindings(self):
+ def test_multiple_bindings(self) -> None:
chan = Mock()
q = Queue('mul', [
binding(Exchange('mul1'), 'rkey1'),
@@ -329,14 +329,14 @@ class test_Queue:
durable=True,
) in chan.exchange_declare.call_args_list
- def test_can_cache_declaration(self):
+ def test_can_cache_declaration(self) -> None:
assert Queue('a', durable=True).can_cache_declaration
assert Queue('a', durable=False).can_cache_declaration
assert not Queue(
'a', queue_arguments={'x-expires': 100}
).can_cache_declaration
- def test_eq(self):
+ def test_eq(self) -> None:
q1 = Queue('xxx', Exchange('xxx', 'direct'), 'xxx')
q2 = Queue('xxx', Exchange('xxx', 'direct'), 'xxx')
assert q1 == q2
@@ -345,14 +345,14 @@ class test_Queue:
q3 = Queue('yyy', Exchange('xxx', 'direct'), 'xxx')
assert q1 != q3
- def test_exclusive_implies_auto_delete(self):
+ def test_exclusive_implies_auto_delete(self) -> None:
assert Queue('foo', self.exchange, exclusive=True).auto_delete
- def test_binds_at_instantiation(self):
+ def test_binds_at_instantiation(self) -> None:
assert Queue('foo', self.exchange,
channel=get_conn().channel()).is_bound
- def test_also_binds_exchange(self):
+ def test_also_binds_exchange(self) -> None:
chan = get_conn().channel()
b = Queue('foo', self.exchange)
assert not b.is_bound
@@ -363,7 +363,7 @@ class test_Queue:
assert b.channel is b.exchange.channel
assert b.exchange is not self.exchange
- def test_declare(self):
+ def test_declare(self) -> None:
chan = get_conn().channel()
b = Queue('foo', self.exchange, 'foo', channel=chan)
assert b.is_bound
@@ -372,49 +372,49 @@ class test_Queue:
assert 'queue_declare' in chan
assert 'queue_bind' in chan
- def test_get(self):
+ def test_get(self) -> None:
b = Queue('foo', self.exchange, 'foo', channel=get_conn().channel())
b.get()
assert 'basic_get' in b.channel
- def test_purge(self):
+ def test_purge(self) -> None:
b = Queue('foo', self.exchange, 'foo', channel=get_conn().channel())
b.purge()
assert 'queue_purge' in b.channel
- def test_consume(self):
+ def test_consume(self) -> None:
b = Queue('foo', self.exchange, 'foo', channel=get_conn().channel())
b.consume('fifafo', None)
assert 'basic_consume' in b.channel
- def test_cancel(self):
+ def test_cancel(self) -> None:
b = Queue('foo', self.exchange, 'foo', channel=get_conn().channel())
b.cancel('fifafo')
assert 'basic_cancel' in b.channel
- def test_delete(self):
+ def test_delete(self) -> None:
b = Queue('foo', self.exchange, 'foo', channel=get_conn().channel())
b.delete()
assert 'queue_delete' in b.channel
- def test_queue_unbind(self):
+ def test_queue_unbind(self) -> None:
b = Queue('foo', self.exchange, 'foo', channel=get_conn().channel())
b.queue_unbind()
assert 'queue_unbind' in b.channel
- def test_as_dict(self):
+ def test_as_dict(self) -> None:
q = Queue('foo', self.exchange, 'rk')
d = q.as_dict(recurse=True)
assert d['exchange']['name'] == self.exchange.name
- def test_queue_dump(self):
+ def test_queue_dump(self) -> None:
b = binding(self.exchange, 'rk')
q = Queue('foo', self.exchange, 'rk', bindings=[b])
d = q.as_dict(recurse=True)
assert d['bindings'][0]['routing_key'] == 'rk'
registry.dumps(d)
- def test__repr__(self):
+ def test__repr__(self) -> None:
b = Queue('foo', self.exchange, 'foo')
assert 'foo' in repr(b)
assert 'Queue' in repr(b)
@@ -422,5 +422,5 @@ class test_Queue:
class test_MaybeChannelBound:
- def test_repr(self):
+ def test_repr(self) -> None:
assert repr(MaybeChannelBound())