summaryrefslogtreecommitdiff
path: root/kombu/pools.py
diff options
context:
space:
mode:
Diffstat (limited to 'kombu/pools.py')
-rw-r--r--kombu/pools.py60
1 files changed, 35 insertions, 25 deletions
diff --git a/kombu/pools.py b/kombu/pools.py
index f9168886..88770212 100644
--- a/kombu/pools.py
+++ b/kombu/pools.py
@@ -1,23 +1,25 @@
"""Public resource pools."""
import os
-
from itertools import chain
-
+from typing import Any
from .connection import Resource
from .messaging import Producer
+from .types import ClientT, ProducerT, ResourceT
from .utils.collections import EqualityDict
from .utils.compat import register_after_fork
from .utils.functional import lazy
-__all__ = ['ProducerPool', 'PoolGroup', 'register_group',
- 'connections', 'producers', 'get_limit', 'set_limit', 'reset']
+__all__ = [
+ 'ProducerPool', 'PoolGroup', 'register_group',
+ 'connections', 'producers', 'get_limit', 'set_limit', 'reset',
+]
_limit = [10]
_groups = []
-use_global_limit = object()
+use_global_limit = 44444444444444444444444444
disable_limit_protection = os.environ.get('KOMBU_DISABLE_LIMIT_PROTECTION')
-def _after_fork_cleanup_group(group):
+def _after_fork_cleanup_group(group: 'PoolGroup') -> None:
group.clear()
@@ -27,15 +29,19 @@ class ProducerPool(Resource):
Producer = Producer
close_after_fork = True
- def __init__(self, connections, *args, Producer=None, **kwargs):
+ def __init__(self,
+ connections: ResourceT,
+ *args,
+ Producer: type = None,
+ **kwargs) -> None:
self.connections = connections
self.Producer = Producer or self.Producer
super().__init__(*args, **kwargs)
- def _acquire_connection(self):
+ def _acquire_connection(self) -> ClientT:
return self.connections.acquire(block=True)
- def create_producer(self):
+ def create_producer(self) -> ProducerT:
conn = self._acquire_connection()
try:
return self.Producer(conn)
@@ -43,18 +49,18 @@ class ProducerPool(Resource):
conn.release()
raise
- def new(self):
+ def new(self) -> lazy:
return lazy(self.create_producer)
- def setup(self):
+ def setup(self) -> None:
if self.limit:
for _ in range(self.limit):
self._resource.put_nowait(self.new())
- def close_resource(self, resource):
+ def close_resource(self, resource: ProducerT) -> None:
...
- def prepare(self, p):
+ def prepare(self, p: Any) -> None:
if callable(p):
p = p()
if p._channel is None:
@@ -66,7 +72,7 @@ class ProducerPool(Resource):
raise
return p
- def release(self, resource):
+ def release(self, resource: ProducerT) -> None:
if resource.__connection__:
resource.__connection__.release()
resource.channel = None
@@ -76,24 +82,25 @@ class ProducerPool(Resource):
class PoolGroup(EqualityDict):
"""Collection of resource pools."""
- def __init__(self, limit=None, close_after_fork=True):
+ def __init__(self, limit: int = None,
+ close_after_fork: bool = True) -> None:
self.limit = limit
self.close_after_fork = close_after_fork
if self.close_after_fork and register_after_fork is not None:
register_after_fork(self, _after_fork_cleanup_group)
- def create(self, resource, limit):
+ def create(self, connection: ClientT, limit: int) -> Any:
raise NotImplementedError('PoolGroups must define ``create``')
- def __missing__(self, resource):
+ def __missing__(self, resource: Any) -> Any:
limit = self.limit
- if limit is use_global_limit:
+ if limit == use_global_limit:
limit = get_limit()
k = self[resource] = self.create(resource, limit)
return k
-def register_group(group):
+def register_group(group: PoolGroup) -> PoolGroup:
"""Register group (can be used as decorator)."""
_groups.append(group)
return group
@@ -102,7 +109,7 @@ def register_group(group):
class Connections(PoolGroup):
"""Collection of connection pools."""
- def create(self, connection, limit):
+ def create(self, connection: ClientT, limit: int) -> Any:
return connection.Pool(limit=limit)
connections = register_group(Connections(limit=use_global_limit)) # noqa: E305
@@ -110,21 +117,24 @@ connections = register_group(Connections(limit=use_global_limit)) # noqa: E305
class Producers(PoolGroup):
"""Collection of producer pools."""
- def create(self, connection, limit):
+ def create(self, connection: ClientT, limit: int) -> Any:
return ProducerPool(connections[connection], limit=limit)
producers = register_group(Producers(limit=use_global_limit)) # noqa: E305
-def _all_pools():
+def _all_pools() -> chain[ResourceT]:
return chain(*[(g.values() if g else iter([])) for g in _groups])
-def get_limit():
+def get_limit() -> int:
"""Get current connection pool limit."""
return _limit[0]
-def set_limit(limit, force=False, reset_after=False, ignore_errors=False):
+def set_limit(limit: int,
+ force: bool = False,
+ reset_after: bool = False,
+ ignore_errors: bool = False) -> int:
"""Set new connection pool limit."""
limit = limit or 0
glimit = _limit[0] or 0
@@ -135,7 +145,7 @@ def set_limit(limit, force=False, reset_after=False, ignore_errors=False):
return limit
-def reset(*args, **kwargs):
+def reset(*args, **kwargs) -> None:
"""Reset all pools by closing open resources."""
for pool in _all_pools():
try: