diff options
author | Joshua Harlow <harlowja@yahoo-inc.com> | 2015-02-04 18:36:00 -0800 |
---|---|---|
committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2015-02-06 16:23:20 -0800 |
commit | 9afaefd56fa117941e1b801803a7d796a8a96551 (patch) | |
tree | 82f4683f22ecf565d9ba98ebd47b25f310eb81f1 | |
parent | 5edf2b3db33a42b646fe923580debff6b58a1122 (diff) | |
download | tooz-9afaefd56fa117941e1b801803a7d796a8a96551.tar.gz |
Since we use msgpack this can be more than a str
It is often nice to have capabilities that are dicts
to allow for more complex capabilities that; and since
we use msgpack this is already enabled for usage (since
msgpack will serialize/deserialize capabilities provided
as dicts already).
This updates the zookeeper driver to ensure that capabilites
sent are first formatted to msgpack format (before being
sent) so that all drivers work in the same manner and accept
the same kind of items.
Change-Id: I2e19e806d38f70175706610cd92d9e22f606b41b
-rw-r--r-- | tooz/coordination.py | 4 | ||||
-rw-r--r-- | tooz/drivers/memcached.py | 9 | ||||
-rw-r--r-- | tooz/drivers/redis.py | 11 | ||||
-rw-r--r-- | tooz/drivers/zookeeper.py | 16 | ||||
-rw-r--r-- | tooz/tests/test_coordination.py | 11 | ||||
-rw-r--r-- | tooz/utils.py | 28 |
6 files changed, 60 insertions, 19 deletions
diff --git a/tooz/coordination.py b/tooz/coordination.py index 0cdbef1..cbeec4c 100644 --- a/tooz/coordination.py +++ b/tooz/coordination.py @@ -226,7 +226,7 @@ class CoordinationDriver(object): :param group_id: the id of the group to join :type group_id: str :param capabilities: the capabilities of the joined member - :type capabilities: str + :type capabilities: object (typically str) :returns: None :rtype: CoordAsyncResult """ @@ -285,7 +285,7 @@ class CoordinationDriver(object): :param group_id: the id of the group of the current member :type group_id: str :param capabilities: the capabilities of the updated member - :type capabilities: str + :type capabilities: object (typically str) :returns: None :rtype: CoordAsyncResult """ diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py index f14c5bc..4c7b8e6 100644 --- a/tooz/drivers/memcached.py +++ b/tooz/drivers/memcached.py @@ -18,7 +18,6 @@ import collections import logging from concurrent import futures -import msgpack import pymemcache.client import six @@ -103,15 +102,15 @@ class MemcachedDriver(coordination.CoordinationDriver): def _msgpack_serializer(key, value): if isinstance(value, six.binary_type): return value, 1 - return msgpack.dumps(value), 2 + return utils.dumps(value), 2 @staticmethod def _msgpack_deserializer(key, value, flags): if flags == 1: return value if flags == 2: - return msgpack.loads(value) - raise Exception("Unknown serialization format") + return utils.loads(value) + raise Exception("Unknown serialization format '%s'" % flags) def _start(self): try: @@ -213,7 +212,7 @@ class MemcachedDriver(coordination.CoordinationDriver): raise coordination.MemberAlreadyExist(group_id, self._member_id) group_members[self._member_id] = { - "capabilities": capabilities, + b"capabilities": capabilities, } if not self.client.cas(encoded_group, group_members, cas): # It changed, let's try again diff --git a/tooz/drivers/redis.py b/tooz/drivers/redis.py index e6a8df0..64b8667 100644 --- a/tooz/drivers/redis.py +++ b/tooz/drivers/redis.py @@ -21,7 +21,6 @@ from distutils import version import logging from concurrent import futures -import msgpack from oslo.utils import strutils import redis from redis import exceptions @@ -291,17 +290,11 @@ class RedisDriver(coordination.CoordinationDriver): @staticmethod def _dumps(data): - try: - return msgpack.dumps(data) - except (msgpack.PackException, ValueError) as e: - raise coordination.ToozError(utils.exception_message(e)) + return utils.dumps(data) @staticmethod def _loads(blob): - try: - return msgpack.loads(blob) - except (msgpack.UnpackException, ValueError) as e: - raise coordination.ToozError(utils.exception_message(e)) + return utils.loads(blob) @classmethod def _make_client(cls, parsed_url, options, default_socket_timeout): diff --git a/tooz/drivers/zookeeper.py b/tooz/drivers/zookeeper.py index bd97ab6..df2df94 100644 --- a/tooz/drivers/zookeeper.py +++ b/tooz/drivers/zookeeper.py @@ -73,6 +73,14 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver): self._coord.stop() @staticmethod + def _dumps(data): + return utils.dumps(data) + + @staticmethod + def _loads(blob): + return utils.loads(blob) + + @staticmethod def _create_group_handler(async_result, timeout, timeout_exception, group_id): try: @@ -130,6 +138,7 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver): def join_group(self, group_id, capabilities=b""): member_path = self._path_member(group_id, self._member_id) + capabilities = self._dumps(capabilities) async_result = self._coord.create_async(member_path, value=capabilities, ephemeral=True) @@ -191,13 +200,14 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver): def update_capabilities(self, group_id, capabilities): member_path = self._path_member(group_id, self._member_id) + capabilities = self._dumps(capabilities) async_result = self._coord.set_async(member_path, capabilities) return ZooAsyncResult(async_result, self._update_capabilities_handler, timeout_exception=self._timeout_exception, group_id=group_id, member_id=self._member_id) - @staticmethod - def _get_member_capabilities_handler(async_result, timeout, + @classmethod + def _get_member_capabilities_handler(cls, async_result, timeout, timeout_exception, group_id, member_id): try: @@ -209,7 +219,7 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver): except exceptions.ZookeeperError as e: raise coordination.ToozError(utils.exception_message(e)) else: - return capabilities + return cls._loads(capabilities) def get_member_capabilities(self, group_id, member_id): member_path = self._path_member(group_id, member_id) diff --git a/tooz/tests/test_coordination.py b/tooz/tests/test_coordination.py index 381b591..8de2f58 100644 --- a/tooz/tests/test_coordination.py +++ b/tooz/tests/test_coordination.py @@ -199,6 +199,17 @@ class TestAPI(testscenarios.TestWithScenarios, self.member_id).get() self.assertEqual(capa, b"test_capabilities") + def test_get_member_capabilities_complex(self): + self._coord.create_group(self.group_id).get() + caps = { + 'type': 'warrior', + 'abilities': ['fight', 'flight', 'double-hit-damage'], + } + self._coord.join_group(self.group_id, caps) + capa = self._coord.get_member_capabilities(self.group_id, + self.member_id).get() + self.assertEqual(capa, caps) + def test_get_member_capabilities_nonexistent_group(self): capa = self._coord.get_member_capabilities(self.group_id, self.member_id) diff --git a/tooz/utils.py b/tooz/utils.py index 3cc11d4..ffb9641 100644 --- a/tooz/utils.py +++ b/tooz/utils.py @@ -16,6 +16,10 @@ import six +import msgpack + +from tooz import coordination + def exception_message(exc): """Return the string representation of exception.""" @@ -30,3 +34,27 @@ def to_binary(text, encoding='ascii'): if not isinstance(text, six.binary_type): text = text.encode(encoding) return text + + +def dumps(data, excp_cls=coordination.ToozError): + """Serializes provided data using msgpack into a byte string. + + TODO(harlowja): use oslo.serialization 'msgpackutils.py' when we can since + that handles more native types better than the default does... + """ + try: + return msgpack.packb(data, use_bin_type=True) + except (msgpack.PackException, ValueError) as e: + raise excp_cls(exception_message(e)) + + +def loads(blob, excp_cls=coordination.ToozError): + """Deserializes provided data using msgpack (from a prior byte string). + + TODO(harlowja): use oslo.serialization 'msgpackutils.py' when we can since + that handles more native types better than the default does... + """ + try: + return msgpack.unpackb(blob, encoding='utf-8') + except (msgpack.UnpackException, ValueError) as e: + raise excp_cls(exception_message(e)) |