summaryrefslogtreecommitdiff
path: root/kombu/resource.py
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2017-02-16 10:44:48 -0800
committerAsk Solem <ask@celeryproject.org>2017-02-16 10:44:48 -0800
commite6fab2f68b562cf1400bd8167e9b755f0482aafe (patch)
treeb4d6be32dc8c62fa032e3c1a1a74636ac8360a38 /kombu/resource.py
parentf2f7c67651106e77fb2db60ded134404ccc0a626 (diff)
downloadkombu-e6fab2f68b562cf1400bd8167e9b755f0482aafe.tar.gz
Diffstat (limited to 'kombu/resource.py')
-rw-r--r--kombu/resource.py55
1 files changed, 30 insertions, 25 deletions
diff --git a/kombu/resource.py b/kombu/resource.py
index d98f49e7..1829b466 100644
--- a/kombu/resource.py
+++ b/kombu/resource.py
@@ -1,16 +1,15 @@
"""Generic resource pool implementation."""
import os
-
from collections import deque
from queue import Empty, LifoQueue as _LifoQueue
-
+from typing import Any
from . import exceptions
-from .utils import abstract
+from .types import ResourceT
from .utils.compat import register_after_fork
from .utils.functional import lazy
-def _after_fork_cleanup_resource(resource):
+def _after_fork_cleanup_resource(resource: ResourceT) -> None:
try:
resource.force_close_all()
except Exception:
@@ -20,19 +19,21 @@ def _after_fork_cleanup_resource(resource):
class LifoQueue(_LifoQueue):
"""Last in first out version of Queue."""
- def _init(self, maxsize):
+ def _init(self, maxsize: int) -> None:
self.queue = deque()
-@abstract.Resource.register
-class Resource:
+class Resource(ResourceT):
"""Pool of resources."""
LimitExceeded = exceptions.LimitExceeded
close_after_fork = False
- def __init__(self, limit=None, preload=None, close_after_fork=None):
+ def __init__(self,
+ limit: int = None,
+ preload: int = None,
+ close_after_fork: bool = None):
self._limit = limit
self.preload = preload or 0
self._closed = False
@@ -47,10 +48,10 @@ class Resource:
register_after_fork(self, _after_fork_cleanup_resource)
self.setup()
- def setup(self):
+ def setup(self) -> None:
raise NotImplementedError('subclass responsibility')
- def _add_when_empty(self):
+ def _add_when_empty(self) -> None:
if self.limit and len(self._dirty) >= self.limit:
raise self.LimitExceeded(self.limit)
# All taken, put new on the queue and
@@ -58,7 +59,7 @@ class Resource:
# will get the resource.
self._resource.put_nowait(self.new())
- def acquire(self, block=False, timeout=None):
+ def acquire(self, block: bool = False, timeout: int = None):
"""Acquire resource.
Arguments:
@@ -94,7 +95,7 @@ class Resource:
else:
R = self.prepare(self.new())
- def release():
+ def release() -> None:
"""Release resource so it can be used by another thread.
Warnings:
@@ -107,16 +108,16 @@ class Resource:
return R
- def prepare(self, resource):
+ def prepare(self, resource: Any) -> Any:
return resource
- def close_resource(self, resource):
+ def close_resource(self, resource: Any) -> None:
resource.close()
- def release_resource(self, resource):
+ def release_resource(self, resource: Any) -> None:
...
- def replace(self, resource):
+ def replace(self, resource: Any) -> None:
"""Replace existing resource with a new instance.
This can be used in case of defective resources.
@@ -125,7 +126,7 @@ class Resource:
self._dirty.discard(resource)
self.close_resource(resource)
- def release(self, resource):
+ def release(self, resource: Any) -> None:
if self.limit:
self._dirty.discard(resource)
self._resource.put_nowait(resource)
@@ -133,10 +134,10 @@ class Resource:
else:
self.close_resource(resource)
- def collect_resource(self, resource):
+ def collect_resource(self, resource: Any) -> None:
...
- def force_close_all(self):
+ def force_close_all(self) -> None:
"""Close and remove all resources in the pool (also those in use).
Used to close resources from parent processes after fork
@@ -169,7 +170,11 @@ class Resource:
except AttributeError:
pass # Issue #78
- def resize(self, limit, force=False, ignore_errors=False, reset=False):
+ def resize(
+ self, limit: int,
+ force: bool = False,
+ ignore_errors: bool = False,
+ reset: bool = False) -> None:
prev_limit = self._limit
if (self._dirty and limit < self._limit) and not ignore_errors:
if not force:
@@ -187,7 +192,7 @@ class Resource:
if limit < prev_limit:
self._shrink_down()
- def _shrink_down(self):
+ def _shrink_down(self) -> None:
resource = self._resource
# Items to the left are last recently used, so we remove those first.
with resource.mutex:
@@ -195,11 +200,11 @@ class Resource:
self.collect_resource(resource.queue.popleft())
@property
- def limit(self):
+ def limit(self) -> int:
return self._limit
@limit.setter
- def limit(self, limit):
+ def limit(self, limit: int) -> None:
self.resize(limit)
if os.environ.get('KOMBU_DEBUG_POOL'): # pragma: no cover
@@ -208,7 +213,7 @@ class Resource:
_next_resource_id = 0
- def acquire(self, *args, **kwargs): # noqa
+ def acquire(self, *args, **kwargs) -> Any: # noqa
import traceback
id = self._next_resource_id = self._next_resource_id + 1
print('+{0} ACQUIRE {1}'.format(id, self.__class__.__name__))
@@ -220,7 +225,7 @@ class Resource:
r.acquired_by.append(traceback.format_stack())
return r
- def release(self, resource): # noqa
+ def release(self, resource: Any) -> None: # noqa
id = resource._resource_id
print('+{0} RELEASE {1}'.format(id, self.__class__.__name__))
r = self._orig_release(resource)