summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharles-Henri de Boysson <ceache@users.noreply.github.com>2020-02-13 00:40:14 -0500
committerCharles-Henri de Boysson <ceache@users.noreply.github.com>2020-02-18 00:36:59 -0500
commit1d81f96a77a6fd7ae2476cd798a5cb4bf1c1d9a2 (patch)
tree3ba38b227ad33300b86b9302c13ad09b2771ac45
parent5764da0b3c25556cb6760f68c51b387f2fd8307b (diff)
downloadkazoo-1d81f96a77a6fd7ae2476cd798a5cb4bf1c1d9a2.tar.gz
feat(core): Add create2 support
-rw-r--r--kazoo/client.py55
-rw-r--r--kazoo/protocol/serialization.py21
-rw-r--r--kazoo/tests/test_client.py15
3 files changed, 78 insertions, 13 deletions
diff --git a/kazoo/client.py b/kazoo/client.py
index 6c066a0..a129fc5 100644
--- a/kazoo/client.py
+++ b/kazoo/client.py
@@ -31,6 +31,7 @@ from kazoo.protocol.serialization import (
CheckVersion,
CloseInstance,
Create,
+ Create2,
Delete,
Exists,
GetChildren,
@@ -68,8 +69,11 @@ from kazoo.recipe.watchers import ChildrenWatch, DataWatch
string_types = six.string_types
bytes_types = (six.binary_type,)
-CLOSED_STATES = (KeeperState.EXPIRED_SESSION, KeeperState.AUTH_FAILED,
- KeeperState.CLOSED)
+CLOSED_STATES = (
+ KeeperState.EXPIRED_SESSION,
+ KeeperState.AUTH_FAILED,
+ KeeperState.CLOSED
+)
ENVI_VERSION = re.compile(r'([\d\.]*).*', re.DOTALL)
ENVI_VERSION_KEY = 'zookeeper.version'
log = logging.getLogger(__name__)
@@ -856,7 +860,7 @@ class KazooClient(object):
return self.sync_async(path).get()
def create(self, path, value=b"", acl=None, ephemeral=False,
- sequence=False, makepath=False):
+ sequence=False, makepath=False, include_data=False):
"""Create a node with the given value as its data. Optionally
set an ACL on the node.
@@ -904,7 +908,13 @@ class KazooClient(object):
with a unique index.
:param makepath: Whether the path should be created if it
doesn't exist.
- :returns: Real path of the new node.
+ :param include_data:
+ Include the :class:`~kazoo.protocol.states.ZnodeStat` of
+ the node in addition to its real path. This option changes
+ the return value to be a tuple of (path, stat).
+
+ :returns: Real path of the new node, or tuple if `include_data`
+ is `True`
:rtype: str
:raises:
@@ -923,13 +933,19 @@ class KazooClient(object):
:exc:`~kazoo.exceptions.ZookeeperError` if the server
returns a non-zero error code.
+ .. versionadded:: 1.1
+ The `makepath` option.
+ .. versionadded:: 2.7
+ The `include_data` option.
"""
acl = acl or self.default_acl
- return self.create_async(path, value, acl=acl, ephemeral=ephemeral,
- sequence=sequence, makepath=makepath).get()
+ return self.create_async(
+ path, value, acl=acl, ephemeral=ephemeral,
+ sequence=sequence, makepath=makepath, include_data=include_data
+ ).get()
def create_async(self, path, value=b"", acl=None, ephemeral=False,
- sequence=False, makepath=False):
+ sequence=False, makepath=False, include_data=False):
"""Asynchronously create a ZNode. Takes the same arguments as
:meth:`create`.
@@ -937,7 +953,8 @@ class KazooClient(object):
.. versionadded:: 1.1
The makepath option.
-
+ .. versionadded:: 2.7
+ The `include_data` option.
"""
if acl is None and self.default_acl:
acl = self.default_acl
@@ -956,6 +973,8 @@ class KazooClient(object):
raise TypeError("Invalid type for 'sequence' (bool expected)")
if not isinstance(makepath, bool):
raise TypeError("Invalid type for 'makepath' (bool expected)")
+ if not isinstance(include_data, bool):
+ raise TypeError("Invalid type for 'include_data' (bool expected)")
flags = 0
if ephemeral:
@@ -970,7 +989,9 @@ class KazooClient(object):
@capture_exceptions(async_result)
def do_create():
result = self._create_async_inner(
- path, value, acl, flags, trailing=sequence)
+ path, value, acl, flags,
+ trailing=sequence, include_data=include_data
+ )
result.rawlink(create_completion)
@capture_exceptions(async_result)
@@ -981,7 +1002,11 @@ class KazooClient(object):
@wrap(async_result)
def create_completion(result):
try:
- return self.unchroot(result.get())
+ if include_data:
+ new_path, stat = result.get()
+ return self.unchroot(new_path), stat
+ else:
+ return self.unchroot(result.get())
except NoNodeError:
if not makepath:
raise
@@ -994,10 +1019,16 @@ class KazooClient(object):
do_create()
return async_result
- def _create_async_inner(self, path, value, acl, flags, trailing=False):
+ def _create_async_inner(self, path, value, acl, flags,
+ trailing=False, include_data=False):
async_result = self.handler.async_result()
+ if include_data:
+ opcode = Create2
+ else:
+ opcode = Create
+
call_result = self._call(
- Create(_prefix_root(self.chroot, path, trailing=trailing),
+ opcode(_prefix_root(self.chroot, path, trailing=trailing),
value, acl, flags), async_result)
if call_result is False:
# We hit a short-circuit exit on the _call. Because we are
diff --git a/kazoo/protocol/serialization.py b/kazoo/protocol/serialization.py
index fa5c67a..14ad71a 100644
--- a/kazoo/protocol/serialization.py
+++ b/kazoo/protocol/serialization.py
@@ -351,6 +351,27 @@ class Transaction(namedtuple('Transaction', 'operations')):
return resp
+class Create2(namedtuple('Create2', 'path data acl flags')):
+ type = 15
+
+ def serialize(self):
+ b = bytearray()
+ b.extend(write_string(self.path))
+ b.extend(write_buffer(self.data))
+ b.extend(int_struct.pack(len(self.acl)))
+ for acl in self.acl:
+ b.extend(int_struct.pack(acl.perms) +
+ write_string(acl.id.scheme) + write_string(acl.id.id))
+ b.extend(int_struct.pack(self.flags))
+ return b
+
+ @classmethod
+ def deserialize(cls, bytes, offset):
+ path, offset = read_string(bytes, offset)
+ stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
+ return path, stat
+
+
class Reconfig(namedtuple('Reconfig',
'joining leaving new_members config_id')):
type = 16
diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py
index b081542..719b24f 100644
--- a/kazoo/tests/test_client.py
+++ b/kazoo/tests/test_client.py
@@ -775,6 +775,19 @@ class TestClient(KazooTestCase):
path = client.create("/1")
self.assertRaises(NodeExistsError, client.create, path)
+ def test_create_stat(self):
+ if TRAVIS_ZK_VERSION:
+ version = TRAVIS_ZK_VERSION
+ else:
+ version = self.client.server_version()
+ if not version or version < (3, 5):
+ raise SkipTest("Must use Zookeeper 3.5 or above")
+ client = self.client
+ path, stat1 = client.create("/1", b"bytes", include_data=True)
+ data, stat2 = client.get("/1")
+ eq_(data, b"bytes")
+ eq_(stat1, stat2)
+
def test_create_get_set(self):
nodepath = "/" + uuid.uuid4().hex
@@ -1099,8 +1112,8 @@ class TestClientTransactions(KazooTestCase):
t.create('/fred', ephemeral=True)
t.create('/smith', sequence=True)
results = t.commit()
- eq_(results[0], '/freddy')
eq_(len(results), 3)
+ eq_(results[0], '/freddy')
self.assertTrue(results[2].startswith('/smith0'))
def test_bad_creates(self):