summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@gmail.com>2014-09-19 23:21:46 -0700
committerJoshua Harlow <harlowja@gmail.com>2014-09-19 23:24:38 -0700
commitccfc1ea16eb39515e34f8e751a5b9eca80f6c59c (patch)
tree677ce935bdc43c9b53c4875474b73fe494c73d46
parent031f8146122a0b11f63b9b70ad6a590ea45431e8 (diff)
downloadtooz-ccfc1ea16eb39515e34f8e751a5b9eca80f6c59c.tar.gz
Use futures to make parts of the memcached driver async
Instead of having a sync api exposing async results in a synchronous manner we can provide a asynchronous api by just using a simple executor and using the futures that it returns to map into the tooz async result concept. This moves the simplistic methods that are not using the retry decorator to being actually async, in a future change the retry decorated ones can be altered as well... Change-Id: I39d86da24b3f559890df12095ebde837e2820c63
-rw-r--r--requirements.txt1
-rw-r--r--tooz/drivers/memcached.py61
2 files changed, 42 insertions, 20 deletions
diff --git a/requirements.txt b/requirements.txt
index 64a171a..98d47f1 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -9,3 +9,4 @@ zake>=0.1.6
sysv_ipc>=0.6.8
msgpack-python
retrying
+futures>=2.1.6
diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py
index b6c4740..d3bdaba 100644
--- a/tooz/drivers/memcached.py
+++ b/tooz/drivers/memcached.py
@@ -19,6 +19,7 @@
import collections
import logging
+from concurrent import futures
import msgpack
import pymemcache.client
import retrying
@@ -98,6 +99,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
super(MemcachedDriver, self).__init__()
self._member_id = member_id
self._groups = set()
+ self._executor = None
self.host = (parsed_url.hostname or "localhost",
parsed_url.port or 11211)
default_timeout = options.get('timeout', ['30'])
@@ -135,9 +137,14 @@ class MemcachedDriver(coordination.CoordinationDriver):
raise coordination.ToozConnectionError(utils.exception_message(e))
self._group_members = collections.defaultdict(set)
self._acquired_locks = []
+ self._executor = futures.ThreadPoolExecutor(max_workers=1)
self.heartbeat()
def stop(self):
+ if self._executor is not None:
+ self._executor.shutdown(wait=True)
+ self._executor = None
+
for lock in list(self._acquired_locks):
lock.release()
@@ -177,15 +184,18 @@ class MemcachedDriver(coordination.CoordinationDriver):
def create_group(self, group_id):
encoded_group = self._encode_group_id(group_id)
- if not self.client.add(encoded_group, {}, noreply=False):
- return MemcachedAsyncError(
- coordination.GroupAlreadyExist(group_id))
- self._add_group_to_group_list(group_id)
- return MemcachedAsyncResult(None)
+
+ def _create_group():
+ if not self.client.add(encoded_group, {}, noreply=False):
+ raise coordination.GroupAlreadyExist(group_id)
+ self._add_group_to_group_list(group_id)
+
+ return MemcachedFutureResult(self._executor.submit(_create_group))
def get_groups(self):
- return MemcachedAsyncResult(
- self.client.get(self._GROUP_LIST_KEY) or [])
+ def _get_groups():
+ return self.client.get(self._GROUP_LIST_KEY) or []
+ return MemcachedFutureResult(self._executor.submit(_get_groups))
@retry
def join_group(self, group_id, capabilities=b""):
@@ -236,20 +246,18 @@ class MemcachedDriver(coordination.CoordinationDriver):
if self.client.get(self._encode_member_id(m)))
def get_members(self, group_id):
- try:
- return MemcachedAsyncResult(self._get_members(group_id).keys())
- except Exception as e:
- return MemcachedAsyncError(e)
+ def _get_members():
+ return self._get_members(group_id).keys()
+ return MemcachedFutureResult(self._executor.submit(_get_members))
def get_member_capabilities(self, group_id, member_id):
- try:
+ def _get_member_capabilities():
group_members = self._get_members(group_id)
- except Exception as e:
- return MemcachedAsyncError(e)
- if member_id not in group_members:
- return MemcachedAsyncError(
- coordination.MemberNotJoined(group_id, member_id))
- return MemcachedAsyncResult(group_members[member_id][b'capabilities'])
+ if member_id not in group_members:
+ raise coordination.MemberNotJoined(group_id, member_id)
+ return group_members[member_id][b'capabilities']
+ return MemcachedFutureResult(
+ self._executor.submit(_get_member_capabilities))
@retry
def update_capabilities(self, group_id, capabilities):
@@ -268,8 +276,9 @@ class MemcachedDriver(coordination.CoordinationDriver):
return MemcachedAsyncResult(None)
def get_leader(self, group_id):
- return MemcachedAsyncResult(
- self._get_leader_lock(group_id).get_owner())
+ def _get_leader():
+ return self._get_leader_lock(group_id).get_owner()
+ return MemcachedFutureResult(self._executor.submit(_get_leader))
def heartbeat(self):
self.client.set(self._encode_member_id(self._member_id),
@@ -353,6 +362,18 @@ class MemcachedDriver(coordination.CoordinationDriver):
return result
+class MemcachedFutureResult(coordination.CoordAsyncResult):
+ """Memcached asynchronous result that references a future."""
+ def __init__(self, fut):
+ self._fut = fut
+
+ def get(self, timeout=10):
+ return self._fut.result(timeout=timeout)
+
+ def done(self):
+ return self._fut.done()
+
+
class MemcachedAsyncResult(coordination.CoordAsyncResult):
"""Memcached asynchronous result.