summaryrefslogtreecommitdiff
path: root/pymemcache/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'pymemcache/client.py')
-rw-r--r--pymemcache/client.py219
1 files changed, 201 insertions, 18 deletions
diff --git a/pymemcache/client.py b/pymemcache/client.py
index fd31589..18a6b4a 100644
--- a/pymemcache/client.py
+++ b/pymemcache/client.py
@@ -73,6 +73,8 @@ __author__ = "Charles Gordon"
import socket
import six
+from pymemcache import pool
+
RECV_SIZE = 4096
VALID_STORE_RESULTS = {
@@ -152,6 +154,23 @@ class MemcacheUnexpectedCloseError(MemcacheServerError):
pass
+# Common helper functions.
+
+def _check_key(key, key_prefix=b''):
+ """Checks key and add key_prefix."""
+ if isinstance(key, six.text_type):
+ try:
+ key = key.encode('ascii')
+ except UnicodeEncodeError:
+ raise MemcacheIllegalInputError("No ascii key: %r" % (key,))
+ key = key_prefix + key
+ if b' ' in key:
+ raise MemcacheIllegalInputError("Key contains spaces: %r" % (key,))
+ if len(key) > 250:
+ raise MemcacheIllegalInputError("Key is too long: %r" % (key,))
+ return key
+
+
class Client(object):
"""
A client for a single memcached server.
@@ -266,7 +285,6 @@ class Client(object):
self.ignore_exc = ignore_exc
self.socket_module = socket_module
self.sock = None
- self.buf = b''
if isinstance(key_prefix, six.text_type):
key_prefix = key_prefix.encode('ascii')
if not isinstance(key_prefix, bytes):
@@ -275,17 +293,7 @@ class Client(object):
def check_key(self, key):
"""Checks key and add key_prefix."""
- if isinstance(key, six.text_type):
- try:
- key = key.encode('ascii')
- except UnicodeEncodeError as e:
- raise MemcacheIllegalInputError("No ascii key: %r" % (key,))
- key = self.key_prefix + key
- if b' ' in key:
- raise MemcacheIllegalInputError("Key contains spaces: %r" % (key,))
- if len(key) > 250:
- raise MemcacheIllegalInputError("Key is too long: %r" % (key,))
- return key
+ return _check_key(key, key_prefix=self.key_prefix)
def _connect(self):
sock = self.socket_module.socket(self.socket_module.AF_INET,
@@ -307,7 +315,6 @@ class Client(object):
except Exception:
pass
self.sock = None
- self.buf = b''
def set(self, key, value, expire=0, noreply=True):
"""
@@ -694,9 +701,10 @@ class Client(object):
self.sock.sendall(cmd)
+ buf = b''
result = {}
while True:
- self.buf, line = _readline(self.sock, self.buf)
+ buf, line = _readline(self.sock, buf)
self._raise_errors(line, name)
if line == b'END':
@@ -711,9 +719,7 @@ class Client(object):
raise ValueError("Unable to parse line %s: %s"
% (line, str(e)))
- self.buf, value = _readvalue(self.sock,
- self.buf,
- int(size))
+ buf, value = _readvalue(self.sock, buf, int(size))
key = checked_keys[key]
if self.deserializer:
@@ -767,7 +773,8 @@ class Client(object):
if noreply:
return True
- self.buf, line = _readline(self.sock, self.buf)
+ buf = b''
+ buf, line = _readline(self.sock, buf)
self._raise_errors(line, name)
if line in VALID_STORE_RESULTS[name]:
@@ -816,6 +823,182 @@ class Client(object):
self.delete(key, noreply=True)
+class PooledClient(object):
+ """A thread-safe pool of clients (with the same client api)."""
+
+ def __init__(self,
+ server,
+ serializer=None,
+ deserializer=None,
+ connect_timeout=None,
+ timeout=None,
+ no_delay=False,
+ ignore_exc=False,
+ socket_module=socket,
+ key_prefix=b'',
+ max_pool_size=None):
+ self.server = server
+ self.serializer = serializer
+ self.deserializer = deserializer
+ self.connect_timeout = connect_timeout
+ self.timeout = timeout
+ self.no_delay = no_delay
+ self.ignore_exc = ignore_exc
+ self.socket_module = socket_module
+ if isinstance(key_prefix, six.text_type):
+ key_prefix = key_prefix.encode('ascii')
+ if not isinstance(key_prefix, bytes):
+ raise TypeError("key_prefix should be bytes.")
+ self.key_prefix = key_prefix
+ self.client_pool = pool.ObjectPool(
+ self._create_client,
+ after_remove=lambda client: client.close(),
+ max_size=max_pool_size)
+
+ def check_key(self, key):
+ """Checks key and add key_prefix."""
+ return _check_key(key, key_prefix=self.key_prefix)
+
+ def _create_client(self):
+ client = Client(self.server,
+ serializer=self.serializer,
+ deserializer=self.deserializer,
+ connect_timeout=self.connect_timeout,
+ timeout=self.timeout,
+ no_delay=self.no_delay,
+ # We need to know when it fails *always* so that we
+ # can remove/destroy it from the pool...
+ ignore_exc=False,
+ socket_module=self.socket_module,
+ key_prefix=self.key_prefix)
+ return client
+
+ def close(self):
+ self.client_pool.clear()
+
+ def set(self, key, value, expire=0, noreply=True):
+ with self.client_pool.get_and_release(destroy_on_fail=True) as client:
+ return client.set(key, value, expire=expire, noreply=noreply)
+
+ def set_many(self, values, expire=0, noreply=True):
+ with self.client_pool.get_and_release(destroy_on_fail=True) as client:
+ return client.set_many(values, expire=expire, noreply=noreply)
+
+ def replace(self, key, value, expire=0, noreply=True):
+ with self.client_pool.get_and_release(destroy_on_fail=True) as client:
+ return client.replace(key, value, expire=expire, noreply=noreply)
+
+ def append(self, key, value, expire=0, noreply=True):
+ with self.client_pool.get_and_release(destroy_on_fail=True) as client:
+ return client.append(key, value, expire=expire, noreply=noreply)
+
+ def prepend(self, key, value, expire=0, noreply=True):
+ with self.client_pool.get_and_release(destroy_on_fail=True) as client:
+ return client.prepend(key, value, expire=expire, noreply=noreply)
+
+ def cas(self, key, value, cas, expire=0, noreply=False):
+ with self.client_pool.get_and_release(destroy_on_fail=True) as client:
+ return client.cas(key, value, cas,
+ expire=expire, noreply=noreply)
+
+ def get(self, key):
+ with self.client_pool.get_and_release(destroy_on_fail=True) as client:
+ try:
+ return client.get(key)
+ except Exception:
+ if self.ignore_exc:
+ return None
+ else:
+ raise
+
+ def get_many(self, keys):
+ with self.client_pool.get_and_release(destroy_on_fail=True) as client:
+ try:
+ return client.get_many(keys)
+ except Exception:
+ if self.ignore_exc:
+ return {}
+ else:
+ raise
+
+ def gets(self, key):
+ with self.client_pool.get_and_release(destroy_on_fail=True) as client:
+ try:
+ return client.gets(key)
+ except Exception:
+ if self.ignore_exc:
+ return (None, None)
+ else:
+ raise
+
+ def gets_many(self, keys):
+ with self.client_pool.get_and_release(destroy_on_fail=True) as client:
+ try:
+ return client.gets_many(keys)
+ except Exception:
+ if self.ignore_exc:
+ return {}
+ else:
+ raise
+
+ def delete(self, key, noreply=True):
+ with self.client_pool.get_and_release(destroy_on_fail=True) as client:
+ return client.delete(key, noreply=noreply)
+
+ def delete_many(self, keys, noreply=True):
+ with self.client_pool.get_and_release(destroy_on_fail=True) as client:
+ return client.delete_many(keys, noreply=noreply)
+
+ def add(self, key, value, expire=0, noreply=True):
+ with self.client_pool.get_and_release(destroy_on_fail=True) as client:
+ return client.add(key, value, expire=expire, noreply=noreply)
+
+ def incr(self, key, value, noreply=False):
+ with self.client_pool.get_and_release(destroy_on_fail=True) as client:
+ return client.incr(key, value, noreply=noreply)
+
+ def decr(self, key, value, noreply=False):
+ with self.client_pool.get_and_release(destroy_on_fail=True) as client:
+ return client.decr(key, value, noreply=noreply)
+
+ def touch(self, key, expire=0, noreply=True):
+ with self.client_pool.get_and_release(destroy_on_fail=True) as client:
+ return client.touch(key, expire=expire, noreply=noreply)
+
+ def stats(self, *args):
+ with self.client_pool.get_and_release(destroy_on_fail=True) as client:
+ try:
+ return client.stats(*args)
+ except Exception:
+ if self.ignore_exc:
+ return {}
+ else:
+ raise
+
+ def flush_all(self, delay=0, noreply=True):
+ with self.client_pool.get_and_release(destroy_on_fail=True) as client:
+ return client.flush_all(delay=delay, noreply=noreply)
+
+ def quit(self):
+ with self.client_pool.get_and_release(destroy_on_fail=True) as client:
+ try:
+ client.quit()
+ finally:
+ self.client_pool.destroy(client)
+
+ def __setitem__(self, key, value):
+ self.set(key, value, noreply=True)
+
+ def __getitem__(self, key):
+ value = self.get(key)
+ if value is None:
+ raise KeyError
+ return value
+
+ def __delitem__(self, key):
+ self.delete(key, noreply=True)
+
+
def _readline(sock, buf):
"""Read line of text from the socket.