diff options
author | Yassine Lamgarchal <yassine.lamgarchal@enovance.com> | 2014-01-17 15:12:40 +0100 |
---|---|---|
committer | Yassine Lamgarchal <yassine.lamgarchal@enovance.com> | 2014-01-28 14:57:41 +0100 |
commit | 5a11223c4656b7de9906c117aebd85abf68de99f (patch) | |
tree | 00b2846ba15e6ae55e5d54ade91816feefab167b | |
parent | d39dedd0ef0df018a189381236ec72b4e3035af9 (diff) | |
download | tooz-5a11223c4656b7de9906c117aebd85abf68de99f.tar.gz |
Add asynchronous API
The Tooz asynchronous API rely on zookeeper client asynchronous
API. Each asynchronous call will return a future like object.
Change-Id: Ibee9bf8ae0d1c1f62318f46b58ee25631e9e319a
-rw-r--r-- | test-requirements.txt | 2 | ||||
-rw-r--r-- | tooz/coordination.py | 51 | ||||
-rw-r--r-- | tooz/drivers/zookeeper.py | 130 | ||||
-rw-r--r-- | tooz/tests/test_coordination.py | 50 |
4 files changed, 163 insertions, 70 deletions
diff --git a/test-requirements.txt b/test-requirements.txt index 59adbef..0e5799f 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -7,4 +7,4 @@ python-subunit testrepository>=0.0.17 testtools>=0.9.32 testscenarios>=0.4 -zake>=0.0.6 +zake>=0.0.9 diff --git a/tooz/coordination.py b/tooz/coordination.py index 98db447..b6e2d78 100644 --- a/tooz/coordination.py +++ b/tooz/coordination.py @@ -46,68 +46,97 @@ class CoordinationDriver(object): @abc.abstractmethod def create_group(self, group_id): - """Request the creation of a group. + """Request the creation of a group asynchronously. :param group_id: the id of the group to create :type group_id: str + :returns: None + :rtype: CoordAsyncResult """ @abc.abstractmethod def get_groups(self): - """Return the list composed by all groups ids. + """Return the list composed by all groups ids asynchronously. - :returns: list of all created group ids - :rtype: list + :returns: the list of all created group ids + :rtype: CoordAsyncResult """ @abc.abstractmethod def join_group(self, group_id, capabilities): - """Join a group and establish group membership. + """Join a group and establish group membership asynchronously. :param group_id: the id of the group to join :type group_id: str :param capabilities: the capabilities of the joined member :type capabilities: str + :returns: None + :rtype: CoordAsyncResult """ @abc.abstractmethod def leave_group(self, group_id): - """Leave a group. + """Leave a group asynchronously. :param group_id: the id of the group to leave :type group_id: str + :returns: None + :rtype: CoordAsyncResult """ @abc.abstractmethod def get_members(self, group_id): - """Return the list of all members ids of the specified group. + """Return the list of all members ids of the specified group + asynchronously. :returns: list of all created group ids - :rtype: list + :rtype: CoordAsyncResult """ @abc.abstractmethod def get_member_capabilities(self, group_id, member_id): - """Return the capabilities of a member. + """Return the capabilities of a member asynchronously. :param group_id: the id of the group of the member :type group_id: str :param member_id: the id of the member :type member_id: str :returns: capabilities of a member - :rtype: str + :rtype: CoordAsyncResult """ @abc.abstractmethod def update_capabilities(self, group_id, capabilities): - """Update capabilities of the caller in the specified group. + """Update capabilities of the caller in the specified group + asynchronously. :param group_id: the id of the group of the current member :type group_id: str :param capabilities: the capabilities of the updated member :type capabilities: str + :returns: None + :rtype: CoordAsyncResult + """ + + +@six.add_metaclass(abc.ABCMeta) +class CoordAsyncResult(object): + """Representation of an asynchronous task, every call API + returns an CoordAsyncResult object on which the result or + the status of the task can be requested. + """ + + @abc.abstractmethod + def get(self, timeout=10): + """Retrieve the result of the corresponding asynchronous call. + :param timeout: block until the timeout expire. + :type timeout: float """ + @abc.abstractmethod + def done(self): + """Returns True if the task is done, False otherwise.""" + #TODO(yassine) #Replace kwargs by something more simple. diff --git a/tooz/drivers/zookeeper.py b/tooz/drivers/zookeeper.py index 6e65572..bdafbdc 100644 --- a/tooz/drivers/zookeeper.py +++ b/tooz/drivers/zookeeper.py @@ -40,85 +40,131 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver): def stop(self): self._coord.stop() - def create_group(self, group_id): + @staticmethod + def _create_group_handler(async_result, timeout, group_id): try: - group_path = "/%s/%s" % (_TOOZ_NAMESPACE, group_id) - self._wrap_kazoo_call(self._coord.create, group_path) + async_result.get(block=True, timeout=timeout) except exceptions.NodeExistsError: raise coordination.GroupAlreadyExist("group_id=%s" % group_id) except exceptions.NoNodeError: raise coordination.ToozError("tooz namespace has not been created") - def join_group(self, group_id, capabilities=b""): + def create_group(self, group_id): + group_path = "/%s/%s" % (_TOOZ_NAMESPACE, group_id) + async_result = self._wrap_kazoo_call(self._coord.create_async, + group_path) + return ZooAsyncResult(async_result, self._create_group_handler, + group_id=group_id) + + @staticmethod + def _join_group_handler(async_result, timeout, group_id, member_id): try: - member_path = self._path_member(group_id, self._member_id) - self._wrap_kazoo_call(self._coord.create, - member_path, - value=capabilities, - ephemeral=True) + async_result.get(block=True, timeout=timeout) except exceptions.NodeExistsError: - raise coordination.MemberAlreadyExist(str(self._member_id)) + raise coordination.MemberAlreadyExist(str(member_id)) except exceptions.NoNodeError: raise coordination.GroupNotCreated("group '%s' has not been " - "created" % _TOOZ_NAMESPACE) + "created" % group_id) - def leave_group(self, group_id): + def join_group(self, group_id, capabilities=b""): + member_path = self._path_member(group_id, self._member_id) + async_result = self._wrap_kazoo_call(self._coord.create_async, + member_path, + value=capabilities, + ephemeral=True) + return ZooAsyncResult(async_result, self._join_group_handler, + group_id=group_id, member_id=self._member_id) + + @staticmethod + def _leave_group_handler(async_result, timeout, group_id, member_id): try: - member_path = self._path_member(group_id, self._member_id) - self._wrap_kazoo_call(self._coord.delete, member_path) + async_result.get(block=True, timeout=timeout) except exceptions.NoNodeError: raise coordination.MemberNotJoined("member '%s' has not joined " "the group '%s' or the group " "has not been created" % - (self._member_id, group_id)) + member_id, group_id) except exceptions.ZookeeperError as e: raise coordination.ToozError(str(e)) - def get_members(self, group_id): - member_ids = [] + def leave_group(self, group_id): + member_path = self._path_member(group_id, self._member_id) + async_result = self._wrap_kazoo_call(self._coord.delete_async, + member_path) + return ZooAsyncResult(async_result, self._leave_group_handler, + group_id=group_id, member_id=self._member_id) + + @staticmethod + def _get_members_handler(async_result, timeout, group_id): + members_ids = None try: - group_path = paths.join("/", _TOOZ_NAMESPACE, group_id) - member_ids = self._wrap_kazoo_call(self._coord.get_children, - group_path) + members_ids = async_result.get(block=True, timeout=timeout) except exceptions.NoNodeError: raise coordination.GroupNotCreated("group '%s' does not exist" % group_id) + return members_ids - return member_ids + def get_members(self, group_id): + group_path = paths.join("/", _TOOZ_NAMESPACE, group_id) + async_result = self._wrap_kazoo_call(self._coord.get_children_async, + group_path) + return ZooAsyncResult(async_result, self._get_members_handler, + group_id=group_id) - def update_capabilities(self, group_id, capabilities): + @staticmethod + def _update_capabilities_handler(async_result, timeout, group_id, + member_id): try: - member_path = self._path_member(group_id, self._member_id) - self._wrap_kazoo_call(self._coord.set, member_path, capabilities) + async_result.get(block=True, timeout=timeout) except exceptions.NoNodeError: raise coordination.MemberNotJoined("member '%s' has not joined " "the group '%s' or the group " "has not been created" % - (self._member_id, group_id)) + (member_id, group_id)) - def get_member_capabilities(self, group_id, member_id): + def update_capabilities(self, group_id, capabilities): + member_path = self._path_member(group_id, self._member_id) + async_result = self._wrap_kazoo_call(self._coord.set_async, + member_path, capabilities) + return ZooAsyncResult(async_result, self._update_capabilities_handler, + group_id=group_id, member_id=self._member_id) + + @staticmethod + def _get_member_capabilities_handler(async_result, timeout, group_id, + member_id): capabilities = "" try: - member_path = self._path_member(group_id, member_id) - capabilities = self._wrap_kazoo_call(self._coord.get, - member_path)[0] + capabilities = async_result.get(block=True, timeout=timeout)[0] except exceptions.NoNodeError: raise coordination.MemberNotJoined("member '%s' has not joined " "the group '%s' or the group " "has not been created" % - (self._member_id, group_id)) + (member_id, group_id)) return capabilities - def get_groups(self): + def get_member_capabilities(self, group_id, member_id): + member_path = self._path_member(group_id, member_id) + async_result = self._wrap_kazoo_call(self._coord.get_async, + member_path) + return ZooAsyncResult(async_result, + self._get_member_capabilities_handler, + group_id=group_id, member_id=self._member_id) + + @staticmethod + def _get_groups_handler(async_result, timeout): group_ids = [] try: - group_ids = self._wrap_kazoo_call(self._coord.get_children, - paths.join("/", _TOOZ_NAMESPACE)) + group_ids = async_result.get(block=True, timeout=timeout) except exceptions.NoNodeError: - raise coordination.ToozError("tooz namespace has " - "not been created") + raise coordination.ToozError("tooz namespace has not been created") return group_ids + def get_groups(self): + tooz_namespace = paths.join("/", _TOOZ_NAMESPACE) + async_result = self._wrap_kazoo_call(self._coord.get_children_async, + tooz_namespace) + return ZooAsyncResult(async_result, self._get_groups_handler) + @staticmethod def _path_member(group_id, member_id): return paths.join("/", _TOOZ_NAMESPACE, group_id, member_id) @@ -163,3 +209,17 @@ class ZakeDriver(BaseZooKeeperDriver): self._member_id = member_id self._coord = fake_client.FakeClient(storage=storage) super(ZakeDriver, self).__init__() + + +class ZooAsyncResult(coordination.CoordAsyncResult): + + def __init__(self, kazooAsyncResult, handler, **kwargs): + self.kazooAsyncResult = kazooAsyncResult + self.handler = handler + self.kwargs = kwargs + + def get(self, timeout=15): + return self.handler(self.kazooAsyncResult, timeout, **self.kwargs) + + def done(self): + return self.kazooAsyncResult.ready() diff --git a/tooz/tests/test_coordination.py b/tooz/tests/test_coordination.py index 7bba5a4..b8f6145 100644 --- a/tooz/tests/test_coordination.py +++ b/tooz/tests/test_coordination.py @@ -55,31 +55,35 @@ class TestAPI(testscenarios.TestWithScenarios, testcase.TestCase): super(TestAPI, self).tearDown() def test_create_group(self): - self._coord.create_group(self.group_id) - all_group_ids = self._coord.get_groups() + self._coord.create_group(self.group_id).get() + all_group_ids = self._coord.get_groups().get() self.assertTrue(self.group_id in all_group_ids) def test_get_groups(self): groups_ids = [self._get_random_uuid() for _ in range(0, 5)] for group_id in groups_ids: - self._coord.create_group(group_id) - created_groups = self._coord.get_groups() + self._coord.create_group(group_id).get() + created_groups = self._coord.get_groups().get() for group_id in groups_ids: self.assertTrue(group_id in created_groups) def test_join_group(self): - self._coord.create_group(self.group_id) - self._coord.join_group(self.group_id) - member_list = self._coord.get_members(self.group_id) + self._coord.create_group(self.group_id).get() + self._coord.join_group(self.group_id).get() + member_list = self._coord.get_members(self.group_id).get() self.assertTrue(self.member_id in member_list) def test_leave_group(self): - self._coord.create_group(self.group_id) - self._coord.join_group(self.group_id) - member_ids = self._coord.get_members(self.group_id) + self._coord.create_group(self.group_id).get() + all_group_ids = self._coord.get_groups().get() + self.assertTrue(self.group_id in all_group_ids) + self._coord.join_group(self.group_id).get() + member_list = self._coord.get_members(self.group_id).get() + self.assertTrue(self.member_id in member_list) + member_ids = self._coord.get_members(self.group_id).get() self.assertTrue(self.member_id in member_ids) - self._coord.leave_group(self.group_id) - new_member_objects = self._coord.get_members(self.group_id) + self._coord.leave_group(self.group_id).get() + new_member_objects = self._coord.get_members(self.group_id).get() new_member_list = [member.member_id for member in new_member_objects] self.assertTrue(self.member_id not in new_member_list) @@ -91,33 +95,33 @@ class TestAPI(testscenarios.TestWithScenarios, testcase.TestCase): **self.kwargs) client2.start() - self._coord.create_group(group_id_test2) - self._coord.join_group(group_id_test2) - client2.join_group(group_id_test2) - members_ids = self._coord.get_members(group_id_test2) + self._coord.create_group(group_id_test2).get() + self._coord.join_group(group_id_test2).get() + client2.join_group(group_id_test2).get() + members_ids = self._coord.get_members(group_id_test2).get() self.assertTrue(self.member_id in members_ids) self.assertTrue(member_id_test2 in members_ids) def test_get_member_capabilities(self): - self._coord.create_group(self.group_id) + self._coord.create_group(self.group_id).get() self._coord.join_group(self.group_id, b"test_capabilities") capa = self._coord.get_member_capabilities(self.group_id, - self.member_id) + self.member_id).get() self.assertEqual(capa, b"test_capabilities") def test_update_capabilities(self): - self._coord.create_group(self.group_id) - self._coord.join_group(self.group_id, b"test_capabilities1") + self._coord.create_group(self.group_id).get() + self._coord.join_group(self.group_id, b"test_capabilities1").get() capa = self._coord.get_member_capabilities(self.group_id, - self.member_id) + self.member_id).get() self.assertEqual(capa, b"test_capabilities1") self._coord.update_capabilities(self.group_id, - b"test_capabilities2") + b"test_capabilities2").get() capa2 = self._coord.get_member_capabilities(self.group_id, - self.member_id) + self.member_id).get() self.assertEqual(capa2, b"test_capabilities2") def _get_random_uuid(self): |