diff options
Diffstat (limited to 'pymemcache/client.py')
-rw-r--r-- | pymemcache/client.py | 219 |
1 files changed, 201 insertions, 18 deletions
diff --git a/pymemcache/client.py b/pymemcache/client.py index fd31589..18a6b4a 100644 --- a/pymemcache/client.py +++ b/pymemcache/client.py @@ -73,6 +73,8 @@ __author__ = "Charles Gordon" import socket import six +from pymemcache import pool + RECV_SIZE = 4096 VALID_STORE_RESULTS = { @@ -152,6 +154,23 @@ class MemcacheUnexpectedCloseError(MemcacheServerError): pass +# Common helper functions. + +def _check_key(key, key_prefix=b''): + """Checks key and add key_prefix.""" + if isinstance(key, six.text_type): + try: + key = key.encode('ascii') + except UnicodeEncodeError: + raise MemcacheIllegalInputError("No ascii key: %r" % (key,)) + key = key_prefix + key + if b' ' in key: + raise MemcacheIllegalInputError("Key contains spaces: %r" % (key,)) + if len(key) > 250: + raise MemcacheIllegalInputError("Key is too long: %r" % (key,)) + return key + + class Client(object): """ A client for a single memcached server. @@ -266,7 +285,6 @@ class Client(object): self.ignore_exc = ignore_exc self.socket_module = socket_module self.sock = None - self.buf = b'' if isinstance(key_prefix, six.text_type): key_prefix = key_prefix.encode('ascii') if not isinstance(key_prefix, bytes): @@ -275,17 +293,7 @@ class Client(object): def check_key(self, key): """Checks key and add key_prefix.""" - if isinstance(key, six.text_type): - try: - key = key.encode('ascii') - except UnicodeEncodeError as e: - raise MemcacheIllegalInputError("No ascii key: %r" % (key,)) - key = self.key_prefix + key - if b' ' in key: - raise MemcacheIllegalInputError("Key contains spaces: %r" % (key,)) - if len(key) > 250: - raise MemcacheIllegalInputError("Key is too long: %r" % (key,)) - return key + return _check_key(key, key_prefix=self.key_prefix) def _connect(self): sock = self.socket_module.socket(self.socket_module.AF_INET, @@ -307,7 +315,6 @@ class Client(object): except Exception: pass self.sock = None - self.buf = b'' def set(self, key, value, expire=0, noreply=True): """ @@ -694,9 +701,10 @@ class Client(object): self.sock.sendall(cmd) + buf = b'' result = {} while True: - self.buf, line = _readline(self.sock, self.buf) + buf, line = _readline(self.sock, buf) self._raise_errors(line, name) if line == b'END': @@ -711,9 +719,7 @@ class Client(object): raise ValueError("Unable to parse line %s: %s" % (line, str(e))) - self.buf, value = _readvalue(self.sock, - self.buf, - int(size)) + buf, value = _readvalue(self.sock, buf, int(size)) key = checked_keys[key] if self.deserializer: @@ -767,7 +773,8 @@ class Client(object): if noreply: return True - self.buf, line = _readline(self.sock, self.buf) + buf = b'' + buf, line = _readline(self.sock, buf) self._raise_errors(line, name) if line in VALID_STORE_RESULTS[name]: @@ -816,6 +823,182 @@ class Client(object): self.delete(key, noreply=True) +class PooledClient(object): + """A thread-safe pool of clients (with the same client api).""" + + def __init__(self, + server, + serializer=None, + deserializer=None, + connect_timeout=None, + timeout=None, + no_delay=False, + ignore_exc=False, + socket_module=socket, + key_prefix=b'', + max_pool_size=None): + self.server = server + self.serializer = serializer + self.deserializer = deserializer + self.connect_timeout = connect_timeout + self.timeout = timeout + self.no_delay = no_delay + self.ignore_exc = ignore_exc + self.socket_module = socket_module + if isinstance(key_prefix, six.text_type): + key_prefix = key_prefix.encode('ascii') + if not isinstance(key_prefix, bytes): + raise TypeError("key_prefix should be bytes.") + self.key_prefix = key_prefix + self.client_pool = pool.ObjectPool( + self._create_client, + after_remove=lambda client: client.close(), + max_size=max_pool_size) + + def check_key(self, key): + """Checks key and add key_prefix.""" + return _check_key(key, key_prefix=self.key_prefix) + + def _create_client(self): + client = Client(self.server, + serializer=self.serializer, + deserializer=self.deserializer, + connect_timeout=self.connect_timeout, + timeout=self.timeout, + no_delay=self.no_delay, + # We need to know when it fails *always* so that we + # can remove/destroy it from the pool... + ignore_exc=False, + socket_module=self.socket_module, + key_prefix=self.key_prefix) + return client + + def close(self): + self.client_pool.clear() + + def set(self, key, value, expire=0, noreply=True): + with self.client_pool.get_and_release(destroy_on_fail=True) as client: + return client.set(key, value, expire=expire, noreply=noreply) + + def set_many(self, values, expire=0, noreply=True): + with self.client_pool.get_and_release(destroy_on_fail=True) as client: + return client.set_many(values, expire=expire, noreply=noreply) + + def replace(self, key, value, expire=0, noreply=True): + with self.client_pool.get_and_release(destroy_on_fail=True) as client: + return client.replace(key, value, expire=expire, noreply=noreply) + + def append(self, key, value, expire=0, noreply=True): + with self.client_pool.get_and_release(destroy_on_fail=True) as client: + return client.append(key, value, expire=expire, noreply=noreply) + + def prepend(self, key, value, expire=0, noreply=True): + with self.client_pool.get_and_release(destroy_on_fail=True) as client: + return client.prepend(key, value, expire=expire, noreply=noreply) + + def cas(self, key, value, cas, expire=0, noreply=False): + with self.client_pool.get_and_release(destroy_on_fail=True) as client: + return client.cas(key, value, cas, + expire=expire, noreply=noreply) + + def get(self, key): + with self.client_pool.get_and_release(destroy_on_fail=True) as client: + try: + return client.get(key) + except Exception: + if self.ignore_exc: + return None + else: + raise + + def get_many(self, keys): + with self.client_pool.get_and_release(destroy_on_fail=True) as client: + try: + return client.get_many(keys) + except Exception: + if self.ignore_exc: + return {} + else: + raise + + def gets(self, key): + with self.client_pool.get_and_release(destroy_on_fail=True) as client: + try: + return client.gets(key) + except Exception: + if self.ignore_exc: + return (None, None) + else: + raise + + def gets_many(self, keys): + with self.client_pool.get_and_release(destroy_on_fail=True) as client: + try: + return client.gets_many(keys) + except Exception: + if self.ignore_exc: + return {} + else: + raise + + def delete(self, key, noreply=True): + with self.client_pool.get_and_release(destroy_on_fail=True) as client: + return client.delete(key, noreply=noreply) + + def delete_many(self, keys, noreply=True): + with self.client_pool.get_and_release(destroy_on_fail=True) as client: + return client.delete_many(keys, noreply=noreply) + + def add(self, key, value, expire=0, noreply=True): + with self.client_pool.get_and_release(destroy_on_fail=True) as client: + return client.add(key, value, expire=expire, noreply=noreply) + + def incr(self, key, value, noreply=False): + with self.client_pool.get_and_release(destroy_on_fail=True) as client: + return client.incr(key, value, noreply=noreply) + + def decr(self, key, value, noreply=False): + with self.client_pool.get_and_release(destroy_on_fail=True) as client: + return client.decr(key, value, noreply=noreply) + + def touch(self, key, expire=0, noreply=True): + with self.client_pool.get_and_release(destroy_on_fail=True) as client: + return client.touch(key, expire=expire, noreply=noreply) + + def stats(self, *args): + with self.client_pool.get_and_release(destroy_on_fail=True) as client: + try: + return client.stats(*args) + except Exception: + if self.ignore_exc: + return {} + else: + raise + + def flush_all(self, delay=0, noreply=True): + with self.client_pool.get_and_release(destroy_on_fail=True) as client: + return client.flush_all(delay=delay, noreply=noreply) + + def quit(self): + with self.client_pool.get_and_release(destroy_on_fail=True) as client: + try: + client.quit() + finally: + self.client_pool.destroy(client) + + def __setitem__(self, key, value): + self.set(key, value, noreply=True) + + def __getitem__(self, key): + value = self.get(key) + if value is None: + raise KeyError + return value + + def __delitem__(self, key): + self.delete(key, noreply=True) + + def _readline(sock, buf): """Read line of text from the socket. |