diff options
author | Ben Bangert <ben@groovie.org> | 2012-08-28 15:43:42 -0700 |
---|---|---|
committer | Ben Bangert <ben@groovie.org> | 2012-08-28 15:43:42 -0700 |
commit | 72c90e919096c852310325f7d16d1d8b78f1e411 (patch) | |
tree | 41be87b192de269ddfa7d8edcfbe3f8749468e2b | |
parent | 0c900ce5508be546d1f8efb1217f78a0b71cdec1 (diff) | |
download | kazoo-72c90e919096c852310325f7d16d1d8b78f1e411.tar.gz |
Most of the main refactor complete, basic connection and ping handling working with start/stop.
-rw-r--r-- | kazoo/client.py | 415 | ||||
-rw-r--r-- | kazoo/exceptions.py | 120 | ||||
-rw-r--r-- | kazoo/handlers/threading.py | 4 | ||||
-rw-r--r-- | kazoo/hosts.py | 35 | ||||
-rw-r--r-- | kazoo/protocol/__init__.py | 240 | ||||
-rw-r--r-- | kazoo/protocol/paths.py | 52 | ||||
-rw-r--r-- | kazoo/protocol/serialization.py | 73 | ||||
-rw-r--r-- | kazoo/protocol/states.py | 235 | ||||
-rw-r--r-- | kazoo/recipe/barrier.py | 14 | ||||
-rw-r--r-- | kazoo/recipe/lock.py | 4 | ||||
-rw-r--r-- | kazoo/recipe/party.py | 8 | ||||
-rw-r--r-- | kazoo/recipe/watchers.py | 4 | ||||
-rw-r--r-- | kazoo/retry.py | 17 |
13 files changed, 643 insertions, 578 deletions
diff --git a/kazoo/client.py b/kazoo/client.py index 3b075b4..8f7fda5 100644 --- a/kazoo/client.py +++ b/kazoo/client.py @@ -1,190 +1,28 @@ """Kazoo Zookeeper Client""" import inspect import logging -import random -from collections import namedtuple +from collections import defaultdict from functools import partial from os.path import split -from kazoo.exceptions import BadArgumentsException +from kazoo.exceptions import NoNodeError from kazoo.exceptions import ConfigurationError -from kazoo.exceptions import ZookeeperStoppedError -from kazoo.exceptions import NoNodeException -from kazoo.exceptions import err_to_exception from kazoo.handlers.threading import SequentialThreadingHandler from kazoo.recipe.lock import Lock from kazoo.recipe.party import Party from kazoo.recipe.party import ShallowParty from kazoo.recipe.election import Election +from kazoo.protocol.states import KazooState +from kazoo.protocol.states import KeeperState +from kazoo.protocol import proto_writer from kazoo.retry import KazooRetry +from kazoo.hosts import collect_hosts +from kazoo.protocol.serialization import Close +from kazoo.protocol.paths import normpath log = logging.getLogger(__name__) -class WatchedEvent(namedtuple('WatchedEvent', ('type', 'state', 'path'))): - """A change on ZooKeeper that a Watcher is able to respond to. - - The :class:`WatchedEvent` includes exactly what happened, the - current state of ZooKeeper, and the path of the znode that was - involved in the event. An instance of :class:`WatchedEvent` will be - passed to registered watch functions. - - .. attribute:: type - - A :class:`EventType` attribute indicating the event type. - - .. attribute:: state - - A :class:`KeeperState` attribute indicating the Zookeeper - state. - - .. attribute:: path - - The path of the node for the watch event. - - """ - - -class ZnodeStat(namedtuple('ZnodeStat', ('aversion', 'ctime', 'cversion', - 'czxid', 'dataLength', - 'ephemeralOwner', 'mtime', 'mzxid', - 'numChildren', 'pzxid', 'version'))): - """A ZnodeStat structure with convenience properties - - When getting the value of a node from Zookeeper, the properties for - the node known as a "Stat structure" will be retrieved. The - :class:`ZnodeStat` object provides access to the standard Stat - properties and additional properties that are more readable and use - Python time semantics (seconds since epoch instead of ms). - - .. note:: - - The original Zookeeper Stat name is in parens next to the name - when it differs from the convenience attribute. These are **not - functions**, just attributes. - - .. attribute:: creation_transaction_id (czxid) - - The transaction id of the change that caused this znode to be - created. - - .. attribute:: last_modified_transaction_id (mzxid) - - The transaction id of the change that last modified this znode. - - .. attribute:: created (ctime) - - The time in seconds from epoch when this node was created. - (ctime is in milliseconds) - - .. attribute:: last_modified (mtime) - - The time in seconds from epoch when this znode was last - modified. (mtime is in milliseconds) - - .. attribute:: version - - The number of changes to the data of this znode. - - .. attribute:: acl_version (aversion) - - The number of changes to the ACL of this znode. - - .. attribute:: owner_session_id (ephemeralOwner) - - The session id of the owner of this znode if the znode is an - ephemeral node. If it is not an ephemeral node, it will be - `None`. (ephemeralOwner will be 0 if it is not ephemeral) - - .. attribute:: data_length (dataLength) - - The length of the data field of this znode. - - .. attribute:: children_count (numChildren) - - The number of children of this znode. - - """ - @property - def acl_version(self): - return self.aversion - - @property - def children_version(self): - return self.cversion - - @property - def created(self): - return self.ctime / 1000.0 - - @property - def last_modified(self): - return self.mtime / 1000.0 - - @property - def owner_session_id(self): - return self.ephemeralOwner or None - - @property - def creation_transaction_id(self): - return self.czxid - - @property - def last_modified_transaction_id(self): - return self.mzxid - - @property - def data_length(self): - return self.dataLength - - @property - def children_count(self): - return self.numChildren - - -class Callback(namedtuple('Callback', ('type', 'func', 'args'))): - """A callback that is handed to a handler for dispatch - - :param type: Type of the callback, can be 'session' or 'watch' - :param func: Callback function - :param args: Argument list for the callback function - - """ - - -## Client Callbacks - -def _generic_callback(async_result, handle, code, *args): - if code != zookeeper.OK: - exc = err_to_exception(code) - async_result.set_exception(exc) - else: - if not args: - result = None - elif len(args) == 1: - if isinstance(args[0], dict): - # It's a node struct, put it in a ZnodeStat - result = ZnodeStat(**args[0]) - else: - result = args[0] - else: - # if there's two, the second is a stat object - args = list(args) - if len(args) == 2 and isinstance(args[1], dict): - args[1] = ZnodeStat(**args[1]) - result = tuple(args) - - async_result.set(result) - - -def _exists_callback(async_result, handle, code, stat): - if code not in (zookeeper.OK, zookeeper.NONODE): - exc = err_to_exception(code) - async_result.set_exception(exc) - else: - async_result.set(stat) - - class KazooClient(object): """An Apache Zookeeper Python wrapper supporting alternate callback handlers and high-level functionality @@ -197,7 +35,7 @@ class KazooClient(object): def __init__(self, hosts='127.0.0.1:2181', watcher=None, timeout=10.0, client_id=None, max_retries=None, retry_delay=0.1, retry_backoff=2, retry_jitter=0.8, handler=None, - default_acl=None): + default_acl=None, read_only=None): """Create a KazooClient instance. All time arguments are in seconds. :param hosts: Comma-separated list of hosts to connect to @@ -226,46 +64,47 @@ class KazooClient(object): and reconnects. """ - from kazoo.recipe.barrier import Barrier - from kazoo.recipe.barrier import DoubleBarrier - from kazoo.recipe.partitioner import SetPartitioner - from kazoo.recipe.watchers import ChildrenWatch - from kazoo.recipe.watchers import DataWatch - # Record the handler strategy used - self._handler = handler if handler else SequentialThreadingHandler() - self.handler = self._handler + self.handler = handler if handler else SequentialThreadingHandler() + if inspect.isclass(self.handler): + raise ConfigurationError("Handler must be an instance of a class, " + "not the class: %s" % self.handler) - # Check for chroot - chroot_check = hosts.split('/', 1) - if len(chroot_check) == 2: - self.chroot = '/' + chroot_check[1] + self.default_acl = default_acl + self.hosts, chroot = collect_hosts(hosts) + if chroot: + self.chroot = normpath(chroot) + if not self.chroot.startswith('/'): + raise ValueError('chroot not absolute') else: - self.chroot = None + self.chroot = '' - # remove any trailing slashes - self.default_acl = default_acl + self.last_zxid = 0 + self.read_only = read_only + self._session_id = None - self._hosts = hosts - self._watcher = watcher - self._provided_client_id = client_id + if client_id: + self._session_id = client_id[0] + self._session_passwd = client_id[1] + else: + self._session_id = None + self._session_passwd = str(bytearray([0] * 16)) # ZK uses milliseconds - self._timeout = int(timeout * 1000) - - if inspect.isclass(self.handler): - raise ConfigurationError("Handler must be an instance of a class, " - "not the class: %s" % self.handler) - - self._handle = None + self._session_timeout = int(timeout * 1000) # We use events like twitter's client to track current and desired # state (connected, and whether to shutdown) self._live = self.handler.event_object() + self._writer_stopped = self.handler.event_object() self._stopped = self.handler.event_object() self._stopped.set() - self._connection_timed_out = False + self._queue = self.handler.peekable_queue() + self._pending = self.handler.peekable_queue() + + self._child_watchers = defaultdict(set) + self._data_watchers = defaultdict(set) self.retry = KazooRetry( max_tries=max_retries, @@ -274,19 +113,22 @@ class KazooClient(object): max_jitter=retry_jitter, sleep_func=self.handler.sleep_func ) - - self.max_retries = max_retries - self.retry_delay = retry_delay - self.retry_backoff = retry_backoff - self.retry_jitter = int(retry_jitter * 100) - self._connection_attempts = None + self.retry_sleeper = self.retry.retry_sleeper.copy() # Curator like simplified state tracking, and listeners for state # transitions + self._state_lock = self.handler.rlock_object() + self._state = KeeperState.CLOSED self.state = KazooState.LOST self.state_listeners = set() # convenience API + from kazoo.recipe.barrier import Barrier + from kazoo.recipe.barrier import DoubleBarrier + from kazoo.recipe.partitioner import SetPartitioner + from kazoo.recipe.watchers import ChildrenWatch + from kazoo.recipe.watchers import DataWatch + self.Barrier = partial(Barrier, self) self.DoubleBarrier = partial(DoubleBarrier, self) self.ChildrenWatch = partial(ChildrenWatch, self) @@ -297,48 +139,6 @@ class KazooClient(object): self.SetPartitioner = partial(SetPartitioner, self) self.ShallowParty = partial(ShallowParty, self) - def _safe_call(self, func, async_result, *args, **kwargs): - """Safely call a zookeeper function and handle errors related - to a bad zhandle and state bugs - - In older zkpython bindings, a SystemError can arise if some - functions are called when the session handle is expired. This - client clears the old handle as soon as possible, but its - possible a command may run against the old handle that is - expired resulting in this error being thrown. See - https://issues.apache.org/jira/browse/ZOOKEEPER-1318 - - """ - try: - return func(self._handle, *args) - except BadArgumentsException: - # Validate the path to throw a better except - if isinstance(args[0], basestring) and not args[0].startswith('/'): - raise ValueError("invalid path '%s'. must start with /" % - args[0]) - else: - raise - except (TypeError, SystemError) as exc: - # Handle was cleared or isn't set. If it was cleared and we are - # supposed to be running, it means we had session expiration and - # are attempting to reconnect. We treat it as a session expiration - # in that case for appropriate behavior. - if self._stopped.is_set(): - async_result.set_exception( - ZookeeperStoppedError( - "The Kazoo client is stopped, call connect before running " - "commands that talk to Zookeeper")) - elif isinstance(exc, SystemError): - # Set this to the error it should be for appropriate handling - async_result.set_exception( - self.zookeeper.InvalidStateException( - "invalid handle state")) - elif self._handle is None or 'an integer is required' in exc: - async_result.set_exception( - self.zookeeper.SessionExpiredException("session expired")) - else: - raise - def add_listener(self, listener): """Add a function to be called for connection state changes @@ -362,31 +162,10 @@ class KazooClient(object): @property def client_id(self): """Returns the client id for this Zookeeper session if connected""" - if self._handle is not None: - return self.zookeeper.client_id(self._handle) + if self._live.is_set(): + return (self._session_id, self._session_passwd) return None - def _wrap_session_callback(self, func): - def wrapper(handle, type, state, path): - callback = Callback('session', func, (handle, type, state, path)) - self.handler.dispatch_callback(callback) - return wrapper - - def _wrap_watch_callback(self, func): - def func_wrapper(*args): - try: - func(*args) - except Exception: - log.exception("Exception in kazoo callback <func: %s>" % func) - - def wrapper(handle, type, state, path): - # don't send session events to all watchers - if type != self.zookeeper.SESSION_EVENT: - event = WatchedEvent(type, state, path) - callback = Callback('watch', func_wrapper, (event,)) - self.handler.dispatch_callback(callback) - return wrapper - def _make_state_change(self, state): # skip if state is current if self.state == state: @@ -403,68 +182,43 @@ class KazooClient(object): except Exception: log.exception("Error in connection state listener") - def _session_callback(self, handle, type, state, path): - log.debug("Client Instance: %s, Handle: %s, Type: %s, State: %s" - ", Path: %s", id(self), handle, ZK_TYPES.get(type, type), - ZK_STATES.get(state, state), - path) - - if type != EventType.SESSION: + def _session_callback(self, state): + if self._stopped.is_set(): + # Any events at this point can be ignored return - if self._handle != handle: - try: - # latent handle callback from previous connection - self.zookeeper.close(handle) - except Exception: - pass + if state == self._state: return - if self._stopped.is_set(): - # Any events at this point can be ignored - return + with self._state_lock: + self._state = state if state == KeeperState.CONNECTED: log.info("Zookeeper connection established") self._live.set() - self._connection_attempts = None - - # Clear the client id when we successfully connect - self._provided_client_id = None - self._make_state_change(KazooState.CONNECTED) elif state in (KeeperState.EXPIRED_SESSION, KeeperState.AUTH_FAILED): log.info("Zookeeper session lost, state: %s", state) self._live.clear() self._make_state_change(KazooState.LOST) - self._handle = None - - # This session callback already runs in the handler so - # its safe to spawn the start_async call so this function - # can return immediately - self.handler.spawn(self._reconnect) else: log.info("Zookeeper connection lost") # Connection lost self._live.clear() self._make_state_change(KazooState.SUSPENDED) - if self._watcher: - self._watcher(WatchedEvent(type, state, path)) - def _safe_close(self): - if self._handle is not None: - # Stop the handler - self.handler.stop() - zh, self._handle = self._handle, None + self.handler.stop() + + if not self._writer_stopped.is_set(): try: - self.zookeeper.close(zh) - except self.zookeeper.ZooKeeperException: - # Corrupt session or otherwise disconnected - pass - self._live.clear() - self._make_state_change(KazooState.LOST) + self._writer_stopped.wait(10) + except self.handler.timeout_exception: + raise Exception("Writer still open from prior connection" + " and wouldn't close after 10 seconds") + + self._make_state_change(KazooState.LOST) def start_async(self): """Asynchronously initiate connection to ZK @@ -481,36 +235,17 @@ class KazooClient(object): # Make sure we're safely closed self._safe_close() - # We've been asked to connect, clear the stop + # We've been asked to connect, clear the stop and our writer + # thread indicator self._stopped.clear() + self._writer_stopped.clear() # Start the handler self.handler.start() - cb = self._wrap_session_callback(self._session_callback) - self._connection_attempts = 1 - self._connection_delay = self.retry_delay - if self._provided_client_id: - self._handle = self.zookeeper.init(self._hosts, cb, self._timeout, - self._provided_client_id) - else: - self._handle = self.zookeeper.init(self._hosts, cb, self._timeout) + # Start the connection writer to establish the connection + self.handler.spawn(proto_writer, self) return self._live - connect_async = start_async - - def _reconnect(self): - """Reconnect to Zookeeper, staggering connection attempts""" - if not self._connection_attempts: - self._connection_attempts = 1 - self._connection_delay = self.retry_delay - else: - if self._connection_attempts == self.max_retries: - raise self.handler.timeout_exception("Time out reconnecting") - self._connection_attempts += 1 - jitter = random.randint(0, self.retry_jitter) / 100.0 - self.handler.sleep_func(self._connection_delay + jitter) - self._connection_delay *= self.retry_delay - self.start_async() def start(self, timeout=15): """Initiate connection to ZK @@ -528,17 +263,19 @@ class KazooClient(object): # We time-out, ensure we are disconnected self.stop() raise self.handler.timeout_exception("Connection time-out") - connect = start def stop(self): """Gracefully stop this Zookeeper session""" + if self._stopped.is_set(): + return + self._stopped.set() + self._queue.put((Close(), None)) self._safe_close() def restart(self): """Stop and restart the Zookeeper session.""" - self._safe_close() - self._stopped.clear() + self.stop() self.start() def add_auth_async(self, scheme, credential): @@ -590,7 +327,7 @@ class KazooClient(object): self._inner_ensure_path(parent, acl) try: self.create_async(path, "", acl=acl).get() - except self.zookeeper.NodeExistsException: + except self.zookeeper.NodeExistsError: # someone else created the node. how sweet! pass @@ -657,7 +394,7 @@ class KazooClient(object): realpath = self.create_async(path, value, acl=acl, ephemeral=ephemeral, sequence=sequence).get() - except NoNodeException: + except NoNodeError: # some or all of the parent path doesn't exist. if makepath is set # we will create it and retry. If it fails again, someone must be # actively deleting ZNodes and we'd best bail out. @@ -833,7 +570,7 @@ class KazooClient(object): def _delete_recursive(self, path): try: children = self.get_children(path) - except self.zookeeper.NoNodeException: + except self.zookeeper.NoNodeError: return if children: @@ -846,5 +583,5 @@ class KazooClient(object): self._delete_recursive(child_path) try: self.delete(path) - except self.zookeeper.NoNodeException: + except self.zookeeper.NoNodeError: pass diff --git a/kazoo/exceptions.py b/kazoo/exceptions.py index 5e377c6..6709d9b 100644 --- a/kazoo/exceptions.py +++ b/kazoo/exceptions.py @@ -7,6 +7,11 @@ class KazooException(Exception): from""" +class ZookeeperError(KazooException): + """Base Zookeeper exception for errors originating from the Zookeeper + server""" + + class CancelledError(KazooException): """Raised when a process is cancelled by another thread""" @@ -23,12 +28,119 @@ class ConnectionDropped(KazooException): """ Internal error for jumping out of loops """ -class AuthFailedError(KazooException): - """Authentication failed when connected""" - - def _invalid_error_code(): raise RuntimeError('Invalid error code') EXCEPTIONS = defaultdict(_invalid_error_code) + + +def _zookeeper_exception(code): + def decorator(klass): + def create(*args, **kwargs): + return klass(args, kwargs) + + EXCEPTIONS[code] = create + return klass + + return decorator + + +@_zookeeper_exception(0) +class RolledBackError(ZookeeperError): + pass + + +@_zookeeper_exception(-1) +class SystemZookeeperError(ZookeeperError): + pass + + +@_zookeeper_exception(-2) +class RuntimeInconsistency(ZookeeperError): + pass + + +@_zookeeper_exception(-3) +class DataInconsistency(ZookeeperError): + pass + + +@_zookeeper_exception(-4) +class ConnectionLoss(ZookeeperError): + pass + + +@_zookeeper_exception(-5) +class MarshallingError(ZookeeperError): + pass + + +@_zookeeper_exception(-6) +class UnimplementedError(ZookeeperError): + pass + + +@_zookeeper_exception(-7) +class OperationTimeoutError(ZookeeperError): + pass + + +@_zookeeper_exception(-8) +class BadArgumentsError(ZookeeperError): + pass + + +@_zookeeper_exception(-100) +class APIError(ZookeeperError): + pass + + +@_zookeeper_exception(-101) +class NoNodeError(ZookeeperError): + pass + + +@_zookeeper_exception(-102) +class NoAuthError(ZookeeperError): + pass + + +@_zookeeper_exception(-103) +class BadVersionError(ZookeeperError): + pass + + +@_zookeeper_exception(-108) +class NoChildrenForEphemeralsError(ZookeeperError): + pass + + +@_zookeeper_exception(-110) +class NodeExistsError(ZookeeperError): + pass + + +@_zookeeper_exception(-111) +class NotEmptyError(ZookeeperError): + pass + + +@_zookeeper_exception(-112) +class SessionExpiredError(ZookeeperError): + pass + + +@_zookeeper_exception(-113) +class InvalidCallbackError(ZookeeperError): + pass + + +@_zookeeper_exception(-114) +class InvalidACLError(ZookeeperError): + pass + + +@_zookeeper_exception(-115) +class AuthFailedError(ZookeeperError): + pass diff --git a/kazoo/handlers/threading.py b/kazoo/handlers/threading.py index 5ab6c6f..66a2aa1 100644 --- a/kazoo/handlers/threading.py +++ b/kazoo/handlers/threading.py @@ -285,9 +285,9 @@ class SequentialThreadingHandler(object): self.callback_queue.put(lambda: callback.func(*callback.args)) -class _PeekableQueue(Queue): +class _PeekableQueue(Queue.Queue): def __init__(self, maxsize=0): - Queue.__init__(self, maxsize=0) + Queue.Queue.__init__(self, maxsize=maxsize) def peek(self, block=True, timeout=None): """Return the first item in the queue but do not remove it from the queue. diff --git a/kazoo/hosts.py b/kazoo/hosts.py new file mode 100644 index 0000000..2c21f89 --- /dev/null +++ b/kazoo/hosts.py @@ -0,0 +1,35 @@ +import random + + +class RandomHostIterator: + """ An iterator that returns a randomly selected host. A host is + guaranteed to not be selected twice unless there is only one + host in the collection. + """ + + def __init__(self, hosts): + self.last = 0 + self.hosts = hosts + + def __iter__(self): + hostslist = self.hosts[:] + random.shuffle(hostslist) + for host in hostslist: + yield host + + def __len__(self): + return len(self.hosts) + + +def collect_hosts(hosts): + """ Collect a set of hosts and an optional chroot from a string. + """ + host_ports, chroot = hosts.partition("/")[::2] + chroot = "/" + chroot if chroot else None + + result = [] + for host_port in host_ports.split(","): + host, port = host_port.partition(":")[::2] + port = int(port.strip()) if port else 2181 + result.append((host.strip(), port)) + return (RandomHostIterator(result), chroot) diff --git a/kazoo/protocol/__init__.py b/kazoo/protocol/__init__.py index 81088b9..443339a 100644 --- a/kazoo/protocol/__init__.py +++ b/kazoo/protocol/__init__.py @@ -6,13 +6,18 @@ from kazoo.exceptions import ConnectionDropped from kazoo.exceptions import EXCEPTIONS from kazoo.protocol.serialization import int_struct from kazoo.protocol.serialization import int_int_struct -from kazoo.protocol.serialization import deserialize_watcher_event from kazoo.protocol.serialization import deserialize_reply_header +from kazoo.protocol.serialization import Close from kazoo.protocol.serialization import Connect +from kazoo.protocol.serialization import Ping +from kazoo.protocol.serialization import Watch from kazoo.protocol.states import KeeperState +from kazoo.protocol.states import WatchedEvent +from kazoo.protocol.states import Callback +from kazoo.protocol.states import EVENT_TYPE_MAP +from kazoo.protocol.paths import _prefix_root - -log = logging.getlog(__name__) +log = logging.getLogger(__name__) def proto_reader(client, s, reader_started, reader_done, read_timeout): @@ -20,7 +25,7 @@ def proto_reader(client, s, reader_started, reader_done, read_timeout): while True: try: - header, buffer, offset = _read_header(s, read_timeout) + header, buffer, offset = _read_header(client, s, read_timeout) if header.xid == -2: log.debug('Received PING') continue @@ -28,65 +33,60 @@ def proto_reader(client, s, reader_started, reader_done, read_timeout): log.debug('Received AUTH') continue elif header.xid == -1: - log.debug('Received EVENT') - wtype, state, path, offset = deserialize_watcher_event( - buffer, offset) + watch, offset = Watch.deserialize(buffer, offset) + path = watch.path + log.debug('Received EVENT: %s', watch) watchers = set() with client._state_lock: - if wtype == 1: - watchers |= client._data_watchers.pop(path, set()) - watchers |= client._exists_watchers.pop(path, set()) - - event = lambda: map(lambda w: w.node_created(path), - watchers) - elif wtype == 2: - watchers |= client._data_watchers.pop(path, set()) - watchers |= client._exists_watchers.pop(path, set()) - watchers |= client._child_watchers.pop(path, set()) + # Ignore watches if we've been stopped + if client._stopped.is_set(): + continue - event = lambda: map(lambda w: w.node_deleted(path), - watchers) - elif wtype == 3: + if watch.type in (1, 2, 3): watchers |= client._data_watchers.pop(path, set()) - watchers |= client._exists_watchers.pop(path, set()) - - event = lambda: map(lambda w: w.data_changed(path), - watchers) - elif wtype == 4: + elif watch.type == 4: watchers |= client._child_watchers.pop(path, set()) - - event = lambda: map(lambda w: w.children_changed(path), - watchers) else: - log.warn('Received unknown event %r', type) + log.warn('Received unknown event %r', watch.type) continue - - client._events.put(event) + ev = WatchedEvent(EVENT_TYPE_MAP[watch.type], + client._state, path) + + client.handler.dispatch_callback( + Callback('watch', + lambda: map(lambda w: w(ev), watchers), + () + ) + ) else: log.debug('Reading for header %r', header) - - request, response, callback, xid = client._pending.get() + request, async_object, xid = client._pending.get() if header.zxid and header.zxid > 0: client.last_zxid = header.zxid if header.xid != xid: - raise RuntimeError('xids do not match, expected %r received %r', xid, header.xid) + raise RuntimeError('xids do not match, expected %r ' + 'received %r', xid, header.xid) - callback_exception = None if header.err: callback_exception = EXCEPTIONS[header.err]() log.debug('Received error %r', callback_exception) - elif response: - response.deserialize(buffer, 'response') + if async_object: + async_object.set_exception(callback_exception) + elif request and async_object: + response = request.deserialize(buffer, offset) log.debug('Received response: %r', response) + async_object.set(response) - try: - callback(callback_exception) - except Exception as e: - log.exception(e) + # Determine if watchers should be registered + with client._state_lock: + if (not client._stopped.is_set() and + hasattr(request, 'watcher')): + path = _prefix_root(client.chroot, request.path) + request.watch_dict[path].add(request.watcher) - if isinstance(response, CloseResponse): + if isinstance(request, Close): log.debug('Read close response') s.close() reader_done.set() @@ -103,14 +103,30 @@ def proto_reader(client, s, reader_started, reader_done, read_timeout): def proto_writer(client): log.debug('Starting writer') - - writer_done = False retry = client.retry_sleeper.copy() + while not client._stopped.is_set(): + log.debug("Client stopped?: %s", client._stopped.is_set()) + + # If the connect_loop returns False, stop retrying + if connect_loop(client, retry) is False: + break + # Still going, increment our retry then go through the + # list of hosts again + if not client._stopped.is_set(): + log.debug("Incrementing and waiting") + retry.increment() + log.debug('Writer stopped') + client._writer_stopped.set() + + +def connect_loop(client, retry): + writer_done = False for host, port in client.hosts: s = client.handler.socket() - client._state = KeeperState.CONNECTING + if client._state != KeeperState.CONNECTING: + client._session_callback(KeeperState.CONNECTING) try: read_timeout, connect_timeout = _connect( @@ -129,24 +145,24 @@ def proto_writer(client): xid = 0 while not writer_done: try: - request, response, callback = client._queue.peek( + request, async_object = client._queue.peek( True, read_timeout / 2000.0) log.debug('Sending %r', request) xid += 1 log.debug('xid: %r', xid) - _submit(s, request, connect_timeout, xid) + _submit(client, s, request, connect_timeout, xid) - if isinstance(request, CloseRequest): + if isinstance(request, Close): log.debug('Received close request, closing') writer_done = True client._queue.get() - client._pending.put((request, response, callback, xid)) + client._pending.put((request, async_object, xid)) except client.handler.empty: log.debug('Queue timeout. Sending PING') - _submit(s, PingRequest(), connect_timeout, -2) + _submit(client, s, Ping, connect_timeout, -2) except Exception as e: log.exception(e) break @@ -157,18 +173,17 @@ def proto_writer(client): if writer_done: client._close(KeeperState.CLOSED) - break + return False except ConnectionDropped: log.warning('Connection dropped') - client._events.put(lambda: client._default_watcher.connection_dropped()) - retry.increment() + client._session_callback(KeeperState.CONNECTING) except AuthFailedError: log.warning('AUTH_FAILED closing') - client._close(KeeperState.AUTH_FAILED) - break + client._session_callback(KeeperState.AUTH_FAILED) + return False except Exception as e: - log.warning(e) - retry.increment() + log.exception(e) + raise finally: if not writer_done: # The read thread will close the socket since there @@ -176,13 +191,11 @@ def proto_writer(client): # still needs to be read from the socket. s.close() - log.debug('Writer stopped') - def _connect(client, s, host, port): log.info('Connecting to %s:%s', host, port) log.debug(' Using session_id: %r session_passwd: 0x%s', - client.session_id, _hex(client.session_passwd)) + client._session_id, client._session_passwd.encode('hex')) s.connect((host, port)) s.setblocking(0) @@ -192,53 +205,54 @@ def _connect(client, s, host, port): connect = Connect( 0, client.last_zxid, - int(client.session_timeout * 1000), - client.session_passwd, + client._session_timeout, + client._session_id or 0, + client._session_passwd, client.read_only) - connect_result, zxid = _invoke(client, s, client.session_timeout, None, - connect.serialize(), connect.deserialize) + connect_result, zxid = _invoke(client, s, client._session_timeout, connect) if connect_result.time_out < 0: log.error('Session expired') - client._events.put(lambda: client._default_watcher.session_expired(client.session_id)) - client._state = KeeperState.EXPIRED_SESSION + client._session_callback(KeeperState.EXPIRED_SESSION) raise RuntimeError('Session expired') if zxid: client.last_zxid = zxid - client.session_id = connect_result.session_id + + # Load return values + client._session_id = connect_result.session_id negotiated_session_timeout = connect_result.time_out connect_timeout = negotiated_session_timeout / len(client.hosts) read_timeout = negotiated_session_timeout * 2.0 / 3.0 - client.session_passwd = connect_result.passwd - log.debug('Session created, session_id: %r session_passwd: 0x%s', client.session_id, _hex(client.session_passwd)) - log.debug(' negotiated session timeout: %s', negotiated_session_timeout) - log.debug(' connect timeout: %s', connect_timeout) - log.debug(' read timeout: %s', read_timeout) - client._events.put(lambda: client._default_watcher.session_connected(client.session_id, client.session_passwd, client.read_only)) - client._state = KeeperState.CONNECTED - - for scheme, auth in client.auth_data: - ap = AuthPacket(0, scheme, auth) - zxid = _invoke(s, connect_timeout, ap, xid=-4) - if zxid: - client.last_zxid = zxid + client._session_passwd = connect_result.passwd + log.debug('Session created, session_id: %r session_passwd: 0x%s\n' + ' negotiated session timeout: %s\n' + ' connect timeout: %s\n' + ' read timeout: %s', client._session_id, + client._session_passwd.encode('hex'), negotiated_session_timeout, + connect_timeout, read_timeout) + client._session_callback(KeeperState.CONNECTED) + + # for scheme, auth in client.auth_data: + # ap = AuthPacket(0, scheme, auth) + # zxid = _invoke(s, connect_timeout, ap, xid=-4) + # if zxid: + # client.last_zxid = zxid return read_timeout, connect_timeout -def _invoke(client, socket, timeout, request_type, request_bytes, - response_deserializer=None, xid=None): +def _invoke(client, socket, timeout, request, xid=None): b = bytearray() - if xid and request_type: - b.extend(int_int_struct.pack(xid, request_type)) - elif xid: + if xid: b.extend(int_struct.pack(xid)) - elif request_type: - b.extend(int_struct.pack(request_type)) - b.extend(request_bytes) - b = int_struct.pack(len(b)) + b - _write(client, socket, b, timeout) + if request.type: + b.extend(int_struct.pack(request.type)) + b.extend(request.serialize()) + buff = int_struct.pack(len(b)) + b + + _write(client, socket, buff, timeout) + log.debug("Wrote out initial request") zxid = None if xid: @@ -254,39 +268,39 @@ def _invoke(client, socket, timeout, request_type, request_bytes, raise callback_exception return zxid - msg = _read(socket, 4, timeout) - length = int_struct.unpack_from(msg, 0)[0] + msg = _read(client, socket, 4, timeout) + length = int_struct.unpack(msg)[0] - msg = _read(socket, length, timeout) + log.debug("Reading full packet") + msg = _read(client, socket, length, timeout) - if response_deserializer: - log.debug('Read response %s', response_deserializer(msg)) - return response_deserializer(msg), zxid + if hasattr(request, 'deserialize'): + obj, _ = request.deserialize(msg, 0) + log.debug('Read response %s', obj) + return obj, zxid return zxid -def _hex(bindata): - return bindata.encode('hex') - - -def _submit(client, socket, request_type, request_buffer, timeout, xid=None): +def _submit(client, socket, request, timeout, xid=None): b = bytearray() b.extend(int_struct.pack(xid)) - if request_type: - b.extend(int_struct.pack(request_type)) - b += request_buffer + if request.type: + b.extend(int_struct.pack(request.type)) + b += request.serialize() b = int_struct.pack(len(b)) + b _write(client, socket, b, timeout) def _write(client, socket, msg, timeout): sent = 0 + msg_length = len(msg) select = client.handler.select - while sent < len(msg): + while sent < msg_length: _, ready_to_write, _ = select([], [socket], [], timeout) - bytes_sent = ready_to_write[0].send(msg[sent:]) - if not sent: + msg_slice = buffer(msg, sent) + bytes_sent = ready_to_write[0].send(msg_slice) + if not bytes_sent: raise ConnectionDropped('socket connection broken') sent += bytes_sent @@ -300,12 +314,14 @@ def _read_header(client, socket, timeout): def _read(client, socket, length, timeout): - msg = '' + msgparts = [] + remaining = length select = client.handler.select - while len(msg) < length: + while remaining > 0: ready_to_read, _, _ = select([socket], [], [], timeout) - chunk = ready_to_read[0].recv(length - len(msg)) + chunk = ready_to_read[0].recv(remaining) if chunk == '': raise ConnectionDropped('socket connection broken') - msg = msg + chunk - return msg + msgparts.append(chunk) + remaining -= len(chunk) + return b"".join(msgparts) diff --git a/kazoo/protocol/paths.py b/kazoo/protocol/paths.py new file mode 100644 index 0000000..ba5d47d --- /dev/null +++ b/kazoo/protocol/paths.py @@ -0,0 +1,52 @@ +def normpath(path): + """Normalize path, eliminating double slashes, etc. + """ + comps = path.split('/') + new_comps = [] + for comp in comps: + if comp == '': + continue + if comp in ('.', '..'): + raise ValueError('relative paths not allowed') + new_comps.append(comp) + slash = u'/' if isinstance(path, unicode) else '/' + new_path = slash.join(new_comps) + if path.startswith('/'): + return slash + new_path + return new_path + + +def join(a, *p): + """Join two or more pathname components, inserting '/' as needed. + If any component is an absolute path, all previous path components + will be discarded. + """ + path = a + for b in p: + if b.startswith('/'): + path = b + elif path == '' or path.endswith('/'): + path += b + else: + path += '/' + b + return path + + +def isabs(s): + """Test whether a path is absolute. """ + return s.startswith('/') + + +def basename(p): + """Returns the final component of a pathname""" + i = p.rfind('/') + 1 + return p[i:] + + +def _prefix_root(root, path): + """ Prepend a root to a path. """ + return normpath(join(_norm_root(root), path.lstrip('/'))) + + +def _norm_root(root): + return normpath(join('/', root)) diff --git a/kazoo/protocol/serialization.py b/kazoo/protocol/serialization.py index 759e2e6..0503a60 100644 --- a/kazoo/protocol/serialization.py +++ b/kazoo/protocol/serialization.py @@ -33,19 +33,67 @@ def write_buffer(bytes): return int_struct.pack(len(bytes)) + bytes -class Connect(namedtuple('Connect', 'protocol_version', 'last_zxid_seen', - 'time_out', 'session_id', 'passwd', 'read_only')): +def read_buffer(bytes, offset): + length = int_struct.unpack_from(bytes, offset)[0] + offset += int_struct.size + if length < 0: + return None, offset + else: + index = offset + offset += length + return bytes[index:index + length], offset + + +class Connect(namedtuple('Connect', 'protocol_version last_zxid_seen' + ' time_out session_id passwd read_only')): """A connection request""" + type = None + def serialize(self): - b = int_long_int_long_struct.pack( + b = bytearray() + b.extend(int_long_int_long_struct.pack( self.protocol_version, self.last_zxid_seen, self.time_out, - self.session_id) - b += write_buffer(self.passwd) - b += 1 if self.read_only else 0 + self.session_id)) + b.extend(write_buffer(self.passwd)) + b.extend([1 if self.read_only else 0]) return b - def deserialize(buffer, offset): - pass + @classmethod + def deserialize(cls, bytes, offset): + proto_version, timeout, session_id = int_int_long_struct.unpack_from( + bytes, offset) + offset += int_int_long_struct.size + password, offset = read_buffer(bytes, offset) + return cls(proto_version, 0, timeout, session_id, password, 0), offset + + +class Close(object): + __slots__ = ['type'] + type = -11 + + @classmethod + def serialize(cls): + return '' + + +class Ping(object): + __slots__ = ['type'] + type = 11 + + @classmethod + def serialize(cls): + return '' + + +class Watch(namedtuple('Watch', 'type state path')): + @classmethod + def deserialize(cls, buffer, offset): + """Given a buffer and the current buffer offset, return the type, + state, path, and new offset""" + type, state = int_int_struct.unpack_from(buffer, offset) + offset += int_int_struct.size + path, offset = read_string(buffer, offset) + return cls(type, state, path), offset def deserialize_reply_header(buffer, offset): @@ -54,12 +102,3 @@ def deserialize_reply_header(buffer, offset): new_offset = offset + reply_header_struct.size return ReplyHeader._make( reply_header_struct.unpack_from(buffer, offset)), new_offset - - -def deserialize_watcher_event(buffer, offset): - """Given a buffer and the current buffer offset, return the type, - state, path, and new offset""" - type, state = int_int_struct.unpack_from(buffer, offset) - offset += int_int_struct.size - path, offset = read_string(buffer, offset) - return type, state, path, offset diff --git a/kazoo/protocol/states.py b/kazoo/protocol/states.py index 75e60a7..059a5bc 100644 --- a/kazoo/protocol/states.py +++ b/kazoo/protocol/states.py @@ -1,4 +1,5 @@ """Kazoo State and Event objects""" +from collections import namedtuple class KazooState(object): @@ -26,62 +27,6 @@ class KazooState(object): LOST = "LOST" -class State(object): - def __init__(self, code, description): - self.code = code - self.description = description - - def __eq__(self, other): - return self.code == other.code - - def __hash__(self): - return hash(self.code) - - def __str__(self): - return self.code - - def __repr__(self): - return '%s()' % self.__class__.__name__ - - -class Connecting(State): - def __init__(self): - super(Connecting, self).__init__('CONNECTING', 'Connecting') - - -class Connected(State): - def __init__(self): - super(Connected, self).__init__('CONNECTED', 'Connected') - - -class ConnectedRO(State): - def __init__(self): - super(ConnectedRO, self).__init__('CONNECTED_RO', 'Connected Read-Only') - - -class AuthFailed(State): - def __init__(self): - super(AuthFailed, self).__init__('AUTH_FAILED', 'Authorization Failed') - - -class Closed(State): - def __init__(self): - super(Closed, self).__init__('CLOSED', 'Closed') - - -class ExpiredSession(State): - def __init__(self): - super(ExpiredSession, self).__init__('EXPIRED_SESSION', 'Expired Session') - - -CONNECTING = Connecting() -CONNECTED = Connected() -CONNECTED_RO = ConnectedRO() -AUTH_FAILED = AuthFailed() -CLOSED = Closed() -EXPIRED_SESSION = ExpiredSession() - - class KeeperState(object): """Zookeeper State @@ -100,6 +45,10 @@ class KeeperState(object): Zookeeper is connected. + .. attribute:: CONNECTED_RO + + Zookeeper is connected in read-only state. + .. attribute:: CONNECTING Zookeeper is currently attempting to establish a connection. @@ -110,11 +59,12 @@ class KeeperState(object): gone. """ - AUTH_FAILED = AUTH_FAILED - CONNECTED = CONNECTED - CONNECTING = CONNECTING - CLOSED = CLOSED - EXPIRED_SESSION = EXPIRED_SESSION + AUTH_FAILED = 'AUTH_FAILED' + CONNECTED = 'CONNECTED' + CONNECTED_RO = 'CONNECTED_RO' + CONNECTING = 'CONNECTING' + CLOSED = 'CLOSED' + EXPIRED_SESSION = 'EXPIRED_SESSION' class EventType(object): @@ -124,21 +74,6 @@ class EventType(object): will receive a :class:`EventType` attribute as their event argument. - .. attribute:: NOTWATCHING - - This event type was added to Zookeeper in the event that - watches get overloaded. It's never been used though and will - likely be removed in a future Zookeeper version. **This event - will never actually be set, don't bother testing for it.** - - .. attribute:: SESSION - - A Zookeeper session event. Watch functions do not receive - session events. A session event watch can be registered with - :class:`KazooClient` during creation that can receive these - events. It's recommended to add a listener for connection state - changes instead. - .. attribute:: CREATED A node has been created. @@ -158,9 +93,145 @@ class EventType(object): node has changed, which must have its own watch established. """ - NOTWATCHING = zookeeper.NOTWATCHING_EVENT - SESSION = zookeeper.SESSION_EVENT - CREATED = zookeeper.CREATED_EVENT - DELETED = zookeeper.DELETED_EVENT - CHANGED = zookeeper.CHANGED_EVENT - CHILD = zookeeper.CHILD_EVENT + CREATED = 'CREATED' + DELETED = 'DELETED' + CHANGED = 'CHANGED' + CHILD = 'CHILD' + +EVENT_TYPE_MAP = { + 1: EventType.CREATED, + 2: EventType.DELETED, + 3: EventType.CHANGED, + 4: EventType.CHILD +} + + +class WatchedEvent(namedtuple('WatchedEvent', ('type', 'state', 'path'))): + """A change on ZooKeeper that a Watcher is able to respond to. + + The :class:`WatchedEvent` includes exactly what happened, the + current state of ZooKeeper, and the path of the znode that was + involved in the event. An instance of :class:`WatchedEvent` will be + passed to registered watch functions. + + .. attribute:: type + + A :class:`EventType` attribute indicating the event type. + + .. attribute:: state + + A :class:`KeeperState` attribute indicating the Zookeeper + state. + + .. attribute:: path + + The path of the node for the watch event. + + """ + + +class Callback(namedtuple('Callback', ('type', 'func', 'args'))): + """A callback that is handed to a handler for dispatch + + :param type: Type of the callback, can be 'session' or 'watch' + :param func: Callback function + :param args: Argument list for the callback function + + """ + + +class ZnodeStat(namedtuple('ZnodeStat', ('aversion', 'ctime', 'cversion', + 'czxid', 'dataLength', + 'ephemeralOwner', 'mtime', 'mzxid', + 'numChildren', 'pzxid', 'version'))): + """A ZnodeStat structure with convenience properties + + When getting the value of a node from Zookeeper, the properties for + the node known as a "Stat structure" will be retrieved. The + :class:`ZnodeStat` object provides access to the standard Stat + properties and additional properties that are more readable and use + Python time semantics (seconds since epoch instead of ms). + + .. note:: + + The original Zookeeper Stat name is in parens next to the name + when it differs from the convenience attribute. These are **not + functions**, just attributes. + + .. attribute:: creation_transaction_id (czxid) + + The transaction id of the change that caused this znode to be + created. + + .. attribute:: last_modified_transaction_id (mzxid) + + The transaction id of the change that last modified this znode. + + .. attribute:: created (ctime) + + The time in seconds from epoch when this node was created. + (ctime is in milliseconds) + + .. attribute:: last_modified (mtime) + + The time in seconds from epoch when this znode was last + modified. (mtime is in milliseconds) + + .. attribute:: version + + The number of changes to the data of this znode. + + .. attribute:: acl_version (aversion) + + The number of changes to the ACL of this znode. + + .. attribute:: owner_session_id (ephemeralOwner) + + The session id of the owner of this znode if the znode is an + ephemeral node. If it is not an ephemeral node, it will be + `None`. (ephemeralOwner will be 0 if it is not ephemeral) + + .. attribute:: data_length (dataLength) + + The length of the data field of this znode. + + .. attribute:: children_count (numChildren) + + The number of children of this znode. + + """ + @property + def acl_version(self): + return self.aversion + + @property + def children_version(self): + return self.cversion + + @property + def created(self): + return self.ctime / 1000.0 + + @property + def last_modified(self): + return self.mtime / 1000.0 + + @property + def owner_session_id(self): + return self.ephemeralOwner or None + + @property + def creation_transaction_id(self): + return self.czxid + + @property + def last_modified_transaction_id(self): + return self.mzxid + + @property + def data_length(self): + return self.dataLength + + @property + def children_count(self): + return self.numChildren diff --git a/kazoo/recipe/barrier.py b/kazoo/recipe/barrier.py index deb8698..e83c4cf 100644 --- a/kazoo/recipe/barrier.py +++ b/kazoo/recipe/barrier.py @@ -3,9 +3,9 @@ import os import socket import uuid -from kazoo.client import EventType -from kazoo.exceptions import NoNodeException -from kazoo.exceptions import NodeExistsException +from kazoo.protocol.states import EventType +from kazoo.exceptions import NoNodeError +from kazoo.exceptions import NodeExistsError class Barrier(object): @@ -46,7 +46,7 @@ class Barrier(object): try: self.client.retry(self.client.delete, self.path) return True - except NoNodeException: + except NoNodeError: return False def wait(self, timeout=None): @@ -131,7 +131,7 @@ class DoubleBarrier(object): try: self.client.create(self.create_path, self._identifier, ephemeral=True) - except NodeExistsException: + except NodeExistsError: pass def created(event): @@ -161,7 +161,7 @@ class DoubleBarrier(object): # Delete the ready node if its around try: self.client.delete(self.path + '/ready') - except NoNodeException: + except NoNodeError: pass while True: @@ -204,5 +204,5 @@ class DoubleBarrier(object): def _best_effort_cleanup(self): try: self.client.retry(self.client.delete, self.create_path) - except NoNodeException: + except NoNodeError: pass diff --git a/kazoo/recipe/lock.py b/kazoo/recipe/lock.py index 01bee19..c802767 100644 --- a/kazoo/recipe/lock.py +++ b/kazoo/recipe/lock.py @@ -15,7 +15,7 @@ import uuid from kazoo.retry import ForceRetryError from kazoo.exceptions import CancelledError -from kazoo.exceptions import NoNodeException +from kazoo.exceptions import NoNodeError class Lock(object): @@ -186,7 +186,7 @@ class Lock(object): try: data, stat = self.client.get(self.path + "/" + child) contenders.append(data) - except NoNodeException: + except NoNodeError: pass return contenders diff --git a/kazoo/recipe/party.py b/kazoo/recipe/party.py index 5bba798..00e828f 100644 --- a/kazoo/recipe/party.py +++ b/kazoo/recipe/party.py @@ -5,7 +5,7 @@ used for determining members of a party. """ import uuid -from kazoo.exceptions import NodeExistsException, NoNodeException +from kazoo.exceptions import NodeExistsError, NoNodeError class BaseParty(object): @@ -40,7 +40,7 @@ class BaseParty(object): try: self.client.create(self.create_path, self.data, ephemeral=True) self.participating = True - except NodeExistsException: + except NodeExistsError: # node was already created, perhaps we are recovering from a # suspended connection self.participating = True @@ -52,7 +52,7 @@ class BaseParty(object): def _inner_leave(self): try: self.client.delete(self.create_path) - except NoNodeException: + except NoNodeError: return False return True @@ -83,7 +83,7 @@ class Party(BaseParty): d, _ = self.client.retry(self.client.get, self.path + "/" + child) yield d - except NoNodeException: + except NoNodeError: pass def _get_children(self): diff --git a/kazoo/recipe/watchers.py b/kazoo/recipe/watchers.py index c9bde92..f5e1275 100644 --- a/kazoo/recipe/watchers.py +++ b/kazoo/recipe/watchers.py @@ -5,7 +5,7 @@ import time from functools import partial from kazoo.client import KazooState -from kazoo.exceptions import NoNodeException +from kazoo.exceptions import NoNodeError log = logging.getLogger(__name__) @@ -103,7 +103,7 @@ class DataWatch(object): try: data, stat = self._client.retry(self._client.get, self._path, self._watcher) - except NoNodeException: + except NoNodeError: self._stopped = True self._func(None, None) return diff --git a/kazoo/retry.py b/kazoo/retry.py index 73eb42c..dd03e0b 100644 --- a/kazoo/retry.py +++ b/kazoo/retry.py @@ -1,3 +1,4 @@ +import logging import random import time @@ -9,6 +10,8 @@ from zookeeper import ( SessionExpiredException ) +log = logging.getLogger(__name__) + class ForceRetryError(Exception): """Raised when some recipe logic wants to force a retry""" @@ -46,16 +49,16 @@ class RetySleeper(object): """Increment the failed count, and sleep appropriately before continuing""" if self._attempts == self.max_tries: - raise + raise Exception("Too many retry attempts") self._attempts += 1 jitter = random.randint(0, self.max_jitter) / 100.0 - self.sleep_func(self.delay + jitter) + self.sleep_func(self._cur_delay + jitter) self._cur_delay *= self.backoff def copy(self): """Return a clone of this retry sleeper""" return RetySleeper(self.max_tries, self.delay, self.backoff, - self.max_jitter, self.sleep_func) + self.max_jitter / 100.0, self.sleep_func) class KazooRetry(object): @@ -90,8 +93,8 @@ class KazooRetry(object): and treated as a retry-able command. """ - self.retry = RetySleeper(max_tries, delay, backoff, max_jitter, - sleep_func) + self.retry_sleeper = RetySleeper(max_tries, delay, backoff, max_jitter, + sleep_func) self.sleep_func = sleep_func self.retry_exceptions = self.RETRY_EXCEPTIONS if ignore_expire: @@ -101,11 +104,11 @@ class KazooRetry(object): self(func, *args, **kwargs) def __call__(self, func, *args, **kwargs): - self.retry.reset() + self.retry_sleeper.reset() while True: try: return func(*args, **kwargs) except self.retry_exceptions: - self.retry.increment() + self.retry_sleeper.increment() |