diff options
Diffstat (limited to 'kombu/pools.py')
-rw-r--r-- | kombu/pools.py | 60 |
1 files changed, 35 insertions, 25 deletions
diff --git a/kombu/pools.py b/kombu/pools.py index f9168886..88770212 100644 --- a/kombu/pools.py +++ b/kombu/pools.py @@ -1,23 +1,25 @@ """Public resource pools.""" import os - from itertools import chain - +from typing import Any from .connection import Resource from .messaging import Producer +from .types import ClientT, ProducerT, ResourceT from .utils.collections import EqualityDict from .utils.compat import register_after_fork from .utils.functional import lazy -__all__ = ['ProducerPool', 'PoolGroup', 'register_group', - 'connections', 'producers', 'get_limit', 'set_limit', 'reset'] +__all__ = [ + 'ProducerPool', 'PoolGroup', 'register_group', + 'connections', 'producers', 'get_limit', 'set_limit', 'reset', +] _limit = [10] _groups = [] -use_global_limit = object() +use_global_limit = 44444444444444444444444444 disable_limit_protection = os.environ.get('KOMBU_DISABLE_LIMIT_PROTECTION') -def _after_fork_cleanup_group(group): +def _after_fork_cleanup_group(group: 'PoolGroup') -> None: group.clear() @@ -27,15 +29,19 @@ class ProducerPool(Resource): Producer = Producer close_after_fork = True - def __init__(self, connections, *args, Producer=None, **kwargs): + def __init__(self, + connections: ResourceT, + *args, + Producer: type = None, + **kwargs) -> None: self.connections = connections self.Producer = Producer or self.Producer super().__init__(*args, **kwargs) - def _acquire_connection(self): + def _acquire_connection(self) -> ClientT: return self.connections.acquire(block=True) - def create_producer(self): + def create_producer(self) -> ProducerT: conn = self._acquire_connection() try: return self.Producer(conn) @@ -43,18 +49,18 @@ class ProducerPool(Resource): conn.release() raise - def new(self): + def new(self) -> lazy: return lazy(self.create_producer) - def setup(self): + def setup(self) -> None: if self.limit: for _ in range(self.limit): self._resource.put_nowait(self.new()) - def close_resource(self, resource): + def close_resource(self, resource: ProducerT) -> None: ... - def prepare(self, p): + def prepare(self, p: Any) -> None: if callable(p): p = p() if p._channel is None: @@ -66,7 +72,7 @@ class ProducerPool(Resource): raise return p - def release(self, resource): + def release(self, resource: ProducerT) -> None: if resource.__connection__: resource.__connection__.release() resource.channel = None @@ -76,24 +82,25 @@ class ProducerPool(Resource): class PoolGroup(EqualityDict): """Collection of resource pools.""" - def __init__(self, limit=None, close_after_fork=True): + def __init__(self, limit: int = None, + close_after_fork: bool = True) -> None: self.limit = limit self.close_after_fork = close_after_fork if self.close_after_fork and register_after_fork is not None: register_after_fork(self, _after_fork_cleanup_group) - def create(self, resource, limit): + def create(self, connection: ClientT, limit: int) -> Any: raise NotImplementedError('PoolGroups must define ``create``') - def __missing__(self, resource): + def __missing__(self, resource: Any) -> Any: limit = self.limit - if limit is use_global_limit: + if limit == use_global_limit: limit = get_limit() k = self[resource] = self.create(resource, limit) return k -def register_group(group): +def register_group(group: PoolGroup) -> PoolGroup: """Register group (can be used as decorator).""" _groups.append(group) return group @@ -102,7 +109,7 @@ def register_group(group): class Connections(PoolGroup): """Collection of connection pools.""" - def create(self, connection, limit): + def create(self, connection: ClientT, limit: int) -> Any: return connection.Pool(limit=limit) connections = register_group(Connections(limit=use_global_limit)) # noqa: E305 @@ -110,21 +117,24 @@ connections = register_group(Connections(limit=use_global_limit)) # noqa: E305 class Producers(PoolGroup): """Collection of producer pools.""" - def create(self, connection, limit): + def create(self, connection: ClientT, limit: int) -> Any: return ProducerPool(connections[connection], limit=limit) producers = register_group(Producers(limit=use_global_limit)) # noqa: E305 -def _all_pools(): +def _all_pools() -> chain[ResourceT]: return chain(*[(g.values() if g else iter([])) for g in _groups]) -def get_limit(): +def get_limit() -> int: """Get current connection pool limit.""" return _limit[0] -def set_limit(limit, force=False, reset_after=False, ignore_errors=False): +def set_limit(limit: int, + force: bool = False, + reset_after: bool = False, + ignore_errors: bool = False) -> int: """Set new connection pool limit.""" limit = limit or 0 glimit = _limit[0] or 0 @@ -135,7 +145,7 @@ def set_limit(limit, force=False, reset_after=False, ignore_errors=False): return limit -def reset(*args, **kwargs): +def reset(*args, **kwargs) -> None: """Reset all pools by closing open resources.""" for pool in _all_pools(): try: |