diff options
author | Joshua Harlow <harlowja@gmail.com> | 2014-09-19 23:21:46 -0700 |
---|---|---|
committer | Joshua Harlow <harlowja@gmail.com> | 2014-09-19 23:24:38 -0700 |
commit | ccfc1ea16eb39515e34f8e751a5b9eca80f6c59c (patch) | |
tree | 677ce935bdc43c9b53c4875474b73fe494c73d46 | |
parent | 031f8146122a0b11f63b9b70ad6a590ea45431e8 (diff) | |
download | tooz-ccfc1ea16eb39515e34f8e751a5b9eca80f6c59c.tar.gz |
Use futures to make parts of the memcached driver async
Instead of having a sync api exposing async results in a
synchronous manner we can provide a asynchronous api by
just using a simple executor and using the futures that it
returns to map into the tooz async result concept.
This moves the simplistic methods that are not using the
retry decorator to being actually async, in a future change
the retry decorated ones can be altered as well...
Change-Id: I39d86da24b3f559890df12095ebde837e2820c63
-rw-r--r-- | requirements.txt | 1 | ||||
-rw-r--r-- | tooz/drivers/memcached.py | 61 |
2 files changed, 42 insertions, 20 deletions
diff --git a/requirements.txt b/requirements.txt index 64a171a..98d47f1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ zake>=0.1.6 sysv_ipc>=0.6.8 msgpack-python retrying +futures>=2.1.6 diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py index b6c4740..d3bdaba 100644 --- a/tooz/drivers/memcached.py +++ b/tooz/drivers/memcached.py @@ -19,6 +19,7 @@ import collections import logging +from concurrent import futures import msgpack import pymemcache.client import retrying @@ -98,6 +99,7 @@ class MemcachedDriver(coordination.CoordinationDriver): super(MemcachedDriver, self).__init__() self._member_id = member_id self._groups = set() + self._executor = None self.host = (parsed_url.hostname or "localhost", parsed_url.port or 11211) default_timeout = options.get('timeout', ['30']) @@ -135,9 +137,14 @@ class MemcachedDriver(coordination.CoordinationDriver): raise coordination.ToozConnectionError(utils.exception_message(e)) self._group_members = collections.defaultdict(set) self._acquired_locks = [] + self._executor = futures.ThreadPoolExecutor(max_workers=1) self.heartbeat() def stop(self): + if self._executor is not None: + self._executor.shutdown(wait=True) + self._executor = None + for lock in list(self._acquired_locks): lock.release() @@ -177,15 +184,18 @@ class MemcachedDriver(coordination.CoordinationDriver): def create_group(self, group_id): encoded_group = self._encode_group_id(group_id) - if not self.client.add(encoded_group, {}, noreply=False): - return MemcachedAsyncError( - coordination.GroupAlreadyExist(group_id)) - self._add_group_to_group_list(group_id) - return MemcachedAsyncResult(None) + + def _create_group(): + if not self.client.add(encoded_group, {}, noreply=False): + raise coordination.GroupAlreadyExist(group_id) + self._add_group_to_group_list(group_id) + + return MemcachedFutureResult(self._executor.submit(_create_group)) def get_groups(self): - return MemcachedAsyncResult( - self.client.get(self._GROUP_LIST_KEY) or []) + def _get_groups(): + return self.client.get(self._GROUP_LIST_KEY) or [] + return MemcachedFutureResult(self._executor.submit(_get_groups)) @retry def join_group(self, group_id, capabilities=b""): @@ -236,20 +246,18 @@ class MemcachedDriver(coordination.CoordinationDriver): if self.client.get(self._encode_member_id(m))) def get_members(self, group_id): - try: - return MemcachedAsyncResult(self._get_members(group_id).keys()) - except Exception as e: - return MemcachedAsyncError(e) + def _get_members(): + return self._get_members(group_id).keys() + return MemcachedFutureResult(self._executor.submit(_get_members)) def get_member_capabilities(self, group_id, member_id): - try: + def _get_member_capabilities(): group_members = self._get_members(group_id) - except Exception as e: - return MemcachedAsyncError(e) - if member_id not in group_members: - return MemcachedAsyncError( - coordination.MemberNotJoined(group_id, member_id)) - return MemcachedAsyncResult(group_members[member_id][b'capabilities']) + if member_id not in group_members: + raise coordination.MemberNotJoined(group_id, member_id) + return group_members[member_id][b'capabilities'] + return MemcachedFutureResult( + self._executor.submit(_get_member_capabilities)) @retry def update_capabilities(self, group_id, capabilities): @@ -268,8 +276,9 @@ class MemcachedDriver(coordination.CoordinationDriver): return MemcachedAsyncResult(None) def get_leader(self, group_id): - return MemcachedAsyncResult( - self._get_leader_lock(group_id).get_owner()) + def _get_leader(): + return self._get_leader_lock(group_id).get_owner() + return MemcachedFutureResult(self._executor.submit(_get_leader)) def heartbeat(self): self.client.set(self._encode_member_id(self._member_id), @@ -353,6 +362,18 @@ class MemcachedDriver(coordination.CoordinationDriver): return result +class MemcachedFutureResult(coordination.CoordAsyncResult): + """Memcached asynchronous result that references a future.""" + def __init__(self, fut): + self._fut = fut + + def get(self, timeout=10): + return self._fut.result(timeout=timeout) + + def done(self): + return self._fut.done() + + class MemcachedAsyncResult(coordination.CoordAsyncResult): """Memcached asynchronous result. |