diff options
author | Ask Solem <ask@celeryproject.org> | 2017-02-16 10:44:48 -0800 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2017-02-16 10:44:48 -0800 |
commit | e6fab2f68b562cf1400bd8167e9b755f0482aafe (patch) | |
tree | b4d6be32dc8c62fa032e3c1a1a74636ac8360a38 /kombu/resource.py | |
parent | f2f7c67651106e77fb2db60ded134404ccc0a626 (diff) | |
download | kombu-e6fab2f68b562cf1400bd8167e9b755f0482aafe.tar.gz |
WIP5.0-devel
Diffstat (limited to 'kombu/resource.py')
-rw-r--r-- | kombu/resource.py | 55 |
1 files changed, 30 insertions, 25 deletions
diff --git a/kombu/resource.py b/kombu/resource.py index d98f49e7..1829b466 100644 --- a/kombu/resource.py +++ b/kombu/resource.py @@ -1,16 +1,15 @@ """Generic resource pool implementation.""" import os - from collections import deque from queue import Empty, LifoQueue as _LifoQueue - +from typing import Any from . import exceptions -from .utils import abstract +from .types import ResourceT from .utils.compat import register_after_fork from .utils.functional import lazy -def _after_fork_cleanup_resource(resource): +def _after_fork_cleanup_resource(resource: ResourceT) -> None: try: resource.force_close_all() except Exception: @@ -20,19 +19,21 @@ def _after_fork_cleanup_resource(resource): class LifoQueue(_LifoQueue): """Last in first out version of Queue.""" - def _init(self, maxsize): + def _init(self, maxsize: int) -> None: self.queue = deque() -@abstract.Resource.register -class Resource: +class Resource(ResourceT): """Pool of resources.""" LimitExceeded = exceptions.LimitExceeded close_after_fork = False - def __init__(self, limit=None, preload=None, close_after_fork=None): + def __init__(self, + limit: int = None, + preload: int = None, + close_after_fork: bool = None): self._limit = limit self.preload = preload or 0 self._closed = False @@ -47,10 +48,10 @@ class Resource: register_after_fork(self, _after_fork_cleanup_resource) self.setup() - def setup(self): + def setup(self) -> None: raise NotImplementedError('subclass responsibility') - def _add_when_empty(self): + def _add_when_empty(self) -> None: if self.limit and len(self._dirty) >= self.limit: raise self.LimitExceeded(self.limit) # All taken, put new on the queue and @@ -58,7 +59,7 @@ class Resource: # will get the resource. self._resource.put_nowait(self.new()) - def acquire(self, block=False, timeout=None): + def acquire(self, block: bool = False, timeout: int = None): """Acquire resource. Arguments: @@ -94,7 +95,7 @@ class Resource: else: R = self.prepare(self.new()) - def release(): + def release() -> None: """Release resource so it can be used by another thread. Warnings: @@ -107,16 +108,16 @@ class Resource: return R - def prepare(self, resource): + def prepare(self, resource: Any) -> Any: return resource - def close_resource(self, resource): + def close_resource(self, resource: Any) -> None: resource.close() - def release_resource(self, resource): + def release_resource(self, resource: Any) -> None: ... - def replace(self, resource): + def replace(self, resource: Any) -> None: """Replace existing resource with a new instance. This can be used in case of defective resources. @@ -125,7 +126,7 @@ class Resource: self._dirty.discard(resource) self.close_resource(resource) - def release(self, resource): + def release(self, resource: Any) -> None: if self.limit: self._dirty.discard(resource) self._resource.put_nowait(resource) @@ -133,10 +134,10 @@ class Resource: else: self.close_resource(resource) - def collect_resource(self, resource): + def collect_resource(self, resource: Any) -> None: ... - def force_close_all(self): + def force_close_all(self) -> None: """Close and remove all resources in the pool (also those in use). Used to close resources from parent processes after fork @@ -169,7 +170,11 @@ class Resource: except AttributeError: pass # Issue #78 - def resize(self, limit, force=False, ignore_errors=False, reset=False): + def resize( + self, limit: int, + force: bool = False, + ignore_errors: bool = False, + reset: bool = False) -> None: prev_limit = self._limit if (self._dirty and limit < self._limit) and not ignore_errors: if not force: @@ -187,7 +192,7 @@ class Resource: if limit < prev_limit: self._shrink_down() - def _shrink_down(self): + def _shrink_down(self) -> None: resource = self._resource # Items to the left are last recently used, so we remove those first. with resource.mutex: @@ -195,11 +200,11 @@ class Resource: self.collect_resource(resource.queue.popleft()) @property - def limit(self): + def limit(self) -> int: return self._limit @limit.setter - def limit(self, limit): + def limit(self, limit: int) -> None: self.resize(limit) if os.environ.get('KOMBU_DEBUG_POOL'): # pragma: no cover @@ -208,7 +213,7 @@ class Resource: _next_resource_id = 0 - def acquire(self, *args, **kwargs): # noqa + def acquire(self, *args, **kwargs) -> Any: # noqa import traceback id = self._next_resource_id = self._next_resource_id + 1 print('+{0} ACQUIRE {1}'.format(id, self.__class__.__name__)) @@ -220,7 +225,7 @@ class Resource: r.acquired_by.append(traceback.format_stack()) return r - def release(self, resource): # noqa + def release(self, resource: Any) -> None: # noqa id = resource._resource_id print('+{0} RELEASE {1}'.format(id, self.__class__.__name__)) r = self._orig_release(resource) |