summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Bangert <ben@groovie.org>2012-08-28 15:43:42 -0700
committerBen Bangert <ben@groovie.org>2012-08-28 15:43:42 -0700
commit72c90e919096c852310325f7d16d1d8b78f1e411 (patch)
tree41be87b192de269ddfa7d8edcfbe3f8749468e2b
parent0c900ce5508be546d1f8efb1217f78a0b71cdec1 (diff)
downloadkazoo-72c90e919096c852310325f7d16d1d8b78f1e411.tar.gz
Most of the main refactor complete, basic connection and ping handling working with start/stop.
-rw-r--r--kazoo/client.py415
-rw-r--r--kazoo/exceptions.py120
-rw-r--r--kazoo/handlers/threading.py4
-rw-r--r--kazoo/hosts.py35
-rw-r--r--kazoo/protocol/__init__.py240
-rw-r--r--kazoo/protocol/paths.py52
-rw-r--r--kazoo/protocol/serialization.py73
-rw-r--r--kazoo/protocol/states.py235
-rw-r--r--kazoo/recipe/barrier.py14
-rw-r--r--kazoo/recipe/lock.py4
-rw-r--r--kazoo/recipe/party.py8
-rw-r--r--kazoo/recipe/watchers.py4
-rw-r--r--kazoo/retry.py17
13 files changed, 643 insertions, 578 deletions
diff --git a/kazoo/client.py b/kazoo/client.py
index 3b075b4..8f7fda5 100644
--- a/kazoo/client.py
+++ b/kazoo/client.py
@@ -1,190 +1,28 @@
"""Kazoo Zookeeper Client"""
import inspect
import logging
-import random
-from collections import namedtuple
+from collections import defaultdict
from functools import partial
from os.path import split
-from kazoo.exceptions import BadArgumentsException
+from kazoo.exceptions import NoNodeError
from kazoo.exceptions import ConfigurationError
-from kazoo.exceptions import ZookeeperStoppedError
-from kazoo.exceptions import NoNodeException
-from kazoo.exceptions import err_to_exception
from kazoo.handlers.threading import SequentialThreadingHandler
from kazoo.recipe.lock import Lock
from kazoo.recipe.party import Party
from kazoo.recipe.party import ShallowParty
from kazoo.recipe.election import Election
+from kazoo.protocol.states import KazooState
+from kazoo.protocol.states import KeeperState
+from kazoo.protocol import proto_writer
from kazoo.retry import KazooRetry
+from kazoo.hosts import collect_hosts
+from kazoo.protocol.serialization import Close
+from kazoo.protocol.paths import normpath
log = logging.getLogger(__name__)
-class WatchedEvent(namedtuple('WatchedEvent', ('type', 'state', 'path'))):
- """A change on ZooKeeper that a Watcher is able to respond to.
-
- The :class:`WatchedEvent` includes exactly what happened, the
- current state of ZooKeeper, and the path of the znode that was
- involved in the event. An instance of :class:`WatchedEvent` will be
- passed to registered watch functions.
-
- .. attribute:: type
-
- A :class:`EventType` attribute indicating the event type.
-
- .. attribute:: state
-
- A :class:`KeeperState` attribute indicating the Zookeeper
- state.
-
- .. attribute:: path
-
- The path of the node for the watch event.
-
- """
-
-
-class ZnodeStat(namedtuple('ZnodeStat', ('aversion', 'ctime', 'cversion',
- 'czxid', 'dataLength',
- 'ephemeralOwner', 'mtime', 'mzxid',
- 'numChildren', 'pzxid', 'version'))):
- """A ZnodeStat structure with convenience properties
-
- When getting the value of a node from Zookeeper, the properties for
- the node known as a "Stat structure" will be retrieved. The
- :class:`ZnodeStat` object provides access to the standard Stat
- properties and additional properties that are more readable and use
- Python time semantics (seconds since epoch instead of ms).
-
- .. note::
-
- The original Zookeeper Stat name is in parens next to the name
- when it differs from the convenience attribute. These are **not
- functions**, just attributes.
-
- .. attribute:: creation_transaction_id (czxid)
-
- The transaction id of the change that caused this znode to be
- created.
-
- .. attribute:: last_modified_transaction_id (mzxid)
-
- The transaction id of the change that last modified this znode.
-
- .. attribute:: created (ctime)
-
- The time in seconds from epoch when this node was created.
- (ctime is in milliseconds)
-
- .. attribute:: last_modified (mtime)
-
- The time in seconds from epoch when this znode was last
- modified. (mtime is in milliseconds)
-
- .. attribute:: version
-
- The number of changes to the data of this znode.
-
- .. attribute:: acl_version (aversion)
-
- The number of changes to the ACL of this znode.
-
- .. attribute:: owner_session_id (ephemeralOwner)
-
- The session id of the owner of this znode if the znode is an
- ephemeral node. If it is not an ephemeral node, it will be
- `None`. (ephemeralOwner will be 0 if it is not ephemeral)
-
- .. attribute:: data_length (dataLength)
-
- The length of the data field of this znode.
-
- .. attribute:: children_count (numChildren)
-
- The number of children of this znode.
-
- """
- @property
- def acl_version(self):
- return self.aversion
-
- @property
- def children_version(self):
- return self.cversion
-
- @property
- def created(self):
- return self.ctime / 1000.0
-
- @property
- def last_modified(self):
- return self.mtime / 1000.0
-
- @property
- def owner_session_id(self):
- return self.ephemeralOwner or None
-
- @property
- def creation_transaction_id(self):
- return self.czxid
-
- @property
- def last_modified_transaction_id(self):
- return self.mzxid
-
- @property
- def data_length(self):
- return self.dataLength
-
- @property
- def children_count(self):
- return self.numChildren
-
-
-class Callback(namedtuple('Callback', ('type', 'func', 'args'))):
- """A callback that is handed to a handler for dispatch
-
- :param type: Type of the callback, can be 'session' or 'watch'
- :param func: Callback function
- :param args: Argument list for the callback function
-
- """
-
-
-## Client Callbacks
-
-def _generic_callback(async_result, handle, code, *args):
- if code != zookeeper.OK:
- exc = err_to_exception(code)
- async_result.set_exception(exc)
- else:
- if not args:
- result = None
- elif len(args) == 1:
- if isinstance(args[0], dict):
- # It's a node struct, put it in a ZnodeStat
- result = ZnodeStat(**args[0])
- else:
- result = args[0]
- else:
- # if there's two, the second is a stat object
- args = list(args)
- if len(args) == 2 and isinstance(args[1], dict):
- args[1] = ZnodeStat(**args[1])
- result = tuple(args)
-
- async_result.set(result)
-
-
-def _exists_callback(async_result, handle, code, stat):
- if code not in (zookeeper.OK, zookeeper.NONODE):
- exc = err_to_exception(code)
- async_result.set_exception(exc)
- else:
- async_result.set(stat)
-
-
class KazooClient(object):
"""An Apache Zookeeper Python wrapper supporting alternate callback
handlers and high-level functionality
@@ -197,7 +35,7 @@ class KazooClient(object):
def __init__(self, hosts='127.0.0.1:2181', watcher=None,
timeout=10.0, client_id=None, max_retries=None, retry_delay=0.1,
retry_backoff=2, retry_jitter=0.8, handler=None,
- default_acl=None):
+ default_acl=None, read_only=None):
"""Create a KazooClient instance. All time arguments are in seconds.
:param hosts: Comma-separated list of hosts to connect to
@@ -226,46 +64,47 @@ class KazooClient(object):
and reconnects.
"""
- from kazoo.recipe.barrier import Barrier
- from kazoo.recipe.barrier import DoubleBarrier
- from kazoo.recipe.partitioner import SetPartitioner
- from kazoo.recipe.watchers import ChildrenWatch
- from kazoo.recipe.watchers import DataWatch
-
# Record the handler strategy used
- self._handler = handler if handler else SequentialThreadingHandler()
- self.handler = self._handler
+ self.handler = handler if handler else SequentialThreadingHandler()
+ if inspect.isclass(self.handler):
+ raise ConfigurationError("Handler must be an instance of a class, "
+ "not the class: %s" % self.handler)
- # Check for chroot
- chroot_check = hosts.split('/', 1)
- if len(chroot_check) == 2:
- self.chroot = '/' + chroot_check[1]
+ self.default_acl = default_acl
+ self.hosts, chroot = collect_hosts(hosts)
+ if chroot:
+ self.chroot = normpath(chroot)
+ if not self.chroot.startswith('/'):
+ raise ValueError('chroot not absolute')
else:
- self.chroot = None
+ self.chroot = ''
- # remove any trailing slashes
- self.default_acl = default_acl
+ self.last_zxid = 0
+ self.read_only = read_only
+ self._session_id = None
- self._hosts = hosts
- self._watcher = watcher
- self._provided_client_id = client_id
+ if client_id:
+ self._session_id = client_id[0]
+ self._session_passwd = client_id[1]
+ else:
+ self._session_id = None
+ self._session_passwd = str(bytearray([0] * 16))
# ZK uses milliseconds
- self._timeout = int(timeout * 1000)
-
- if inspect.isclass(self.handler):
- raise ConfigurationError("Handler must be an instance of a class, "
- "not the class: %s" % self.handler)
-
- self._handle = None
+ self._session_timeout = int(timeout * 1000)
# We use events like twitter's client to track current and desired
# state (connected, and whether to shutdown)
self._live = self.handler.event_object()
+ self._writer_stopped = self.handler.event_object()
self._stopped = self.handler.event_object()
self._stopped.set()
- self._connection_timed_out = False
+ self._queue = self.handler.peekable_queue()
+ self._pending = self.handler.peekable_queue()
+
+ self._child_watchers = defaultdict(set)
+ self._data_watchers = defaultdict(set)
self.retry = KazooRetry(
max_tries=max_retries,
@@ -274,19 +113,22 @@ class KazooClient(object):
max_jitter=retry_jitter,
sleep_func=self.handler.sleep_func
)
-
- self.max_retries = max_retries
- self.retry_delay = retry_delay
- self.retry_backoff = retry_backoff
- self.retry_jitter = int(retry_jitter * 100)
- self._connection_attempts = None
+ self.retry_sleeper = self.retry.retry_sleeper.copy()
# Curator like simplified state tracking, and listeners for state
# transitions
+ self._state_lock = self.handler.rlock_object()
+ self._state = KeeperState.CLOSED
self.state = KazooState.LOST
self.state_listeners = set()
# convenience API
+ from kazoo.recipe.barrier import Barrier
+ from kazoo.recipe.barrier import DoubleBarrier
+ from kazoo.recipe.partitioner import SetPartitioner
+ from kazoo.recipe.watchers import ChildrenWatch
+ from kazoo.recipe.watchers import DataWatch
+
self.Barrier = partial(Barrier, self)
self.DoubleBarrier = partial(DoubleBarrier, self)
self.ChildrenWatch = partial(ChildrenWatch, self)
@@ -297,48 +139,6 @@ class KazooClient(object):
self.SetPartitioner = partial(SetPartitioner, self)
self.ShallowParty = partial(ShallowParty, self)
- def _safe_call(self, func, async_result, *args, **kwargs):
- """Safely call a zookeeper function and handle errors related
- to a bad zhandle and state bugs
-
- In older zkpython bindings, a SystemError can arise if some
- functions are called when the session handle is expired. This
- client clears the old handle as soon as possible, but its
- possible a command may run against the old handle that is
- expired resulting in this error being thrown. See
- https://issues.apache.org/jira/browse/ZOOKEEPER-1318
-
- """
- try:
- return func(self._handle, *args)
- except BadArgumentsException:
- # Validate the path to throw a better except
- if isinstance(args[0], basestring) and not args[0].startswith('/'):
- raise ValueError("invalid path '%s'. must start with /" %
- args[0])
- else:
- raise
- except (TypeError, SystemError) as exc:
- # Handle was cleared or isn't set. If it was cleared and we are
- # supposed to be running, it means we had session expiration and
- # are attempting to reconnect. We treat it as a session expiration
- # in that case for appropriate behavior.
- if self._stopped.is_set():
- async_result.set_exception(
- ZookeeperStoppedError(
- "The Kazoo client is stopped, call connect before running "
- "commands that talk to Zookeeper"))
- elif isinstance(exc, SystemError):
- # Set this to the error it should be for appropriate handling
- async_result.set_exception(
- self.zookeeper.InvalidStateException(
- "invalid handle state"))
- elif self._handle is None or 'an integer is required' in exc:
- async_result.set_exception(
- self.zookeeper.SessionExpiredException("session expired"))
- else:
- raise
-
def add_listener(self, listener):
"""Add a function to be called for connection state changes
@@ -362,31 +162,10 @@ class KazooClient(object):
@property
def client_id(self):
"""Returns the client id for this Zookeeper session if connected"""
- if self._handle is not None:
- return self.zookeeper.client_id(self._handle)
+ if self._live.is_set():
+ return (self._session_id, self._session_passwd)
return None
- def _wrap_session_callback(self, func):
- def wrapper(handle, type, state, path):
- callback = Callback('session', func, (handle, type, state, path))
- self.handler.dispatch_callback(callback)
- return wrapper
-
- def _wrap_watch_callback(self, func):
- def func_wrapper(*args):
- try:
- func(*args)
- except Exception:
- log.exception("Exception in kazoo callback <func: %s>" % func)
-
- def wrapper(handle, type, state, path):
- # don't send session events to all watchers
- if type != self.zookeeper.SESSION_EVENT:
- event = WatchedEvent(type, state, path)
- callback = Callback('watch', func_wrapper, (event,))
- self.handler.dispatch_callback(callback)
- return wrapper
-
def _make_state_change(self, state):
# skip if state is current
if self.state == state:
@@ -403,68 +182,43 @@ class KazooClient(object):
except Exception:
log.exception("Error in connection state listener")
- def _session_callback(self, handle, type, state, path):
- log.debug("Client Instance: %s, Handle: %s, Type: %s, State: %s"
- ", Path: %s", id(self), handle, ZK_TYPES.get(type, type),
- ZK_STATES.get(state, state),
- path)
-
- if type != EventType.SESSION:
+ def _session_callback(self, state):
+ if self._stopped.is_set():
+ # Any events at this point can be ignored
return
- if self._handle != handle:
- try:
- # latent handle callback from previous connection
- self.zookeeper.close(handle)
- except Exception:
- pass
+ if state == self._state:
return
- if self._stopped.is_set():
- # Any events at this point can be ignored
- return
+ with self._state_lock:
+ self._state = state
if state == KeeperState.CONNECTED:
log.info("Zookeeper connection established")
self._live.set()
- self._connection_attempts = None
-
- # Clear the client id when we successfully connect
- self._provided_client_id = None
-
self._make_state_change(KazooState.CONNECTED)
elif state in (KeeperState.EXPIRED_SESSION,
KeeperState.AUTH_FAILED):
log.info("Zookeeper session lost, state: %s", state)
self._live.clear()
self._make_state_change(KazooState.LOST)
- self._handle = None
-
- # This session callback already runs in the handler so
- # its safe to spawn the start_async call so this function
- # can return immediately
- self.handler.spawn(self._reconnect)
else:
log.info("Zookeeper connection lost")
# Connection lost
self._live.clear()
self._make_state_change(KazooState.SUSPENDED)
- if self._watcher:
- self._watcher(WatchedEvent(type, state, path))
-
def _safe_close(self):
- if self._handle is not None:
- # Stop the handler
- self.handler.stop()
- zh, self._handle = self._handle, None
+ self.handler.stop()
+
+ if not self._writer_stopped.is_set():
try:
- self.zookeeper.close(zh)
- except self.zookeeper.ZooKeeperException:
- # Corrupt session or otherwise disconnected
- pass
- self._live.clear()
- self._make_state_change(KazooState.LOST)
+ self._writer_stopped.wait(10)
+ except self.handler.timeout_exception:
+ raise Exception("Writer still open from prior connection"
+ " and wouldn't close after 10 seconds")
+
+ self._make_state_change(KazooState.LOST)
def start_async(self):
"""Asynchronously initiate connection to ZK
@@ -481,36 +235,17 @@ class KazooClient(object):
# Make sure we're safely closed
self._safe_close()
- # We've been asked to connect, clear the stop
+ # We've been asked to connect, clear the stop and our writer
+ # thread indicator
self._stopped.clear()
+ self._writer_stopped.clear()
# Start the handler
self.handler.start()
- cb = self._wrap_session_callback(self._session_callback)
- self._connection_attempts = 1
- self._connection_delay = self.retry_delay
- if self._provided_client_id:
- self._handle = self.zookeeper.init(self._hosts, cb, self._timeout,
- self._provided_client_id)
- else:
- self._handle = self.zookeeper.init(self._hosts, cb, self._timeout)
+ # Start the connection writer to establish the connection
+ self.handler.spawn(proto_writer, self)
return self._live
- connect_async = start_async
-
- def _reconnect(self):
- """Reconnect to Zookeeper, staggering connection attempts"""
- if not self._connection_attempts:
- self._connection_attempts = 1
- self._connection_delay = self.retry_delay
- else:
- if self._connection_attempts == self.max_retries:
- raise self.handler.timeout_exception("Time out reconnecting")
- self._connection_attempts += 1
- jitter = random.randint(0, self.retry_jitter) / 100.0
- self.handler.sleep_func(self._connection_delay + jitter)
- self._connection_delay *= self.retry_delay
- self.start_async()
def start(self, timeout=15):
"""Initiate connection to ZK
@@ -528,17 +263,19 @@ class KazooClient(object):
# We time-out, ensure we are disconnected
self.stop()
raise self.handler.timeout_exception("Connection time-out")
- connect = start
def stop(self):
"""Gracefully stop this Zookeeper session"""
+ if self._stopped.is_set():
+ return
+
self._stopped.set()
+ self._queue.put((Close(), None))
self._safe_close()
def restart(self):
"""Stop and restart the Zookeeper session."""
- self._safe_close()
- self._stopped.clear()
+ self.stop()
self.start()
def add_auth_async(self, scheme, credential):
@@ -590,7 +327,7 @@ class KazooClient(object):
self._inner_ensure_path(parent, acl)
try:
self.create_async(path, "", acl=acl).get()
- except self.zookeeper.NodeExistsException:
+ except self.zookeeper.NodeExistsError:
# someone else created the node. how sweet!
pass
@@ -657,7 +394,7 @@ class KazooClient(object):
realpath = self.create_async(path, value, acl=acl,
ephemeral=ephemeral, sequence=sequence).get()
- except NoNodeException:
+ except NoNodeError:
# some or all of the parent path doesn't exist. if makepath is set
# we will create it and retry. If it fails again, someone must be
# actively deleting ZNodes and we'd best bail out.
@@ -833,7 +570,7 @@ class KazooClient(object):
def _delete_recursive(self, path):
try:
children = self.get_children(path)
- except self.zookeeper.NoNodeException:
+ except self.zookeeper.NoNodeError:
return
if children:
@@ -846,5 +583,5 @@ class KazooClient(object):
self._delete_recursive(child_path)
try:
self.delete(path)
- except self.zookeeper.NoNodeException:
+ except self.zookeeper.NoNodeError:
pass
diff --git a/kazoo/exceptions.py b/kazoo/exceptions.py
index 5e377c6..6709d9b 100644
--- a/kazoo/exceptions.py
+++ b/kazoo/exceptions.py
@@ -7,6 +7,11 @@ class KazooException(Exception):
from"""
+class ZookeeperError(KazooException):
+ """Base Zookeeper exception for errors originating from the Zookeeper
+ server"""
+
+
class CancelledError(KazooException):
"""Raised when a process is cancelled by another thread"""
@@ -23,12 +28,119 @@ class ConnectionDropped(KazooException):
""" Internal error for jumping out of loops """
-class AuthFailedError(KazooException):
- """Authentication failed when connected"""
-
-
def _invalid_error_code():
raise RuntimeError('Invalid error code')
EXCEPTIONS = defaultdict(_invalid_error_code)
+
+
+def _zookeeper_exception(code):
+ def decorator(klass):
+ def create(*args, **kwargs):
+ return klass(args, kwargs)
+
+ EXCEPTIONS[code] = create
+ return klass
+
+ return decorator
+
+
+@_zookeeper_exception(0)
+class RolledBackError(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-1)
+class SystemZookeeperError(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-2)
+class RuntimeInconsistency(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-3)
+class DataInconsistency(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-4)
+class ConnectionLoss(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-5)
+class MarshallingError(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-6)
+class UnimplementedError(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-7)
+class OperationTimeoutError(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-8)
+class BadArgumentsError(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-100)
+class APIError(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-101)
+class NoNodeError(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-102)
+class NoAuthError(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-103)
+class BadVersionError(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-108)
+class NoChildrenForEphemeralsError(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-110)
+class NodeExistsError(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-111)
+class NotEmptyError(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-112)
+class SessionExpiredError(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-113)
+class InvalidCallbackError(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-114)
+class InvalidACLError(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-115)
+class AuthFailedError(ZookeeperError):
+ pass
diff --git a/kazoo/handlers/threading.py b/kazoo/handlers/threading.py
index 5ab6c6f..66a2aa1 100644
--- a/kazoo/handlers/threading.py
+++ b/kazoo/handlers/threading.py
@@ -285,9 +285,9 @@ class SequentialThreadingHandler(object):
self.callback_queue.put(lambda: callback.func(*callback.args))
-class _PeekableQueue(Queue):
+class _PeekableQueue(Queue.Queue):
def __init__(self, maxsize=0):
- Queue.__init__(self, maxsize=0)
+ Queue.Queue.__init__(self, maxsize=maxsize)
def peek(self, block=True, timeout=None):
"""Return the first item in the queue but do not remove it from the queue.
diff --git a/kazoo/hosts.py b/kazoo/hosts.py
new file mode 100644
index 0000000..2c21f89
--- /dev/null
+++ b/kazoo/hosts.py
@@ -0,0 +1,35 @@
+import random
+
+
+class RandomHostIterator:
+ """ An iterator that returns a randomly selected host. A host is
+ guaranteed to not be selected twice unless there is only one
+ host in the collection.
+ """
+
+ def __init__(self, hosts):
+ self.last = 0
+ self.hosts = hosts
+
+ def __iter__(self):
+ hostslist = self.hosts[:]
+ random.shuffle(hostslist)
+ for host in hostslist:
+ yield host
+
+ def __len__(self):
+ return len(self.hosts)
+
+
+def collect_hosts(hosts):
+ """ Collect a set of hosts and an optional chroot from a string.
+ """
+ host_ports, chroot = hosts.partition("/")[::2]
+ chroot = "/" + chroot if chroot else None
+
+ result = []
+ for host_port in host_ports.split(","):
+ host, port = host_port.partition(":")[::2]
+ port = int(port.strip()) if port else 2181
+ result.append((host.strip(), port))
+ return (RandomHostIterator(result), chroot)
diff --git a/kazoo/protocol/__init__.py b/kazoo/protocol/__init__.py
index 81088b9..443339a 100644
--- a/kazoo/protocol/__init__.py
+++ b/kazoo/protocol/__init__.py
@@ -6,13 +6,18 @@ from kazoo.exceptions import ConnectionDropped
from kazoo.exceptions import EXCEPTIONS
from kazoo.protocol.serialization import int_struct
from kazoo.protocol.serialization import int_int_struct
-from kazoo.protocol.serialization import deserialize_watcher_event
from kazoo.protocol.serialization import deserialize_reply_header
+from kazoo.protocol.serialization import Close
from kazoo.protocol.serialization import Connect
+from kazoo.protocol.serialization import Ping
+from kazoo.protocol.serialization import Watch
from kazoo.protocol.states import KeeperState
+from kazoo.protocol.states import WatchedEvent
+from kazoo.protocol.states import Callback
+from kazoo.protocol.states import EVENT_TYPE_MAP
+from kazoo.protocol.paths import _prefix_root
-
-log = logging.getlog(__name__)
+log = logging.getLogger(__name__)
def proto_reader(client, s, reader_started, reader_done, read_timeout):
@@ -20,7 +25,7 @@ def proto_reader(client, s, reader_started, reader_done, read_timeout):
while True:
try:
- header, buffer, offset = _read_header(s, read_timeout)
+ header, buffer, offset = _read_header(client, s, read_timeout)
if header.xid == -2:
log.debug('Received PING')
continue
@@ -28,65 +33,60 @@ def proto_reader(client, s, reader_started, reader_done, read_timeout):
log.debug('Received AUTH')
continue
elif header.xid == -1:
- log.debug('Received EVENT')
- wtype, state, path, offset = deserialize_watcher_event(
- buffer, offset)
+ watch, offset = Watch.deserialize(buffer, offset)
+ path = watch.path
+ log.debug('Received EVENT: %s', watch)
watchers = set()
with client._state_lock:
- if wtype == 1:
- watchers |= client._data_watchers.pop(path, set())
- watchers |= client._exists_watchers.pop(path, set())
-
- event = lambda: map(lambda w: w.node_created(path),
- watchers)
- elif wtype == 2:
- watchers |= client._data_watchers.pop(path, set())
- watchers |= client._exists_watchers.pop(path, set())
- watchers |= client._child_watchers.pop(path, set())
+ # Ignore watches if we've been stopped
+ if client._stopped.is_set():
+ continue
- event = lambda: map(lambda w: w.node_deleted(path),
- watchers)
- elif wtype == 3:
+ if watch.type in (1, 2, 3):
watchers |= client._data_watchers.pop(path, set())
- watchers |= client._exists_watchers.pop(path, set())
-
- event = lambda: map(lambda w: w.data_changed(path),
- watchers)
- elif wtype == 4:
+ elif watch.type == 4:
watchers |= client._child_watchers.pop(path, set())
-
- event = lambda: map(lambda w: w.children_changed(path),
- watchers)
else:
- log.warn('Received unknown event %r', type)
+ log.warn('Received unknown event %r', watch.type)
continue
-
- client._events.put(event)
+ ev = WatchedEvent(EVENT_TYPE_MAP[watch.type],
+ client._state, path)
+
+ client.handler.dispatch_callback(
+ Callback('watch',
+ lambda: map(lambda w: w(ev), watchers),
+ ()
+ )
+ )
else:
log.debug('Reading for header %r', header)
-
- request, response, callback, xid = client._pending.get()
+ request, async_object, xid = client._pending.get()
if header.zxid and header.zxid > 0:
client.last_zxid = header.zxid
if header.xid != xid:
- raise RuntimeError('xids do not match, expected %r received %r', xid, header.xid)
+ raise RuntimeError('xids do not match, expected %r '
+ 'received %r', xid, header.xid)
- callback_exception = None
if header.err:
callback_exception = EXCEPTIONS[header.err]()
log.debug('Received error %r', callback_exception)
- elif response:
- response.deserialize(buffer, 'response')
+ if async_object:
+ async_object.set_exception(callback_exception)
+ elif request and async_object:
+ response = request.deserialize(buffer, offset)
log.debug('Received response: %r', response)
+ async_object.set(response)
- try:
- callback(callback_exception)
- except Exception as e:
- log.exception(e)
+ # Determine if watchers should be registered
+ with client._state_lock:
+ if (not client._stopped.is_set() and
+ hasattr(request, 'watcher')):
+ path = _prefix_root(client.chroot, request.path)
+ request.watch_dict[path].add(request.watcher)
- if isinstance(response, CloseResponse):
+ if isinstance(request, Close):
log.debug('Read close response')
s.close()
reader_done.set()
@@ -103,14 +103,30 @@ def proto_reader(client, s, reader_started, reader_done, read_timeout):
def proto_writer(client):
log.debug('Starting writer')
-
- writer_done = False
retry = client.retry_sleeper.copy()
+ while not client._stopped.is_set():
+ log.debug("Client stopped?: %s", client._stopped.is_set())
+
+ # If the connect_loop returns False, stop retrying
+ if connect_loop(client, retry) is False:
+ break
+ # Still going, increment our retry then go through the
+ # list of hosts again
+ if not client._stopped.is_set():
+ log.debug("Incrementing and waiting")
+ retry.increment()
+ log.debug('Writer stopped')
+ client._writer_stopped.set()
+
+
+def connect_loop(client, retry):
+ writer_done = False
for host, port in client.hosts:
s = client.handler.socket()
- client._state = KeeperState.CONNECTING
+ if client._state != KeeperState.CONNECTING:
+ client._session_callback(KeeperState.CONNECTING)
try:
read_timeout, connect_timeout = _connect(
@@ -129,24 +145,24 @@ def proto_writer(client):
xid = 0
while not writer_done:
try:
- request, response, callback = client._queue.peek(
+ request, async_object = client._queue.peek(
True, read_timeout / 2000.0)
log.debug('Sending %r', request)
xid += 1
log.debug('xid: %r', xid)
- _submit(s, request, connect_timeout, xid)
+ _submit(client, s, request, connect_timeout, xid)
- if isinstance(request, CloseRequest):
+ if isinstance(request, Close):
log.debug('Received close request, closing')
writer_done = True
client._queue.get()
- client._pending.put((request, response, callback, xid))
+ client._pending.put((request, async_object, xid))
except client.handler.empty:
log.debug('Queue timeout. Sending PING')
- _submit(s, PingRequest(), connect_timeout, -2)
+ _submit(client, s, Ping, connect_timeout, -2)
except Exception as e:
log.exception(e)
break
@@ -157,18 +173,17 @@ def proto_writer(client):
if writer_done:
client._close(KeeperState.CLOSED)
- break
+ return False
except ConnectionDropped:
log.warning('Connection dropped')
- client._events.put(lambda: client._default_watcher.connection_dropped())
- retry.increment()
+ client._session_callback(KeeperState.CONNECTING)
except AuthFailedError:
log.warning('AUTH_FAILED closing')
- client._close(KeeperState.AUTH_FAILED)
- break
+ client._session_callback(KeeperState.AUTH_FAILED)
+ return False
except Exception as e:
- log.warning(e)
- retry.increment()
+ log.exception(e)
+ raise
finally:
if not writer_done:
# The read thread will close the socket since there
@@ -176,13 +191,11 @@ def proto_writer(client):
# still needs to be read from the socket.
s.close()
- log.debug('Writer stopped')
-
def _connect(client, s, host, port):
log.info('Connecting to %s:%s', host, port)
log.debug(' Using session_id: %r session_passwd: 0x%s',
- client.session_id, _hex(client.session_passwd))
+ client._session_id, client._session_passwd.encode('hex'))
s.connect((host, port))
s.setblocking(0)
@@ -192,53 +205,54 @@ def _connect(client, s, host, port):
connect = Connect(
0,
client.last_zxid,
- int(client.session_timeout * 1000),
- client.session_passwd,
+ client._session_timeout,
+ client._session_id or 0,
+ client._session_passwd,
client.read_only)
- connect_result, zxid = _invoke(client, s, client.session_timeout, None,
- connect.serialize(), connect.deserialize)
+ connect_result, zxid = _invoke(client, s, client._session_timeout, connect)
if connect_result.time_out < 0:
log.error('Session expired')
- client._events.put(lambda: client._default_watcher.session_expired(client.session_id))
- client._state = KeeperState.EXPIRED_SESSION
+ client._session_callback(KeeperState.EXPIRED_SESSION)
raise RuntimeError('Session expired')
if zxid:
client.last_zxid = zxid
- client.session_id = connect_result.session_id
+
+ # Load return values
+ client._session_id = connect_result.session_id
negotiated_session_timeout = connect_result.time_out
connect_timeout = negotiated_session_timeout / len(client.hosts)
read_timeout = negotiated_session_timeout * 2.0 / 3.0
- client.session_passwd = connect_result.passwd
- log.debug('Session created, session_id: %r session_passwd: 0x%s', client.session_id, _hex(client.session_passwd))
- log.debug(' negotiated session timeout: %s', negotiated_session_timeout)
- log.debug(' connect timeout: %s', connect_timeout)
- log.debug(' read timeout: %s', read_timeout)
- client._events.put(lambda: client._default_watcher.session_connected(client.session_id, client.session_passwd, client.read_only))
- client._state = KeeperState.CONNECTED
-
- for scheme, auth in client.auth_data:
- ap = AuthPacket(0, scheme, auth)
- zxid = _invoke(s, connect_timeout, ap, xid=-4)
- if zxid:
- client.last_zxid = zxid
+ client._session_passwd = connect_result.passwd
+ log.debug('Session created, session_id: %r session_passwd: 0x%s\n'
+ ' negotiated session timeout: %s\n'
+ ' connect timeout: %s\n'
+ ' read timeout: %s', client._session_id,
+ client._session_passwd.encode('hex'), negotiated_session_timeout,
+ connect_timeout, read_timeout)
+ client._session_callback(KeeperState.CONNECTED)
+
+ # for scheme, auth in client.auth_data:
+ # ap = AuthPacket(0, scheme, auth)
+ # zxid = _invoke(s, connect_timeout, ap, xid=-4)
+ # if zxid:
+ # client.last_zxid = zxid
return read_timeout, connect_timeout
-def _invoke(client, socket, timeout, request_type, request_bytes,
- response_deserializer=None, xid=None):
+def _invoke(client, socket, timeout, request, xid=None):
b = bytearray()
- if xid and request_type:
- b.extend(int_int_struct.pack(xid, request_type))
- elif xid:
+ if xid:
b.extend(int_struct.pack(xid))
- elif request_type:
- b.extend(int_struct.pack(request_type))
- b.extend(request_bytes)
- b = int_struct.pack(len(b)) + b
- _write(client, socket, b, timeout)
+ if request.type:
+ b.extend(int_struct.pack(request.type))
+ b.extend(request.serialize())
+ buff = int_struct.pack(len(b)) + b
+
+ _write(client, socket, buff, timeout)
+ log.debug("Wrote out initial request")
zxid = None
if xid:
@@ -254,39 +268,39 @@ def _invoke(client, socket, timeout, request_type, request_bytes,
raise callback_exception
return zxid
- msg = _read(socket, 4, timeout)
- length = int_struct.unpack_from(msg, 0)[0]
+ msg = _read(client, socket, 4, timeout)
+ length = int_struct.unpack(msg)[0]
- msg = _read(socket, length, timeout)
+ log.debug("Reading full packet")
+ msg = _read(client, socket, length, timeout)
- if response_deserializer:
- log.debug('Read response %s', response_deserializer(msg))
- return response_deserializer(msg), zxid
+ if hasattr(request, 'deserialize'):
+ obj, _ = request.deserialize(msg, 0)
+ log.debug('Read response %s', obj)
+ return obj, zxid
return zxid
-def _hex(bindata):
- return bindata.encode('hex')
-
-
-def _submit(client, socket, request_type, request_buffer, timeout, xid=None):
+def _submit(client, socket, request, timeout, xid=None):
b = bytearray()
b.extend(int_struct.pack(xid))
- if request_type:
- b.extend(int_struct.pack(request_type))
- b += request_buffer
+ if request.type:
+ b.extend(int_struct.pack(request.type))
+ b += request.serialize()
b = int_struct.pack(len(b)) + b
_write(client, socket, b, timeout)
def _write(client, socket, msg, timeout):
sent = 0
+ msg_length = len(msg)
select = client.handler.select
- while sent < len(msg):
+ while sent < msg_length:
_, ready_to_write, _ = select([], [socket], [], timeout)
- bytes_sent = ready_to_write[0].send(msg[sent:])
- if not sent:
+ msg_slice = buffer(msg, sent)
+ bytes_sent = ready_to_write[0].send(msg_slice)
+ if not bytes_sent:
raise ConnectionDropped('socket connection broken')
sent += bytes_sent
@@ -300,12 +314,14 @@ def _read_header(client, socket, timeout):
def _read(client, socket, length, timeout):
- msg = ''
+ msgparts = []
+ remaining = length
select = client.handler.select
- while len(msg) < length:
+ while remaining > 0:
ready_to_read, _, _ = select([socket], [], [], timeout)
- chunk = ready_to_read[0].recv(length - len(msg))
+ chunk = ready_to_read[0].recv(remaining)
if chunk == '':
raise ConnectionDropped('socket connection broken')
- msg = msg + chunk
- return msg
+ msgparts.append(chunk)
+ remaining -= len(chunk)
+ return b"".join(msgparts)
diff --git a/kazoo/protocol/paths.py b/kazoo/protocol/paths.py
new file mode 100644
index 0000000..ba5d47d
--- /dev/null
+++ b/kazoo/protocol/paths.py
@@ -0,0 +1,52 @@
+def normpath(path):
+ """Normalize path, eliminating double slashes, etc.
+ """
+ comps = path.split('/')
+ new_comps = []
+ for comp in comps:
+ if comp == '':
+ continue
+ if comp in ('.', '..'):
+ raise ValueError('relative paths not allowed')
+ new_comps.append(comp)
+ slash = u'/' if isinstance(path, unicode) else '/'
+ new_path = slash.join(new_comps)
+ if path.startswith('/'):
+ return slash + new_path
+ return new_path
+
+
+def join(a, *p):
+ """Join two or more pathname components, inserting '/' as needed.
+ If any component is an absolute path, all previous path components
+ will be discarded.
+ """
+ path = a
+ for b in p:
+ if b.startswith('/'):
+ path = b
+ elif path == '' or path.endswith('/'):
+ path += b
+ else:
+ path += '/' + b
+ return path
+
+
+def isabs(s):
+ """Test whether a path is absolute. """
+ return s.startswith('/')
+
+
+def basename(p):
+ """Returns the final component of a pathname"""
+ i = p.rfind('/') + 1
+ return p[i:]
+
+
+def _prefix_root(root, path):
+ """ Prepend a root to a path. """
+ return normpath(join(_norm_root(root), path.lstrip('/')))
+
+
+def _norm_root(root):
+ return normpath(join('/', root))
diff --git a/kazoo/protocol/serialization.py b/kazoo/protocol/serialization.py
index 759e2e6..0503a60 100644
--- a/kazoo/protocol/serialization.py
+++ b/kazoo/protocol/serialization.py
@@ -33,19 +33,67 @@ def write_buffer(bytes):
return int_struct.pack(len(bytes)) + bytes
-class Connect(namedtuple('Connect', 'protocol_version', 'last_zxid_seen',
- 'time_out', 'session_id', 'passwd', 'read_only')):
+def read_buffer(bytes, offset):
+ length = int_struct.unpack_from(bytes, offset)[0]
+ offset += int_struct.size
+ if length < 0:
+ return None, offset
+ else:
+ index = offset
+ offset += length
+ return bytes[index:index + length], offset
+
+
+class Connect(namedtuple('Connect', 'protocol_version last_zxid_seen'
+ ' time_out session_id passwd read_only')):
"""A connection request"""
+ type = None
+
def serialize(self):
- b = int_long_int_long_struct.pack(
+ b = bytearray()
+ b.extend(int_long_int_long_struct.pack(
self.protocol_version, self.last_zxid_seen, self.time_out,
- self.session_id)
- b += write_buffer(self.passwd)
- b += 1 if self.read_only else 0
+ self.session_id))
+ b.extend(write_buffer(self.passwd))
+ b.extend([1 if self.read_only else 0])
return b
- def deserialize(buffer, offset):
- pass
+ @classmethod
+ def deserialize(cls, bytes, offset):
+ proto_version, timeout, session_id = int_int_long_struct.unpack_from(
+ bytes, offset)
+ offset += int_int_long_struct.size
+ password, offset = read_buffer(bytes, offset)
+ return cls(proto_version, 0, timeout, session_id, password, 0), offset
+
+
+class Close(object):
+ __slots__ = ['type']
+ type = -11
+
+ @classmethod
+ def serialize(cls):
+ return ''
+
+
+class Ping(object):
+ __slots__ = ['type']
+ type = 11
+
+ @classmethod
+ def serialize(cls):
+ return ''
+
+
+class Watch(namedtuple('Watch', 'type state path')):
+ @classmethod
+ def deserialize(cls, buffer, offset):
+ """Given a buffer and the current buffer offset, return the type,
+ state, path, and new offset"""
+ type, state = int_int_struct.unpack_from(buffer, offset)
+ offset += int_int_struct.size
+ path, offset = read_string(buffer, offset)
+ return cls(type, state, path), offset
def deserialize_reply_header(buffer, offset):
@@ -54,12 +102,3 @@ def deserialize_reply_header(buffer, offset):
new_offset = offset + reply_header_struct.size
return ReplyHeader._make(
reply_header_struct.unpack_from(buffer, offset)), new_offset
-
-
-def deserialize_watcher_event(buffer, offset):
- """Given a buffer and the current buffer offset, return the type,
- state, path, and new offset"""
- type, state = int_int_struct.unpack_from(buffer, offset)
- offset += int_int_struct.size
- path, offset = read_string(buffer, offset)
- return type, state, path, offset
diff --git a/kazoo/protocol/states.py b/kazoo/protocol/states.py
index 75e60a7..059a5bc 100644
--- a/kazoo/protocol/states.py
+++ b/kazoo/protocol/states.py
@@ -1,4 +1,5 @@
"""Kazoo State and Event objects"""
+from collections import namedtuple
class KazooState(object):
@@ -26,62 +27,6 @@ class KazooState(object):
LOST = "LOST"
-class State(object):
- def __init__(self, code, description):
- self.code = code
- self.description = description
-
- def __eq__(self, other):
- return self.code == other.code
-
- def __hash__(self):
- return hash(self.code)
-
- def __str__(self):
- return self.code
-
- def __repr__(self):
- return '%s()' % self.__class__.__name__
-
-
-class Connecting(State):
- def __init__(self):
- super(Connecting, self).__init__('CONNECTING', 'Connecting')
-
-
-class Connected(State):
- def __init__(self):
- super(Connected, self).__init__('CONNECTED', 'Connected')
-
-
-class ConnectedRO(State):
- def __init__(self):
- super(ConnectedRO, self).__init__('CONNECTED_RO', 'Connected Read-Only')
-
-
-class AuthFailed(State):
- def __init__(self):
- super(AuthFailed, self).__init__('AUTH_FAILED', 'Authorization Failed')
-
-
-class Closed(State):
- def __init__(self):
- super(Closed, self).__init__('CLOSED', 'Closed')
-
-
-class ExpiredSession(State):
- def __init__(self):
- super(ExpiredSession, self).__init__('EXPIRED_SESSION', 'Expired Session')
-
-
-CONNECTING = Connecting()
-CONNECTED = Connected()
-CONNECTED_RO = ConnectedRO()
-AUTH_FAILED = AuthFailed()
-CLOSED = Closed()
-EXPIRED_SESSION = ExpiredSession()
-
-
class KeeperState(object):
"""Zookeeper State
@@ -100,6 +45,10 @@ class KeeperState(object):
Zookeeper is connected.
+ .. attribute:: CONNECTED_RO
+
+ Zookeeper is connected in read-only state.
+
.. attribute:: CONNECTING
Zookeeper is currently attempting to establish a connection.
@@ -110,11 +59,12 @@ class KeeperState(object):
gone.
"""
- AUTH_FAILED = AUTH_FAILED
- CONNECTED = CONNECTED
- CONNECTING = CONNECTING
- CLOSED = CLOSED
- EXPIRED_SESSION = EXPIRED_SESSION
+ AUTH_FAILED = 'AUTH_FAILED'
+ CONNECTED = 'CONNECTED'
+ CONNECTED_RO = 'CONNECTED_RO'
+ CONNECTING = 'CONNECTING'
+ CLOSED = 'CLOSED'
+ EXPIRED_SESSION = 'EXPIRED_SESSION'
class EventType(object):
@@ -124,21 +74,6 @@ class EventType(object):
will receive a :class:`EventType` attribute as their event
argument.
- .. attribute:: NOTWATCHING
-
- This event type was added to Zookeeper in the event that
- watches get overloaded. It's never been used though and will
- likely be removed in a future Zookeeper version. **This event
- will never actually be set, don't bother testing for it.**
-
- .. attribute:: SESSION
-
- A Zookeeper session event. Watch functions do not receive
- session events. A session event watch can be registered with
- :class:`KazooClient` during creation that can receive these
- events. It's recommended to add a listener for connection state
- changes instead.
-
.. attribute:: CREATED
A node has been created.
@@ -158,9 +93,145 @@ class EventType(object):
node has changed, which must have its own watch established.
"""
- NOTWATCHING = zookeeper.NOTWATCHING_EVENT
- SESSION = zookeeper.SESSION_EVENT
- CREATED = zookeeper.CREATED_EVENT
- DELETED = zookeeper.DELETED_EVENT
- CHANGED = zookeeper.CHANGED_EVENT
- CHILD = zookeeper.CHILD_EVENT
+ CREATED = 'CREATED'
+ DELETED = 'DELETED'
+ CHANGED = 'CHANGED'
+ CHILD = 'CHILD'
+
+EVENT_TYPE_MAP = {
+ 1: EventType.CREATED,
+ 2: EventType.DELETED,
+ 3: EventType.CHANGED,
+ 4: EventType.CHILD
+}
+
+
+class WatchedEvent(namedtuple('WatchedEvent', ('type', 'state', 'path'))):
+ """A change on ZooKeeper that a Watcher is able to respond to.
+
+ The :class:`WatchedEvent` includes exactly what happened, the
+ current state of ZooKeeper, and the path of the znode that was
+ involved in the event. An instance of :class:`WatchedEvent` will be
+ passed to registered watch functions.
+
+ .. attribute:: type
+
+ A :class:`EventType` attribute indicating the event type.
+
+ .. attribute:: state
+
+ A :class:`KeeperState` attribute indicating the Zookeeper
+ state.
+
+ .. attribute:: path
+
+ The path of the node for the watch event.
+
+ """
+
+
+class Callback(namedtuple('Callback', ('type', 'func', 'args'))):
+ """A callback that is handed to a handler for dispatch
+
+ :param type: Type of the callback, can be 'session' or 'watch'
+ :param func: Callback function
+ :param args: Argument list for the callback function
+
+ """
+
+
+class ZnodeStat(namedtuple('ZnodeStat', ('aversion', 'ctime', 'cversion',
+ 'czxid', 'dataLength',
+ 'ephemeralOwner', 'mtime', 'mzxid',
+ 'numChildren', 'pzxid', 'version'))):
+ """A ZnodeStat structure with convenience properties
+
+ When getting the value of a node from Zookeeper, the properties for
+ the node known as a "Stat structure" will be retrieved. The
+ :class:`ZnodeStat` object provides access to the standard Stat
+ properties and additional properties that are more readable and use
+ Python time semantics (seconds since epoch instead of ms).
+
+ .. note::
+
+ The original Zookeeper Stat name is in parens next to the name
+ when it differs from the convenience attribute. These are **not
+ functions**, just attributes.
+
+ .. attribute:: creation_transaction_id (czxid)
+
+ The transaction id of the change that caused this znode to be
+ created.
+
+ .. attribute:: last_modified_transaction_id (mzxid)
+
+ The transaction id of the change that last modified this znode.
+
+ .. attribute:: created (ctime)
+
+ The time in seconds from epoch when this node was created.
+ (ctime is in milliseconds)
+
+ .. attribute:: last_modified (mtime)
+
+ The time in seconds from epoch when this znode was last
+ modified. (mtime is in milliseconds)
+
+ .. attribute:: version
+
+ The number of changes to the data of this znode.
+
+ .. attribute:: acl_version (aversion)
+
+ The number of changes to the ACL of this znode.
+
+ .. attribute:: owner_session_id (ephemeralOwner)
+
+ The session id of the owner of this znode if the znode is an
+ ephemeral node. If it is not an ephemeral node, it will be
+ `None`. (ephemeralOwner will be 0 if it is not ephemeral)
+
+ .. attribute:: data_length (dataLength)
+
+ The length of the data field of this znode.
+
+ .. attribute:: children_count (numChildren)
+
+ The number of children of this znode.
+
+ """
+ @property
+ def acl_version(self):
+ return self.aversion
+
+ @property
+ def children_version(self):
+ return self.cversion
+
+ @property
+ def created(self):
+ return self.ctime / 1000.0
+
+ @property
+ def last_modified(self):
+ return self.mtime / 1000.0
+
+ @property
+ def owner_session_id(self):
+ return self.ephemeralOwner or None
+
+ @property
+ def creation_transaction_id(self):
+ return self.czxid
+
+ @property
+ def last_modified_transaction_id(self):
+ return self.mzxid
+
+ @property
+ def data_length(self):
+ return self.dataLength
+
+ @property
+ def children_count(self):
+ return self.numChildren
diff --git a/kazoo/recipe/barrier.py b/kazoo/recipe/barrier.py
index deb8698..e83c4cf 100644
--- a/kazoo/recipe/barrier.py
+++ b/kazoo/recipe/barrier.py
@@ -3,9 +3,9 @@ import os
import socket
import uuid
-from kazoo.client import EventType
-from kazoo.exceptions import NoNodeException
-from kazoo.exceptions import NodeExistsException
+from kazoo.protocol.states import EventType
+from kazoo.exceptions import NoNodeError
+from kazoo.exceptions import NodeExistsError
class Barrier(object):
@@ -46,7 +46,7 @@ class Barrier(object):
try:
self.client.retry(self.client.delete, self.path)
return True
- except NoNodeException:
+ except NoNodeError:
return False
def wait(self, timeout=None):
@@ -131,7 +131,7 @@ class DoubleBarrier(object):
try:
self.client.create(self.create_path, self._identifier,
ephemeral=True)
- except NodeExistsException:
+ except NodeExistsError:
pass
def created(event):
@@ -161,7 +161,7 @@ class DoubleBarrier(object):
# Delete the ready node if its around
try:
self.client.delete(self.path + '/ready')
- except NoNodeException:
+ except NoNodeError:
pass
while True:
@@ -204,5 +204,5 @@ class DoubleBarrier(object):
def _best_effort_cleanup(self):
try:
self.client.retry(self.client.delete, self.create_path)
- except NoNodeException:
+ except NoNodeError:
pass
diff --git a/kazoo/recipe/lock.py b/kazoo/recipe/lock.py
index 01bee19..c802767 100644
--- a/kazoo/recipe/lock.py
+++ b/kazoo/recipe/lock.py
@@ -15,7 +15,7 @@ import uuid
from kazoo.retry import ForceRetryError
from kazoo.exceptions import CancelledError
-from kazoo.exceptions import NoNodeException
+from kazoo.exceptions import NoNodeError
class Lock(object):
@@ -186,7 +186,7 @@ class Lock(object):
try:
data, stat = self.client.get(self.path + "/" + child)
contenders.append(data)
- except NoNodeException:
+ except NoNodeError:
pass
return contenders
diff --git a/kazoo/recipe/party.py b/kazoo/recipe/party.py
index 5bba798..00e828f 100644
--- a/kazoo/recipe/party.py
+++ b/kazoo/recipe/party.py
@@ -5,7 +5,7 @@ used for determining members of a party.
"""
import uuid
-from kazoo.exceptions import NodeExistsException, NoNodeException
+from kazoo.exceptions import NodeExistsError, NoNodeError
class BaseParty(object):
@@ -40,7 +40,7 @@ class BaseParty(object):
try:
self.client.create(self.create_path, self.data, ephemeral=True)
self.participating = True
- except NodeExistsException:
+ except NodeExistsError:
# node was already created, perhaps we are recovering from a
# suspended connection
self.participating = True
@@ -52,7 +52,7 @@ class BaseParty(object):
def _inner_leave(self):
try:
self.client.delete(self.create_path)
- except NoNodeException:
+ except NoNodeError:
return False
return True
@@ -83,7 +83,7 @@ class Party(BaseParty):
d, _ = self.client.retry(self.client.get, self.path +
"/" + child)
yield d
- except NoNodeException:
+ except NoNodeError:
pass
def _get_children(self):
diff --git a/kazoo/recipe/watchers.py b/kazoo/recipe/watchers.py
index c9bde92..f5e1275 100644
--- a/kazoo/recipe/watchers.py
+++ b/kazoo/recipe/watchers.py
@@ -5,7 +5,7 @@ import time
from functools import partial
from kazoo.client import KazooState
-from kazoo.exceptions import NoNodeException
+from kazoo.exceptions import NoNodeError
log = logging.getLogger(__name__)
@@ -103,7 +103,7 @@ class DataWatch(object):
try:
data, stat = self._client.retry(self._client.get,
self._path, self._watcher)
- except NoNodeException:
+ except NoNodeError:
self._stopped = True
self._func(None, None)
return
diff --git a/kazoo/retry.py b/kazoo/retry.py
index 73eb42c..dd03e0b 100644
--- a/kazoo/retry.py
+++ b/kazoo/retry.py
@@ -1,3 +1,4 @@
+import logging
import random
import time
@@ -9,6 +10,8 @@ from zookeeper import (
SessionExpiredException
)
+log = logging.getLogger(__name__)
+
class ForceRetryError(Exception):
"""Raised when some recipe logic wants to force a retry"""
@@ -46,16 +49,16 @@ class RetySleeper(object):
"""Increment the failed count, and sleep appropriately before
continuing"""
if self._attempts == self.max_tries:
- raise
+ raise Exception("Too many retry attempts")
self._attempts += 1
jitter = random.randint(0, self.max_jitter) / 100.0
- self.sleep_func(self.delay + jitter)
+ self.sleep_func(self._cur_delay + jitter)
self._cur_delay *= self.backoff
def copy(self):
"""Return a clone of this retry sleeper"""
return RetySleeper(self.max_tries, self.delay, self.backoff,
- self.max_jitter, self.sleep_func)
+ self.max_jitter / 100.0, self.sleep_func)
class KazooRetry(object):
@@ -90,8 +93,8 @@ class KazooRetry(object):
and treated as a retry-able command.
"""
- self.retry = RetySleeper(max_tries, delay, backoff, max_jitter,
- sleep_func)
+ self.retry_sleeper = RetySleeper(max_tries, delay, backoff, max_jitter,
+ sleep_func)
self.sleep_func = sleep_func
self.retry_exceptions = self.RETRY_EXCEPTIONS
if ignore_expire:
@@ -101,11 +104,11 @@ class KazooRetry(object):
self(func, *args, **kwargs)
def __call__(self, func, *args, **kwargs):
- self.retry.reset()
+ self.retry_sleeper.reset()
while True:
try:
return func(*args, **kwargs)
except self.retry_exceptions:
- self.retry.increment()
+ self.retry_sleeper.increment()