diff options
author | Charles-Henri de Boysson <ceache@users.noreply.github.com> | 2020-02-13 00:40:14 -0500 |
---|---|---|
committer | Charles-Henri de Boysson <ceache@users.noreply.github.com> | 2020-02-18 00:36:59 -0500 |
commit | 1d81f96a77a6fd7ae2476cd798a5cb4bf1c1d9a2 (patch) | |
tree | 3ba38b227ad33300b86b9302c13ad09b2771ac45 | |
parent | 5764da0b3c25556cb6760f68c51b387f2fd8307b (diff) | |
download | kazoo-1d81f96a77a6fd7ae2476cd798a5cb4bf1c1d9a2.tar.gz |
feat(core): Add create2 support
-rw-r--r-- | kazoo/client.py | 55 | ||||
-rw-r--r-- | kazoo/protocol/serialization.py | 21 | ||||
-rw-r--r-- | kazoo/tests/test_client.py | 15 |
3 files changed, 78 insertions, 13 deletions
diff --git a/kazoo/client.py b/kazoo/client.py index 6c066a0..a129fc5 100644 --- a/kazoo/client.py +++ b/kazoo/client.py @@ -31,6 +31,7 @@ from kazoo.protocol.serialization import ( CheckVersion, CloseInstance, Create, + Create2, Delete, Exists, GetChildren, @@ -68,8 +69,11 @@ from kazoo.recipe.watchers import ChildrenWatch, DataWatch string_types = six.string_types bytes_types = (six.binary_type,) -CLOSED_STATES = (KeeperState.EXPIRED_SESSION, KeeperState.AUTH_FAILED, - KeeperState.CLOSED) +CLOSED_STATES = ( + KeeperState.EXPIRED_SESSION, + KeeperState.AUTH_FAILED, + KeeperState.CLOSED +) ENVI_VERSION = re.compile(r'([\d\.]*).*', re.DOTALL) ENVI_VERSION_KEY = 'zookeeper.version' log = logging.getLogger(__name__) @@ -856,7 +860,7 @@ class KazooClient(object): return self.sync_async(path).get() def create(self, path, value=b"", acl=None, ephemeral=False, - sequence=False, makepath=False): + sequence=False, makepath=False, include_data=False): """Create a node with the given value as its data. Optionally set an ACL on the node. @@ -904,7 +908,13 @@ class KazooClient(object): with a unique index. :param makepath: Whether the path should be created if it doesn't exist. - :returns: Real path of the new node. + :param include_data: + Include the :class:`~kazoo.protocol.states.ZnodeStat` of + the node in addition to its real path. This option changes + the return value to be a tuple of (path, stat). + + :returns: Real path of the new node, or tuple if `include_data` + is `True` :rtype: str :raises: @@ -923,13 +933,19 @@ class KazooClient(object): :exc:`~kazoo.exceptions.ZookeeperError` if the server returns a non-zero error code. + .. versionadded:: 1.1 + The `makepath` option. + .. versionadded:: 2.7 + The `include_data` option. """ acl = acl or self.default_acl - return self.create_async(path, value, acl=acl, ephemeral=ephemeral, - sequence=sequence, makepath=makepath).get() + return self.create_async( + path, value, acl=acl, ephemeral=ephemeral, + sequence=sequence, makepath=makepath, include_data=include_data + ).get() def create_async(self, path, value=b"", acl=None, ephemeral=False, - sequence=False, makepath=False): + sequence=False, makepath=False, include_data=False): """Asynchronously create a ZNode. Takes the same arguments as :meth:`create`. @@ -937,7 +953,8 @@ class KazooClient(object): .. versionadded:: 1.1 The makepath option. - + .. versionadded:: 2.7 + The `include_data` option. """ if acl is None and self.default_acl: acl = self.default_acl @@ -956,6 +973,8 @@ class KazooClient(object): raise TypeError("Invalid type for 'sequence' (bool expected)") if not isinstance(makepath, bool): raise TypeError("Invalid type for 'makepath' (bool expected)") + if not isinstance(include_data, bool): + raise TypeError("Invalid type for 'include_data' (bool expected)") flags = 0 if ephemeral: @@ -970,7 +989,9 @@ class KazooClient(object): @capture_exceptions(async_result) def do_create(): result = self._create_async_inner( - path, value, acl, flags, trailing=sequence) + path, value, acl, flags, + trailing=sequence, include_data=include_data + ) result.rawlink(create_completion) @capture_exceptions(async_result) @@ -981,7 +1002,11 @@ class KazooClient(object): @wrap(async_result) def create_completion(result): try: - return self.unchroot(result.get()) + if include_data: + new_path, stat = result.get() + return self.unchroot(new_path), stat + else: + return self.unchroot(result.get()) except NoNodeError: if not makepath: raise @@ -994,10 +1019,16 @@ class KazooClient(object): do_create() return async_result - def _create_async_inner(self, path, value, acl, flags, trailing=False): + def _create_async_inner(self, path, value, acl, flags, + trailing=False, include_data=False): async_result = self.handler.async_result() + if include_data: + opcode = Create2 + else: + opcode = Create + call_result = self._call( - Create(_prefix_root(self.chroot, path, trailing=trailing), + opcode(_prefix_root(self.chroot, path, trailing=trailing), value, acl, flags), async_result) if call_result is False: # We hit a short-circuit exit on the _call. Because we are diff --git a/kazoo/protocol/serialization.py b/kazoo/protocol/serialization.py index fa5c67a..14ad71a 100644 --- a/kazoo/protocol/serialization.py +++ b/kazoo/protocol/serialization.py @@ -351,6 +351,27 @@ class Transaction(namedtuple('Transaction', 'operations')): return resp +class Create2(namedtuple('Create2', 'path data acl flags')): + type = 15 + + def serialize(self): + b = bytearray() + b.extend(write_string(self.path)) + b.extend(write_buffer(self.data)) + b.extend(int_struct.pack(len(self.acl))) + for acl in self.acl: + b.extend(int_struct.pack(acl.perms) + + write_string(acl.id.scheme) + write_string(acl.id.id)) + b.extend(int_struct.pack(self.flags)) + return b + + @classmethod + def deserialize(cls, bytes, offset): + path, offset = read_string(bytes, offset) + stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset)) + return path, stat + + class Reconfig(namedtuple('Reconfig', 'joining leaving new_members config_id')): type = 16 diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py index b081542..719b24f 100644 --- a/kazoo/tests/test_client.py +++ b/kazoo/tests/test_client.py @@ -775,6 +775,19 @@ class TestClient(KazooTestCase): path = client.create("/1") self.assertRaises(NodeExistsError, client.create, path) + def test_create_stat(self): + if TRAVIS_ZK_VERSION: + version = TRAVIS_ZK_VERSION + else: + version = self.client.server_version() + if not version or version < (3, 5): + raise SkipTest("Must use Zookeeper 3.5 or above") + client = self.client + path, stat1 = client.create("/1", b"bytes", include_data=True) + data, stat2 = client.get("/1") + eq_(data, b"bytes") + eq_(stat1, stat2) + def test_create_get_set(self): nodepath = "/" + uuid.uuid4().hex @@ -1099,8 +1112,8 @@ class TestClientTransactions(KazooTestCase): t.create('/fred', ephemeral=True) t.create('/smith', sequence=True) results = t.commit() - eq_(results[0], '/freddy') eq_(len(results), 3) + eq_(results[0], '/freddy') self.assertTrue(results[2].startswith('/smith0')) def test_bad_creates(self): |