summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2015-02-04 18:36:00 -0800
committerJoshua Harlow <harlowja@yahoo-inc.com>2015-02-06 16:23:20 -0800
commit9afaefd56fa117941e1b801803a7d796a8a96551 (patch)
tree82f4683f22ecf565d9ba98ebd47b25f310eb81f1
parent5edf2b3db33a42b646fe923580debff6b58a1122 (diff)
downloadtooz-9afaefd56fa117941e1b801803a7d796a8a96551.tar.gz
Since we use msgpack this can be more than a str
It is often nice to have capabilities that are dicts to allow for more complex capabilities that; and since we use msgpack this is already enabled for usage (since msgpack will serialize/deserialize capabilities provided as dicts already). This updates the zookeeper driver to ensure that capabilites sent are first formatted to msgpack format (before being sent) so that all drivers work in the same manner and accept the same kind of items. Change-Id: I2e19e806d38f70175706610cd92d9e22f606b41b
-rw-r--r--tooz/coordination.py4
-rw-r--r--tooz/drivers/memcached.py9
-rw-r--r--tooz/drivers/redis.py11
-rw-r--r--tooz/drivers/zookeeper.py16
-rw-r--r--tooz/tests/test_coordination.py11
-rw-r--r--tooz/utils.py28
6 files changed, 60 insertions, 19 deletions
diff --git a/tooz/coordination.py b/tooz/coordination.py
index 0cdbef1..cbeec4c 100644
--- a/tooz/coordination.py
+++ b/tooz/coordination.py
@@ -226,7 +226,7 @@ class CoordinationDriver(object):
:param group_id: the id of the group to join
:type group_id: str
:param capabilities: the capabilities of the joined member
- :type capabilities: str
+ :type capabilities: object (typically str)
:returns: None
:rtype: CoordAsyncResult
"""
@@ -285,7 +285,7 @@ class CoordinationDriver(object):
:param group_id: the id of the group of the current member
:type group_id: str
:param capabilities: the capabilities of the updated member
- :type capabilities: str
+ :type capabilities: object (typically str)
:returns: None
:rtype: CoordAsyncResult
"""
diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py
index f14c5bc..4c7b8e6 100644
--- a/tooz/drivers/memcached.py
+++ b/tooz/drivers/memcached.py
@@ -18,7 +18,6 @@ import collections
import logging
from concurrent import futures
-import msgpack
import pymemcache.client
import six
@@ -103,15 +102,15 @@ class MemcachedDriver(coordination.CoordinationDriver):
def _msgpack_serializer(key, value):
if isinstance(value, six.binary_type):
return value, 1
- return msgpack.dumps(value), 2
+ return utils.dumps(value), 2
@staticmethod
def _msgpack_deserializer(key, value, flags):
if flags == 1:
return value
if flags == 2:
- return msgpack.loads(value)
- raise Exception("Unknown serialization format")
+ return utils.loads(value)
+ raise Exception("Unknown serialization format '%s'" % flags)
def _start(self):
try:
@@ -213,7 +212,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
raise coordination.MemberAlreadyExist(group_id,
self._member_id)
group_members[self._member_id] = {
- "capabilities": capabilities,
+ b"capabilities": capabilities,
}
if not self.client.cas(encoded_group, group_members, cas):
# It changed, let's try again
diff --git a/tooz/drivers/redis.py b/tooz/drivers/redis.py
index e6a8df0..64b8667 100644
--- a/tooz/drivers/redis.py
+++ b/tooz/drivers/redis.py
@@ -21,7 +21,6 @@ from distutils import version
import logging
from concurrent import futures
-import msgpack
from oslo.utils import strutils
import redis
from redis import exceptions
@@ -291,17 +290,11 @@ class RedisDriver(coordination.CoordinationDriver):
@staticmethod
def _dumps(data):
- try:
- return msgpack.dumps(data)
- except (msgpack.PackException, ValueError) as e:
- raise coordination.ToozError(utils.exception_message(e))
+ return utils.dumps(data)
@staticmethod
def _loads(blob):
- try:
- return msgpack.loads(blob)
- except (msgpack.UnpackException, ValueError) as e:
- raise coordination.ToozError(utils.exception_message(e))
+ return utils.loads(blob)
@classmethod
def _make_client(cls, parsed_url, options, default_socket_timeout):
diff --git a/tooz/drivers/zookeeper.py b/tooz/drivers/zookeeper.py
index bd97ab6..df2df94 100644
--- a/tooz/drivers/zookeeper.py
+++ b/tooz/drivers/zookeeper.py
@@ -73,6 +73,14 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
self._coord.stop()
@staticmethod
+ def _dumps(data):
+ return utils.dumps(data)
+
+ @staticmethod
+ def _loads(blob):
+ return utils.loads(blob)
+
+ @staticmethod
def _create_group_handler(async_result, timeout,
timeout_exception, group_id):
try:
@@ -130,6 +138,7 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
def join_group(self, group_id, capabilities=b""):
member_path = self._path_member(group_id, self._member_id)
+ capabilities = self._dumps(capabilities)
async_result = self._coord.create_async(member_path,
value=capabilities,
ephemeral=True)
@@ -191,13 +200,14 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
def update_capabilities(self, group_id, capabilities):
member_path = self._path_member(group_id, self._member_id)
+ capabilities = self._dumps(capabilities)
async_result = self._coord.set_async(member_path, capabilities)
return ZooAsyncResult(async_result, self._update_capabilities_handler,
timeout_exception=self._timeout_exception,
group_id=group_id, member_id=self._member_id)
- @staticmethod
- def _get_member_capabilities_handler(async_result, timeout,
+ @classmethod
+ def _get_member_capabilities_handler(cls, async_result, timeout,
timeout_exception, group_id,
member_id):
try:
@@ -209,7 +219,7 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
except exceptions.ZookeeperError as e:
raise coordination.ToozError(utils.exception_message(e))
else:
- return capabilities
+ return cls._loads(capabilities)
def get_member_capabilities(self, group_id, member_id):
member_path = self._path_member(group_id, member_id)
diff --git a/tooz/tests/test_coordination.py b/tooz/tests/test_coordination.py
index 381b591..8de2f58 100644
--- a/tooz/tests/test_coordination.py
+++ b/tooz/tests/test_coordination.py
@@ -199,6 +199,17 @@ class TestAPI(testscenarios.TestWithScenarios,
self.member_id).get()
self.assertEqual(capa, b"test_capabilities")
+ def test_get_member_capabilities_complex(self):
+ self._coord.create_group(self.group_id).get()
+ caps = {
+ 'type': 'warrior',
+ 'abilities': ['fight', 'flight', 'double-hit-damage'],
+ }
+ self._coord.join_group(self.group_id, caps)
+ capa = self._coord.get_member_capabilities(self.group_id,
+ self.member_id).get()
+ self.assertEqual(capa, caps)
+
def test_get_member_capabilities_nonexistent_group(self):
capa = self._coord.get_member_capabilities(self.group_id,
self.member_id)
diff --git a/tooz/utils.py b/tooz/utils.py
index 3cc11d4..ffb9641 100644
--- a/tooz/utils.py
+++ b/tooz/utils.py
@@ -16,6 +16,10 @@
import six
+import msgpack
+
+from tooz import coordination
+
def exception_message(exc):
"""Return the string representation of exception."""
@@ -30,3 +34,27 @@ def to_binary(text, encoding='ascii'):
if not isinstance(text, six.binary_type):
text = text.encode(encoding)
return text
+
+
+def dumps(data, excp_cls=coordination.ToozError):
+ """Serializes provided data using msgpack into a byte string.
+
+ TODO(harlowja): use oslo.serialization 'msgpackutils.py' when we can since
+ that handles more native types better than the default does...
+ """
+ try:
+ return msgpack.packb(data, use_bin_type=True)
+ except (msgpack.PackException, ValueError) as e:
+ raise excp_cls(exception_message(e))
+
+
+def loads(blob, excp_cls=coordination.ToozError):
+ """Deserializes provided data using msgpack (from a prior byte string).
+
+ TODO(harlowja): use oslo.serialization 'msgpackutils.py' when we can since
+ that handles more native types better than the default does...
+ """
+ try:
+ return msgpack.unpackb(blob, encoding='utf-8')
+ except (msgpack.UnpackException, ValueError) as e:
+ raise excp_cls(exception_message(e))