summaryrefslogtreecommitdiff
path: root/zake
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@gmail.com>2014-08-23 21:08:22 -0700
committerJoshua Harlow <harlowja@gmail.com>2014-08-23 21:08:22 -0700
commite6a4182749300a81f1dde9f46eca116e1b3d2b2b (patch)
treeeb7bf966ba29213e4eb4ec42cec7d30604a6d038 /zake
parent11219172fc31b8324c5195d32ff826059ee03464 (diff)
downloadzake-e6a4182749300a81f1dde9f46eca116e1b3d2b2b.tar.gz
Move the partial client below the fake client code
Diffstat (limited to 'zake')
-rw-r--r--zake/fake_client.py216
1 files changed, 109 insertions, 107 deletions
diff --git a/zake/fake_client.py b/zake/fake_client.py
index 4af50ba..719ef8a 100644
--- a/zake/fake_client.py
+++ b/zake/fake_client.py
@@ -58,113 +58,6 @@ SERVER_VERSION = (3, 4, 0)
_NO_ACL_MSG = "ACLs not currently supported"
-class _PartialClient(object):
- def __init__(self, storage):
- self.storage = storage
- self.session_id = None
-
- def delete(self, path, version=-1, recursive=False):
- if not isinstance(path, six.string_types):
- raise TypeError("path must be a string")
- data_watches = []
- child_watches = []
- path = utils.normpath(path)
- with self.storage.lock:
- if path not in self.storage:
- raise k_exceptions.NoNodeError("No path %s" % (path))
- path_version = self.storage[path]['version']
- if version != -1 and path_version != version:
- raise k_exceptions.BadVersionError("Version mismatch"
- " (%s != %s)"
- % (version, path_version))
-
- if recursive:
- paths = [path]
- children = self.storage.get_children(path, only_direct=False)
- for p in six.iterkeys(children):
- paths.append(p)
- else:
- children = self.storage.get_children(path, only_direct=False)
- if children:
- raise k_exceptions.NotEmptyError("Path %s is not-empty"
- " (%s children exist)"
- % (path, len(children)))
- paths = [path]
- paths = list(reversed(sorted(set(paths))))
- for p in paths:
- self.storage.pop(p)
- parents = []
- for p in paths:
- parents.extend(self.storage.get_parents(p))
- parents = list(reversed(sorted(set(parents))))
- for p in parents:
- event = k_states.WatchedEvent(
- type=k_states.EventType.DELETED,
- state=k_states.KeeperState.CONNECTED,
- path=p)
- child_watches.append(([p], event))
- for p in paths:
- event = k_states.WatchedEvent(
- type=k_states.EventType.DELETED,
- state=k_states.KeeperState.CONNECTED,
- path=p)
- data_watches.append(([p], event))
- return (True, data_watches, child_watches)
-
- def set(self, path, value, version=-1):
- if not isinstance(path, six.string_types):
- raise TypeError("path must be a string")
- if not isinstance(value, six.binary_type):
- raise TypeError("value must be a byte string")
- if not isinstance(version, int):
- raise TypeError("version must be an int")
- path = utils.normpath(path)
- try:
- stat = self.storage.set(path, value, version=version)
- except KeyError:
- raise k_exceptions.NoNodeError("No path %s" % (path))
- data_watches = []
- child_watches = []
- event = k_states.WatchedEvent(type=k_states.EventType.CHANGED,
- state=k_states.KeeperState.CONNECTED,
- path=path)
- data_watches.append(([path], event))
- return (stat, data_watches, child_watches)
-
- def create(self, path, value=b"", acl=None,
- ephemeral=False, sequence=False, makepath=False):
- if not isinstance(path, six.string_types):
- raise TypeError("path must be a string")
- if not isinstance(value, six.binary_type):
- raise TypeError("value must be a byte string")
- if acl:
- raise NotImplementedError(_NO_ACL_MSG)
- data_watches = []
- child_watches = []
- with self.storage.lock:
- path = utils.normpath(path)
- if makepath:
- for parent_path in utils.partition_path(path)[0:-1]:
- if parent_path not in self.storage:
- result = self.create(parent_path)
- data_watches.extend(result[1])
- child_watches.extend(result[2])
- created, parents, path = self.storage.create(
- path, value=value, sequence=sequence,
- ephemeral=ephemeral, session_id=self.session_id)
- if parents:
- event = k_states.WatchedEvent(type=k_states.EventType.CHILD,
- state=k_states.KeeperState.CONNECTED,
- path=path)
- child_watches.append((parents, event))
- if created:
- event = k_states.WatchedEvent(type=k_states.EventType.CREATED,
- state=k_states.KeeperState.CONNECTED,
- path=path)
- data_watches.append(([path], event))
- return (path, data_watches, child_watches)
-
-
class FakeClient(object):
"""A fake mostly functional/good enough kazoo compat. client
@@ -507,6 +400,115 @@ class FakeClient(object):
self._partial_client.session_id = None
+class _PartialClient(object):
+ """An internal *only* client that returns the watches to be triggered."""
+
+ def __init__(self, storage):
+ self.storage = storage
+ self.session_id = None
+
+ def delete(self, path, version=-1, recursive=False):
+ if not isinstance(path, six.string_types):
+ raise TypeError("path must be a string")
+ data_watches = []
+ child_watches = []
+ path = utils.normpath(path)
+ with self.storage.lock:
+ if path not in self.storage:
+ raise k_exceptions.NoNodeError("No path %s" % (path))
+ path_version = self.storage[path]['version']
+ if version != -1 and path_version != version:
+ raise k_exceptions.BadVersionError("Version mismatch"
+ " (%s != %s)"
+ % (version, path_version))
+
+ if recursive:
+ paths = [path]
+ children = self.storage.get_children(path, only_direct=False)
+ for p in six.iterkeys(children):
+ paths.append(p)
+ else:
+ children = self.storage.get_children(path, only_direct=False)
+ if children:
+ raise k_exceptions.NotEmptyError("Path %s is not-empty"
+ " (%s children exist)"
+ % (path, len(children)))
+ paths = [path]
+ paths = list(reversed(sorted(set(paths))))
+ for p in paths:
+ self.storage.pop(p)
+ parents = []
+ for p in paths:
+ parents.extend(self.storage.get_parents(p))
+ parents = list(reversed(sorted(set(parents))))
+ for p in parents:
+ event = k_states.WatchedEvent(
+ type=k_states.EventType.DELETED,
+ state=k_states.KeeperState.CONNECTED,
+ path=p)
+ child_watches.append(([p], event))
+ for p in paths:
+ event = k_states.WatchedEvent(
+ type=k_states.EventType.DELETED,
+ state=k_states.KeeperState.CONNECTED,
+ path=p)
+ data_watches.append(([p], event))
+ return (True, data_watches, child_watches)
+
+ def set(self, path, value, version=-1):
+ if not isinstance(path, six.string_types):
+ raise TypeError("path must be a string")
+ if not isinstance(value, six.binary_type):
+ raise TypeError("value must be a byte string")
+ if not isinstance(version, int):
+ raise TypeError("version must be an int")
+ path = utils.normpath(path)
+ try:
+ stat = self.storage.set(path, value, version=version)
+ except KeyError:
+ raise k_exceptions.NoNodeError("No path %s" % (path))
+ data_watches = []
+ child_watches = []
+ event = k_states.WatchedEvent(type=k_states.EventType.CHANGED,
+ state=k_states.KeeperState.CONNECTED,
+ path=path)
+ data_watches.append(([path], event))
+ return (stat, data_watches, child_watches)
+
+ def create(self, path, value=b"", acl=None,
+ ephemeral=False, sequence=False, makepath=False):
+ if not isinstance(path, six.string_types):
+ raise TypeError("path must be a string")
+ if not isinstance(value, six.binary_type):
+ raise TypeError("value must be a byte string")
+ if acl:
+ raise NotImplementedError(_NO_ACL_MSG)
+ data_watches = []
+ child_watches = []
+ with self.storage.lock:
+ path = utils.normpath(path)
+ if makepath:
+ for parent_path in utils.partition_path(path)[0:-1]:
+ if parent_path not in self.storage:
+ result = self.create(parent_path)
+ data_watches.extend(result[1])
+ child_watches.extend(result[2])
+ created, parents, path = self.storage.create(
+ path, value=value, sequence=sequence,
+ ephemeral=ephemeral, session_id=self.session_id)
+ if parents:
+ event = k_states.WatchedEvent(type=k_states.EventType.CHILD,
+ state=k_states.KeeperState.CONNECTED,
+ path=path)
+ child_watches.append((parents, event))
+ if created:
+ event = k_states.WatchedEvent(type=k_states.EventType.CREATED,
+ state=k_states.KeeperState.CONNECTED,
+ path=path)
+ data_watches.append(([path], event))
+ return (path, data_watches, child_watches)
+
+
class StopTransaction(Exception):
pass