summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-09-24 18:52:22 +0000
committerGerrit Code Review <review@openstack.org>2014-09-24 18:52:22 +0000
commit3fb1d26a50aac4cd2da4aa348fce934cab3c9790 (patch)
treead34d713aee152714e73e6c1a1be057458760735
parentf076bc430241deecbfe403dd155be35ed45b2295 (diff)
parentccfc1ea16eb39515e34f8e751a5b9eca80f6c59c (diff)
downloadtooz-3fb1d26a50aac4cd2da4aa348fce934cab3c9790.tar.gz
Merge "Use futures to make parts of the memcached driver async"
-rw-r--r--requirements.txt1
-rw-r--r--tooz/drivers/memcached.py61
2 files changed, 42 insertions, 20 deletions
diff --git a/requirements.txt b/requirements.txt
index 64a171a..98d47f1 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -9,3 +9,4 @@ zake>=0.1.6
sysv_ipc>=0.6.8
msgpack-python
retrying
+futures>=2.1.6
diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py
index b6c4740..d3bdaba 100644
--- a/tooz/drivers/memcached.py
+++ b/tooz/drivers/memcached.py
@@ -19,6 +19,7 @@
import collections
import logging
+from concurrent import futures
import msgpack
import pymemcache.client
import retrying
@@ -98,6 +99,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
super(MemcachedDriver, self).__init__()
self._member_id = member_id
self._groups = set()
+ self._executor = None
self.host = (parsed_url.hostname or "localhost",
parsed_url.port or 11211)
default_timeout = options.get('timeout', ['30'])
@@ -135,9 +137,14 @@ class MemcachedDriver(coordination.CoordinationDriver):
raise coordination.ToozConnectionError(utils.exception_message(e))
self._group_members = collections.defaultdict(set)
self._acquired_locks = []
+ self._executor = futures.ThreadPoolExecutor(max_workers=1)
self.heartbeat()
def stop(self):
+ if self._executor is not None:
+ self._executor.shutdown(wait=True)
+ self._executor = None
+
for lock in list(self._acquired_locks):
lock.release()
@@ -177,15 +184,18 @@ class MemcachedDriver(coordination.CoordinationDriver):
def create_group(self, group_id):
encoded_group = self._encode_group_id(group_id)
- if not self.client.add(encoded_group, {}, noreply=False):
- return MemcachedAsyncError(
- coordination.GroupAlreadyExist(group_id))
- self._add_group_to_group_list(group_id)
- return MemcachedAsyncResult(None)
+
+ def _create_group():
+ if not self.client.add(encoded_group, {}, noreply=False):
+ raise coordination.GroupAlreadyExist(group_id)
+ self._add_group_to_group_list(group_id)
+
+ return MemcachedFutureResult(self._executor.submit(_create_group))
def get_groups(self):
- return MemcachedAsyncResult(
- self.client.get(self._GROUP_LIST_KEY) or [])
+ def _get_groups():
+ return self.client.get(self._GROUP_LIST_KEY) or []
+ return MemcachedFutureResult(self._executor.submit(_get_groups))
@retry
def join_group(self, group_id, capabilities=b""):
@@ -236,20 +246,18 @@ class MemcachedDriver(coordination.CoordinationDriver):
if self.client.get(self._encode_member_id(m)))
def get_members(self, group_id):
- try:
- return MemcachedAsyncResult(self._get_members(group_id).keys())
- except Exception as e:
- return MemcachedAsyncError(e)
+ def _get_members():
+ return self._get_members(group_id).keys()
+ return MemcachedFutureResult(self._executor.submit(_get_members))
def get_member_capabilities(self, group_id, member_id):
- try:
+ def _get_member_capabilities():
group_members = self._get_members(group_id)
- except Exception as e:
- return MemcachedAsyncError(e)
- if member_id not in group_members:
- return MemcachedAsyncError(
- coordination.MemberNotJoined(group_id, member_id))
- return MemcachedAsyncResult(group_members[member_id][b'capabilities'])
+ if member_id not in group_members:
+ raise coordination.MemberNotJoined(group_id, member_id)
+ return group_members[member_id][b'capabilities']
+ return MemcachedFutureResult(
+ self._executor.submit(_get_member_capabilities))
@retry
def update_capabilities(self, group_id, capabilities):
@@ -268,8 +276,9 @@ class MemcachedDriver(coordination.CoordinationDriver):
return MemcachedAsyncResult(None)
def get_leader(self, group_id):
- return MemcachedAsyncResult(
- self._get_leader_lock(group_id).get_owner())
+ def _get_leader():
+ return self._get_leader_lock(group_id).get_owner()
+ return MemcachedFutureResult(self._executor.submit(_get_leader))
def heartbeat(self):
self.client.set(self._encode_member_id(self._member_id),
@@ -353,6 +362,18 @@ class MemcachedDriver(coordination.CoordinationDriver):
return result
+class MemcachedFutureResult(coordination.CoordAsyncResult):
+ """Memcached asynchronous result that references a future."""
+ def __init__(self, fut):
+ self._fut = fut
+
+ def get(self, timeout=10):
+ return self._fut.result(timeout=timeout)
+
+ def done(self):
+ return self._fut.done()
+
+
class MemcachedAsyncResult(coordination.CoordAsyncResult):
"""Memcached asynchronous result.