summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kombu/asynchronous/http/base.py11
-rw-r--r--kombu/asynchronous/timer.py11
-rw-r--r--kombu/compat.py18
-rw-r--r--kombu/connection.py12
-rw-r--r--kombu/messaging.py18
-rw-r--r--kombu/resource.py11
-rw-r--r--kombu/simple.py11
-rw-r--r--kombu/transport/base.py11
-rw-r--r--kombu/transport/virtual/base.py16
-rw-r--r--t/mocks.py11
-rw-r--r--t/unit/asynchronous/test_semaphore.py6
-rw-r--r--t/unit/test_common.py11
-rw-r--r--t/unit/test_compat.py27
-rw-r--r--t/unit/test_connection.py14
-rw-r--r--t/unit/test_messaging.py5
-rw-r--r--t/unit/test_simple.py5
-rw-r--r--t/unit/transport/test_redis.py11
-rw-r--r--t/unit/transport/virtual/test_base.py5
18 files changed, 168 insertions, 46 deletions
diff --git a/kombu/asynchronous/http/base.py b/kombu/asynchronous/http/base.py
index 2bd28076..5feb8f3a 100644
--- a/kombu/asynchronous/http/base.py
+++ b/kombu/asynchronous/http/base.py
@@ -2,6 +2,7 @@
import sys
from http.client import responses
+from typing import TYPE_CHECKING, Optional, Type
from vine import Thenable, maybe_promise, promise
@@ -10,6 +11,9 @@ from kombu.utils.compat import coro
from kombu.utils.encoding import bytes_to_str
from kombu.utils.functional import maybe_list, memoize
+if TYPE_CHECKING:
+ from types import TracebackType
+
__all__ = ('Headers', 'Response', 'Request')
PYPY = hasattr(sys, 'pypy_version_info')
@@ -253,5 +257,10 @@ class BaseClient:
def __enter__(self):
return self
- def __exit__(self, *exc_info):
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional['TracebackType']
+ ) -> None:
self.close()
diff --git a/kombu/asynchronous/timer.py b/kombu/asynchronous/timer.py
index 21ad37c1..2ee26c61 100644
--- a/kombu/asynchronous/timer.py
+++ b/kombu/asynchronous/timer.py
@@ -7,6 +7,7 @@ from datetime import datetime
from functools import total_ordering
from time import monotonic
from time import time as _time
+from typing import TYPE_CHECKING, Optional, Type
from weakref import proxy as weakrefproxy
from vine.utils import wraps
@@ -18,6 +19,9 @@ try:
except ImportError: # pragma: no cover
utc = None
+if TYPE_CHECKING:
+ from types import TracebackType
+
__all__ = ('Entry', 'Timer', 'to_timestamp')
logger = get_logger(__name__)
@@ -101,7 +105,12 @@ class Timer:
def __enter__(self):
return self
- def __exit__(self, *exc_info):
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional['TracebackType']
+ ) -> None:
self.stop()
def call_at(self, eta, fun, args=(), kwargs=None, priority=0):
diff --git a/kombu/compat.py b/kombu/compat.py
index 1fa3f631..df37c3b1 100644
--- a/kombu/compat.py
+++ b/kombu/compat.py
@@ -4,10 +4,14 @@ See https://pypi.org/project/carrot/ for documentation.
"""
from itertools import count
+from typing import TYPE_CHECKING, Optional, Type
from . import messaging
from .entity import Exchange, Queue
+if TYPE_CHECKING:
+ from types import TracebackType
+
__all__ = ('Publisher', 'Consumer')
# XXX compat attribute
@@ -65,7 +69,12 @@ class Publisher(messaging.Producer):
def __enter__(self):
return self
- def __exit__(self, *exc_info):
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional['TracebackType']
+ ) -> None:
self.close()
@property
@@ -127,7 +136,12 @@ class Consumer(messaging.Consumer):
def __enter__(self):
return self
- def __exit__(self, *exc_info):
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional['TracebackType']
+ ) -> None:
self.close()
def __iter__(self):
diff --git a/kombu/connection.py b/kombu/connection.py
index e2dd69b6..7cdef505 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -5,6 +5,7 @@ import socket
from contextlib import contextmanager
from itertools import count, cycle
from operator import itemgetter
+from typing import TYPE_CHECKING, Optional, Type
try:
from ssl import CERT_NONE
@@ -13,6 +14,7 @@ except ImportError: # pragma: no cover
CERT_NONE = None
ssl_available = False
+
# jython breaks on relative import for .exceptions for some reason
# (Issue #112)
from kombu import exceptions
@@ -25,6 +27,9 @@ from .utils.functional import dictfilter, lazy, retry_over_time, shufflecycle
from .utils.objects import cached_property
from .utils.url import as_url, maybe_sanitize_url, parse_url, quote, urlparse
+if TYPE_CHECKING:
+ from types import TracebackType
+
__all__ = ('Connection', 'ConnectionPool', 'ChannelPool')
logger = get_logger(__name__)
@@ -828,7 +833,12 @@ class Connection:
def __enter__(self):
return self
- def __exit__(self, *args):
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional['TracebackType']
+ ) -> None:
self.release()
@property
diff --git a/kombu/messaging.py b/kombu/messaging.py
index 0bed52c5..ebba43ee 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -1,6 +1,7 @@
"""Sending and receiving messages."""
from itertools import count
+from typing import TYPE_CHECKING, Optional, Type
from .common import maybe_declare
from .compression import compress
@@ -10,6 +11,9 @@ from .exceptions import ContentDisallowed
from .serialization import dumps, prepare_accept_content
from .utils.functional import ChannelPromise, maybe_list
+if TYPE_CHECKING:
+ from types import TracebackType
+
__all__ = ('Exchange', 'Queue', 'Producer', 'Consumer')
@@ -236,7 +240,12 @@ class Producer:
def __enter__(self):
return self
- def __exit__(self, *exc_info):
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional['TracebackType']
+ ) -> None:
self.release()
def release(self):
@@ -435,7 +444,12 @@ class Consumer:
self.consume()
return self
- def __exit__(self, exc_type, exc_val, exc_tb):
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional['TracebackType']
+ ) -> None:
if self.channel and self.channel.connection:
conn_errors = self.channel.connection.client.connection_errors
if not isinstance(exc_val, conn_errors):
diff --git a/kombu/resource.py b/kombu/resource.py
index e3617dc4..004eb7a9 100644
--- a/kombu/resource.py
+++ b/kombu/resource.py
@@ -4,11 +4,15 @@ import os
from collections import deque
from queue import Empty
from queue import LifoQueue as _LifoQueue
+from typing import TYPE_CHECKING
from . import exceptions
from .utils.compat import register_after_fork
from .utils.functional import lazy
+if TYPE_CHECKING:
+ from types import TracebackType
+
def _after_fork_cleanup_resource(resource):
try:
@@ -191,7 +195,12 @@ class Resource:
def __enter__(self):
pass
- def __exit__(self, type, value, traceback):
+ def __exit__(
+ self,
+ exc_type: type,
+ exc_val: Exception,
+ exc_tb: 'TracebackType'
+ ) -> None:
pass
resource = self._resource
diff --git a/kombu/simple.py b/kombu/simple.py
index eee037be..5925ceec 100644
--- a/kombu/simple.py
+++ b/kombu/simple.py
@@ -4,10 +4,14 @@ import socket
from collections import deque
from queue import Empty
from time import monotonic
+from typing import TYPE_CHECKING, Optional, Type
from . import entity, messaging
from .connection import maybe_channel
+if TYPE_CHECKING:
+ from types import TracebackType
+
__all__ = ('SimpleQueue', 'SimpleBuffer')
@@ -18,7 +22,12 @@ class SimpleBase:
def __enter__(self):
return self
- def __exit__(self, *exc_info):
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional['TracebackType']
+ ) -> None:
self.close()
def __init__(self, channel, producer, consumer, no_ack=False):
diff --git a/kombu/transport/base.py b/kombu/transport/base.py
index 3083acf4..62a84fda 100644
--- a/kombu/transport/base.py
+++ b/kombu/transport/base.py
@@ -4,6 +4,7 @@
import errno
import socket
+from typing import TYPE_CHECKING, Optional, Type
from amqp.exceptions import RecoverableConnectionError
@@ -13,6 +14,9 @@ from kombu.utils.functional import dictfilter
from kombu.utils.objects import cached_property
from kombu.utils.time import maybe_s_to_ms
+if TYPE_CHECKING:
+ from types import TracebackType
+
__all__ = ('Message', 'StdChannel', 'Management', 'Transport')
RABBITMQ_QUEUE_ARGUMENTS = {
@@ -100,7 +104,12 @@ class StdChannel:
def __enter__(self):
return self
- def __exit__(self, *exc_info):
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional['TracebackType']
+ ) -> None:
self.close()
diff --git a/kombu/transport/virtual/base.py b/kombu/transport/virtual/base.py
index 1e18caa6..6aef17da 100644
--- a/kombu/transport/virtual/base.py
+++ b/kombu/transport/virtual/base.py
@@ -13,6 +13,7 @@ from itertools import count
from multiprocessing.util import Finalize
from queue import Empty
from time import monotonic, sleep
+from typing import TYPE_CHECKING, Optional, Type
from amqp.protocol import queue_declare_ok_t
@@ -26,6 +27,9 @@ from kombu.utils.uuid import uuid
from .exchange import STANDARD_EXCHANGE_TYPES
+if TYPE_CHECKING:
+ from types import TracebackType
+
ARRAY_TYPE_H = 'H'
UNDELIVERABLE_FMT = """\
@@ -722,7 +726,8 @@ class Channel(AbstractChannel, base.StdChannel):
message = message.serializable()
message['redelivered'] = True
for queue in self._lookup(
- delivery_info['exchange'], delivery_info['routing_key']):
+ delivery_info['exchange'],
+ delivery_info['routing_key']):
self._put(queue, message)
def _restore_at_beginning(self, message):
@@ -799,7 +804,12 @@ class Channel(AbstractChannel, base.StdChannel):
def __enter__(self):
return self
- def __exit__(self, *exc_info):
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional['TracebackType']
+ ) -> None:
self.close()
@property
@@ -947,7 +957,7 @@ class Transport(base.Transport):
# this channel is then used as the next requested channel.
# (returned by ``create_channel``).
self._avail_channels.append(self.create_channel(self))
- return self # for drain events
+ return self # for drain events
def close_connection(self, connection):
self.cycle.close()
diff --git a/t/mocks.py b/t/mocks.py
index b02a34d6..51ef32fb 100644
--- a/t/mocks.py
+++ b/t/mocks.py
@@ -1,9 +1,13 @@
from itertools import count
+from typing import TYPE_CHECKING, Optional, Type
from unittest.mock import Mock
from kombu.transport import base
from kombu.utils import json
+if TYPE_CHECKING:
+ from types import TracebackType
+
class _ContextMock(Mock):
"""Dummy class implementing __enter__ and __exit__
@@ -13,7 +17,12 @@ class _ContextMock(Mock):
def __enter__(self):
return self
- def __exit__(self, *exc_info):
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional['TracebackType']
+ ) -> None:
pass
diff --git a/t/unit/asynchronous/test_semaphore.py b/t/unit/asynchronous/test_semaphore.py
index 8767ca91..485d0800 100644
--- a/t/unit/asynchronous/test_semaphore.py
+++ b/t/unit/asynchronous/test_semaphore.py
@@ -1,11 +1,13 @@
+from typing import List
+
from kombu.asynchronous.semaphore import LaxBoundedSemaphore
class test_LaxBoundedSemaphore:
- def test_over_release(self):
+ def test_over_release(self) -> None:
x = LaxBoundedSemaphore(2)
- calls = []
+ calls: List[int] = []
for i in range(1, 21):
x.acquire(calls.append, i)
x.release()
diff --git a/t/unit/test_common.py b/t/unit/test_common.py
index 0f669b7d..4780c0a4 100644
--- a/t/unit/test_common.py
+++ b/t/unit/test_common.py
@@ -1,4 +1,5 @@
import socket
+from typing import TYPE_CHECKING, Optional, Type
from unittest.mock import Mock, patch
import pytest
@@ -10,6 +11,9 @@ from kombu.common import (PREFETCH_COUNT_MAX, Broadcast, QoS, collect_replies,
maybe_declare, send_reply)
from t.mocks import ContextMock, MockPool
+if TYPE_CHECKING:
+ from types import TracebackType
+
def test_generate_oid():
from uuid import NAMESPACE_OID
@@ -338,7 +342,12 @@ class MockConsumer:
self.consumers.add(self)
return self
- def __exit__(self, *exc_info):
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional['TracebackType']
+ ) -> None:
self.consumers.discard(self)
diff --git a/t/unit/test_compat.py b/t/unit/test_compat.py
index d75ce5df..31eb97ea 100644
--- a/t/unit/test_compat.py
+++ b/t/unit/test_compat.py
@@ -115,12 +115,14 @@ class test_Publisher:
pub.close()
def test__enter__exit__(self):
- pub = compat.Publisher(self.connection,
- exchange='test_Publisher_send',
- routing_key='rkey')
- x = pub.__enter__()
- assert x is pub
- x.__exit__()
+ pub = compat.Publisher(
+ self.connection,
+ exchange='test_Publisher_send',
+ routing_key='rkey'
+ )
+ with pub as x:
+ assert x is pub
+
assert pub._closed
@@ -158,11 +160,14 @@ class test_Consumer:
assert q2.exchange.auto_delete
def test__enter__exit__(self, n='test__enter__exit__'):
- c = compat.Consumer(self.connection, queue=n, exchange=n,
- routing_key='rkey')
- x = c.__enter__()
- assert x is c
- x.__exit__()
+ c = compat.Consumer(
+ self.connection,
+ queue=n,
+ exchange=n,
+ routing_key='rkey'
+ )
+ with c as x:
+ assert x is c
assert c._closed
def test_revive(self, n='test_revive'):
diff --git a/t/unit/test_connection.py b/t/unit/test_connection.py
index 17ea7b34..703e237d 100644
--- a/t/unit/test_connection.py
+++ b/t/unit/test_connection.py
@@ -398,14 +398,12 @@ class test_Connection:
qsms.assert_called_with(self.conn.connection)
def test__enter____exit__(self):
- conn = self.conn
- context = conn.__enter__()
- assert context is conn
- conn.connect()
- assert conn.connection.connected
- conn.__exit__()
- assert conn.connection is None
- conn.close() # again
+ with self.conn as context:
+ assert context is self.conn
+ self.conn.connect()
+ assert self.conn.connection.connected
+ assert self.conn.connection is None
+ self.conn.close() # again
def test_close_survives_connerror(self):
diff --git a/t/unit/test_messaging.py b/t/unit/test_messaging.py
index f8ed437c..68c85b5f 100644
--- a/t/unit/test_messaging.py
+++ b/t/unit/test_messaging.py
@@ -188,9 +188,8 @@ class test_Producer:
def test_enter_exit(self):
p = self.connection.Producer()
p.release = Mock()
-
- assert p.__enter__() is p
- p.__exit__()
+ with p as x:
+ assert x is p
p.release.assert_called_with()
def test_connection_property_handles_AttributeError(self):
diff --git a/t/unit/test_simple.py b/t/unit/test_simple.py
index a5cd899a..6a9a9b09 100644
--- a/t/unit/test_simple.py
+++ b/t/unit/test_simple.py
@@ -91,9 +91,8 @@ class SimpleBase:
def test_enter_exit(self):
q = self.Queue('test_enter_exit')
q.close = Mock()
-
- assert q.__enter__() is q
- q.__exit__()
+ with q as x:
+ assert x is q
q.close.assert_called_with()
def test_qsize(self):
diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py
index 029f7901..9905f6ee 100644
--- a/t/unit/transport/test_redis.py
+++ b/t/unit/transport/test_redis.py
@@ -6,6 +6,7 @@ from collections import defaultdict
from itertools import count
from queue import Empty
from queue import Queue as _Queue
+from typing import TYPE_CHECKING, Optional, Type
from unittest.mock import ANY, Mock, call, patch
import pytest
@@ -16,6 +17,9 @@ from kombu.transport import virtual
from kombu.utils import eventio # patch poll
from kombu.utils.json import dumps
+if TYPE_CHECKING:
+ from types import TracebackType
+
def _redis_modules():
@@ -231,7 +235,12 @@ class Pipeline:
def __enter__(self):
return self
- def __exit__(self, *exc_info):
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional['TracebackType']
+ ) -> None:
pass
def __getattr__(self, key):
diff --git a/t/unit/transport/virtual/test_base.py b/t/unit/transport/virtual/test_base.py
index a5685ab9..97df370a 100644
--- a/t/unit/transport/virtual/test_base.py
+++ b/t/unit/transport/virtual/test_base.py
@@ -462,9 +462,8 @@ class test_Channel:
assert 'could not be delivered' in log[0].message.args[0]
def test_context(self):
- x = self.channel.__enter__()
- assert x is self.channel
- x.__exit__()
+ with self.channel as x:
+ assert x is self.channel
assert x.closed
def test_cycle_property(self):