diff options
author | Jenkins <jenkins@review.openstack.org> | 2014-09-24 18:52:22 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2014-09-24 18:52:22 +0000 |
commit | 3fb1d26a50aac4cd2da4aa348fce934cab3c9790 (patch) | |
tree | ad34d713aee152714e73e6c1a1be057458760735 | |
parent | f076bc430241deecbfe403dd155be35ed45b2295 (diff) | |
parent | ccfc1ea16eb39515e34f8e751a5b9eca80f6c59c (diff) | |
download | tooz-3fb1d26a50aac4cd2da4aa348fce934cab3c9790.tar.gz |
Merge "Use futures to make parts of the memcached driver async"
-rw-r--r-- | requirements.txt | 1 | ||||
-rw-r--r-- | tooz/drivers/memcached.py | 61 |
2 files changed, 42 insertions, 20 deletions
diff --git a/requirements.txt b/requirements.txt index 64a171a..98d47f1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ zake>=0.1.6 sysv_ipc>=0.6.8 msgpack-python retrying +futures>=2.1.6 diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py index b6c4740..d3bdaba 100644 --- a/tooz/drivers/memcached.py +++ b/tooz/drivers/memcached.py @@ -19,6 +19,7 @@ import collections import logging +from concurrent import futures import msgpack import pymemcache.client import retrying @@ -98,6 +99,7 @@ class MemcachedDriver(coordination.CoordinationDriver): super(MemcachedDriver, self).__init__() self._member_id = member_id self._groups = set() + self._executor = None self.host = (parsed_url.hostname or "localhost", parsed_url.port or 11211) default_timeout = options.get('timeout', ['30']) @@ -135,9 +137,14 @@ class MemcachedDriver(coordination.CoordinationDriver): raise coordination.ToozConnectionError(utils.exception_message(e)) self._group_members = collections.defaultdict(set) self._acquired_locks = [] + self._executor = futures.ThreadPoolExecutor(max_workers=1) self.heartbeat() def stop(self): + if self._executor is not None: + self._executor.shutdown(wait=True) + self._executor = None + for lock in list(self._acquired_locks): lock.release() @@ -177,15 +184,18 @@ class MemcachedDriver(coordination.CoordinationDriver): def create_group(self, group_id): encoded_group = self._encode_group_id(group_id) - if not self.client.add(encoded_group, {}, noreply=False): - return MemcachedAsyncError( - coordination.GroupAlreadyExist(group_id)) - self._add_group_to_group_list(group_id) - return MemcachedAsyncResult(None) + + def _create_group(): + if not self.client.add(encoded_group, {}, noreply=False): + raise coordination.GroupAlreadyExist(group_id) + self._add_group_to_group_list(group_id) + + return MemcachedFutureResult(self._executor.submit(_create_group)) def get_groups(self): - return MemcachedAsyncResult( - self.client.get(self._GROUP_LIST_KEY) or []) + def _get_groups(): + return self.client.get(self._GROUP_LIST_KEY) or [] + return MemcachedFutureResult(self._executor.submit(_get_groups)) @retry def join_group(self, group_id, capabilities=b""): @@ -236,20 +246,18 @@ class MemcachedDriver(coordination.CoordinationDriver): if self.client.get(self._encode_member_id(m))) def get_members(self, group_id): - try: - return MemcachedAsyncResult(self._get_members(group_id).keys()) - except Exception as e: - return MemcachedAsyncError(e) + def _get_members(): + return self._get_members(group_id).keys() + return MemcachedFutureResult(self._executor.submit(_get_members)) def get_member_capabilities(self, group_id, member_id): - try: + def _get_member_capabilities(): group_members = self._get_members(group_id) - except Exception as e: - return MemcachedAsyncError(e) - if member_id not in group_members: - return MemcachedAsyncError( - coordination.MemberNotJoined(group_id, member_id)) - return MemcachedAsyncResult(group_members[member_id][b'capabilities']) + if member_id not in group_members: + raise coordination.MemberNotJoined(group_id, member_id) + return group_members[member_id][b'capabilities'] + return MemcachedFutureResult( + self._executor.submit(_get_member_capabilities)) @retry def update_capabilities(self, group_id, capabilities): @@ -268,8 +276,9 @@ class MemcachedDriver(coordination.CoordinationDriver): return MemcachedAsyncResult(None) def get_leader(self, group_id): - return MemcachedAsyncResult( - self._get_leader_lock(group_id).get_owner()) + def _get_leader(): + return self._get_leader_lock(group_id).get_owner() + return MemcachedFutureResult(self._executor.submit(_get_leader)) def heartbeat(self): self.client.set(self._encode_member_id(self._member_id), @@ -353,6 +362,18 @@ class MemcachedDriver(coordination.CoordinationDriver): return result +class MemcachedFutureResult(coordination.CoordAsyncResult): + """Memcached asynchronous result that references a future.""" + def __init__(self, fut): + self._fut = fut + + def get(self, timeout=10): + return self._fut.result(timeout=timeout) + + def done(self): + return self._fut.done() + + class MemcachedAsyncResult(coordination.CoordAsyncResult): """Memcached asynchronous result. |