summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-02-07 13:06:30 +0000
committerGerrit Code Review <review@openstack.org>2015-02-07 13:06:30 +0000
commit858187510dc07869eb1ef4519e04cfe235f5bd2f (patch)
tree4eabdb7204de68609a3b2571dfed380115ce2abd
parent5b77b967618bab41d0ede93d5684db05aea82fc7 (diff)
parent9afaefd56fa117941e1b801803a7d796a8a96551 (diff)
downloadtooz-858187510dc07869eb1ef4519e04cfe235f5bd2f.tar.gz
Merge "Since we use msgpack this can be more than a str"
-rw-r--r--tooz/coordination.py4
-rw-r--r--tooz/drivers/memcached.py9
-rw-r--r--tooz/drivers/redis.py11
-rw-r--r--tooz/drivers/zookeeper.py16
-rw-r--r--tooz/tests/test_coordination.py11
-rw-r--r--tooz/utils.py28
6 files changed, 60 insertions, 19 deletions
diff --git a/tooz/coordination.py b/tooz/coordination.py
index 0cdbef1..cbeec4c 100644
--- a/tooz/coordination.py
+++ b/tooz/coordination.py
@@ -226,7 +226,7 @@ class CoordinationDriver(object):
:param group_id: the id of the group to join
:type group_id: str
:param capabilities: the capabilities of the joined member
- :type capabilities: str
+ :type capabilities: object (typically str)
:returns: None
:rtype: CoordAsyncResult
"""
@@ -285,7 +285,7 @@ class CoordinationDriver(object):
:param group_id: the id of the group of the current member
:type group_id: str
:param capabilities: the capabilities of the updated member
- :type capabilities: str
+ :type capabilities: object (typically str)
:returns: None
:rtype: CoordAsyncResult
"""
diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py
index f14c5bc..4c7b8e6 100644
--- a/tooz/drivers/memcached.py
+++ b/tooz/drivers/memcached.py
@@ -18,7 +18,6 @@ import collections
import logging
from concurrent import futures
-import msgpack
import pymemcache.client
import six
@@ -103,15 +102,15 @@ class MemcachedDriver(coordination.CoordinationDriver):
def _msgpack_serializer(key, value):
if isinstance(value, six.binary_type):
return value, 1
- return msgpack.dumps(value), 2
+ return utils.dumps(value), 2
@staticmethod
def _msgpack_deserializer(key, value, flags):
if flags == 1:
return value
if flags == 2:
- return msgpack.loads(value)
- raise Exception("Unknown serialization format")
+ return utils.loads(value)
+ raise Exception("Unknown serialization format '%s'" % flags)
def _start(self):
try:
@@ -213,7 +212,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
raise coordination.MemberAlreadyExist(group_id,
self._member_id)
group_members[self._member_id] = {
- "capabilities": capabilities,
+ b"capabilities": capabilities,
}
if not self.client.cas(encoded_group, group_members, cas):
# It changed, let's try again
diff --git a/tooz/drivers/redis.py b/tooz/drivers/redis.py
index e6a8df0..64b8667 100644
--- a/tooz/drivers/redis.py
+++ b/tooz/drivers/redis.py
@@ -21,7 +21,6 @@ from distutils import version
import logging
from concurrent import futures
-import msgpack
from oslo.utils import strutils
import redis
from redis import exceptions
@@ -291,17 +290,11 @@ class RedisDriver(coordination.CoordinationDriver):
@staticmethod
def _dumps(data):
- try:
- return msgpack.dumps(data)
- except (msgpack.PackException, ValueError) as e:
- raise coordination.ToozError(utils.exception_message(e))
+ return utils.dumps(data)
@staticmethod
def _loads(blob):
- try:
- return msgpack.loads(blob)
- except (msgpack.UnpackException, ValueError) as e:
- raise coordination.ToozError(utils.exception_message(e))
+ return utils.loads(blob)
@classmethod
def _make_client(cls, parsed_url, options, default_socket_timeout):
diff --git a/tooz/drivers/zookeeper.py b/tooz/drivers/zookeeper.py
index bd97ab6..df2df94 100644
--- a/tooz/drivers/zookeeper.py
+++ b/tooz/drivers/zookeeper.py
@@ -73,6 +73,14 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
self._coord.stop()
@staticmethod
+ def _dumps(data):
+ return utils.dumps(data)
+
+ @staticmethod
+ def _loads(blob):
+ return utils.loads(blob)
+
+ @staticmethod
def _create_group_handler(async_result, timeout,
timeout_exception, group_id):
try:
@@ -130,6 +138,7 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
def join_group(self, group_id, capabilities=b""):
member_path = self._path_member(group_id, self._member_id)
+ capabilities = self._dumps(capabilities)
async_result = self._coord.create_async(member_path,
value=capabilities,
ephemeral=True)
@@ -191,13 +200,14 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
def update_capabilities(self, group_id, capabilities):
member_path = self._path_member(group_id, self._member_id)
+ capabilities = self._dumps(capabilities)
async_result = self._coord.set_async(member_path, capabilities)
return ZooAsyncResult(async_result, self._update_capabilities_handler,
timeout_exception=self._timeout_exception,
group_id=group_id, member_id=self._member_id)
- @staticmethod
- def _get_member_capabilities_handler(async_result, timeout,
+ @classmethod
+ def _get_member_capabilities_handler(cls, async_result, timeout,
timeout_exception, group_id,
member_id):
try:
@@ -209,7 +219,7 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
except exceptions.ZookeeperError as e:
raise coordination.ToozError(utils.exception_message(e))
else:
- return capabilities
+ return cls._loads(capabilities)
def get_member_capabilities(self, group_id, member_id):
member_path = self._path_member(group_id, member_id)
diff --git a/tooz/tests/test_coordination.py b/tooz/tests/test_coordination.py
index 381b591..8de2f58 100644
--- a/tooz/tests/test_coordination.py
+++ b/tooz/tests/test_coordination.py
@@ -199,6 +199,17 @@ class TestAPI(testscenarios.TestWithScenarios,
self.member_id).get()
self.assertEqual(capa, b"test_capabilities")
+ def test_get_member_capabilities_complex(self):
+ self._coord.create_group(self.group_id).get()
+ caps = {
+ 'type': 'warrior',
+ 'abilities': ['fight', 'flight', 'double-hit-damage'],
+ }
+ self._coord.join_group(self.group_id, caps)
+ capa = self._coord.get_member_capabilities(self.group_id,
+ self.member_id).get()
+ self.assertEqual(capa, caps)
+
def test_get_member_capabilities_nonexistent_group(self):
capa = self._coord.get_member_capabilities(self.group_id,
self.member_id)
diff --git a/tooz/utils.py b/tooz/utils.py
index 3cc11d4..ffb9641 100644
--- a/tooz/utils.py
+++ b/tooz/utils.py
@@ -16,6 +16,10 @@
import six
+import msgpack
+
+from tooz import coordination
+
def exception_message(exc):
"""Return the string representation of exception."""
@@ -30,3 +34,27 @@ def to_binary(text, encoding='ascii'):
if not isinstance(text, six.binary_type):
text = text.encode(encoding)
return text
+
+
+def dumps(data, excp_cls=coordination.ToozError):
+ """Serializes provided data using msgpack into a byte string.
+
+ TODO(harlowja): use oslo.serialization 'msgpackutils.py' when we can since
+ that handles more native types better than the default does...
+ """
+ try:
+ return msgpack.packb(data, use_bin_type=True)
+ except (msgpack.PackException, ValueError) as e:
+ raise excp_cls(exception_message(e))
+
+
+def loads(blob, excp_cls=coordination.ToozError):
+ """Deserializes provided data using msgpack (from a prior byte string).
+
+ TODO(harlowja): use oslo.serialization 'msgpackutils.py' when we can since
+ that handles more native types better than the default does...
+ """
+ try:
+ return msgpack.unpackb(blob, encoding='utf-8')
+ except (msgpack.UnpackException, ValueError) as e:
+ raise excp_cls(exception_message(e))