summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYassine Lamgarchal <yassine.lamgarchal@enovance.com>2014-01-17 15:12:40 +0100
committerYassine Lamgarchal <yassine.lamgarchal@enovance.com>2014-01-28 14:57:41 +0100
commit5a11223c4656b7de9906c117aebd85abf68de99f (patch)
tree00b2846ba15e6ae55e5d54ade91816feefab167b
parentd39dedd0ef0df018a189381236ec72b4e3035af9 (diff)
downloadtooz-5a11223c4656b7de9906c117aebd85abf68de99f.tar.gz
Add asynchronous API
The Tooz asynchronous API rely on zookeeper client asynchronous API. Each asynchronous call will return a future like object. Change-Id: Ibee9bf8ae0d1c1f62318f46b58ee25631e9e319a
-rw-r--r--test-requirements.txt2
-rw-r--r--tooz/coordination.py51
-rw-r--r--tooz/drivers/zookeeper.py130
-rw-r--r--tooz/tests/test_coordination.py50
4 files changed, 163 insertions, 70 deletions
diff --git a/test-requirements.txt b/test-requirements.txt
index 59adbef..0e5799f 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -7,4 +7,4 @@ python-subunit
testrepository>=0.0.17
testtools>=0.9.32
testscenarios>=0.4
-zake>=0.0.6
+zake>=0.0.9
diff --git a/tooz/coordination.py b/tooz/coordination.py
index 98db447..b6e2d78 100644
--- a/tooz/coordination.py
+++ b/tooz/coordination.py
@@ -46,68 +46,97 @@ class CoordinationDriver(object):
@abc.abstractmethod
def create_group(self, group_id):
- """Request the creation of a group.
+ """Request the creation of a group asynchronously.
:param group_id: the id of the group to create
:type group_id: str
+ :returns: None
+ :rtype: CoordAsyncResult
"""
@abc.abstractmethod
def get_groups(self):
- """Return the list composed by all groups ids.
+ """Return the list composed by all groups ids asynchronously.
- :returns: list of all created group ids
- :rtype: list
+ :returns: the list of all created group ids
+ :rtype: CoordAsyncResult
"""
@abc.abstractmethod
def join_group(self, group_id, capabilities):
- """Join a group and establish group membership.
+ """Join a group and establish group membership asynchronously.
:param group_id: the id of the group to join
:type group_id: str
:param capabilities: the capabilities of the joined member
:type capabilities: str
+ :returns: None
+ :rtype: CoordAsyncResult
"""
@abc.abstractmethod
def leave_group(self, group_id):
- """Leave a group.
+ """Leave a group asynchronously.
:param group_id: the id of the group to leave
:type group_id: str
+ :returns: None
+ :rtype: CoordAsyncResult
"""
@abc.abstractmethod
def get_members(self, group_id):
- """Return the list of all members ids of the specified group.
+ """Return the list of all members ids of the specified group
+ asynchronously.
:returns: list of all created group ids
- :rtype: list
+ :rtype: CoordAsyncResult
"""
@abc.abstractmethod
def get_member_capabilities(self, group_id, member_id):
- """Return the capabilities of a member.
+ """Return the capabilities of a member asynchronously.
:param group_id: the id of the group of the member
:type group_id: str
:param member_id: the id of the member
:type member_id: str
:returns: capabilities of a member
- :rtype: str
+ :rtype: CoordAsyncResult
"""
@abc.abstractmethod
def update_capabilities(self, group_id, capabilities):
- """Update capabilities of the caller in the specified group.
+ """Update capabilities of the caller in the specified group
+ asynchronously.
:param group_id: the id of the group of the current member
:type group_id: str
:param capabilities: the capabilities of the updated member
:type capabilities: str
+ :returns: None
+ :rtype: CoordAsyncResult
+ """
+
+
+@six.add_metaclass(abc.ABCMeta)
+class CoordAsyncResult(object):
+ """Representation of an asynchronous task, every call API
+ returns an CoordAsyncResult object on which the result or
+ the status of the task can be requested.
+ """
+
+ @abc.abstractmethod
+ def get(self, timeout=10):
+ """Retrieve the result of the corresponding asynchronous call.
+ :param timeout: block until the timeout expire.
+ :type timeout: float
"""
+ @abc.abstractmethod
+ def done(self):
+ """Returns True if the task is done, False otherwise."""
+
#TODO(yassine)
#Replace kwargs by something more simple.
diff --git a/tooz/drivers/zookeeper.py b/tooz/drivers/zookeeper.py
index 6e65572..bdafbdc 100644
--- a/tooz/drivers/zookeeper.py
+++ b/tooz/drivers/zookeeper.py
@@ -40,85 +40,131 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
def stop(self):
self._coord.stop()
- def create_group(self, group_id):
+ @staticmethod
+ def _create_group_handler(async_result, timeout, group_id):
try:
- group_path = "/%s/%s" % (_TOOZ_NAMESPACE, group_id)
- self._wrap_kazoo_call(self._coord.create, group_path)
+ async_result.get(block=True, timeout=timeout)
except exceptions.NodeExistsError:
raise coordination.GroupAlreadyExist("group_id=%s" % group_id)
except exceptions.NoNodeError:
raise coordination.ToozError("tooz namespace has not been created")
- def join_group(self, group_id, capabilities=b""):
+ def create_group(self, group_id):
+ group_path = "/%s/%s" % (_TOOZ_NAMESPACE, group_id)
+ async_result = self._wrap_kazoo_call(self._coord.create_async,
+ group_path)
+ return ZooAsyncResult(async_result, self._create_group_handler,
+ group_id=group_id)
+
+ @staticmethod
+ def _join_group_handler(async_result, timeout, group_id, member_id):
try:
- member_path = self._path_member(group_id, self._member_id)
- self._wrap_kazoo_call(self._coord.create,
- member_path,
- value=capabilities,
- ephemeral=True)
+ async_result.get(block=True, timeout=timeout)
except exceptions.NodeExistsError:
- raise coordination.MemberAlreadyExist(str(self._member_id))
+ raise coordination.MemberAlreadyExist(str(member_id))
except exceptions.NoNodeError:
raise coordination.GroupNotCreated("group '%s' has not been "
- "created" % _TOOZ_NAMESPACE)
+ "created" % group_id)
- def leave_group(self, group_id):
+ def join_group(self, group_id, capabilities=b""):
+ member_path = self._path_member(group_id, self._member_id)
+ async_result = self._wrap_kazoo_call(self._coord.create_async,
+ member_path,
+ value=capabilities,
+ ephemeral=True)
+ return ZooAsyncResult(async_result, self._join_group_handler,
+ group_id=group_id, member_id=self._member_id)
+
+ @staticmethod
+ def _leave_group_handler(async_result, timeout, group_id, member_id):
try:
- member_path = self._path_member(group_id, self._member_id)
- self._wrap_kazoo_call(self._coord.delete, member_path)
+ async_result.get(block=True, timeout=timeout)
except exceptions.NoNodeError:
raise coordination.MemberNotJoined("member '%s' has not joined "
"the group '%s' or the group "
"has not been created" %
- (self._member_id, group_id))
+ member_id, group_id)
except exceptions.ZookeeperError as e:
raise coordination.ToozError(str(e))
- def get_members(self, group_id):
- member_ids = []
+ def leave_group(self, group_id):
+ member_path = self._path_member(group_id, self._member_id)
+ async_result = self._wrap_kazoo_call(self._coord.delete_async,
+ member_path)
+ return ZooAsyncResult(async_result, self._leave_group_handler,
+ group_id=group_id, member_id=self._member_id)
+
+ @staticmethod
+ def _get_members_handler(async_result, timeout, group_id):
+ members_ids = None
try:
- group_path = paths.join("/", _TOOZ_NAMESPACE, group_id)
- member_ids = self._wrap_kazoo_call(self._coord.get_children,
- group_path)
+ members_ids = async_result.get(block=True, timeout=timeout)
except exceptions.NoNodeError:
raise coordination.GroupNotCreated("group '%s' does not exist" %
group_id)
+ return members_ids
- return member_ids
+ def get_members(self, group_id):
+ group_path = paths.join("/", _TOOZ_NAMESPACE, group_id)
+ async_result = self._wrap_kazoo_call(self._coord.get_children_async,
+ group_path)
+ return ZooAsyncResult(async_result, self._get_members_handler,
+ group_id=group_id)
- def update_capabilities(self, group_id, capabilities):
+ @staticmethod
+ def _update_capabilities_handler(async_result, timeout, group_id,
+ member_id):
try:
- member_path = self._path_member(group_id, self._member_id)
- self._wrap_kazoo_call(self._coord.set, member_path, capabilities)
+ async_result.get(block=True, timeout=timeout)
except exceptions.NoNodeError:
raise coordination.MemberNotJoined("member '%s' has not joined "
"the group '%s' or the group "
"has not been created" %
- (self._member_id, group_id))
+ (member_id, group_id))
- def get_member_capabilities(self, group_id, member_id):
+ def update_capabilities(self, group_id, capabilities):
+ member_path = self._path_member(group_id, self._member_id)
+ async_result = self._wrap_kazoo_call(self._coord.set_async,
+ member_path, capabilities)
+ return ZooAsyncResult(async_result, self._update_capabilities_handler,
+ group_id=group_id, member_id=self._member_id)
+
+ @staticmethod
+ def _get_member_capabilities_handler(async_result, timeout, group_id,
+ member_id):
capabilities = ""
try:
- member_path = self._path_member(group_id, member_id)
- capabilities = self._wrap_kazoo_call(self._coord.get,
- member_path)[0]
+ capabilities = async_result.get(block=True, timeout=timeout)[0]
except exceptions.NoNodeError:
raise coordination.MemberNotJoined("member '%s' has not joined "
"the group '%s' or the group "
"has not been created" %
- (self._member_id, group_id))
+ (member_id, group_id))
return capabilities
- def get_groups(self):
+ def get_member_capabilities(self, group_id, member_id):
+ member_path = self._path_member(group_id, member_id)
+ async_result = self._wrap_kazoo_call(self._coord.get_async,
+ member_path)
+ return ZooAsyncResult(async_result,
+ self._get_member_capabilities_handler,
+ group_id=group_id, member_id=self._member_id)
+
+ @staticmethod
+ def _get_groups_handler(async_result, timeout):
group_ids = []
try:
- group_ids = self._wrap_kazoo_call(self._coord.get_children,
- paths.join("/", _TOOZ_NAMESPACE))
+ group_ids = async_result.get(block=True, timeout=timeout)
except exceptions.NoNodeError:
- raise coordination.ToozError("tooz namespace has "
- "not been created")
+ raise coordination.ToozError("tooz namespace has not been created")
return group_ids
+ def get_groups(self):
+ tooz_namespace = paths.join("/", _TOOZ_NAMESPACE)
+ async_result = self._wrap_kazoo_call(self._coord.get_children_async,
+ tooz_namespace)
+ return ZooAsyncResult(async_result, self._get_groups_handler)
+
@staticmethod
def _path_member(group_id, member_id):
return paths.join("/", _TOOZ_NAMESPACE, group_id, member_id)
@@ -163,3 +209,17 @@ class ZakeDriver(BaseZooKeeperDriver):
self._member_id = member_id
self._coord = fake_client.FakeClient(storage=storage)
super(ZakeDriver, self).__init__()
+
+
+class ZooAsyncResult(coordination.CoordAsyncResult):
+
+ def __init__(self, kazooAsyncResult, handler, **kwargs):
+ self.kazooAsyncResult = kazooAsyncResult
+ self.handler = handler
+ self.kwargs = kwargs
+
+ def get(self, timeout=15):
+ return self.handler(self.kazooAsyncResult, timeout, **self.kwargs)
+
+ def done(self):
+ return self.kazooAsyncResult.ready()
diff --git a/tooz/tests/test_coordination.py b/tooz/tests/test_coordination.py
index 7bba5a4..b8f6145 100644
--- a/tooz/tests/test_coordination.py
+++ b/tooz/tests/test_coordination.py
@@ -55,31 +55,35 @@ class TestAPI(testscenarios.TestWithScenarios, testcase.TestCase):
super(TestAPI, self).tearDown()
def test_create_group(self):
- self._coord.create_group(self.group_id)
- all_group_ids = self._coord.get_groups()
+ self._coord.create_group(self.group_id).get()
+ all_group_ids = self._coord.get_groups().get()
self.assertTrue(self.group_id in all_group_ids)
def test_get_groups(self):
groups_ids = [self._get_random_uuid() for _ in range(0, 5)]
for group_id in groups_ids:
- self._coord.create_group(group_id)
- created_groups = self._coord.get_groups()
+ self._coord.create_group(group_id).get()
+ created_groups = self._coord.get_groups().get()
for group_id in groups_ids:
self.assertTrue(group_id in created_groups)
def test_join_group(self):
- self._coord.create_group(self.group_id)
- self._coord.join_group(self.group_id)
- member_list = self._coord.get_members(self.group_id)
+ self._coord.create_group(self.group_id).get()
+ self._coord.join_group(self.group_id).get()
+ member_list = self._coord.get_members(self.group_id).get()
self.assertTrue(self.member_id in member_list)
def test_leave_group(self):
- self._coord.create_group(self.group_id)
- self._coord.join_group(self.group_id)
- member_ids = self._coord.get_members(self.group_id)
+ self._coord.create_group(self.group_id).get()
+ all_group_ids = self._coord.get_groups().get()
+ self.assertTrue(self.group_id in all_group_ids)
+ self._coord.join_group(self.group_id).get()
+ member_list = self._coord.get_members(self.group_id).get()
+ self.assertTrue(self.member_id in member_list)
+ member_ids = self._coord.get_members(self.group_id).get()
self.assertTrue(self.member_id in member_ids)
- self._coord.leave_group(self.group_id)
- new_member_objects = self._coord.get_members(self.group_id)
+ self._coord.leave_group(self.group_id).get()
+ new_member_objects = self._coord.get_members(self.group_id).get()
new_member_list = [member.member_id for member in new_member_objects]
self.assertTrue(self.member_id not in new_member_list)
@@ -91,33 +95,33 @@ class TestAPI(testscenarios.TestWithScenarios, testcase.TestCase):
**self.kwargs)
client2.start()
- self._coord.create_group(group_id_test2)
- self._coord.join_group(group_id_test2)
- client2.join_group(group_id_test2)
- members_ids = self._coord.get_members(group_id_test2)
+ self._coord.create_group(group_id_test2).get()
+ self._coord.join_group(group_id_test2).get()
+ client2.join_group(group_id_test2).get()
+ members_ids = self._coord.get_members(group_id_test2).get()
self.assertTrue(self.member_id in members_ids)
self.assertTrue(member_id_test2 in members_ids)
def test_get_member_capabilities(self):
- self._coord.create_group(self.group_id)
+ self._coord.create_group(self.group_id).get()
self._coord.join_group(self.group_id, b"test_capabilities")
capa = self._coord.get_member_capabilities(self.group_id,
- self.member_id)
+ self.member_id).get()
self.assertEqual(capa, b"test_capabilities")
def test_update_capabilities(self):
- self._coord.create_group(self.group_id)
- self._coord.join_group(self.group_id, b"test_capabilities1")
+ self._coord.create_group(self.group_id).get()
+ self._coord.join_group(self.group_id, b"test_capabilities1").get()
capa = self._coord.get_member_capabilities(self.group_id,
- self.member_id)
+ self.member_id).get()
self.assertEqual(capa, b"test_capabilities1")
self._coord.update_capabilities(self.group_id,
- b"test_capabilities2")
+ b"test_capabilities2").get()
capa2 = self._coord.get_member_capabilities(self.group_id,
- self.member_id)
+ self.member_id).get()
self.assertEqual(capa2, b"test_capabilities2")
def _get_random_uuid(self):