summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJordan Carlson <jwgcarlson@gmail.com>2021-06-01 20:39:14 -0700
committerGitHub <noreply@github.com>2021-06-01 20:39:14 -0700
commitb164be9bf3c053b76d58f19eacfe7ff7cac1ca9b (patch)
tree99d68f3aadee86f4526cf5d54cc0405bb23beeee
parentdd61d9f0878adba8692b49167f9a80af4de2f8c2 (diff)
downloadpymemcache-b164be9bf3c053b76d58f19eacfe7ff7cac1ca9b.tar.gz
Remove idle connections from pool (#309)
The Memcached server can be configured to drop idle connections (via the idle_timeout option). When this occurs, pymemcache may still try to use the connection, resulting in MemcacheUnexpectedCloseError. Hence the need for client-side idle timeout logic.
-rw-r--r--pymemcache/client/base.py5
-rw-r--r--pymemcache/client/hash.py2
-rw-r--r--pymemcache/pool.py27
-rw-r--r--pymemcache/test/test_client.py32
4 files changed, 59 insertions, 7 deletions
diff --git a/pymemcache/client/base.py b/pymemcache/client/base.py
index 4db03b8..a0816cf 100644
--- a/pymemcache/client/base.py
+++ b/pymemcache/client/base.py
@@ -1039,6 +1039,9 @@ class PooledClient(object):
max_pool_size: maximum pool size to use (going above this amount
triggers a runtime error), by default this is 2147483648L
when not provided (or none).
+ pool_idle_timeout: pooled connections are discarded if they have been
+ unused for this many seconds. A value of 0 indicates
+ that pooled connections are never discarded.
lock_generator: a callback/type that takes no arguments that will
be called to create a lock or semaphore that can
protect the pool from concurrent access (for example a
@@ -1065,6 +1068,7 @@ class PooledClient(object):
socket_module=socket,
key_prefix=b'',
max_pool_size=None,
+ pool_idle_timeout=0,
lock_generator=None,
default_noreply=True,
allow_unicode_keys=False,
@@ -1088,6 +1092,7 @@ class PooledClient(object):
self._create_client,
after_remove=lambda client: client.close(),
max_size=max_pool_size,
+ idle_timeout=pool_idle_timeout,
lock_generator=lock_generator)
self.encoding = encoding
self.tls_context = tls_context
diff --git a/pymemcache/client/hash.py b/pymemcache/client/hash.py
index c2adb89..09b1e57 100644
--- a/pymemcache/client/hash.py
+++ b/pymemcache/client/hash.py
@@ -36,6 +36,7 @@ class HashClient(object):
socket_module=socket,
key_prefix=b'',
max_pool_size=None,
+ pool_idle_timeout=0,
lock_generator=None,
retry_attempts=2,
retry_timeout=1,
@@ -104,6 +105,7 @@ class HashClient(object):
if use_pooling is True:
self.default_kwargs.update({
'max_pool_size': max_pool_size,
+ 'pool_idle_timeout': pool_idle_timeout,
'lock_generator': lock_generator
})
diff --git a/pymemcache/pool.py b/pymemcache/pool.py
index f800f90..ddb8825 100644
--- a/pymemcache/pool.py
+++ b/pymemcache/pool.py
@@ -16,6 +16,7 @@ import collections
import contextlib
import sys
import threading
+import time
import six
@@ -25,6 +26,7 @@ class ObjectPool(object):
def __init__(self, obj_creator,
after_remove=None, max_size=None,
+ idle_timeout=0,
lock_generator=None):
self._used_objs = collections.deque()
self._free_objs = collections.deque()
@@ -38,6 +40,8 @@ class ObjectPool(object):
if not isinstance(max_size, six.integer_types) or max_size < 0:
raise ValueError('"max_size" must be a positive integer')
self.max_size = max_size
+ self.idle_timeout = idle_timeout
+ self._idle_clock = time.time if idle_timeout else int
@property
def used(self):
@@ -63,19 +67,27 @@ class ObjectPool(object):
def get(self):
with self._lock:
- if not self._free_objs:
+ # Find a free object, removing any that have idled for too long.
+ now = self._idle_clock()
+ while self._free_objs:
+ obj = self._free_objs.popleft()
+ if now - obj._last_used <= self.idle_timeout:
+ break
+
+ if self._after_remove is not None:
+ self._after_remove(obj)
+ else:
+ # No free objects, create a new one.
curr_count = len(self._used_objs)
if curr_count >= self.max_size:
raise RuntimeError("Too many objects,"
" %s >= %s" % (curr_count,
self.max_size))
obj = self._obj_creator()
- self._used_objs.append(obj)
- return obj
- else:
- obj = self._free_objs.pop()
- self._used_objs.append(obj)
- return obj
+
+ self._used_objs.append(obj)
+ obj._last_used = now
+ return obj
def destroy(self, obj, silent=True):
was_dropped = False
@@ -94,6 +106,7 @@ class ObjectPool(object):
try:
self._used_objs.remove(obj)
self._free_objs.append(obj)
+ obj._last_used = self._idle_clock()
except ValueError:
if not silent:
raise
diff --git a/pymemcache/test/test_client.py b/pymemcache/test/test_client.py
index bdaf233..24ecab2 100644
--- a/pymemcache/test/test_client.py
+++ b/pymemcache/test/test_client.py
@@ -1255,6 +1255,38 @@ class TestPooledClient(ClientTestMixin, unittest.TestCase):
assert isinstance(client.client_pool.get(), MyClient)
+class TestPooledClientIdleTimeout(ClientTestMixin, unittest.TestCase):
+ def make_client(self, mock_socket_values, **kwargs):
+ mock_client = Client(None, **kwargs)
+ mock_client.sock = MockSocket(list(mock_socket_values))
+ client = PooledClient(None, pool_idle_timeout=60, **kwargs)
+ client.client_pool = pool.ObjectPool(lambda: mock_client)
+ return client
+
+ def test_free_idle(self):
+ class Counter(object):
+ count = 0
+
+ def increment(self, obj):
+ self.count += 1
+
+ removed = Counter()
+
+ client = self.make_client([b'VALUE key 0 5\r\nvalue\r\nEND\r\n']*2)
+ client.client_pool._after_remove = removed.increment
+ client.client_pool._idle_clock = lambda: 0
+
+ client.set(b'key', b'value')
+ assert removed.count == 0
+ client.get(b'key')
+ assert removed.count == 0
+
+ # Advance clock to beyond the idle timeout.
+ client.client_pool._idle_clock = lambda: 61
+ client.get(b'key')
+ assert removed.count == 1
+
+
class TestMockClient(ClientTestMixin, unittest.TestCase):
def make_client(self, mock_socket_values, **kwargs):
client = MockMemcacheClient(None, **kwargs)