summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharles-Henri de Boysson <ceache@users.noreply.github.com>2022-10-14 23:55:32 -0400
committerCharles-Henri de Boysson <ceache@users.noreply.github.com>2022-10-17 00:46:22 -0400
commit686717629f71c66d39ab0352c605c73eace5bd1f (patch)
tree55f51a9d33976295aecdb8fa9bbf115161ca214e
parent951f744272ad69875827689b3f15ca43c79aae11 (diff)
downloadkazoo-686717629f71c66d39ab0352c605c73eace5bd1f.tar.gz
style: reformat all code with black
-rw-r--r--docs/conf.py141
-rw-r--r--kazoo/client.py356
-rw-r--r--kazoo/exceptions.py2
-rw-r--r--kazoo/handlers/eventlet.py24
-rw-r--r--kazoo/handlers/gevent.py8
-rw-r--r--kazoo/handlers/threading.py7
-rw-r--r--kazoo/handlers/utils.py73
-rw-r--r--kazoo/hosts.py6
-rw-r--r--kazoo/protocol/connection.py261
-rw-r--r--kazoo/protocol/paths.py37
-rw-r--r--kazoo/protocol/serialization.py146
-rw-r--r--kazoo/protocol/states.py44
-rw-r--r--kazoo/python2atexit.py2
-rw-r--r--kazoo/recipe/barrier.py28
-rw-r--r--kazoo/recipe/cache.py76
-rw-r--r--kazoo/recipe/counter.py16
-rw-r--r--kazoo/recipe/election.py1
-rw-r--r--kazoo/recipe/lease.py48
-rw-r--r--kazoo/recipe/lock.py4
-rw-r--r--kazoo/recipe/partitioner.py48
-rw-r--r--kazoo/recipe/party.py16
-rw-r--r--kazoo/recipe/queue.py73
-rw-r--r--kazoo/recipe/watchers.py78
-rw-r--r--kazoo/retry.py62
-rw-r--r--kazoo/security.py72
-rw-r--r--kazoo/testing/__init__.py5
-rw-r--r--kazoo/testing/common.py226
-rw-r--r--kazoo/testing/harness.py69
-rw-r--r--kazoo/tests/conftest.py2
-rw-r--r--kazoo/tests/test__connection.py30
-rw-r--r--kazoo/tests/test_barrier.py6
-rw-r--r--kazoo/tests/test_build.py12
-rw-r--r--kazoo/tests/test_cache.py169
-rw-r--r--kazoo/tests/test_client.py369
-rw-r--r--kazoo/tests/test_election.py16
-rw-r--r--kazoo/tests/test_eventlet_handler.py2
-rw-r--r--kazoo/tests/test_exceptions.py5
-rw-r--r--kazoo/tests/test_gevent_handler.py2
-rw-r--r--kazoo/tests/test_hosts.py53
-rw-r--r--kazoo/tests/test_interrupt.py10
-rw-r--r--kazoo/tests/test_lease.py280
-rw-r--r--kazoo/tests/test_lock.py11
-rw-r--r--kazoo/tests/test_partitioner.py27
-rw-r--r--kazoo/tests/test_party.py16
-rw-r--r--kazoo/tests/test_paths.py89
-rw-r--r--kazoo/tests/test_retry.py7
-rw-r--r--kazoo/tests/test_sasl.py6
-rw-r--r--kazoo/tests/test_security.py40
-rw-r--r--kazoo/tests/test_selectors_select.py23
-rw-r--r--kazoo/tests/test_threading_handler.py48
-rw-r--r--kazoo/tests/test_utils.py38
-rw-r--r--kazoo/tests/test_watchers.py130
-rw-r--r--kazoo/tests/util.py62
-rw-r--r--kazoo/version.py2
54 files changed, 1976 insertions, 1408 deletions
diff --git a/docs/conf.py b/docs/conf.py
index 715ebd7..fb1d598 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -21,43 +21,44 @@ class Mock(object):
def __getattr__(self, name):
return Mock
-MOCK_MODULES = ['zookeeper']
+
+MOCK_MODULES = ["zookeeper"]
for mod_name in MOCK_MODULES:
sys.modules[mod_name] = Mock()
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
-sys.path.insert(0, os.path.abspath('..'))
+sys.path.insert(0, os.path.abspath(".."))
# -- General configuration -----------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
-#needs_sphinx = '1.0'
+# needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be extensions
# coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = [
- 'sphinx.ext.autodoc',
- 'sphinx.ext.doctest',
- 'sphinx.ext.viewcode',
+ "sphinx.ext.autodoc",
+ "sphinx.ext.doctest",
+ "sphinx.ext.viewcode",
]
# Add any paths that contain templates here, relative to this directory.
-templates_path = ['_templates']
+templates_path = ["_templates"]
# The suffix of source filenames.
-source_suffix = '.rst'
+source_suffix = ".rst"
# The encoding of source files.
-#source_encoding = 'utf-8-sig'
+# source_encoding = 'utf-8-sig'
# The master toctree document.
-master_doc = 'index'
+master_doc = "index"
# General information about the project.
-project = u'kazoo'
-copyright = u'2011-2014, Kazoo team'
+project = "kazoo"
+copyright = "2011-2014, Kazoo team"
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
@@ -65,75 +66,76 @@ copyright = u'2011-2014, Kazoo team'
#
# The full version, including alpha/beta/rc tags.
from kazoo import version
+
release = version.__version__
# The short X.Y version.
-version = '.'.join(release.split('.')[:-1])
+version = ".".join(release.split(".")[:-1])
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
-#language = None
+# language = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
-#today = ''
+# today = ''
# Else, today_fmt is used as the format for a strftime call.
-#today_fmt = '%B %d, %Y'
+# today_fmt = '%B %d, %Y'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
-exclude_patterns = ['_build']
+exclude_patterns = ["_build"]
# The reST default role (used for this markup: `text`) to use for all documents.
-#default_role = None
+# default_role = None
# If true, '()' will be appended to :func: etc. cross-reference text.
-#add_function_parentheses = True
+# add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
-#add_module_names = True
+# add_module_names = True
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
-#show_authors = False
+# show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
-pygments_style = 'sphinx'
+pygments_style = "sphinx"
# A list of ignored prefixes for module index sorting.
-#modindex_common_prefix = []
+# modindex_common_prefix = []
# -- Options for HTML output ---------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
-html_theme = 'default'
+html_theme = "default"
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
-#html_theme_options = {}
+# html_theme_options = {}
# Add any paths that contain custom themes here, relative to this directory.
-#html_theme_path = []
+# html_theme_path = []
# The name for this set of Sphinx documents. If None, it defaults to
# "<project> v<release> documentation".
-#html_title = None
+# html_title = None
# A shorter title for the navigation bar. Default is the same as html_title.
-#html_short_title = None
+# html_short_title = None
# The name of an image file (relative to this directory) to place at the top
# of the sidebar.
-#html_logo = None
+# html_logo = None
# The name of an image file (within the static path) to use as favicon of the
# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
# pixels large.
-#html_favicon = None
+# html_favicon = None
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
@@ -142,101 +144,95 @@ html_static_path = []
# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
# using the given strftime format.
-#html_last_updated_fmt = '%b %d, %Y'
+# html_last_updated_fmt = '%b %d, %Y'
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.
-#html_use_smartypants = True
+# html_use_smartypants = True
# Custom sidebar templates, maps document names to template names.
-#html_sidebars = {}
+# html_sidebars = {}
# Additional templates that should be rendered to pages, maps page names to
# template names.
-#html_additional_pages = {}
+# html_additional_pages = {}
# If false, no module index is generated.
-#html_domain_indices = True
+# html_domain_indices = True
# If false, no index is generated.
-#html_use_index = True
+# html_use_index = True
# If true, the index is split into individual pages for each letter.
-#html_split_index = False
+# html_split_index = False
# If true, links to the reST sources are added to the pages.
-#html_show_sourcelink = True
+# html_show_sourcelink = True
# If true, "Created using Sphinx" is shown in the HTML footer. Default is True.
-#html_show_sphinx = True
+# html_show_sphinx = True
# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True.
-#html_show_copyright = True
+# html_show_copyright = True
# If true, an OpenSearch description file will be output, and all pages will
# contain a <link> tag referring to it. The value of this option must be the
# base URL from which the finished HTML is served.
-#html_use_opensearch = ''
+# html_use_opensearch = ''
# This is the file name suffix for HTML files (e.g. ".xhtml").
-#html_file_suffix = None
+# html_file_suffix = None
# Output file base name for HTML help builder.
-htmlhelp_basename = 'kazoodoc'
+htmlhelp_basename = "kazoodoc"
# -- Options for LaTeX output --------------------------------------------------
latex_elements = {
-# The paper size ('letterpaper' or 'a4paper').
-#'papersize': 'letterpaper',
-
-# The font size ('10pt', '11pt' or '12pt').
-#'pointsize': '10pt',
-
-# Additional stuff for the LaTeX preamble.
-#'preamble': '',
+ # The paper size ('letterpaper' or 'a4paper').
+ #'papersize': 'letterpaper',
+ # The font size ('10pt', '11pt' or '12pt').
+ #'pointsize': '10pt',
+ # Additional stuff for the LaTeX preamble.
+ #'preamble': '',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author, documentclass [howto/manual]).
latex_documents = [
- ('index', 'kazoo.tex', u'kazoo Documentation',
- u'Various Authors', 'manual'),
+ ("index", "kazoo.tex", "kazoo Documentation", "Various Authors", "manual"),
]
# The name of an image file (relative to this directory) to place at the top of
# the title page.
-#latex_logo = None
+# latex_logo = None
# For "manual" documents, if this is true, then toplevel headings are parts,
# not chapters.
-#latex_use_parts = False
+# latex_use_parts = False
# If true, show page references after internal links.
-#latex_show_pagerefs = False
+# latex_show_pagerefs = False
# If true, show URL addresses after external links.
-#latex_show_urls = False
+# latex_show_urls = False
# Documents to append as an appendix to all manuals.
-#latex_appendices = []
+# latex_appendices = []
# If false, no module index is generated.
-#latex_domain_indices = True
+# latex_domain_indices = True
# -- Options for manual page output --------------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
-man_pages = [
- ('index', 'kazoo', u'kazoo Documentation',
- [u'Various Authors'], 1)
-]
+man_pages = [("index", "kazoo", "kazoo Documentation", ["Various Authors"], 1)]
# If true, show URL addresses after external links.
-#man_show_urls = False
+# man_show_urls = False
# -- Options for Texinfo output ------------------------------------------------
@@ -245,15 +241,22 @@ man_pages = [
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
- ('index', 'kazoo', u'kazoo Documentation', u'Various Authors',
- 'kazoo', 'One line description of project.', 'Miscellaneous'),
+ (
+ "index",
+ "kazoo",
+ "kazoo Documentation",
+ "Various Authors",
+ "kazoo",
+ "One line description of project.",
+ "Miscellaneous",
+ ),
]
# Documents to append as an appendix to all manuals.
-#texinfo_appendices = []
+# texinfo_appendices = []
# If false, no module index is generated.
-#texinfo_domain_indices = True
+# texinfo_domain_indices = True
# How to display URL addresses: 'footnote', 'no', or 'inline'.
-#texinfo_show_urls = 'footnote'
+# texinfo_show_urls = 'footnote'
diff --git a/kazoo/client.py b/kazoo/client.py
index 25baa68..bf1ec8f 100644
--- a/kazoo/client.py
+++ b/kazoo/client.py
@@ -42,14 +42,14 @@ from kazoo.protocol.serialization import (
Reconfig,
SetData,
Sync,
- Transaction
+ Transaction,
)
from kazoo.protocol.states import (
Callback,
EventType,
KazooState,
KeeperState,
- WatchedEvent
+ WatchedEvent,
)
from kazoo.retry import KazooRetry
from kazoo.security import ACL, OPEN_ACL_UNSAFE
@@ -72,10 +72,10 @@ bytes_types = (six.binary_type,)
CLOSED_STATES = (
KeeperState.EXPIRED_SESSION,
KeeperState.AUTH_FAILED,
- KeeperState.CLOSED
+ KeeperState.CLOSED,
)
-ENVI_VERSION = re.compile(r'([\d\.]*).*', re.DOTALL)
-ENVI_VERSION_KEY = 'zookeeper.version'
+ENVI_VERSION = re.compile(r"([\d\.]*).*", re.DOTALL)
+ENVI_VERSION_KEY = "zookeeper.version"
log = logging.getLogger(__name__)
@@ -87,10 +87,10 @@ _RETRY_COMPAT_DEFAULTS = dict(
)
_RETRY_COMPAT_MAPPING = dict(
- max_retries='max_tries',
- retry_delay='delay',
- retry_backoff='backoff',
- retry_max_delay='max_delay',
+ max_retries="max_tries",
+ retry_delay="delay",
+ retry_backoff="backoff",
+ retry_max_delay="max_delay",
)
@@ -104,13 +104,29 @@ class KazooClient(object):
:class:`~kazoo.protocol.states.WatchedEvent` instance.
"""
- def __init__(self, hosts='127.0.0.1:2181',
- timeout=10.0, client_id=None, handler=None,
- default_acl=None, auth_data=None, sasl_options=None,
- read_only=None, randomize_hosts=True, connection_retry=None,
- command_retry=None, logger=None, keyfile=None,
- keyfile_password=None, certfile=None, ca=None,
- use_ssl=False, verify_certs=True, **kwargs):
+
+ def __init__(
+ self,
+ hosts="127.0.0.1:2181",
+ timeout=10.0,
+ client_id=None,
+ handler=None,
+ default_acl=None,
+ auth_data=None,
+ sasl_options=None,
+ read_only=None,
+ randomize_hosts=True,
+ connection_retry=None,
+ command_retry=None,
+ logger=None,
+ keyfile=None,
+ keyfile_password=None,
+ certfile=None,
+ ca=None,
+ use_ssl=False,
+ verify_certs=True,
+ **kwargs,
+ ):
"""Create a :class:`KazooClient` instance. All time arguments
are in seconds.
@@ -212,8 +228,10 @@ class KazooClient(object):
# Record the handler strategy used
self.handler = handler if handler else SequentialThreadingHandler()
if inspect.isclass(self.handler):
- raise ConfigurationError("Handler must be an instance of a class, "
- "not the class: %s" % self.handler)
+ raise ConfigurationError(
+ "Handler must be an instance of a class, "
+ "not the class: %s" % self.handler
+ )
self.auth_data = auth_data if auth_data else set([])
self.default_acl = default_acl
@@ -269,14 +287,17 @@ class KazooClient(object):
if type(self._conn_retry) is KazooRetry:
if self.handler.sleep_func != self._conn_retry.sleep_func:
- raise ConfigurationError("Retry handler and event handler "
- " must use the same sleep func")
+ raise ConfigurationError(
+ "Retry handler and event handler "
+ " must use the same sleep func"
+ )
if type(self.retry) is KazooRetry:
if self.handler.sleep_func != self.retry.sleep_func:
raise ConfigurationError(
"Command retry handler and event handler "
- "must use the same sleep func")
+ "must use the same sleep func"
+ )
if self.retry is None or self._conn_retry is None:
old_retry_keys = dict(_RETRY_COMPAT_DEFAULTS)
@@ -284,11 +305,13 @@ class KazooClient(object):
try:
old_retry_keys[key] = kwargs.pop(key)
warnings.warn(
- 'Passing retry configuration param %s to the '
- 'client directly is deprecated, please pass a '
- 'configured retry object (using param %s)' % (
- key, _RETRY_COMPAT_MAPPING[key]),
- DeprecationWarning, stacklevel=2)
+ "Passing retry configuration param %s to the "
+ "client directly is deprecated, please pass a "
+ "configured retry object (using param %s)"
+ % (key, _RETRY_COMPAT_MAPPING[key]),
+ DeprecationWarning,
+ stacklevel=2,
+ )
except KeyError:
pass
@@ -298,46 +321,53 @@ class KazooClient(object):
if self._conn_retry is None:
self._conn_retry = KazooRetry(
- sleep_func=self.handler.sleep_func,
- **retry_keys)
+ sleep_func=self.handler.sleep_func, **retry_keys
+ )
if self.retry is None:
self.retry = KazooRetry(
- sleep_func=self.handler.sleep_func,
- **retry_keys)
+ sleep_func=self.handler.sleep_func, **retry_keys
+ )
# Managing legacy SASL options
for scheme, auth in self.auth_data:
- if scheme != 'sasl':
+ if scheme != "sasl":
continue
if sasl_options:
raise ConfigurationError(
- 'Multiple SASL configurations provided'
+ "Multiple SASL configurations provided"
)
warnings.warn(
- 'Passing SASL configuration as part of the auth_data is '
- 'deprecated, please use the sasl_options configuration '
- 'instead', DeprecationWarning, stacklevel=2
+ "Passing SASL configuration as part of the auth_data is "
+ "deprecated, please use the sasl_options configuration "
+ "instead",
+ DeprecationWarning,
+ stacklevel=2,
)
- username, password = auth.split(':')
+ username, password = auth.split(":")
# Generate an equivalent SASL configuration
sasl_options = {
- 'username': username,
- 'password': password,
- 'mechanism': 'DIGEST-MD5',
- 'service': 'zookeeper',
- 'principal': 'zk-sasl-md5',
+ "username": username,
+ "password": password,
+ "mechanism": "DIGEST-MD5",
+ "service": "zookeeper",
+ "principal": "zk-sasl-md5",
}
# Cleanup
- self.auth_data = set([
- (scheme, auth)
- for scheme, auth in self.auth_data
- if scheme != 'sasl'
- ])
+ self.auth_data = set(
+ [
+ (scheme, auth)
+ for scheme, auth in self.auth_data
+ if scheme != "sasl"
+ ]
+ )
self._conn_retry.interrupt = lambda: self._stopped.is_set()
self._connection = ConnectionHandler(
- self, self._conn_retry.copy(), logger=self.logger,
- sasl_options=sasl_options)
+ self,
+ self._conn_retry.copy(),
+ logger=self.logger,
+ sasl_options=sasl_options,
+ )
# Every retry call should have its own copy of the retry helper
# to avoid shared retry counts
@@ -345,6 +375,7 @@ class KazooClient(object):
def _retry(*args, **kwargs):
return self._retry.copy()(*args, **kwargs)
+
self.retry = _retry
self.Barrier = partial(Barrier, self)
@@ -367,8 +398,10 @@ class KazooClient(object):
# If we got any unhandled keywords, complain like Python would
if kwargs:
- raise TypeError('__init__() got unexpected keyword arguments: %s'
- % (kwargs.keys(),))
+ raise TypeError(
+ "__init__() got unexpected keyword arguments: %s"
+ % (kwargs.keys(),)
+ )
def _reset(self):
"""Resets a variety of client states for a new connection."""
@@ -397,7 +430,7 @@ class KazooClient(object):
def _reset_session(self):
self._session_id = None
- self._session_passwd = b'\x00' * 16
+ self._session_passwd = b"\x00" * 16
@property
def client_state(self):
@@ -429,7 +462,7 @@ class KazooClient(object):
return self._live.is_set()
def set_hosts(self, hosts, randomize_hosts=None):
- """ sets the list of hosts used by this client.
+ """sets the list of hosts used by this client.
This function accepts the same format hosts parameter as the init
function and sets the client to use the new hosts the next time it
@@ -463,11 +496,12 @@ class KazooClient(object):
if chroot:
new_chroot = normpath(chroot)
else:
- new_chroot = ''
+ new_chroot = ""
if self.chroot is not None and new_chroot != self.chroot:
- raise ConfigurationError("Changing chroot at runtime is not "
- "currently supported")
+ raise ConfigurationError(
+ "Changing chroot at runtime is not " "currently supported"
+ )
self.chroot = new_chroot
@@ -529,8 +563,9 @@ class KazooClient(object):
return
if state in (KeeperState.CONNECTED, KeeperState.CONNECTED_RO):
- self.logger.info("Zookeeper connection established, "
- "state: %s", state)
+ self.logger.info(
+ "Zookeeper connection established, " "state: %s", state
+ )
self._live.set()
self._make_state_change(KazooState.CONNECTED)
elif state in CLOSED_STATES:
@@ -581,7 +616,8 @@ class KazooClient(object):
if not self._connection.stop(timeout):
raise WriterNotClosedException(
"Writer still open from prior connection "
- "and wouldn't close after %s seconds" % timeout)
+ "and wouldn't close after %s seconds" % timeout
+ )
def _call(self, request, async_object):
"""Ensure the client is in CONNECTED or SUSPENDED state and put the
@@ -596,8 +632,9 @@ class KazooClient(object):
async_object.set_exception(AuthFailedError())
return False
elif self._state == KeeperState.CLOSED:
- async_object.set_exception(ConnectionClosedError(
- "Connection has been closed"))
+ async_object.set_exception(
+ ConnectionClosedError("Connection has been closed")
+ )
return False
elif self._state == KeeperState.EXPIRED_SESSION:
async_object.set_exception(SessionExpiredError())
@@ -608,13 +645,15 @@ class KazooClient(object):
# wake the connection, guarding against a race with close()
write_sock = self._connection._write_sock
if write_sock is None:
- async_object.set_exception(ConnectionClosedError(
- "Connection has been closed"))
+ async_object.set_exception(
+ ConnectionClosedError("Connection has been closed")
+ )
try:
- write_sock.send(b'\0')
+ write_sock.send(b"\0")
except: # NOQA
- async_object.set_exception(ConnectionClosedError(
- "Connection has been closed"))
+ async_object.set_exception(
+ ConnectionClosedError("Connection has been closed")
+ )
def start(self, timeout=15):
"""Initiate connection to ZK.
@@ -635,8 +674,10 @@ class KazooClient(object):
raise self.handler.timeout_exception("Connection time-out")
if self.chroot and not self.exists("/"):
- warnings.warn("No chroot path exists, the chroot path "
- "should be created before normal use.")
+ warnings.warn(
+ "No chroot path exists, the chroot path "
+ "should be created before normal use."
+ )
def start_async(self):
"""Asynchronously initiate connection to ZK.
@@ -683,7 +724,7 @@ class KazooClient(object):
self._stopped.set()
self._queue.append((CloseInstance, None))
try:
- self._connection._write_sock.send(b'\0')
+ self._connection._write_sock.send(b"\0")
finally:
self._safe_close()
@@ -702,7 +743,7 @@ class KazooClient(object):
"""
self._connection.close()
- def command(self, cmd=b'ruok'):
+ def command(self, cmd=b"ruok"):
"""Sent a management command to the current ZK server.
Examples are `ruok`, `envi` or `stat`.
@@ -723,7 +764,8 @@ class KazooClient(object):
peer = self._connection._socket.getpeername()[:2]
sock = self.handler.create_connection(
- peer, timeout=self._session_timeout / 1000.0,
+ peer,
+ timeout=self._session_timeout / 1000.0,
use_ssl=self.use_ssl,
ca=self.ca,
certfile=self.certfile,
@@ -734,7 +776,7 @@ class KazooClient(object):
sock.sendall(cmd)
result = sock.recv(8192)
sock.close()
- return result.decode('utf-8', 'replace')
+ return result.decode("utf-8", "replace")
def server_version(self, retries=3):
"""Get the version of the currently connected ZK server.
@@ -745,8 +787,9 @@ class KazooClient(object):
.. versionadded:: 0.5
"""
+
def _try_fetch():
- data = self.command(b'envi')
+ data = self.command(b"envi")
data_parsed = {}
for line in data.splitlines():
try:
@@ -758,10 +801,10 @@ class KazooClient(object):
else:
if k:
data_parsed[k] = v
- version = data_parsed.get(ENVI_VERSION_KEY, '')
+ version = data_parsed.get(ENVI_VERSION_KEY, "")
version_digits = ENVI_VERSION.match(version).group(1)
try:
- return tuple([int(d) for d in version_digits.split('.')])
+ return tuple([int(d) for d in version_digits.split(".")])
except ValueError:
return None
@@ -782,9 +825,10 @@ class KazooClient(object):
version = _try_fetch()
if _is_valid(version):
return version
- raise KazooException("Unable to fetch useable server"
- " version after trying %s times"
- % (1 + max(0, retries)))
+ raise KazooException(
+ "Unable to fetch useable server"
+ " version after trying %s times" % (1 + max(0, retries))
+ )
def add_auth(self, scheme, credential):
"""Send credentials to server.
@@ -829,7 +873,7 @@ class KazooClient(object):
if self.chroot == path:
return "/"
if path.startswith(self.chroot):
- return path[len(self.chroot):]
+ return path[len(self.chroot) :]
else:
return path
@@ -847,10 +891,7 @@ class KazooClient(object):
def _do_sync():
result = self.handler.async_result()
- self._call(
- Sync(_prefix_root(self.chroot, path)),
- result
- )
+ self._call(Sync(_prefix_root(self.chroot, path)), result)
result.rawlink(_sync_completion)
_do_sync()
@@ -872,8 +913,16 @@ class KazooClient(object):
"""
return self.sync_async(path).get()
- def create(self, path, value=b"", acl=None, ephemeral=False,
- sequence=False, makepath=False, include_data=False):
+ def create(
+ self,
+ path,
+ value=b"",
+ acl=None,
+ ephemeral=False,
+ sequence=False,
+ makepath=False,
+ include_data=False,
+ ):
"""Create a node with the given value as its data. Optionally
set an ACL on the node.
@@ -953,12 +1002,25 @@ class KazooClient(object):
"""
acl = acl or self.default_acl
return self.create_async(
- path, value, acl=acl, ephemeral=ephemeral,
- sequence=sequence, makepath=makepath, include_data=include_data
+ path,
+ value,
+ acl=acl,
+ ephemeral=ephemeral,
+ sequence=sequence,
+ makepath=makepath,
+ include_data=include_data,
).get()
- def create_async(self, path, value=b"", acl=None, ephemeral=False,
- sequence=False, makepath=False, include_data=False):
+ def create_async(
+ self,
+ path,
+ value=b"",
+ acl=None,
+ ephemeral=False,
+ sequence=False,
+ makepath=False,
+ include_data=False,
+ ):
"""Asynchronously create a ZNode. Takes the same arguments as
:meth:`create`.
@@ -974,10 +1036,12 @@ class KazooClient(object):
if not isinstance(path, string_types):
raise TypeError("Invalid type for 'path' (string expected)")
- if acl and (isinstance(acl, ACL) or
- not isinstance(acl, (tuple, list))):
- raise TypeError("Invalid type for 'acl' (acl must be a tuple/list"
- " of ACL's")
+ if acl and (
+ isinstance(acl, ACL) or not isinstance(acl, (tuple, list))
+ ):
+ raise TypeError(
+ "Invalid type for 'acl' (acl must be a tuple/list" " of ACL's"
+ )
if value is not None and not isinstance(value, bytes_types):
raise TypeError("Invalid type for 'value' (must be a byte string)")
if not isinstance(ephemeral, bool):
@@ -1002,8 +1066,12 @@ class KazooClient(object):
@capture_exceptions(async_result)
def do_create():
result = self._create_async_inner(
- path, value, acl, flags,
- trailing=sequence, include_data=include_data
+ path,
+ value,
+ acl,
+ flags,
+ trailing=sequence,
+ include_data=include_data,
)
result.rawlink(create_completion)
@@ -1023,8 +1091,8 @@ class KazooClient(object):
except NoNodeError:
if not makepath:
raise
- if sequence and path.endswith('/'):
- parent = path.rstrip('/')
+ if sequence and path.endswith("/"):
+ parent = path.rstrip("/")
else:
parent, _ = split(path)
self.ensure_path_async(parent, acl).rawlink(retry_completion)
@@ -1032,8 +1100,9 @@ class KazooClient(object):
do_create()
return async_result
- def _create_async_inner(self, path, value, acl, flags,
- trailing=False, include_data=False):
+ def _create_async_inner(
+ self, path, value, acl, flags, trailing=False, include_data=False
+ ):
async_result = self.handler.async_result()
if include_data:
opcode = Create2
@@ -1041,8 +1110,14 @@ class KazooClient(object):
opcode = Create
call_result = self._call(
- opcode(_prefix_root(self.chroot, path, trailing=trailing),
- value, acl, flags), async_result)
+ opcode(
+ _prefix_root(self.chroot, path, trailing=trailing),
+ value,
+ acl,
+ flags,
+ ),
+ async_result,
+ )
if call_result is False:
# We hit a short-circuit exit on the _call. Because we are
# not using the original async_result here, we bubble the
@@ -1092,7 +1167,8 @@ class KazooClient(object):
parent, node = split(path)
if node:
self.ensure_path_async(parent, acl=acl).rawlink(
- partial(prepare_completion, path))
+ partial(prepare_completion, path)
+ )
else:
self.create_async(path, acl=acl).rawlink(create_completion)
@@ -1135,8 +1211,9 @@ class KazooClient(object):
raise TypeError("Invalid type for 'watch' (must be a callable)")
async_result = self.handler.async_result()
- self._call(Exists(_prefix_root(self.chroot, path), watch),
- async_result)
+ self._call(
+ Exists(_prefix_root(self.chroot, path), watch), async_result
+ )
return async_result
def get(self, path, watch=None):
@@ -1177,8 +1254,9 @@ class KazooClient(object):
raise TypeError("Invalid type for 'watch' (must be a callable)")
async_result = self.handler.async_result()
- self._call(GetData(_prefix_root(self.chroot, path), watch),
- async_result)
+ self._call(
+ GetData(_prefix_root(self.chroot, path), watch), async_result
+ )
return async_result
def get_children(self, path, watch=None, include_data=False):
@@ -1215,8 +1293,9 @@ class KazooClient(object):
The `include_data` option.
"""
- return self.get_children_async(path, watch=watch,
- include_data=include_data).get()
+ return self.get_children_async(
+ path, watch=watch, include_data=include_data
+ ).get()
def get_children_async(self, path, watch=None, include_data=False):
"""Asynchronously get a list of child nodes of a path. Takes
@@ -1313,14 +1392,17 @@ class KazooClient(object):
if not isinstance(path, string_types):
raise TypeError("Invalid type for 'path' (string expected)")
if isinstance(acls, ACL) or not isinstance(acls, (tuple, list)):
- raise TypeError("Invalid type for 'acl' (acl must be a tuple/list"
- " of ACL's)")
+ raise TypeError(
+ "Invalid type for 'acl' (acl must be a tuple/list" " of ACL's)"
+ )
if not isinstance(version, int):
raise TypeError("Invalid type for 'version' (int expected)")
async_result = self.handler.async_result()
- self._call(SetACL(_prefix_root(self.chroot, path), acls, version),
- async_result)
+ self._call(
+ SetACL(_prefix_root(self.chroot, path), acls, version),
+ async_result,
+ )
return async_result
def set(self, path, value, version=-1):
@@ -1373,8 +1455,10 @@ class KazooClient(object):
raise TypeError("Invalid type for 'version' (int expected)")
async_result = self.handler.async_result()
- self._call(SetData(_prefix_root(self.chroot, path), value, version),
- async_result)
+ self._call(
+ SetData(_prefix_root(self.chroot, path), value, version),
+ async_result,
+ )
return async_result
def transaction(self):
@@ -1444,8 +1528,9 @@ class KazooClient(object):
if not isinstance(version, int):
raise TypeError("Invalid type for 'version' (int expected)")
async_result = self.handler.async_result()
- self._call(Delete(_prefix_root(self.chroot, path), version),
- async_result)
+ self._call(
+ Delete(_prefix_root(self.chroot, path), version), async_result
+ )
return async_result
def _delete_recursive(self, path):
@@ -1535,8 +1620,9 @@ class KazooClient(object):
returns a non-zero error code.
"""
- result = self.reconfig_async(joining, leaving, new_members,
- from_config)
+ result = self.reconfig_async(
+ joining, leaving, new_members, from_config
+ )
return result.get()
def reconfig_async(self, joining, leaving, new_members, from_config):
@@ -1551,8 +1637,9 @@ class KazooClient(object):
if leaving and not isinstance(leaving, string_types):
raise TypeError("Invalid type for 'leaving' (string expected)")
if new_members and not isinstance(new_members, string_types):
- raise TypeError("Invalid type for 'new_members' (string "
- "expected)")
+ raise TypeError(
+ "Invalid type for 'new_members' (string " "expected)"
+ )
if not isinstance(from_config, int):
raise TypeError("Invalid type for 'from_config' (int expected)")
@@ -1584,13 +1671,15 @@ class TransactionRequest(object):
Requires Zookeeper 3.4+
"""
+
def __init__(self, client):
self.client = client
self.operations = []
self.committed = False
- def create(self, path, value=b"", acl=None, ephemeral=False,
- sequence=False):
+ def create(
+ self, path, value=b"", acl=None, ephemeral=False, sequence=False
+ ):
"""Add a create ZNode to the transaction. Takes the same
arguments as :meth:`KazooClient.create`, with the exception
of `makepath`.
@@ -1604,8 +1693,9 @@ class TransactionRequest(object):
if not isinstance(path, string_types):
raise TypeError("Invalid type for 'path' (string expected)")
if acl and not isinstance(acl, (tuple, list)):
- raise TypeError("Invalid type for 'acl' (acl must be a tuple/list"
- " of ACL's")
+ raise TypeError(
+ "Invalid type for 'acl' (acl must be a tuple/list" " of ACL's"
+ )
if not isinstance(value, bytes_types):
raise TypeError("Invalid type for 'value' (must be a byte string)")
if not isinstance(ephemeral, bool):
@@ -1621,8 +1711,10 @@ class TransactionRequest(object):
if acl is None:
acl = OPEN_ACL_UNSAFE
- self._add(Create(_prefix_root(self.client.chroot, path), value, acl,
- flags), None)
+ self._add(
+ Create(_prefix_root(self.client.chroot, path), value, acl, flags),
+ None,
+ )
def delete(self, path, version=-1):
"""Add a delete ZNode to the transaction. Takes the same
@@ -1647,8 +1739,9 @@ class TransactionRequest(object):
raise TypeError("Invalid type for 'value' (must be a byte string)")
if not isinstance(version, int):
raise TypeError("Invalid type for 'version' (int expected)")
- self._add(SetData(_prefix_root(self.client.chroot, path), value,
- version))
+ self._add(
+ SetData(_prefix_root(self.client.chroot, path), value, version)
+ )
def check(self, path, version):
"""Add a Check Version to the transaction.
@@ -1661,8 +1754,9 @@ class TransactionRequest(object):
raise TypeError("Invalid type for 'path' (string expected)")
if not isinstance(version, int):
raise TypeError("Invalid type for 'version' (int expected)")
- self._add(CheckVersion(_prefix_root(self.client.chroot, path),
- version))
+ self._add(
+ CheckVersion(_prefix_root(self.client.chroot, path), version)
+ )
def commit_async(self):
"""Commit the transaction asynchronously.
@@ -1695,9 +1789,9 @@ class TransactionRequest(object):
def _check_tx_state(self):
if self.committed:
- raise ValueError('Transaction already committed')
+ raise ValueError("Transaction already committed")
def _add(self, request, post_processor=None):
self._check_tx_state()
- self.client.logger.log(BLATHER, 'Added %r to %r', request, self)
+ self.client.logger.log(BLATHER, "Added %r to %r", request, self)
self.operations.append(request)
diff --git a/kazoo/exceptions.py b/kazoo/exceptions.py
index 69f959e..1307463 100644
--- a/kazoo/exceptions.py
+++ b/kazoo/exceptions.py
@@ -51,7 +51,7 @@ class SASLException(KazooException):
def _invalid_error_code():
- raise RuntimeError('Invalid error code')
+ raise RuntimeError("Invalid error code")
EXCEPTIONS = defaultdict(_invalid_error_code)
diff --git a/kazoo/handlers/eventlet.py b/kazoo/handlers/eventlet.py
index 3f67ccd..ebf9898 100644
--- a/kazoo/handlers/eventlet.py
+++ b/kazoo/handlers/eventlet.py
@@ -43,9 +43,9 @@ class AsyncResult(utils.AsyncResult):
"""A one-time event that stores a value or an exception"""
def __init__(self, handler):
- super(AsyncResult, self).__init__(handler,
- green_threading.Condition,
- TimeoutError)
+ super(AsyncResult, self).__init__(
+ handler, green_threading.Condition, TimeoutError
+ )
class SequentialEventletHandler(object):
@@ -76,6 +76,7 @@ class SequentialEventletHandler(object):
returns.
"""
+
name = "sequential_eventlet_handler"
queue_impl = green_queue.LightQueue
queue_empty = green_queue.Empty
@@ -106,8 +107,10 @@ class SequentialEventletHandler(object):
with _yield_before_after():
cb()
except Exception:
- LOG.warning("Exception in worker completion queue greenlet",
- exc_info=True)
+ LOG.warning(
+ "Exception in worker completion queue greenlet",
+ exc_info=True,
+ )
finally:
del cb # release before possible idle
@@ -120,8 +123,10 @@ class SequentialEventletHandler(object):
with _yield_before_after():
cb()
except Exception:
- LOG.warning("Exception in worker callback queue greenlet",
- exc_info=True)
+ LOG.warning(
+ "Exception in worker callback queue greenlet",
+ exc_info=True,
+ )
finally:
del cb # release before possible idle
@@ -165,8 +170,9 @@ class SequentialEventletHandler(object):
def select(self, *args, **kwargs):
with _yield_before_after():
- return selector_select(*args, selectors_module=green_selectors,
- **kwargs)
+ return selector_select(
+ *args, selectors_module=green_selectors, **kwargs
+ )
def async_result(self):
return AsyncResult(self)
diff --git a/kazoo/handlers/gevent.py b/kazoo/handlers/gevent.py
index 7f2948e..1c141bc 100644
--- a/kazoo/handlers/gevent.py
+++ b/kazoo/handlers/gevent.py
@@ -21,7 +21,7 @@ except ImportError:
from kazoo.handlers import utils
from kazoo import python2atexit
-_using_libevent = gevent.__version__.startswith('0.')
+_using_libevent = gevent.__version__.startswith("0.")
log = logging.getLogger(__name__)
@@ -50,6 +50,7 @@ class SequentialGeventHandler(object):
proceed.
"""
+
name = "sequential_gevent_handler"
queue_impl = gevent.queue.Queue
queue_empty = gevent.queue.Empty
@@ -126,8 +127,9 @@ class SequentialGeventHandler(object):
python2atexit.unregister(self.stop)
def select(self, *args, **kwargs):
- return selector_select(*args, selectors_module=gevent.selectors,
- **kwargs)
+ return selector_select(
+ *args, selectors_module=gevent.selectors, **kwargs
+ )
def socket(self, *args, **kwargs):
return utils.create_tcp_socket(socket)
diff --git a/kazoo/handlers/threading.py b/kazoo/handlers/threading.py
index 2389f33..210c31e 100644
--- a/kazoo/handlers/threading.py
+++ b/kazoo/handlers/threading.py
@@ -61,9 +61,9 @@ class AsyncResult(utils.AsyncResult):
"""A one-time event that stores a value or an exception"""
def __init__(self, handler):
- super(AsyncResult, self).__init__(handler,
- threading.Condition,
- KazooTimeoutError)
+ super(AsyncResult, self).__init__(
+ handler, threading.Condition, KazooTimeoutError
+ )
class SequentialThreadingHandler(object):
@@ -94,6 +94,7 @@ class SequentialThreadingHandler(object):
returns.
"""
+
name = "sequential_threading_handler"
timeout_exception = KazooTimeoutError
sleep_func = staticmethod(time.sleep)
diff --git a/kazoo/handlers/utils.py b/kazoo/handlers/utils.py
index 9647a24..2717059 100644
--- a/kazoo/handlers/utils.py
+++ b/kazoo/handlers/utils.py
@@ -133,7 +133,8 @@ class AsyncResult(object):
for callback in self._callbacks:
if self._handler.running:
self._handler.completion_queue.put(
- functools.partial(callback, self))
+ functools.partial(callback, self)
+ )
else:
functools.partial(callback, self)()
@@ -156,7 +157,7 @@ def create_socket_pair(module, port=0):
If socket.socketpair isn't available, we emulate it.
"""
# See if socketpair() is available.
- have_socketpair = hasattr(module, 'socketpair')
+ have_socketpair = hasattr(module, "socketpair")
if have_socketpair:
client_sock, srv_sock = module.socketpair()
return client_sock, srv_sock
@@ -164,7 +165,7 @@ def create_socket_pair(module, port=0):
# Create a non-blocking temporary server socket
temp_srv_sock = module.socket()
temp_srv_sock.setblocking(False)
- temp_srv_sock.bind(('', port))
+ temp_srv_sock.bind(("", port))
port = temp_srv_sock.getsockname()[1]
temp_srv_sock.listen(1)
@@ -172,7 +173,7 @@ def create_socket_pair(module, port=0):
client_sock = module.socket()
client_sock.setblocking(False)
try:
- client_sock.connect(('localhost', port))
+ client_sock.connect(("localhost", port))
except module.error as err:
# EWOULDBLOCK is not an error, as the socket is non-blocking
if err.errno != errno.EWOULDBLOCK:
@@ -182,17 +183,17 @@ def create_socket_pair(module, port=0):
timeout = 1
readable = select.select([temp_srv_sock], [], [], timeout)[0]
if temp_srv_sock not in readable:
- raise Exception('Client socket not connected in %s'
- ' second(s)' % (timeout))
+ raise Exception(
+ "Client socket not connected in %s" " second(s)" % (timeout)
+ )
srv_sock, _ = temp_srv_sock.accept()
return client_sock, srv_sock
def create_tcp_socket(module):
- """Create a TCP socket with the CLOEXEC flag set.
- """
+ """Create a TCP socket with the CLOEXEC flag set."""
type_ = module.SOCK_STREAM
- if hasattr(module, 'SOCK_CLOEXEC'): # pragma: nocover
+ if hasattr(module, "SOCK_CLOEXEC"): # pragma: nocover
# if available, set cloexec flag during socket creation
type_ |= module.SOCK_CLOEXEC
sock = module.socket(module.AF_INET, type_)
@@ -200,10 +201,19 @@ def create_tcp_socket(module):
return sock
-def create_tcp_connection(module, address, timeout=None,
- use_ssl=False, ca=None, certfile=None,
- keyfile=None, keyfile_password=None,
- verify_certs=True, options=None, ciphers=None):
+def create_tcp_connection(
+ module,
+ address,
+ timeout=None,
+ use_ssl=False,
+ ca=None,
+ certfile=None,
+ keyfile=None,
+ keyfile_password=None,
+ verify_certs=True,
+ options=None,
+ ciphers=None,
+):
end = None
if timeout is None:
# thanks to create_connection() developers for
@@ -244,13 +254,16 @@ def create_tcp_connection(module, address, timeout=None,
context.verify_mode = (
ssl.CERT_REQUIRED if verify_certs else ssl.CERT_NONE
)
- context.load_cert_chain(certfile=certfile,
- keyfile=keyfile,
- password=keyfile_password)
+ context.load_cert_chain(
+ certfile=certfile,
+ keyfile=keyfile,
+ password=keyfile_password,
+ )
try:
# Query the address to get back it's address family
- addrs = socket.getaddrinfo(address[0], address[1], 0,
- socket.SOCK_STREAM)
+ addrs = socket.getaddrinfo(
+ address[0], address[1], 0, socket.SOCK_STREAM
+ )
conn = context.wrap_socket(module.socket(addrs[0][0]))
conn.settimeout(timeout_at)
conn.connect(address)
@@ -338,30 +351,34 @@ def fileobj_to_fd(fileobj):
try:
fd = int(fileobj.fileno())
except (AttributeError, TypeError, ValueError):
- raise TypeError("Invalid file object: "
- "{!r}".format(fileobj))
+ raise TypeError("Invalid file object: " "{!r}".format(fileobj))
if fd < 0:
raise TypeError("Invalid file descriptor: {}".format(fd))
os.fstat(fd)
return fd
-def selector_select(rlist, wlist, xlist, timeout=None,
- selectors_module=selectors):
+def selector_select(
+ rlist, wlist, xlist, timeout=None, selectors_module=selectors
+):
"""Selector-based drop-in replacement for select to overcome select
limitation on a maximum filehandle value.
Need backport selectors2 package in python 2.
"""
if timeout is not None:
- if not (isinstance(timeout, six.integer_types) or isinstance(
- timeout, float)):
- raise TypeError('timeout must be a number')
+ if not (
+ isinstance(timeout, six.integer_types)
+ or isinstance(timeout, float)
+ ):
+ raise TypeError("timeout must be a number")
if timeout < 0:
- raise ValueError('timeout must be non-negative')
+ raise ValueError("timeout must be non-negative")
- events_mapping = {selectors_module.EVENT_READ: rlist,
- selectors_module.EVENT_WRITE: wlist}
+ events_mapping = {
+ selectors_module.EVENT_READ: rlist,
+ selectors_module.EVENT_WRITE: wlist,
+ }
fd_events = defaultdict(int)
fd_fileobjs = defaultdict(list)
diff --git a/kazoo/hosts.py b/kazoo/hosts.py
index 04643dc..aa18805 100644
--- a/kazoo/hosts.py
+++ b/kazoo/hosts.py
@@ -3,11 +3,11 @@ from six.moves import urllib_parse
def collect_hosts(hosts):
"""
- Collect a set of hosts and an optional chroot from
- a string or a list of strings.
+ Collect a set of hosts and an optional chroot from
+ a string or a list of strings.
"""
if isinstance(hosts, list):
- if hosts[-1].strip().startswith('/'):
+ if hosts[-1].strip().startswith("/"):
host_ports, chroot = hosts[:-1], hosts[-1]
else:
host_ports, chroot = hosts, None
diff --git a/kazoo/protocol/connection.py b/kazoo/protocol/connection.py
index 726f645..e1bd996 100644
--- a/kazoo/protocol/connection.py
+++ b/kazoo/protocol/connection.py
@@ -50,6 +50,7 @@ from kazoo.retry import (
try:
import puresasl
import puresasl.client
+
PURESASL_AVAILABLE = True
except ImportError:
PURESASL_AVAILABLE = False
@@ -76,12 +77,14 @@ AUTH_XID = -4
CLOSE_RESPONSE = Close.type
-if sys.version_info > (3, ): # pragma: nocover
+if sys.version_info > (3,): # pragma: nocover
+
def buffer(obj, offset=0):
return memoryview(obj)[offset:]
advance_iterator = next
else: # pragma: nocover
+
def advance_iterator(it):
return it.next()
@@ -99,6 +102,7 @@ class RWPinger(object):
the iterator will yield False if called too soon.
"""
+
def __init__(self, hosts, connection_func, socket_handling):
self.hosts = hosts
self.connection = connection_func
@@ -126,7 +130,7 @@ class RWPinger(object):
sock.sendall(b"isro")
result = sock.recv(8192)
sock.close()
- if result == b'rw':
+ if result == b"rw":
return (host, port)
else:
return False
@@ -145,6 +149,7 @@ class RWServerAvailable(Exception):
class ConnectionHandler(object):
"""Zookeeper connection handler"""
+
def __init__(self, client, retry_sleeper, logger=None, sasl_options=None):
self.client = client
self.handler = client.handler
@@ -178,7 +183,7 @@ class ConnectionHandler(object):
try:
yield
except (socket.error, select.error) as e:
- err = getattr(e, 'strerror', e)
+ err = getattr(e, "strerror", e)
raise ConnectionDropped("socket connection error: %s" % (err,))
def start(self):
@@ -188,8 +193,9 @@ class ConnectionHandler(object):
self._read_sock, self._write_sock = rw_sockets
self.connection_closed.clear()
if self._connection_routine:
- raise Exception("Unable to start, connection routine already "
- "active.")
+ raise Exception(
+ "Unable to start, connection routine already " "active."
+ )
self._connection_routine = self.handler.spawn(self.zk_loop)
def stop(self, timeout=None):
@@ -218,8 +224,11 @@ class ConnectionHandler(object):
def _server_pinger(self):
"""Returns a server pinger iterable, that will ping the next
server in the list, and apply a back-off between attempts."""
- return RWPinger(self.client.hosts, self.handler.create_connection,
- self._socket_error_handling)
+ return RWPinger(
+ self.client.hosts,
+ self.handler.create_connection,
+ self._socket_error_handling,
+ )
def _read_header(self, timeout):
b = self._read(4, timeout)
@@ -238,8 +247,10 @@ class ConnectionHandler(object):
# have anything to select, but the wrapped object may still
# have something to read as it has previously gotten enough
# data from the underlying socket.
- if (hasattr(self._socket, "pending") and
- self._socket.pending() > 0):
+ if (
+ hasattr(self._socket, "pending")
+ and self._socket.pending() > 0
+ ):
pass
else:
s = self.handler.select([self._socket], [], [], timeout)[0]
@@ -247,17 +258,20 @@ class ConnectionHandler(object):
# If the read list is empty, we got a timeout. We don't
# have to check wlist and xlist as we don't set any
raise self.handler.timeout_exception(
- "socket time-out during read")
+ "socket time-out during read"
+ )
try:
chunk = self._socket.recv(remaining)
except ssl.SSLError as e:
- if e.errno in (ssl.SSL_ERROR_WANT_READ,
- ssl.SSL_ERROR_WANT_WRITE):
+ if e.errno in (
+ ssl.SSL_ERROR_WANT_READ,
+ ssl.SSL_ERROR_WANT_WRITE,
+ ):
continue
else:
raise
- if chunk == b'':
- raise ConnectionDropped('socket connection broken')
+ if chunk == b"":
+ raise ConnectionDropped("socket connection broken")
msgparts.append(chunk)
remaining -= len(chunk)
return b"".join(msgparts)
@@ -270,14 +284,18 @@ class ConnectionHandler(object):
if xid:
header, buffer, offset = self._read_header(timeout)
if header.xid != xid:
- raise RuntimeError('xids do not match, expected %r '
- 'received %r', xid, header.xid)
+ raise RuntimeError(
+ "xids do not match, expected %r " "received %r",
+ xid,
+ header.xid,
+ )
if header.zxid > 0:
zxid = header.zxid
if header.err:
callback_exception = EXCEPTIONS[header.err]()
self.logger.debug(
- 'Received error(xid=%s) %r', xid, callback_exception)
+ "Received error(xid=%s) %r", xid, callback_exception
+ )
raise callback_exception
return zxid
@@ -285,17 +303,19 @@ class ConnectionHandler(object):
length = int_struct.unpack(msg)[0]
msg = self._read(length, timeout)
- if hasattr(request, 'deserialize'):
+ if hasattr(request, "deserialize"):
try:
obj, _ = request.deserialize(msg, 0)
except Exception:
self.logger.exception(
"Exception raised during deserialization "
- "of request: %s", request)
+ "of request: %s",
+ request,
+ )
# raise ConnectionDropped so connect loop will retry
- raise ConnectionDropped('invalid server response')
- self.logger.log(BLATHER, 'Read response %s', obj)
+ raise ConnectionDropped("invalid server response")
+ self.logger.log(BLATHER, "Read response %s", obj)
return obj, zxid
return zxid
@@ -311,7 +331,10 @@ class ConnectionHandler(object):
b += request.serialize()
self.logger.log(
(BLATHER if isinstance(request, Ping) else logging.DEBUG),
- "Sending request(xid=%s): %s", xid, request)
+ "Sending request(xid=%s): %s",
+ xid,
+ request,
+ )
self._write(int_struct.pack(len(b)) + b, timeout)
def _write(self, msg, timeout):
@@ -324,19 +347,22 @@ class ConnectionHandler(object):
if not s: # pragma: nocover
# If the write list is empty, we got a timeout. We don't
# have to check rlist and xlist as we don't set any
- raise self.handler.timeout_exception("socket time-out"
- " during write")
+ raise self.handler.timeout_exception(
+ "socket time-out" " during write"
+ )
msg_slice = buffer(msg, sent)
try:
bytes_sent = self._socket.send(msg_slice)
except ssl.SSLError as e:
- if e.errno in (ssl.SSL_ERROR_WANT_READ,
- ssl.SSL_ERROR_WANT_WRITE):
+ if e.errno in (
+ ssl.SSL_ERROR_WANT_READ,
+ ssl.SSL_ERROR_WANT_WRITE,
+ ):
continue
else:
raise
if not bytes_sent:
- raise ConnectionDropped('socket connection broken')
+ raise ConnectionDropped("socket connection broken")
sent += bytes_sent
def _read_watch_event(self, buffer, offset):
@@ -344,7 +370,7 @@ class ConnectionHandler(object):
watch, offset = Watch.deserialize(buffer, offset)
path = watch.path
- self.logger.debug('Received EVENT: %s', watch)
+ self.logger.debug("Received EVENT: %s", watch)
watchers = []
@@ -356,7 +382,7 @@ class ConnectionHandler(object):
elif watch.type == CHILD_EVENT:
watchers.extend(client._child_watchers.pop(path, []))
else:
- self.logger.warn('Received unknown event %r', watch.type)
+ self.logger.warn("Received unknown event %r", watch.type)
return
# Strip the chroot if needed
@@ -369,7 +395,7 @@ class ConnectionHandler(object):
# Dump the watchers to the watch thread
for watch in watchers:
- client.handler.dispatch_callback(Callback('watch', watch, (ev,)))
+ client.handler.dispatch_callback(Callback("watch", watch, (ev,)))
def _read_response(self, header, buffer, offset):
client = self.client
@@ -377,20 +403,25 @@ class ConnectionHandler(object):
if header.zxid and header.zxid > 0:
client.last_zxid = header.zxid
if header.xid != xid:
- exc = RuntimeError('xids do not match, expected %r '
- 'received %r', xid, header.xid)
+ exc = RuntimeError(
+ "xids do not match, expected %r " "received %r",
+ xid,
+ header.xid,
+ )
async_object.set_exception(exc)
raise exc
# Determine if its an exists request and a no node error
- exists_error = (header.err == NoNodeError.code and
- request.type == Exists.type)
+ exists_error = (
+ header.err == NoNodeError.code and request.type == Exists.type
+ )
# Set the exception if its not an exists error
if header.err and not exists_error:
callback_exception = EXCEPTIONS[header.err]()
self.logger.debug(
- 'Received error(xid=%s) %r', xid, callback_exception)
+ "Received error(xid=%s) %r", xid, callback_exception
+ )
if async_object:
async_object.set_exception(callback_exception)
elif request and async_object:
@@ -404,11 +435,14 @@ class ConnectionHandler(object):
except Exception as exc:
self.logger.exception(
"Exception raised during deserialization "
- "of request: %s", request)
+ "of request: %s",
+ request,
+ )
async_object.set_exception(exc)
return
self.logger.debug(
- 'Received response(xid=%s): %r', xid, response)
+ "Received response(xid=%s): %r", xid, response
+ )
# We special case a Transaction as we have to unchroot things
if request.type == Transaction.type:
@@ -417,7 +451,7 @@ class ConnectionHandler(object):
async_object.set(response)
# Determine if watchers should be registered
- watcher = getattr(request, 'watcher', None)
+ watcher = getattr(request, "watcher", None)
if not client._stopped.is_set() and watcher:
if isinstance(request, (GetChildren, GetChildren2)):
client._child_watchers[request.path].add(watcher)
@@ -425,7 +459,7 @@ class ConnectionHandler(object):
client._data_watchers[request.path].add(watcher)
if isinstance(request, Close):
- self.logger.log(BLATHER, 'Read close response')
+ self.logger.log(BLATHER, "Read close response")
return CLOSE_RESPONSE
def _read_socket(self, read_timeout):
@@ -434,10 +468,10 @@ class ConnectionHandler(object):
header, buffer, offset = self._read_header(read_timeout)
if header.xid == PING_XID:
- self.logger.log(BLATHER, 'Received Ping')
+ self.logger.log(BLATHER, "Received Ping")
self.ping_outstanding.clear()
elif header.xid == AUTH_XID:
- self.logger.log(BLATHER, 'Received AUTH')
+ self.logger.log(BLATHER, "Received AUTH")
request, async_object, xid = client._pending.popleft()
if header.err:
@@ -448,7 +482,7 @@ class ConnectionHandler(object):
elif header.xid == WATCH_XID:
self._read_watch_event(buffer, offset)
else:
- self.logger.log(BLATHER, 'Reading for header %r', header)
+ self.logger.log(BLATHER, "Reading for header %r", header)
return self._read_response(header, buffer, offset)
@@ -501,7 +535,7 @@ class ConnectionHandler(object):
def zk_loop(self):
"""Main Zookeeper handling loop"""
- self.logger.log(BLATHER, 'ZK loop started')
+ self.logger.log(BLATHER, "ZK loop started")
self.connection_stopped.clear()
@@ -512,12 +546,14 @@ class ConnectionHandler(object):
if retry(self._connect_loop, retry) is STOP_CONNECTING:
break
except RetryFailedError:
- self.logger.warning("Failed connecting to Zookeeper "
- "within the connection retry policy.")
+ self.logger.warning(
+ "Failed connecting to Zookeeper "
+ "within the connection retry policy."
+ )
finally:
self.connection_stopped.set()
self.client._session_callback(KeeperState.CLOSED)
- self.logger.log(BLATHER, 'Connection stopped')
+ self.logger.log(BLATHER, "Connection stopped")
def _expand_client_hosts(self):
# Expand the entire list in advance so we can randomize it if needed
@@ -525,8 +561,9 @@ class ConnectionHandler(object):
for host, port in self.client.hosts:
try:
host = host.strip()
- for rhost in socket.getaddrinfo(host, port, 0, 0,
- socket.IPPROTO_TCP):
+ for rhost in socket.getaddrinfo(
+ host, port, 0, 0, socket.IPPROTO_TCP
+ ):
host_ports.append((host, rhost[4][0], rhost[4][1]))
except socket.gaierror as e:
# Skip hosts that don't resolve
@@ -543,7 +580,7 @@ class ConnectionHandler(object):
# Check for an empty hostlist, indicating none resolved
if len(host_ports) == 0:
- raise ForceRetryError('No host resolved. Reconnecting')
+ raise ForceRetryError("No host resolved. Reconnecting")
for host, hostip, port in host_ports:
if self.client._stopped.is_set():
@@ -556,7 +593,7 @@ class ConnectionHandler(object):
if status is STOP_CONNECTING:
return STOP_CONNECTING
else:
- raise ForceRetryError('Reconnecting')
+ raise ForceRetryError("Reconnecting")
def _connect_attempt(self, host, hostip, port, retry):
client = self.client
@@ -566,8 +603,9 @@ class ConnectionHandler(object):
# Were we given a r/w server? If so, use that instead
if self._rw_server:
- self.logger.log(BLATHER,
- "Found r/w server to use, %s:%s", host, port)
+ self.logger.log(
+ BLATHER, "Found r/w server to use, %s:%s", host, port
+ )
host, port = self._rw_server
self._rw_server = None
@@ -589,14 +627,16 @@ class ConnectionHandler(object):
deadline = last_send + read_timeout / 2.0 - jitter_time
# Ensure our timeout is positive
timeout = max([deadline - time.time(), jitter_time])
- s = self.handler.select([self._socket, self._read_sock],
- [], [], timeout)[0]
+ s = self.handler.select(
+ [self._socket, self._read_sock], [], [], timeout
+ )[0]
if not s:
if self.ping_outstanding.is_set():
self.ping_outstanding.clear()
raise ConnectionDropped(
- "outstanding heartbeat ping not received")
+ "outstanding heartbeat ping not received"
+ )
else:
if self._socket in s:
response = self._read_socket(read_timeout)
@@ -614,32 +654,32 @@ class ConnectionHandler(object):
if time.time() >= deadline:
self._send_ping(connect_timeout)
last_send = time.time()
- self.logger.info('Closing connection to %s:%s', host, port)
+ self.logger.info("Closing connection to %s:%s", host, port)
client._session_callback(KeeperState.CLOSED)
return STOP_CONNECTING
except (ConnectionDropped, KazooTimeoutError) as e:
if isinstance(e, ConnectionDropped):
- self.logger.warning('Connection dropped: %s', e)
+ self.logger.warning("Connection dropped: %s", e)
else:
- self.logger.warning('Connection time-out: %s', e)
+ self.logger.warning("Connection time-out: %s", e)
if client._state != KeeperState.CONNECTING:
self.logger.warning("Transition to CONNECTING")
client._session_callback(KeeperState.CONNECTING)
except AuthFailedError as err:
retry.reset()
- self.logger.warning('AUTH_FAILED closing: %s', err)
+ self.logger.warning("AUTH_FAILED closing: %s", err)
client._session_callback(KeeperState.AUTH_FAILED)
return STOP_CONNECTING
except SessionExpiredError:
retry.reset()
- self.logger.warning('Session has expired')
+ self.logger.warning("Session has expired")
client._session_callback(KeeperState.EXPIRED_SESSION)
except RWServerAvailable:
retry.reset()
- self.logger.warning('Found a RW server, dropping connection')
+ self.logger.warning("Found a RW server, dropping connection")
client._session_callback(KeeperState.CONNECTING)
except Exception:
- self.logger.exception('Unhandled exception in connection loop')
+ self.logger.exception("Unhandled exception in connection loop")
raise
finally:
if self._socket is not None:
@@ -647,13 +687,20 @@ class ConnectionHandler(object):
def _connect(self, host, hostip, port):
client = self.client
- self.logger.info('Connecting to %s(%s):%s, use_ssl: %r',
- host, hostip, port, self.client.use_ssl)
+ self.logger.info(
+ "Connecting to %s(%s):%s, use_ssl: %r",
+ host,
+ hostip,
+ port,
+ self.client.use_ssl,
+ )
- self.logger.log(BLATHER,
- ' Using session_id: %r session_passwd: %s',
- client._session_id,
- hexlify(client._session_passwd))
+ self.logger.log(
+ BLATHER,
+ " Using session_id: %r session_passwd: %s",
+ client._session_id,
+ hexlify(client._session_passwd),
+ )
with self._socket_error_handling():
self._socket = self.handler.create_connection(
@@ -669,12 +716,18 @@ class ConnectionHandler(object):
self._socket.setblocking(0)
- connect = Connect(0, client.last_zxid, client._session_timeout,
- client._session_id or 0, client._session_passwd,
- client.read_only)
+ connect = Connect(
+ 0,
+ client.last_zxid,
+ client._session_timeout,
+ client._session_id or 0,
+ client._session_passwd,
+ client.read_only,
+ )
connect_result, zxid = self._invoke(
- client._session_timeout / 1000.0 / len(client.hosts), connect)
+ client._session_timeout / 1000.0 / len(client.hosts), connect
+ )
if connect_result.time_out <= 0:
raise SessionExpiredError("Session has expired")
@@ -690,14 +743,18 @@ class ConnectionHandler(object):
read_timeout = negotiated_session_timeout * 2.0 / 3.0
client._session_passwd = connect_result.passwd
- self.logger.log(BLATHER,
- 'Session created, session_id: %r session_passwd: %s\n'
- ' negotiated session timeout: %s\n'
- ' connect timeout: %s\n'
- ' read timeout: %s', client._session_id,
- hexlify(client._session_passwd),
- negotiated_session_timeout, connect_timeout,
- read_timeout)
+ self.logger.log(
+ BLATHER,
+ "Session created, session_id: %r session_passwd: %s\n"
+ " negotiated session timeout: %s\n"
+ " connect timeout: %s\n"
+ " read timeout: %s",
+ client._session_id,
+ hexlify(client._session_passwd),
+ negotiated_session_timeout,
+ connect_timeout,
+ read_timeout,
+ )
if connect_result.read_only:
client._session_callback(KeeperState.CONNECTED_RO)
@@ -722,24 +779,22 @@ class ConnectionHandler(object):
return read_timeout, connect_timeout
def _authenticate_with_sasl(self, host, timeout):
- """Establish a SASL authenticated connection to the server.
- """
+ """Establish a SASL authenticated connection to the server."""
if not PURESASL_AVAILABLE:
- raise SASLException('Missing SASL support')
+ raise SASLException("Missing SASL support")
- if 'service' not in self.sasl_options:
- self.sasl_options['service'] = 'zookeeper'
+ if "service" not in self.sasl_options:
+ self.sasl_options["service"] = "zookeeper"
# NOTE: Zookeeper hardcoded the domain for Digest authentication
# instead of using the hostname. See
# zookeeper/util/SecurityUtils.java#L74 and Server/Client
# initializations.
- if self.sasl_options['mechanism'] == 'DIGEST-MD5':
- host = 'zk-sasl-md5'
+ if self.sasl_options["mechanism"] == "DIGEST-MD5":
+ host = "zk-sasl-md5"
sasl_cli = self.client.sasl_cli = puresasl.client.SASLClient(
- host=host,
- **self.sasl_options
+ host=host, **self.sasl_options
)
# Inititalize the process with an empty challenge token
@@ -755,26 +810,26 @@ class ConnectionHandler(object):
except puresasl.SASLError as err:
six.reraise(
SASLException,
- SASLException('library error: %s' % err.message),
- sys.exc_info()[2]
+ SASLException("library error: %s" % err.message),
+ sys.exc_info()[2],
)
except puresasl.SASLProtocolException as err:
six.reraise(
AuthFailedError,
- AuthFailedError('protocol error: %s' % err.message),
- sys.exc_info()[2]
+ AuthFailedError("protocol error: %s" % err.message),
+ sys.exc_info()[2],
)
except Exception as err:
six.reraise(
AuthFailedError,
- AuthFailedError('Unknown error: %s' % err),
- sys.exc_info()[2]
+ AuthFailedError("Unknown error: %s" % err),
+ sys.exc_info()[2],
)
if sasl_cli.complete and not response:
break
elif response is None:
- response = b''
+ response = b""
xid = (xid % 2147483647) + 1
@@ -787,13 +842,16 @@ class ConnectionHandler(object):
# Zookeeper simply drops connections with failed authentication
six.reraise(
AuthFailedError,
- AuthFailedError('Connection dropped in SASL'),
- sys.exc_info()[2]
+ AuthFailedError("Connection dropped in SASL"),
+ sys.exc_info()[2],
)
if header.xid != xid:
- raise RuntimeError('xids do not match, expected %r '
- 'received %r', xid, header.xid)
+ raise RuntimeError(
+ "xids do not match, expected %r " "received %r",
+ xid,
+ header.xid,
+ )
if header.zxid > 0:
self.client.last_zxid = header.zxid
@@ -801,7 +859,8 @@ class ConnectionHandler(object):
if header.err:
callback_exception = EXCEPTIONS[header.err]()
self.logger.debug(
- 'Received error(xid=%s) %r', xid, callback_exception)
+ "Received error(xid=%s) %r", xid, callback_exception
+ )
raise callback_exception
challenge, _ = SASL.deserialize(buffer, offset)
diff --git a/kazoo/protocol/paths.py b/kazoo/protocol/paths.py
index 7fe961c..b8bf665 100644
--- a/kazoo/protocol/paths.py
+++ b/kazoo/protocol/paths.py
@@ -1,18 +1,18 @@
def normpath(path, trailing=False):
"""Normalize path, eliminating double slashes, etc."""
- comps = path.split('/')
+ comps = path.split("/")
new_comps = []
for comp in comps:
- if comp == '':
+ if comp == "":
continue
- if comp in ('.', '..'):
- raise ValueError('relative paths not allowed')
+ if comp in (".", ".."):
+ raise ValueError("relative paths not allowed")
new_comps.append(comp)
- new_path = '/'.join(new_comps)
- if trailing is True and path.endswith('/'):
- new_path += '/'
- if path.startswith('/') and new_path != '/':
- return '/' + new_path
+ new_path = "/".join(new_comps)
+ if trailing is True and path.endswith("/"):
+ new_path += "/"
+ if path.startswith("/") and new_path != "/":
+ return "/" + new_path
return new_path
@@ -25,31 +25,32 @@ def join(a, *p):
"""
path = a
for b in p:
- if b.startswith('/'):
+ if b.startswith("/"):
path = b
- elif path == '' or path.endswith('/'):
+ elif path == "" or path.endswith("/"):
path += b
else:
- path += '/' + b
+ path += "/" + b
return path
def isabs(s):
"""Test whether a path is absolute"""
- return s.startswith('/')
+ return s.startswith("/")
def basename(p):
"""Returns the final component of a pathname"""
- i = p.rfind('/') + 1
+ i = p.rfind("/") + 1
return p[i:]
def _prefix_root(root, path, trailing=False):
- """Prepend a root to a path. """
- return normpath(join(_norm_root(root), path.lstrip('/')),
- trailing=trailing)
+ """Prepend a root to a path."""
+ return normpath(
+ join(_norm_root(root), path.lstrip("/")), trailing=trailing
+ )
def _norm_root(root):
- return normpath(join('/', root))
+ return normpath(join("/", root))
diff --git a/kazoo/protocol/serialization.py b/kazoo/protocol/serialization.py
index 80fa4d1..c702318 100644
--- a/kazoo/protocol/serialization.py
+++ b/kazoo/protocol/serialization.py
@@ -11,16 +11,16 @@ from kazoo.security import Id
# Struct objects with formats compiled
-bool_struct = struct.Struct('B')
-int_struct = struct.Struct('!i')
-int_int_struct = struct.Struct('!ii')
-int_int_long_struct = struct.Struct('!iiq')
+bool_struct = struct.Struct("B")
+int_struct = struct.Struct("!i")
+int_int_struct = struct.Struct("!ii")
+int_int_long_struct = struct.Struct("!iiq")
-int_long_int_long_struct = struct.Struct('!iqiq')
-long_struct = struct.Struct('!q')
-multiheader_struct = struct.Struct('!iBi')
-reply_header_struct = struct.Struct('!iqi')
-stat_struct = struct.Struct('!qqqqiiiqiiq')
+int_long_int_long_struct = struct.Struct("!iqiq")
+long_struct = struct.Struct("!q")
+multiheader_struct = struct.Struct("!iBi")
+reply_header_struct = struct.Struct("!iqi")
+stat_struct = struct.Struct("!qqqqiiiqiiq")
def read_string(buffer, offset):
@@ -33,7 +33,7 @@ def read_string(buffer, offset):
else:
index = offset
offset += length
- return buffer[index:index + length].decode('utf-8'), offset
+ return buffer[index : index + length].decode("utf-8"), offset
def read_acl(bytes, offset):
@@ -48,7 +48,7 @@ def write_string(bytes):
if not bytes:
return int_struct.pack(-1)
else:
- utf8_str = bytes.encode('utf-8')
+ utf8_str = bytes.encode("utf-8")
return int_struct.pack(len(utf8_str)) + utf8_str
@@ -67,38 +67,50 @@ def read_buffer(bytes, offset):
else:
index = offset
offset += length
- return bytes[index:index + length], offset
+ return bytes[index : index + length], offset
-class Close(namedtuple('Close', '')):
+class Close(namedtuple("Close", "")):
type = -11
@classmethod
def serialize(cls):
- return b''
+ return b""
+
CloseInstance = Close()
-class Ping(namedtuple('Ping', '')):
+class Ping(namedtuple("Ping", "")):
type = 11
@classmethod
def serialize(cls):
- return b''
+ return b""
+
PingInstance = Ping()
-class Connect(namedtuple('Connect', 'protocol_version last_zxid_seen'
- ' time_out session_id passwd read_only')):
+class Connect(
+ namedtuple(
+ "Connect",
+ "protocol_version last_zxid_seen"
+ " time_out session_id passwd read_only",
+ )
+):
type = None
def serialize(self):
b = bytearray()
- b.extend(int_long_int_long_struct.pack(
- self.protocol_version, self.last_zxid_seen, self.time_out,
- self.session_id))
+ b.extend(
+ int_long_int_long_struct.pack(
+ self.protocol_version,
+ self.last_zxid_seen,
+ self.time_out,
+ self.session_id,
+ )
+ )
b.extend(write_buffer(self.passwd))
b.extend([1 if self.read_only else 0])
return b
@@ -106,7 +118,8 @@ class Connect(namedtuple('Connect', 'protocol_version last_zxid_seen'
@classmethod
def deserialize(cls, bytes, offset):
proto_version, timeout, session_id = int_int_long_struct.unpack_from(
- bytes, offset)
+ bytes, offset
+ )
offset += int_int_long_struct.size
password, offset = read_buffer(bytes, offset)
@@ -115,11 +128,13 @@ class Connect(namedtuple('Connect', 'protocol_version last_zxid_seen'
offset += bool_struct.size
except struct.error:
read_only = False
- return cls(proto_version, 0, timeout, session_id, password,
- read_only), offset
+ return (
+ cls(proto_version, 0, timeout, session_id, password, read_only),
+ offset,
+ )
-class Create(namedtuple('Create', 'path data acl flags')):
+class Create(namedtuple("Create", "path data acl flags")):
type = 1
def serialize(self):
@@ -128,8 +143,11 @@ class Create(namedtuple('Create', 'path data acl flags')):
b.extend(write_buffer(self.data))
b.extend(int_struct.pack(len(self.acl)))
for acl in self.acl:
- b.extend(int_struct.pack(acl.perms) +
- write_string(acl.id.scheme) + write_string(acl.id.id))
+ b.extend(
+ int_struct.pack(acl.perms)
+ + write_string(acl.id.scheme)
+ + write_string(acl.id.id)
+ )
b.extend(int_struct.pack(self.flags))
return b
@@ -138,7 +156,7 @@ class Create(namedtuple('Create', 'path data acl flags')):
return read_string(bytes, offset)[0]
-class Delete(namedtuple('Delete', 'path version')):
+class Delete(namedtuple("Delete", "path version")):
type = 2
def serialize(self):
@@ -152,7 +170,7 @@ class Delete(namedtuple('Delete', 'path version')):
return True
-class Exists(namedtuple('Exists', 'path watcher')):
+class Exists(namedtuple("Exists", "path watcher")):
type = 3
def serialize(self):
@@ -167,7 +185,7 @@ class Exists(namedtuple('Exists', 'path watcher')):
return stat if stat.czxid != -1 else None
-class GetData(namedtuple('GetData', 'path watcher')):
+class GetData(namedtuple("GetData", "path watcher")):
type = 4
def serialize(self):
@@ -183,7 +201,7 @@ class GetData(namedtuple('GetData', 'path watcher')):
return data, stat
-class SetData(namedtuple('SetData', 'path data version')):
+class SetData(namedtuple("SetData", "path data version")):
type = 5
def serialize(self):
@@ -198,7 +216,7 @@ class SetData(namedtuple('SetData', 'path data version')):
return ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
-class GetACL(namedtuple('GetACL', 'path')):
+class GetACL(namedtuple("GetACL", "path")):
type = 6
def serialize(self):
@@ -219,7 +237,7 @@ class GetACL(namedtuple('GetACL', 'path')):
return acls, stat
-class SetACL(namedtuple('SetACL', 'path acls version')):
+class SetACL(namedtuple("SetACL", "path acls version")):
type = 7
def serialize(self):
@@ -227,8 +245,11 @@ class SetACL(namedtuple('SetACL', 'path acls version')):
b.extend(write_string(self.path))
b.extend(int_struct.pack(len(self.acls)))
for acl in self.acls:
- b.extend(int_struct.pack(acl.perms) +
- write_string(acl.id.scheme) + write_string(acl.id.id))
+ b.extend(
+ int_struct.pack(acl.perms)
+ + write_string(acl.id.scheme)
+ + write_string(acl.id.id)
+ )
b.extend(int_struct.pack(self.version))
return b
@@ -237,7 +258,7 @@ class SetACL(namedtuple('SetACL', 'path acls version')):
return ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
-class GetChildren(namedtuple('GetChildren', 'path watcher')):
+class GetChildren(namedtuple("GetChildren", "path watcher")):
type = 8
def serialize(self):
@@ -260,7 +281,7 @@ class GetChildren(namedtuple('GetChildren', 'path watcher')):
return children
-class Sync(namedtuple('Sync', 'path')):
+class Sync(namedtuple("Sync", "path")):
type = 9
def serialize(self):
@@ -271,7 +292,7 @@ class Sync(namedtuple('Sync', 'path')):
return read_string(buffer, offset)[0]
-class GetChildren2(namedtuple('GetChildren2', 'path watcher')):
+class GetChildren2(namedtuple("GetChildren2", "path watcher")):
type = 12
def serialize(self):
@@ -295,7 +316,7 @@ class GetChildren2(namedtuple('GetChildren2', 'path watcher')):
return children, stat
-class CheckVersion(namedtuple('CheckVersion', 'path version')):
+class CheckVersion(namedtuple("CheckVersion", "path version")):
type = 13
def serialize(self):
@@ -305,14 +326,15 @@ class CheckVersion(namedtuple('CheckVersion', 'path version')):
return b
-class Transaction(namedtuple('Transaction', 'operations')):
+class Transaction(namedtuple("Transaction", "operations")):
type = 14
def serialize(self):
b = bytearray()
for op in self.operations:
- b.extend(MultiHeader(op.type, False, -1).serialize() +
- op.serialize())
+ b.extend(
+ MultiHeader(op.type, False, -1).serialize() + op.serialize()
+ )
return b + multiheader_struct.pack(-1, True, -1)
@classmethod
@@ -327,7 +349,8 @@ class Transaction(namedtuple('Transaction', 'operations')):
response = True
elif header.type == SetData.type:
response = ZnodeStat._make(
- stat_struct.unpack_from(bytes, offset))
+ stat_struct.unpack_from(bytes, offset)
+ )
offset += stat_struct.size
elif header.type == CheckVersion.type:
response = True
@@ -351,7 +374,7 @@ class Transaction(namedtuple('Transaction', 'operations')):
return resp
-class Create2(namedtuple('Create2', 'path data acl flags')):
+class Create2(namedtuple("Create2", "path data acl flags")):
type = 15
def serialize(self):
@@ -360,8 +383,11 @@ class Create2(namedtuple('Create2', 'path data acl flags')):
b.extend(write_buffer(self.data))
b.extend(int_struct.pack(len(self.acl)))
for acl in self.acl:
- b.extend(int_struct.pack(acl.perms) +
- write_string(acl.id.scheme) + write_string(acl.id.id))
+ b.extend(
+ int_struct.pack(acl.perms)
+ + write_string(acl.id.scheme)
+ + write_string(acl.id.id)
+ )
b.extend(int_struct.pack(self.flags))
return b
@@ -372,8 +398,9 @@ class Create2(namedtuple('Create2', 'path data acl flags')):
return path, stat
-class Reconfig(namedtuple('Reconfig',
- 'joining leaving new_members config_id')):
+class Reconfig(
+ namedtuple("Reconfig", "joining leaving new_members config_id")
+):
type = 16
def serialize(self):
@@ -391,15 +418,18 @@ class Reconfig(namedtuple('Reconfig',
return data, stat
-class Auth(namedtuple('Auth', 'auth_type scheme auth')):
+class Auth(namedtuple("Auth", "auth_type scheme auth")):
type = 100
def serialize(self):
- return (int_struct.pack(self.auth_type) + write_string(self.scheme) +
- write_string(self.auth))
+ return (
+ int_struct.pack(self.auth_type)
+ + write_string(self.scheme)
+ + write_string(self.auth)
+ )
-class SASL(namedtuple('SASL', 'challenge')):
+class SASL(namedtuple("SASL", "challenge")):
type = 102
def serialize(self):
@@ -413,7 +443,7 @@ class SASL(namedtuple('SASL', 'challenge')):
return challenge, offset
-class Watch(namedtuple('Watch', 'type state path')):
+class Watch(namedtuple("Watch", "type state path")):
@classmethod
def deserialize(cls, bytes, offset):
"""Given bytes and the current bytes offset, return the
@@ -424,17 +454,19 @@ class Watch(namedtuple('Watch', 'type state path')):
return cls(type, state, path), offset
-class ReplyHeader(namedtuple('ReplyHeader', 'xid, zxid, err')):
+class ReplyHeader(namedtuple("ReplyHeader", "xid, zxid, err")):
@classmethod
def deserialize(cls, bytes, offset):
"""Given bytes and the current bytes offset, return a
:class:`ReplyHeader` instance and the new offset"""
new_offset = offset + reply_header_struct.size
- return cls._make(
- reply_header_struct.unpack_from(bytes, offset)), new_offset
+ return (
+ cls._make(reply_header_struct.unpack_from(bytes, offset)),
+ new_offset,
+ )
-class MultiHeader(namedtuple('MultiHeader', 'type done err')):
+class MultiHeader(namedtuple("MultiHeader", "type done err")):
def serialize(self):
b = bytearray()
b.extend(int_struct.pack(self.type))
diff --git a/kazoo/protocol/states.py b/kazoo/protocol/states.py
index 66a8425..480a586 100644
--- a/kazoo/protocol/states.py
+++ b/kazoo/protocol/states.py
@@ -27,6 +27,7 @@ class KazooState(object):
use, they can be considered lost as well.
"""
+
SUSPENDED = "SUSPENDED"
CONNECTED = "CONNECTED"
LOST = "LOST"
@@ -60,12 +61,13 @@ class KeeperState(object):
gone.
"""
- AUTH_FAILED = 'AUTH_FAILED'
- CONNECTED = 'CONNECTED'
- CONNECTED_RO = 'CONNECTED_RO'
- CONNECTING = 'CONNECTING'
- CLOSED = 'CLOSED'
- EXPIRED_SESSION = 'EXPIRED_SESSION'
+
+ AUTH_FAILED = "AUTH_FAILED"
+ CONNECTED = "CONNECTED"
+ CONNECTED_RO = "CONNECTED_RO"
+ CONNECTING = "CONNECTING"
+ CLOSED = "CLOSED"
+ EXPIRED_SESSION = "EXPIRED_SESSION"
class EventType(object):
@@ -98,22 +100,24 @@ class EventType(object):
The connection state has been altered.
"""
- CREATED = 'CREATED'
- DELETED = 'DELETED'
- CHANGED = 'CHANGED'
- CHILD = 'CHILD'
- NONE = 'NONE'
+
+ CREATED = "CREATED"
+ DELETED = "DELETED"
+ CHANGED = "CHANGED"
+ CHILD = "CHILD"
+ NONE = "NONE"
+
EVENT_TYPE_MAP = {
-1: EventType.NONE,
1: EventType.CREATED,
2: EventType.DELETED,
3: EventType.CHANGED,
- 4: EventType.CHILD
+ 4: EventType.CHILD,
}
-class WatchedEvent(namedtuple('WatchedEvent', ('type', 'state', 'path'))):
+class WatchedEvent(namedtuple("WatchedEvent", ("type", "state", "path"))):
"""A change on ZooKeeper that a Watcher is able to respond to.
The :class:`WatchedEvent` includes exactly what happened, the
@@ -137,7 +141,7 @@ class WatchedEvent(namedtuple('WatchedEvent', ('type', 'state', 'path'))):
"""
-class Callback(namedtuple('Callback', ('type', 'func', 'args'))):
+class Callback(namedtuple("Callback", ("type", "func", "args"))):
"""A callback that is handed to a handler for dispatch
:param type: Type of the callback, currently is only 'watch'
@@ -147,9 +151,14 @@ class Callback(namedtuple('Callback', ('type', 'func', 'args'))):
"""
-class ZnodeStat(namedtuple('ZnodeStat', 'czxid mzxid ctime mtime version'
- ' cversion aversion ephemeralOwner dataLength'
- ' numChildren pzxid')):
+class ZnodeStat(
+ namedtuple(
+ "ZnodeStat",
+ "czxid mzxid ctime mtime version"
+ " cversion aversion ephemeralOwner dataLength"
+ " numChildren pzxid",
+ )
+):
"""A ZnodeStat structure with convenience properties
When getting the value of a znode from Zookeeper, the properties for
@@ -206,6 +215,7 @@ class ZnodeStat(namedtuple('ZnodeStat', 'czxid mzxid ctime mtime version'
The number of children of this znode.
"""
+
@property
def acl_version(self):
return self.aversion
diff --git a/kazoo/python2atexit.py b/kazoo/python2atexit.py
index a7f9e85..499dc34 100644
--- a/kazoo/python2atexit.py
+++ b/kazoo/python2atexit.py
@@ -27,6 +27,7 @@ def _run_exitfuncs():
exc_info = sys.exc_info()
except:
import traceback
+
sys.stderr.write("Error in atexit._run_exitfuncs:\n")
traceback.print_exc()
exc_info = sys.exc_info()
@@ -64,6 +65,7 @@ def unregister(func):
for e in handler_entries:
_exithandlers.remove(e)
+
if not hasattr(atexit, "unregister"):
# Only in python 2.x
atexit.register(_run_exitfuncs)
diff --git a/kazoo/recipe/barrier.py b/kazoo/recipe/barrier.py
index cd94222..683e807 100644
--- a/kazoo/recipe/barrier.py
+++ b/kazoo/recipe/barrier.py
@@ -26,6 +26,7 @@ class Barrier(object):
the connection is lost while waiting.
"""
+
def __init__(self, client, path):
"""Create a Kazoo Barrier
@@ -91,6 +92,7 @@ class DoubleBarrier(object):
policy.
"""
+
def __init__(self, client, path, num_clients, identifier=None):
"""Create a Double Barrier
@@ -107,8 +109,10 @@ class DoubleBarrier(object):
self.client = client
self.path = path
self.num_clients = num_clients
- self._identifier = identifier or '%s-%s' % (
- socket.getfqdn(), os.getpid())
+ self._identifier = identifier or "%s-%s" % (
+ socket.getfqdn(),
+ os.getpid(),
+ )
self.participating = False
self.assured_path = False
self.node_name = uuid.uuid4().hex
@@ -135,7 +139,9 @@ class DoubleBarrier(object):
try:
self.client.create(
self.create_path,
- self._identifier.encode('utf-8'), ephemeral=True)
+ self._identifier.encode("utf-8"),
+ ephemeral=True,
+ )
except NodeExistsError:
pass
@@ -143,14 +149,14 @@ class DoubleBarrier(object):
if event.type == EventType.CREATED:
ready.set()
- self.client.exists(self.path + '/' + 'ready', watch=created)
+ self.client.exists(self.path + "/" + "ready", watch=created)
children = self.client.get_children(self.path)
if len(children) < self.num_clients:
ready.wait()
else:
- self.client.ensure_path(self.path + '/ready')
+ self.client.ensure_path(self.path + "/ready")
return True
def leave(self):
@@ -165,7 +171,7 @@ class DoubleBarrier(object):
def _inner_leave(self):
# Delete the ready node if its around
try:
- self.client.delete(self.path + '/ready')
+ self.client.delete(self.path + "/ready")
except NoNodeError:
pass
@@ -188,8 +194,9 @@ class DoubleBarrier(object):
if self.node_name == children[0]:
# We're first, wait on the highest to leave
- if not self.client.exists(self.path + '/' + children[-1],
- watch=deleted):
+ if not self.client.exists(
+ self.path + "/" + children[-1], watch=deleted
+ ):
continue
ready.wait()
@@ -199,8 +206,9 @@ class DoubleBarrier(object):
self.client.delete(self.create_path)
# Wait on the first
- if not self.client.exists(self.path + '/' + children[0],
- watch=deleted):
+ if not self.client.exists(
+ self.path + "/" + children[0], watch=deleted
+ ):
continue
# Wait for the lowest to be deleted
diff --git a/kazoo/recipe/cache.py b/kazoo/recipe/cache.py
index dc3b8bd..d527ad2 100644
--- a/kazoo/recipe/cache.py
+++ b/kazoo/recipe/cache.py
@@ -73,9 +73,9 @@ class TreeCache(object):
if self._state == self.STATE_LATENT:
self._state = self.STATE_STARTED
elif self._state == self.STATE_CLOSED:
- raise KazooException('already closed')
+ raise KazooException("already closed")
else:
- raise KazooException('already started')
+ raise KazooException("already started")
self._task_thread = self._client.handler.spawn(self._do_background)
self._client.add_listener(self._session_watcher)
@@ -166,9 +166,9 @@ class TreeCache(object):
def _find_node(self, path):
if not path.startswith(self._root._path):
- raise ValueError('outside of tree')
- striped_path = path[len(self._root._path):].strip('/')
- splited_path = [p for p in striped_path.split('/') if p]
+ raise ValueError("outside of tree")
+ striped_path = path[len(self._root._path) :].strip("/")
+ splited_path = [p for p in striped_path.split("/") if p]
current_node = self._root
for node_name in splited_path:
if node_name not in current_node._children:
@@ -179,7 +179,7 @@ class TreeCache(object):
def _publish_event(self, event_type, event_data=None):
event = TreeEvent.make(event_type, event_data)
if self._state != self.STATE_CLOSED:
- logger.debug('public event: %r', event)
+ logger.debug("public event: %r", event)
self._in_background(self._do_publish_event, event)
def _do_publish_event(self, event):
@@ -222,8 +222,15 @@ class TreeNode(object):
:param parent: The parent node reference. ``None`` for root node.
"""
- __slots__ = ('_tree', '_path', '_parent', '_depth', '_children', '_state',
- '_data')
+ __slots__ = (
+ "_tree",
+ "_path",
+ "_parent",
+ "_depth",
+ "_children",
+ "_state",
+ "_data",
+ )
STATE_PENDING = 0
STATE_LIVE = 1
@@ -266,9 +273,9 @@ class TreeNode(object):
self._publish_event(TreeEvent.NODE_REMOVED, old_data)
if self._parent is None:
- self._call_client('exists', self._path) # root node
+ self._call_client("exists", self._path) # root node
else:
- child = self._path[len(self._parent._path) + 1:]
+ child = self._path[len(self._parent._path) + 1 :]
if self._parent._children.get(child) is self:
del self._parent._children[child]
self._reset_watchers()
@@ -288,26 +295,26 @@ class TreeNode(object):
self._refresh_children()
def _refresh_data(self):
- self._call_client('get', self._path)
+ self._call_client("get", self._path)
def _refresh_children(self):
# TODO max-depth checking support
- self._call_client('get_children', self._path)
+ self._call_client("get_children", self._path)
def _call_client(self, method_name, path):
- assert method_name in ('get', 'get_children', 'exists')
+ assert method_name in ("get", "get_children", "exists")
self._tree._outstanding_ops += 1
callback = functools.partial(
- self._tree._in_background, self._process_result,
- method_name, path)
- method = getattr(self._tree._client, method_name + '_async')
+ self._tree._in_background, self._process_result, method_name, path
+ )
+ method = getattr(self._tree._client, method_name + "_async")
method(path, watch=self._process_watch).rawlink(callback)
def _process_watch(self, watched_event):
- logger.debug('process_watch: %r', watched_event)
+ logger.debug("process_watch: %r", watched_event)
with handle_exception(self._tree._error_listeners):
if watched_event.type == EventType.CREATED:
- assert self._parent is None, 'unexpected CREATED on non-root'
+ assert self._parent is None, "unexpected CREATED on non-root"
self.on_created()
elif watched_event.type == EventType.DELETED:
self.on_deleted()
@@ -317,15 +324,15 @@ class TreeNode(object):
self._refresh_children()
def _process_result(self, method_name, path, result):
- logger.debug('process_result: %s %s', method_name, path)
- if method_name == 'exists':
- assert self._parent is None, 'unexpected EXISTS on non-root'
+ logger.debug("process_result: %s %s", method_name, path)
+ if method_name == "exists":
+ assert self._parent is None, "unexpected EXISTS on non-root"
# The result will be `None` if the node doesn't exist.
if result.successful() and result.get() is not None:
if self._state == self.STATE_DEAD:
self._state = self.STATE_PENDING
self.on_created()
- elif method_name == 'get_children':
+ elif method_name == "get_children":
if result.successful():
children = result.get()
for child in sorted(children):
@@ -336,11 +343,13 @@ class TreeNode(object):
node.on_created()
elif isinstance(result.exception, NoNodeError):
self.on_deleted()
- elif method_name == 'get':
+ elif method_name == "get":
if result.successful():
data, stat = result.get()
old_data, self._data = (
- self._data, NodeData.make(path, data, stat))
+ self._data,
+ NodeData.make(path, data, stat),
+ )
old_state, self._state = self._state, self.STATE_LIVE
if old_state == self.STATE_LIVE:
if old_data is None or old_data.stat.mzxid != stat.mzxid:
@@ -350,7 +359,7 @@ class TreeNode(object):
elif isinstance(result.exception, NoNodeError):
self.on_deleted()
else: # pragma: no cover
- logger.warning('unknown operation %s', method_name)
+ logger.warning("unknown operation %s", method_name)
self._tree._outstanding_ops -= 1
return
@@ -384,9 +393,14 @@ class TreeEvent(tuple):
:returns: A :class:`~kazoo.recipe.cache.TreeEvent` instance.
"""
assert event_type in (
- cls.NODE_ADDED, cls.NODE_UPDATED, cls.NODE_REMOVED,
- cls.CONNECTION_SUSPENDED, cls.CONNECTION_RECONNECTED,
- cls.CONNECTION_LOST, cls.INITIALIZED)
+ cls.NODE_ADDED,
+ cls.NODE_UPDATED,
+ cls.NODE_REMOVED,
+ cls.CONNECTION_SUSPENDED,
+ cls.CONNECTION_RECONNECTED,
+ cls.CONNECTION_LOST,
+ cls.INITIALIZED,
+ )
return cls((event_type, event_data))
@@ -416,12 +430,12 @@ def handle_exception(listeners):
try:
yield
except Exception as e:
- logger.debug('processing error: %r', e)
+ logger.debug("processing error: %r", e)
if listeners:
for listener in listeners:
try:
listener(e)
except BaseException: # pragma: no cover
- logger.exception('Exception handling exception') # oops
+ logger.exception("Exception handling exception") # oops
else:
- logger.exception('No listener to process %r', e)
+ logger.exception("No listener to process %r", e)
diff --git a/kazoo/recipe/counter.py b/kazoo/recipe/counter.py
index 9e68849..3b2cc33 100644
--- a/kazoo/recipe/counter.py
+++ b/kazoo/recipe/counter.py
@@ -8,6 +8,7 @@ from kazoo.exceptions import BadVersionError
from kazoo.retry import ForceRetryError
import struct
+
class Counter(object):
"""Kazoo Counter
@@ -56,6 +57,7 @@ class Counter(object):
counter.post_value == 1
"""
+
def __init__(self, client, path, default=0, support_curator=False):
"""Create a Kazoo Counter
@@ -75,8 +77,10 @@ class Counter(object):
self.pre_value = None
self.post_value = None
if self.support_curator and not isinstance(self.default, int):
- raise TypeError("when support_curator is enabled the default "
- "type must be an int")
+ raise TypeError(
+ "when support_curator is enabled the default "
+ "type must be an int"
+ )
def _ensure_node(self):
if not self._ensured_path:
@@ -88,9 +92,9 @@ class Counter(object):
self._ensure_node()
old, stat = self.client.get(self.path)
if self.support_curator:
- old = struct.unpack(">i", old)[0] if old != b'' else self.default
+ old = struct.unpack(">i", old)[0] if old != b"" else self.default
else:
- old = old.decode('ascii') if old != b'' else self.default
+ old = old.decode("ascii") if old != b"" else self.default
version = stat.version
data = self.default_type(old)
return data, version
@@ -101,7 +105,7 @@ class Counter(object):
def _change(self, value):
if not isinstance(value, self.default_type):
- raise TypeError('invalid type for value change')
+ raise TypeError("invalid type for value change")
self.client.retry(self._inner_change, value)
return self
@@ -111,7 +115,7 @@ class Counter(object):
if self.support_curator:
data = struct.pack(">i", post_value)
else:
- data = repr(post_value).encode('ascii')
+ data = repr(post_value).encode("ascii")
try:
self.client.set(self.path, data, version=version)
except BadVersionError: # pragma: nocover
diff --git a/kazoo/recipe/election.py b/kazoo/recipe/election.py
index f5f1be4..93bb725 100644
--- a/kazoo/recipe/election.py
+++ b/kazoo/recipe/election.py
@@ -21,6 +21,7 @@ class Election(object):
election.run(my_leader_function)
"""
+
def __init__(self, client, path, identifier=None):
"""Create a Kazoo Leader Election
diff --git a/kazoo/recipe/lease.py b/kazoo/recipe/lease.py
index 27bda8a..ce7fe56 100644
--- a/kazoo/recipe/lease.py
+++ b/kazoo/recipe/lease.py
@@ -44,10 +44,16 @@ class NonBlockingLease(object):
# Bump when storage format changes
_version = 1
_date_format = "%Y-%m-%dT%H:%M:%S"
- _byte_encoding = 'utf-8'
-
- def __init__(self, client, path, duration, identifier=None,
- utcnow=datetime.datetime.utcnow):
+ _byte_encoding = "utf-8"
+
+ def __init__(
+ self,
+ client,
+ path,
+ duration,
+ identifier=None,
+ utcnow=datetime.datetime.utcnow,
+ ):
"""Create a non-blocking lease.
:param client: A :class:`~kazoo.client.KazooClient` instance.
@@ -78,15 +84,19 @@ class NonBlockingLease(object):
if data["version"] != self._version:
# We need an upgrade, let someone else take the lease
return
- current_end = datetime.datetime.strptime(data['end'],
- self._date_format)
- if data['holder'] != ident and now < current_end:
+ current_end = datetime.datetime.strptime(
+ data["end"], self._date_format
+ )
+ if data["holder"] != ident and now < current_end:
# Another client is still holding the lease
return
client.delete(holder_path)
end_lease = (now + duration).strftime(self._date_format)
- new_data = {'version': self._version, 'holder': ident,
- 'end': end_lease}
+ new_data = {
+ "version": self._version,
+ "holder": ident,
+ "end": end_lease,
+ }
client.create(holder_path, self._encode(new_data))
self.obtained = True
@@ -128,12 +138,24 @@ class MultiNonBlockingLease(object):
"""
- def __init__(self, client, count, path, duration, identifier=None,
- utcnow=datetime.datetime.utcnow):
+ def __init__(
+ self,
+ client,
+ count,
+ path,
+ duration,
+ identifier=None,
+ utcnow=datetime.datetime.utcnow,
+ ):
self.obtained = False
for num in range(count):
- ls = NonBlockingLease(client, '%s/%d' % (path, num), duration,
- identifier=identifier, utcnow=utcnow)
+ ls = NonBlockingLease(
+ client,
+ "%s/%d" % (path, num),
+ duration,
+ identifier=identifier,
+ utcnow=utcnow,
+ )
if ls:
self.obtained = True
break
diff --git a/kazoo/recipe/lock.py b/kazoo/recipe/lock.py
index 88b98e7..d891d63 100644
--- a/kazoo/recipe/lock.py
+++ b/kazoo/recipe/lock.py
@@ -249,7 +249,7 @@ class Lock(object):
self.create_path, self.data, ephemeral=ephemeral, sequence=True
)
# strip off path to node
- node = node[len(self.path) + 1:]
+ node = node[len(self.path) + 1 :]
self.node = node
@@ -294,7 +294,7 @@ class Lock(object):
(e.g. rlock), this and also edge cases where the lock's ephemeral node
is gone.
"""
- node_sequence = node[len(self.prefix):]
+ node_sequence = node[len(self.prefix) :]
children = self.client.get_children(self.path)
found_self = False
# Filter out the contenders using the computed regex
diff --git a/kazoo/recipe/partitioner.py b/kazoo/recipe/partitioner.py
index c552ea2..21dc6ef 100644
--- a/kazoo/recipe/partitioner.py
+++ b/kazoo/recipe/partitioner.py
@@ -56,6 +56,7 @@ class PartitionState(object):
be recreated.
"""
+
ALLOCATING = "ALLOCATING"
ACQUIRED = "ACQUIRED"
RELEASE = "RELEASE"
@@ -135,9 +136,18 @@ class SetPartitioner(object):
The current partition was released and is being re-allocated.
"""
- def __init__(self, client, path, set, partition_func=None,
- identifier=None, time_boundary=30, max_reaction_time=1,
- state_change_event=None):
+
+ def __init__(
+ self,
+ client,
+ path,
+ set,
+ partition_func=None,
+ identifier=None,
+ time_boundary=30,
+ max_reaction_time=1,
+ state_change_event=None,
+ ):
"""Create a :class:`~SetPartitioner` instance
:param client: A :class:`~kazoo.client.KazooClient` instance.
@@ -159,19 +169,22 @@ class SetPartitioner(object):
# Used to differentiate two states with the same names in time
self.state_id = 0
self.state = PartitionState.ALLOCATING
- self.state_change_event = state_change_event or \
- client.handler.event_object()
+ self.state_change_event = (
+ state_change_event or client.handler.event_object()
+ )
self._client = client
self._path = path
self._set = set
self._partition_set = []
self._partition_func = partition_func or self._partitioner
- self._identifier = identifier or '%s-%s' % (
- socket.getfqdn(), os.getpid())
+ self._identifier = identifier or "%s-%s" % (
+ socket.getfqdn(),
+ os.getpid(),
+ )
self._locks = []
- self._lock_path = '/'.join([path, 'locks'])
- self._party_path = '/'.join([path, 'party'])
+ self._lock_path = "/".join([path, "locks"])
+ self._party_path = "/".join([path, "party"])
self._time_boundary = time_boundary
self._max_reaction_time = max_reaction_time
@@ -183,8 +196,9 @@ class SetPartitioner(object):
client.ensure_path(self._party_path)
# Join the party
- self._party = client.ShallowParty(self._party_path,
- identifier=self._identifier)
+ self._party = client.ShallowParty(
+ self._party_path, identifier=self._identifier
+ )
self._party.join()
self._state_change = client.handler.rlock_object()
@@ -313,11 +327,12 @@ class SetPartitioner(object):
# Split up the set
partition_set = self._partition_func(
- self._identifier, list(self._party), self._set)
+ self._identifier, list(self._party), self._set
+ )
# Proceed to acquire locks for the working set as needed
for member in partition_set:
- lock = self._client.Lock(self._lock_path + '/' + str(member))
+ lock = self._client.Lock(self._lock_path + "/" + str(member))
while True:
try:
@@ -386,8 +401,9 @@ class SetPartitioner(object):
:param client_handler: If True, deliver the result using the
client's event handler.
"""
- watcher = PatientChildrenWatch(self._client, self._party_path,
- self._time_boundary)
+ watcher = PatientChildrenWatch(
+ self._client, self._party_path, self._time_boundary
+ )
asy = watcher.start()
if func is not None:
# We spin up the function in a separate thread/greenlet
@@ -419,7 +435,7 @@ class SetPartitioner(object):
i = workers.index(identifier)
# Now return the partition list starting at our location and
# skipping the other workers
- return all_partitions[i::len(workers)]
+ return all_partitions[i :: len(workers)]
def _set_state(self, state):
self.state = state
diff --git a/kazoo/recipe/party.py b/kazoo/recipe/party.py
index 7186c10..2a0f5df 100644
--- a/kazoo/recipe/party.py
+++ b/kazoo/recipe/party.py
@@ -14,6 +14,7 @@ from kazoo.exceptions import NodeExistsError, NoNodeError
class BaseParty(object):
"""Base implementation of a party."""
+
def __init__(self, client, path, identifier=None):
"""
:param client: A :class:`~kazoo.client.KazooClient` instance.
@@ -24,7 +25,7 @@ class BaseParty(object):
"""
self.client = client
self.path = path
- self.data = str(identifier or "").encode('utf-8')
+ self.data = str(identifier or "").encode("utf-8")
self.ensured_path = False
self.participating = False
@@ -71,6 +72,7 @@ class BaseParty(object):
class Party(BaseParty):
"""Simple pool of participating processes"""
+
_NODE_NAME = "__party__"
def __init__(self, client, path, identifier=None):
@@ -84,9 +86,10 @@ class Party(BaseParty):
children = self._get_children()
for child in children:
try:
- d, _ = self.client.retry(self.client.get, self.path +
- "/" + child)
- yield d.decode('utf-8')
+ d, _ = self.client.retry(
+ self.client.get, self.path + "/" + child
+ )
+ yield d.decode("utf-8")
except NoNodeError: # pragma: nocover
pass
@@ -105,9 +108,10 @@ class ShallowParty(BaseParty):
of getting a list of participants to a single Zookeeper call.
"""
+
def __init__(self, client, path, identifier=None):
BaseParty.__init__(self, client, path, identifier=identifier)
- self.node = '-'.join([uuid.uuid4().hex, self.data.decode('utf-8')])
+ self.node = "-".join([uuid.uuid4().hex, self.data.decode("utf-8")])
self.create_path = self.path + "/" + self.node
def __iter__(self):
@@ -115,4 +119,4 @@ class ShallowParty(BaseParty):
self._ensure_parent()
children = self._get_children()
for child in children:
- yield child[child.find('-') + 1:]
+ yield child[child.find("-") + 1 :]
diff --git a/kazoo/recipe/queue.py b/kazoo/recipe/queue.py
index 3887973..30d3066 100644
--- a/kazoo/recipe/queue.py
+++ b/kazoo/recipe/queue.py
@@ -27,7 +27,7 @@ class BaseQueue(object):
self.client = client
self.path = path
self._entries_path = path
- self.structure_paths = (self.path, )
+ self.structure_paths = (self.path,)
self.ensured_path = False
def _check_put_arguments(self, value, priority=100):
@@ -87,7 +87,8 @@ class Queue(BaseQueue):
def _inner_get(self):
if not self._children:
self._children = self.client.retry(
- self.client.get_children, self.path)
+ self.client.get_children, self.path
+ )
self._children = sorted(self._children)
if not self._children:
return None
@@ -114,8 +115,9 @@ class Queue(BaseQueue):
"""
self._check_put_arguments(value, priority)
self._ensure_paths()
- path = '{path}/{prefix}{priority:03d}-'.format(
- path=self.path, prefix=self.prefix, priority=priority)
+ path = "{path}/{prefix}{priority:03d}-".format(
+ path=self.path, prefix=self.prefix, priority=priority
+ )
self.client.create(path, value, sequence=True)
@@ -143,6 +145,7 @@ class LockingQueue(BaseQueue):
:class:`LockingQueue` requires ZooKeeper 3.4 or above, since it is
using transactions.
"""
+
lock = "/taken"
entries = "/entries"
entry = "entry"
@@ -180,10 +183,11 @@ class LockingQueue(BaseQueue):
self.client.create(
"{path}/{prefix}-{priority:03d}-".format(
- path=self._entries_path,
- prefix=self.entry,
- priority=priority),
- value, sequence=True)
+ path=self._entries_path, prefix=self.entry, priority=priority
+ ),
+ value,
+ sequence=True,
+ )
def put_all(self, values, priority=100):
"""Put several entries into the queue. The action only succeeds
@@ -211,8 +215,11 @@ class LockingQueue(BaseQueue):
"{path}/{prefix}-{priority:03d}-".format(
path=self._entries_path,
prefix=self.entry,
- priority=priority),
- value, sequence=True)
+ priority=priority,
+ ),
+ value,
+ sequence=True,
+ )
def get(self, timeout=None):
"""Locks and gets an entry from the queue. If a previously got entry
@@ -253,12 +260,12 @@ class LockingQueue(BaseQueue):
if self.processing_element is not None and self.holds_lock():
id_, value = self.processing_element
with self.client.transaction() as transaction:
- transaction.delete("{path}/{id}".format(
- path=self._entries_path,
- id=id_))
- transaction.delete("{path}/{id}".format(
- path=self._lock_path,
- id=id_))
+ transaction.delete(
+ "{path}/{id}".format(path=self._entries_path, id=id_)
+ )
+ transaction.delete(
+ "{path}/{id}".format(path=self._lock_path, id=id_)
+ )
self.processing_element = None
return True
else:
@@ -274,9 +281,9 @@ class LockingQueue(BaseQueue):
if self.processing_element is not None and self.holds_lock():
id_, value = self.processing_element
with self.client.transaction() as transaction:
- transaction.delete("{path}/{id}".format(
- path=self._lock_path,
- id=id_))
+ transaction.delete(
+ "{path}/{id}".format(path=self._lock_path, id=id_)
+ )
self.processing_element = None
return True
else:
@@ -297,11 +304,13 @@ class LockingQueue(BaseQueue):
values = self.client.retry(
self.client.get_children,
self._entries_path,
- check_for_updates)
+ check_for_updates,
+ )
taken = self.client.retry(
self.client.get_children,
self._lock_path,
- check_for_updates)
+ check_for_updates,
+ )
available = self._filter_locked(values, taken)
if len(available) > 0:
ret = self._take(available[0])
@@ -324,17 +333,19 @@ class LockingQueue(BaseQueue):
def _filter_locked(self, values, taken):
taken = set(taken)
available = sorted(values)
- return (available if len(taken) == 0 else
- [x for x in available if x not in taken])
+ return (
+ available
+ if len(taken) == 0
+ else [x for x in available if x not in taken]
+ )
def _take(self, id_):
try:
self.client.create(
- "{path}/{id}".format(
- path=self._lock_path,
- id=id_),
+ "{path}/{id}".format(path=self._lock_path, id=id_),
self.id,
- ephemeral=True)
+ ephemeral=True,
+ )
except NodeExistsError:
# Item is already locked
return None
@@ -342,12 +353,12 @@ class LockingQueue(BaseQueue):
try:
value, stat = self.client.retry(
self.client.get,
- "{path}/{id}".format(path=self._entries_path, id=id_))
+ "{path}/{id}".format(path=self._entries_path, id=id_),
+ )
except NoNodeError:
# Item is already consumed
self.client.delete(
- "{path}/{id}".format(
- path=self._lock_path,
- id=id_))
+ "{path}/{id}".format(path=self._lock_path, id=id_)
+ )
return None
return (id_, value)
diff --git a/kazoo/recipe/watchers.py b/kazoo/recipe/watchers.py
index 96ec4fe..de61777 100644
--- a/kazoo/recipe/watchers.py
+++ b/kazoo/recipe/watchers.py
@@ -15,11 +15,7 @@ import logging
import time
import warnings
-from kazoo.exceptions import (
- ConnectionClosedError,
- NoNodeError,
- KazooException
-)
+from kazoo.exceptions import ConnectionClosedError, NoNodeError, KazooException
from kazoo.protocol.states import KazooState
from kazoo.retry import KazooRetry
@@ -37,6 +33,7 @@ def _ignore_closed(func):
return func(*args, **kwargs)
except ConnectionClosedError:
pass
+
return wrapper
@@ -90,6 +87,7 @@ class DataWatch(object):
passed to it and warns that they are no longer respected.
"""
+
def __init__(self, client, path, func=None, *args, **kwargs):
"""Create a data watcher for a path
@@ -110,18 +108,22 @@ class DataWatch(object):
self._stopped = False
self._run_lock = client.handler.lock_object()
self._version = None
- self._retry = KazooRetry(max_tries=None,
- sleep_func=client.handler.sleep_func)
+ self._retry = KazooRetry(
+ max_tries=None, sleep_func=client.handler.sleep_func
+ )
self._include_event = None
self._ever_called = False
self._used = False
if args or kwargs:
- warnings.warn('Passing additional arguments to DataWatch is'
- ' deprecated. ignore_missing_node is now assumed '
- ' to be True by default, and the event will be '
- ' sent if the function can handle receiving it',
- DeprecationWarning, stacklevel=2)
+ warnings.warn(
+ "Passing additional arguments to DataWatch is"
+ " deprecated. ignore_missing_node is now assumed "
+ " to be True by default, and the event will be "
+ " sent if the function can handle receiving it",
+ DeprecationWarning,
+ stacklevel=2,
+ )
# Register our session listener if we're going to resume
# across session losses
@@ -143,7 +145,8 @@ class DataWatch(object):
if self._used:
raise KazooException(
"A function has already been associated with this "
- "DataWatch instance.")
+ "DataWatch instance."
+ )
self._func = func
@@ -181,15 +184,17 @@ class DataWatch(object):
initial_version = self._version
try:
- data, stat = self._retry(self._client.get,
- self._path, self._watcher)
+ data, stat = self._retry(
+ self._client.get, self._path, self._watcher
+ )
except NoNodeError:
data = None
# This will set 'stat' to None if the node does not yet
# exist.
- stat = self._retry(self._client.exists, self._path,
- self._watcher)
+ stat = self._retry(
+ self._client.exists, self._path, self._watcher
+ )
if stat:
self._client.handler.spawn(self._get_data)
return
@@ -245,8 +250,15 @@ class ChildrenWatch(object):
# Above function is called immediately and prints children
"""
- def __init__(self, client, path, func=None,
- allow_session_lost=True, send_event=False):
+
+ def __init__(
+ self,
+ client,
+ path,
+ func=None,
+ allow_session_lost=True,
+ send_event=False,
+ ):
"""Create a children watcher for a path
:param client: A zookeeper client.
@@ -301,7 +313,8 @@ class ChildrenWatch(object):
if self._used:
raise KazooException(
"A function has already been associated with this "
- "ChildrenWatch instance.")
+ "ChildrenWatch instance."
+ )
self._func = func
@@ -318,8 +331,9 @@ class ChildrenWatch(object):
return
try:
- children = self._client.retry(self._client.get_children,
- self._path, self._watcher)
+ children = self._client.retry(
+ self._client.get_children, self._path, self._watcher
+ )
except NoNodeError:
self._stopped = True
return
@@ -327,8 +341,10 @@ class ChildrenWatch(object):
if not self._watch_established:
self._watch_established = True
- if self._prior_children is not None and \
- self._prior_children == children:
+ if (
+ self._prior_children is not None
+ and self._prior_children == children
+ ):
return
self._prior_children = children
@@ -354,8 +370,11 @@ class ChildrenWatch(object):
def _session_watcher(self, state):
if state in (KazooState.LOST, KazooState.SUSPENDED):
self._watch_established = False
- elif (state == KazooState.CONNECTED and
- not self._watch_established and not self._stopped):
+ elif (
+ state == KazooState.CONNECTED
+ and not self._watch_established
+ and not self._stopped
+ ):
self._client.handler.spawn(self._get_children)
@@ -388,6 +407,7 @@ class PatientChildrenWatch(object):
checked to see if the children have changed later.
"""
+
def __init__(self, client, path, time_boundary=30):
self.client = client
self.path = path
@@ -412,8 +432,10 @@ class PatientChildrenWatch(object):
while True:
async_result = self.client.handler.async_result()
self.children = self.client.retry(
- self.client.get_children, self.path,
- partial(self._children_watcher, async_result))
+ self.client.get_children,
+ self.path,
+ partial(self._children_watcher, async_result),
+ )
self.client.handler.sleep_func(self.time_boundary)
if self.children_changed.is_set():
diff --git a/kazoo/retry.py b/kazoo/retry.py
index 707b378..48b94e1 100644
--- a/kazoo/retry.py
+++ b/kazoo/retry.py
@@ -32,19 +32,23 @@ class InterruptedError(RetryFailedError):
class KazooRetry(object):
"""Helper for retrying a method in the face of retry-able
exceptions"""
- RETRY_EXCEPTIONS = (
- ConnectionLoss,
- OperationTimeoutError,
- ForceRetryError
- )
-
- EXPIRED_EXCEPTIONS = (
- SessionExpiredError,
- )
-
- def __init__(self, max_tries=1, delay=0.1, backoff=2, max_jitter=0.4,
- max_delay=60.0, ignore_expire=True, sleep_func=time.sleep,
- deadline=None, interrupt=None):
+
+ RETRY_EXCEPTIONS = (ConnectionLoss, OperationTimeoutError, ForceRetryError)
+
+ EXPIRED_EXCEPTIONS = (SessionExpiredError,)
+
+ def __init__(
+ self,
+ max_tries=1,
+ delay=0.1,
+ backoff=2,
+ max_jitter=0.4,
+ max_delay=60.0,
+ ignore_expire=True,
+ sleep_func=time.sleep,
+ deadline=None,
+ interrupt=None,
+ ):
"""Create a :class:`KazooRetry` instance for retrying function
calls.
@@ -92,14 +96,16 @@ class KazooRetry(object):
def copy(self):
"""Return a clone of this retry manager"""
- obj = KazooRetry(max_tries=self.max_tries,
- delay=self.delay,
- backoff=self.backoff,
- max_jitter=self.max_jitter,
- max_delay=self.max_delay,
- sleep_func=self.sleep_func,
- deadline=self.deadline,
- interrupt=self.interrupt)
+ obj = KazooRetry(
+ max_tries=self.max_tries,
+ delay=self.delay,
+ backoff=self.backoff,
+ max_jitter=self.max_jitter,
+ max_delay=self.max_delay,
+ sleep_func=self.sleep_func,
+ deadline=self.deadline,
+ interrupt=self.interrupt,
+ )
obj.retry_exceptions = self.retry_exceptions
return obj
@@ -131,12 +137,15 @@ class KazooRetry(object):
if self._attempts == self.max_tries:
raise RetryFailedError("Too many retry attempts")
self._attempts += 1
- jitter = random.uniform(1.0-self.max_jitter,
- 1.0+self.max_jitter)
+ jitter = random.uniform(
+ 1.0 - self.max_jitter, 1.0 + self.max_jitter
+ )
sleeptime = self._cur_delay * jitter
- if self._cur_stoptime is not None and \
- time.time() + sleeptime >= self._cur_stoptime:
+ if (
+ self._cur_stoptime is not None
+ and time.time() + sleeptime >= self._cur_stoptime
+ ):
raise RetryFailedError("Exceeded retry deadline")
if self.interrupt:
@@ -150,5 +159,4 @@ class KazooRetry(object):
raise InterruptedError()
else:
self.sleep_func(sleeptime)
- self._cur_delay = min(sleeptime * self.backoff,
- self.max_delay)
+ self._cur_delay = min(sleeptime * self.backoff, self.max_delay)
diff --git a/kazoo/security.py b/kazoo/security.py
index e473360..6839944 100644
--- a/kazoo/security.py
+++ b/kazoo/security.py
@@ -5,10 +5,10 @@ import hashlib
# Represents a Zookeeper ID and ACL object
-Id = namedtuple('Id', 'scheme id')
+Id = namedtuple("Id", "scheme id")
-class ACL(namedtuple('ACL', 'perms id')):
+class ACL(namedtuple("ACL", "perms id")):
"""An ACL for a Zookeeper Node
An ACL object is created by using an :class:`Id` object along with
@@ -16,27 +16,31 @@ class ACL(namedtuple('ACL', 'perms id')):
:meth:`make_digest_acl` should be used to create an ACL object with
the desired scheme, id, and permissions.
"""
+
@property
def acl_list(self):
perms = []
if self.perms & Permissions.ALL == Permissions.ALL:
- perms.append('ALL')
+ perms.append("ALL")
return perms
if self.perms & Permissions.READ == Permissions.READ:
- perms.append('READ')
+ perms.append("READ")
if self.perms & Permissions.WRITE == Permissions.WRITE:
- perms.append('WRITE')
+ perms.append("WRITE")
if self.perms & Permissions.CREATE == Permissions.CREATE:
- perms.append('CREATE')
+ perms.append("CREATE")
if self.perms & Permissions.DELETE == Permissions.DELETE:
- perms.append('DELETE')
+ perms.append("DELETE")
if self.perms & Permissions.ADMIN == Permissions.ADMIN:
- perms.append('ADMIN')
+ perms.append("ADMIN")
return perms
def __repr__(self):
- return 'ACL(perms=%r, acl_list=%s, id=%r)' % (
- self.perms, self.acl_list, self.id)
+ return "ACL(perms=%r, acl_list=%s, id=%r)" % (
+ self.perms,
+ self.acl_list,
+ self.id,
+ )
class Permissions(object):
@@ -49,8 +53,8 @@ class Permissions(object):
# Shortcuts for common Ids
-ANYONE_ID_UNSAFE = Id('world', 'anyone')
-AUTH_IDS = Id('auth', '')
+ANYONE_ID_UNSAFE = Id("world", "anyone")
+AUTH_IDS = Id("auth", "")
# Shortcuts for common ACLs
OPEN_ACL_UNSAFE = [ACL(Permissions.ALL, ANYONE_ID_UNSAFE)]
@@ -70,13 +74,21 @@ def make_digest_acl_credential(username, password):
See: https://github.com/python-zk/kazoo/pull/584
"""
- credential = username.encode('utf-8') + b":" + password.encode('utf-8')
+ credential = username.encode("utf-8") + b":" + password.encode("utf-8")
cred_hash = b64encode(hashlib.sha1(credential).digest()).strip()
- return username + ":" + cred_hash.decode('utf-8')
-
-
-def make_acl(scheme, credential, read=False, write=False,
- create=False, delete=False, admin=False, all=False):
+ return username + ":" + cred_hash.decode("utf-8")
+
+
+def make_acl(
+ scheme,
+ credential,
+ read=False,
+ write=False,
+ create=False,
+ delete=False,
+ admin=False,
+ all=False,
+):
"""Given a scheme and credential, return an :class:`ACL` object
appropriate for use with Kazoo.
@@ -118,8 +130,16 @@ def make_acl(scheme, credential, read=False, write=False,
return ACL(permissions, Id(scheme, credential))
-def make_digest_acl(username, password, read=False, write=False,
- create=False, delete=False, admin=False, all=False):
+def make_digest_acl(
+ username,
+ password,
+ read=False,
+ write=False,
+ create=False,
+ delete=False,
+ admin=False,
+ all=False,
+):
"""Create a digest ACL for Zookeeper with the given permissions
This method combines :meth:`make_digest_acl_credential` and
@@ -143,5 +163,13 @@ def make_digest_acl(username, password, read=False, write=False,
"""
cred = make_digest_acl_credential(username, password)
- return make_acl("digest", cred, read=read, write=write, create=create,
- delete=delete, admin=admin, all=all)
+ return make_acl(
+ "digest",
+ cred,
+ read=read,
+ write=write,
+ create=create,
+ delete=delete,
+ admin=admin,
+ all=all,
+ )
diff --git a/kazoo/testing/__init__.py b/kazoo/testing/__init__.py
index c1ae12c..40dbc59 100644
--- a/kazoo/testing/__init__.py
+++ b/kazoo/testing/__init__.py
@@ -1,4 +1,7 @@
from kazoo.testing.harness import KazooTestCase, KazooTestHarness
-__all__ = ('KazooTestHarness', 'KazooTestCase', )
+__all__ = (
+ "KazooTestHarness",
+ "KazooTestCase",
+)
diff --git a/kazoo/testing/common.py b/kazoo/testing/common.py
index 810b55f..7a12e2f 100644
--- a/kazoo/testing/common.py
+++ b/kazoo/testing/common.py
@@ -41,30 +41,34 @@ log = logging.getLogger(__name__)
def debug(sig, frame):
"""Interrupt running process, and provide a python prompt for
interactive debugging."""
- d = {'_frame': frame} # Allow access to frame object.
+ d = {"_frame": frame} # Allow access to frame object.
d.update(frame.f_globals) # Unless shadowed by global
d.update(frame.f_locals)
i = code.InteractiveConsole(d)
message = "Signal recieved : entering python shell.\nTraceback:\n"
- message += ''.join(traceback.format_stack(frame))
+ message += "".join(traceback.format_stack(frame))
i.interact(message)
def listen():
- if os.name != 'nt': # SIGUSR1 is not supported on Windows
+ if os.name != "nt": # SIGUSR1 is not supported on Windows
signal.signal(signal.SIGUSR1, debug) # Register handler
+
+
listen()
def to_java_compatible_path(path):
- if os.name == 'nt':
- path = path.replace('\\', '/')
+ if os.name == "nt":
+ path = path.replace("\\", "/")
return path
+
ServerInfo = namedtuple(
"ServerInfo",
- "server_id client_port election_port leader_port admin_port peer_type")
+ "server_id client_port election_port leader_port admin_port peer_type",
+)
class ManagedZooKeeper(object):
@@ -75,9 +79,16 @@ class ManagedZooKeeper(object):
future, we may want to do that, especially when run in a
Hudson/Buildbot context, to ensure more test robustness."""
- def __init__(self, software_path, server_info, peers=(), classpath=None,
- configuration_entries=(), java_system_properties=(),
- jaas_config=None):
+ def __init__(
+ self,
+ software_path,
+ server_info,
+ peers=(),
+ classpath=None,
+ configuration_entries=(),
+ java_system_properties=(),
+ jaas_config=None,
+ ):
"""Define the ZooKeeper test instance.
@param install_path: The path to the install for ZK
@@ -116,7 +127,8 @@ class ManagedZooKeeper(object):
os.mkdir(data_path)
with open(config_path, "w") as config:
- config.write("""
+ config.write(
+ """
tickTime=2000
dataDir=%s
clientPort=%s
@@ -124,72 +136,94 @@ maxClientCnxns=0
admin.serverPort=%s
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
%s
-""" % (to_java_compatible_path(data_path),
- self.server_info.client_port,
- self.server_info.admin_port,
- "\n".join(self.configuration_entries))) # NOQA
+"""
+ % (
+ to_java_compatible_path(data_path),
+ self.server_info.client_port,
+ self.server_info.admin_port,
+ "\n".join(self.configuration_entries),
+ )
+ ) # NOQA
# setup a replicated setup if peers are specified
if self.peers:
servers_cfg = []
for p in chain((self.server_info,), self.peers):
- servers_cfg.append("server.%s=localhost:%s:%s:%s" % (
- p.server_id, p.leader_port, p.election_port, p.peer_type))
+ servers_cfg.append(
+ "server.%s=localhost:%s:%s:%s"
+ % (
+ p.server_id,
+ p.leader_port,
+ p.election_port,
+ p.peer_type,
+ )
+ )
with open(config_path, "a") as config:
- config.write("""
+ config.write(
+ """
initLimit=4
syncLimit=2
%s
peerType=%s
-""" % ("\n".join(servers_cfg), self.server_info.peer_type))
+"""
+ % ("\n".join(servers_cfg), self.server_info.peer_type)
+ )
# Write server ids into datadir
with open(os.path.join(data_path, "myid"), "w") as myid_file:
myid_file.write(str(self.server_info.server_id))
# Write JAAS configuration
with open(jaas_config_path, "w") as jaas_file:
- jaas_file.write(self.jaas_config or '')
+ jaas_file.write(self.jaas_config or "")
with open(log4j_path, "w") as log4j:
- log4j.write("""
+ log4j.write(
+ """
# DEFAULT: console appender only
log4j.rootLogger=INFO, ROLLINGFILE
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
log4j.appender.ROLLINGFILE.Threshold=DEBUG
-log4j.appender.ROLLINGFILE.File=""" + to_java_compatible_path( # NOQA
- self.working_path + os.sep + "zookeeper.log\n"))
-
- args = [
- "java",
- "-cp", self.classpath,
-
- # make_digest_acl_credential assumes UTF-8, but ZK decodes
- # digest auth packets using the JVM's default "charset"--which
- # depends on the environment. Force it to use UTF-8 to avoid
- # test failures.
- "-Dfile.encoding=UTF-8",
-
- # "-Dlog4j.debug",
- "-Dreadonlymode.enabled=true",
- "-Dzookeeper.log.dir=%s" % log_path,
- "-Dzookeeper.root.logger=INFO,CONSOLE",
- "-Dlog4j.configuration=file:%s" % log4j_path,
-
- # OS X: Prevent java from appearing in menu bar, process dock
- # and from activation of the main workspace on run.
- "-Djava.awt.headless=true",
-
- # JAAS configuration for SASL authentication
- "-Djava.security.auth.login.config=%s" % jaas_config_path,
- ] + list(self.java_system_properties) + [
- "org.apache.zookeeper.server.quorum.QuorumPeerMain",
- config_path,
- ]
+log4j.appender.ROLLINGFILE.File="""
+ + to_java_compatible_path( # NOQA
+ self.working_path + os.sep + "zookeeper.log\n"
+ )
+ )
+
+ args = (
+ [
+ "java",
+ "-cp",
+ self.classpath,
+ # make_digest_acl_credential assumes UTF-8, but ZK decodes
+ # digest auth packets using the JVM's default "charset"--which
+ # depends on the environment. Force it to use UTF-8 to avoid
+ # test failures.
+ "-Dfile.encoding=UTF-8",
+ # "-Dlog4j.debug",
+ "-Dreadonlymode.enabled=true",
+ "-Dzookeeper.log.dir=%s" % log_path,
+ "-Dzookeeper.root.logger=INFO,CONSOLE",
+ "-Dlog4j.configuration=file:%s" % log4j_path,
+ # OS X: Prevent java from appearing in menu bar, process dock
+ # and from activation of the main workspace on run.
+ "-Djava.awt.headless=true",
+ # JAAS configuration for SASL authentication
+ "-Djava.security.auth.login.config=%s" % jaas_config_path,
+ ]
+ + list(self.java_system_properties)
+ + [
+ "org.apache.zookeeper.server.quorum.QuorumPeerMain",
+ config_path,
+ ]
+ )
self.process = subprocess.Popen(args=args)
- log.info("Started zookeeper process %s using args %s",
- self.process.pid, args)
+ log.info(
+ "Started zookeeper process %s using args %s",
+ self.process.pid,
+ args,
+ )
self._running = True
@property
@@ -201,33 +235,27 @@ log4j.appender.ROLLINGFILE.File=""" + to_java_compatible_path( # NOQA
# Two possibilities, as seen in zkEnv.sh:
# Check for a release - top-level zookeeper-*.jar?
- jars = glob((os.path.join(
- self.install_path, 'zookeeper-*.jar')))
+ jars = glob((os.path.join(self.install_path, "zookeeper-*.jar")))
if jars:
# Release build (`ant package`)
- jars.extend(glob(os.path.join(
- self.install_path,
- "lib/*.jar")))
- jars.extend(glob(os.path.join(
- self.install_path,
- "*.jar")))
+ jars.extend(glob(os.path.join(self.install_path, "lib/*.jar")))
+ jars.extend(glob(os.path.join(self.install_path, "*.jar")))
# support for different file locations on Debian/Ubuntu
- jars.extend(glob(os.path.join(
- self.install_path,
- "log4j-*.jar")))
- jars.extend(glob(os.path.join(
- self.install_path,
- "slf4j-api-*.jar")))
- jars.extend(glob(os.path.join(
- self.install_path,
- "slf4j-log4j*.jar")))
+ jars.extend(glob(os.path.join(self.install_path, "log4j-*.jar")))
+ jars.extend(
+ glob(os.path.join(self.install_path, "slf4j-api-*.jar"))
+ )
+ jars.extend(
+ glob(os.path.join(self.install_path, "slf4j-log4j*.jar"))
+ )
else:
# Development build (plain `ant`)
- jars = glob((os.path.join(
- self.install_path, 'build/zookeeper-*.jar')))
- jars.extend(glob(os.path.join(
- self.install_path,
- "build/lib/*.jar")))
+ jars = glob(
+ (os.path.join(self.install_path, "build/zookeeper-*.jar"))
+ )
+ jars.extend(
+ glob(os.path.join(self.install_path, "build/lib/*.jar"))
+ )
return os.pathsep.join(jars)
@@ -259,10 +287,13 @@ log4j.appender.ROLLINGFILE.File=""" + to_java_compatible_path( # NOQA
self.process.terminate()
self.process.wait()
if self.process.returncode != 0:
- log.warn("Zookeeper process %s failed to terminate with"
- " non-zero return code (it terminated with %s return"
- " code instead)", self.process.pid,
- self.process.returncode)
+ log.warn(
+ "Zookeeper process %s failed to terminate with"
+ " non-zero return code (it terminated with %s return"
+ " code instead)",
+ self.process.pid,
+ self.process.returncode,
+ )
self._running = False
def destroy(self):
@@ -273,23 +304,26 @@ log4j.appender.ROLLINGFILE.File=""" + to_java_compatible_path( # NOQA
shutil.rmtree(self.working_path, True)
def get_logs(self):
- log_path = pathlib.Path(
- self.working_path,
- 'zookeeper.log'
- )
+ log_path = pathlib.Path(self.working_path, "zookeeper.log")
if log_path.exists():
- log_file = log_path.open('r')
+ log_file = log_path.open("r")
lines = log_file.readlines()
return lines[-100:]
return []
-class ZookeeperCluster(object):
- def __init__(self, install_path=None, classpath=None,
- size=3, port_offset=20000, observer_start_id=-1,
- configuration_entries=(),
- java_system_properties=(),
- jaas_config=None):
+class ZookeeperCluster(object):
+ def __init__(
+ self,
+ install_path=None,
+ classpath=None,
+ size=3,
+ port_offset=20000,
+ observer_start_id=-1,
+ configuration_entries=(),
+ java_system_properties=(),
+ jaas_config=None,
+ ):
self._install_path = install_path
self._classpath = classpath
self._servers = []
@@ -301,11 +335,12 @@ class ZookeeperCluster(object):
for i in range(size):
server_id = i + 1
if observer_start_id != -1 and server_id >= observer_start_id:
- peer_type = 'observer'
+ peer_type = "observer"
else:
- peer_type = 'participant'
- info = ServerInfo(server_id, port, port + 1, port + 2, port + 3,
- peer_type)
+ peer_type = "participant"
+ info = ServerInfo(
+ server_id, port, port + 1, port + 2, port + 3, peer_type
+ )
peers.append(info)
port += 10
@@ -315,11 +350,13 @@ class ZookeeperCluster(object):
server_info = server_peers.pop(i)
self._servers.append(
ManagedZooKeeper(
- self._install_path, server_info, server_peers,
+ self._install_path,
+ server_info,
+ server_peers,
classpath=self._classpath,
configuration_entries=configuration_entries,
java_system_properties=java_system_properties,
- jaas_config=jaas_config
+ jaas_config=jaas_config,
)
)
@@ -339,6 +376,7 @@ class ZookeeperCluster(object):
# required for a client to successfully connect (2s vs. 4s without
# the sleep).
import time
+
time.sleep(2)
def stop(self):
diff --git a/kazoo/testing/harness.py b/kazoo/testing/harness.py
index d7dbad6..fa4427f 100644
--- a/kazoo/testing/harness.py
+++ b/kazoo/testing/harness.py
@@ -8,9 +8,7 @@ from kazoo import python2atexit as atexit
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
from kazoo.protocol.connection import _CONNECTION_DROP, _SESSION_EXPIRED
-from kazoo.protocol.states import (
- KazooState
-)
+from kazoo.protocol.states import KazooState
from kazoo.testing.common import ZookeeperCluster
log = logging.getLogger(__name__)
@@ -21,7 +19,7 @@ CLUSTER_DEFAULTS = {
"ZOOKEEPER_PORT_OFFSET": 20000,
"ZOOKEEPER_CLUSTER_SIZE": 3,
"ZOOKEEPER_OBSERVER_START_ID": -1,
- "ZOOKEEPER_LOCAL_SESSION_RO": "false"
+ "ZOOKEEPER_LOCAL_SESSION_RO": "false",
}
@@ -29,20 +27,22 @@ def get_global_cluster():
global CLUSTER, CLUSTER_CONF
cluster_conf = {
k: os.environ.get(k, CLUSTER_DEFAULTS.get(k))
- for k in ["ZOOKEEPER_PATH",
- "ZOOKEEPER_CLASSPATH",
- "ZOOKEEPER_PORT_OFFSET",
- "ZOOKEEPER_CLUSTER_SIZE",
- "ZOOKEEPER_VERSION",
- "ZOOKEEPER_OBSERVER_START_ID",
- "ZOOKEEPER_JAAS_AUTH",
- "ZOOKEEPER_LOCAL_SESSION_RO"]
+ for k in [
+ "ZOOKEEPER_PATH",
+ "ZOOKEEPER_CLASSPATH",
+ "ZOOKEEPER_PORT_OFFSET",
+ "ZOOKEEPER_CLUSTER_SIZE",
+ "ZOOKEEPER_VERSION",
+ "ZOOKEEPER_OBSERVER_START_ID",
+ "ZOOKEEPER_JAAS_AUTH",
+ "ZOOKEEPER_LOCAL_SESSION_RO",
+ ]
}
if CLUSTER is not None:
if CLUSTER_CONF == cluster_conf:
return CLUSTER
else:
- log.info('Config change detected. Reconfiguring cluster...')
+ log.info("Config change detected. Reconfiguring cluster...")
CLUSTER.terminate()
CLUSTER = None
# Create a new cluster
@@ -51,16 +51,17 @@ def get_global_cluster():
ZK_PORT_OFFSET = int(cluster_conf.get("ZOOKEEPER_PORT_OFFSET"))
ZK_CLUSTER_SIZE = int(cluster_conf.get("ZOOKEEPER_CLUSTER_SIZE"))
ZK_VERSION = cluster_conf.get("ZOOKEEPER_VERSION")
- if '-' in ZK_VERSION:
+ if "-" in ZK_VERSION:
# Ignore pre-release markers like -alpha
- ZK_VERSION = ZK_VERSION.split('-')[0]
- ZK_VERSION = tuple([int(n) for n in ZK_VERSION.split('.')])
+ ZK_VERSION = ZK_VERSION.split("-")[0]
+ ZK_VERSION = tuple([int(n) for n in ZK_VERSION.split(".")])
ZK_OBSERVER_START_ID = int(cluster_conf.get("ZOOKEEPER_OBSERVER_START_ID"))
assert ZK_HOME or ZK_CLASSPATH or ZK_VERSION, (
"Either ZOOKEEPER_PATH or ZOOKEEPER_CLASSPATH or "
"ZOOKEEPER_VERSION environment variable must be defined.\n"
- "For deb package installations this is /usr/share/java")
+ "For deb package installations this is /usr/share/java"
+ )
if ZK_VERSION >= (3, 5):
ZOOKEEPER_LOCAL_SESSION_RO = cluster_conf.get(
@@ -72,7 +73,7 @@ def get_global_cluster():
# required to avoid session validation error
# in read only test
"localSessionsEnabled=" + ZOOKEEPER_LOCAL_SESSION_RO,
- "localSessionsUpgradingEnabled=" + ZOOKEEPER_LOCAL_SESSION_RO
+ "localSessionsUpgradingEnabled=" + ZOOKEEPER_LOCAL_SESSION_RO,
]
# If defined, this sets the superuser password to "test"
additional_java_system_properties = [
@@ -93,9 +94,8 @@ Server {
elif ZOOKEEPER_JAAS_AUTH == "gssapi":
# Configure Zookeeper to use our test KDC.
additional_java_system_properties += [
- "-Djava.security.krb5.conf=%s" % os.path.expandvars(
- "${KRB5_CONFIG}"
- ),
+ "-Djava.security.krb5.conf=%s"
+ % os.path.expandvars("${KRB5_CONFIG}"),
"-Dsun.security.krb5.debug=true",
]
jaas_config = """
@@ -108,7 +108,9 @@ Server {
storeKey=true
useTicketCache=false
principal="zookeeper/127.0.0.1@KAZOOTEST.ORG";
-};""" % os.path.expandvars("${KRB5_TEST_ENV}/server.keytab")
+};""" % os.path.expandvars(
+ "${KRB5_TEST_ENV}/server.keytab"
+ )
else:
jaas_config = None
@@ -120,7 +122,7 @@ Server {
observer_start_id=ZK_OBSERVER_START_ID,
configuration_entries=additional_configuration_entries,
java_system_properties=additional_java_system_properties,
- jaas_config=jaas_config
+ jaas_config=jaas_config,
)
CLUSTER_CONF = cluster_conf
atexit.register(lambda cluster: cluster.terminate(), CLUSTER)
@@ -152,6 +154,7 @@ class KazooTestHarness(unittest.TestCase):
something_that_needs_zk_servers(self.servers)
"""
+
DEFAULT_CLIENT_TIMEOUT = 15
def __init__(self, *args, **kw):
@@ -173,21 +176,23 @@ class KazooTestHarness(unittest.TestCase):
return c
def _get_client(self, **client_options):
- if 'timeout' not in client_options:
- client_options['timeout'] = self.DEFAULT_CLIENT_TIMEOUT
+ if "timeout" not in client_options:
+ client_options["timeout"] = self.DEFAULT_CLIENT_TIMEOUT
c = KazooClient(self.hosts, **client_options)
self._clients.append(c)
return c
def lose_connection(self, event_factory):
"""Force client to lose connection with server"""
- self.__break_connection(_CONNECTION_DROP, KazooState.SUSPENDED,
- event_factory)
+ self.__break_connection(
+ _CONNECTION_DROP, KazooState.SUSPENDED, event_factory
+ )
def expire_session(self, event_factory):
"""Force ZK to expire a client session"""
- self.__break_connection(_SESSION_EXPIRED, KazooState.LOST,
- event_factory)
+ self.__break_connection(
+ _SESSION_EXPIRED, KazooState.LOST, event_factory
+ )
def setup_zookeeper(self, **client_options):
"""Create a ZK cluster and chrooted :class:`KazooClient`
@@ -203,8 +208,8 @@ class KazooTestHarness(unittest.TestCase):
self.cluster.start()
namespace = "/kazootests" + uuid.uuid4().hex
self.hosts = self.servers + namespace
- if 'timeout' not in client_options:
- client_options['timeout'] = self.DEFAULT_CLIENT_TIMEOUT
+ if "timeout" not in client_options:
+ client_options["timeout"] = self.DEFAULT_CLIENT_TIMEOUT
self.client = self._get_client(**client_options)
self.client.start()
self.client.ensure_path("/")
@@ -245,7 +250,7 @@ class KazooTestHarness(unittest.TestCase):
if not safe.is_set():
raise Exception("Failed to see client reconnect.")
- self.client.retry(self.client.get_async, '/')
+ self.client.retry(self.client.get_async, "/")
class KazooTestCase(KazooTestHarness):
diff --git a/kazoo/tests/conftest.py b/kazoo/tests/conftest.py
index bd43772..c7a4cc6 100644
--- a/kazoo/tests/conftest.py
+++ b/kazoo/tests/conftest.py
@@ -5,6 +5,6 @@ log = logging.getLogger(__name__)
def pytest_exception_interact(node, call, report):
cluster = node._testcase.cluster
- log.error('Zookeeper cluster logs:')
+ log.error("Zookeeper cluster logs:")
for logs in cluster.get_logs():
log.error(logs)
diff --git a/kazoo/tests/test__connection.py b/kazoo/tests/test__connection.py
index c600d59..7d362cc 100644
--- a/kazoo/tests/test__connection.py
+++ b/kazoo/tests/test__connection.py
@@ -22,7 +22,7 @@ from kazoo.tests.util import wait
from kazoo.tests.util import CI_ZK_VERSION
-class Delete(namedtuple('Delete', 'path version')):
+class Delete(namedtuple("Delete", "path version")):
type = 2
def serialize(self):
@@ -42,7 +42,7 @@ class TestConnectionHandler(KazooTestCase):
self.client._queue.append(
(Delete(self.client.chroot, -1), async_object)
)
- self.client._connection._write_sock.send(b'\0')
+ self.client._connection._write_sock.send(b"\0")
with pytest.raises(ValueError):
async_object.get()
@@ -160,7 +160,7 @@ class TestConnectionHandler(KazooTestCase):
# continues to retry. This partially reproduces a rare bug seen
# in production.
- with mock.patch.object(Connect, 'deserialize') as mock_deserialize:
+ with mock.patch.object(Connect, "deserialize") as mock_deserialize:
mock_deserialize.side_effect = bad_deserialize
try:
handler.select = delayed_select
@@ -227,7 +227,7 @@ class TestConnectionHandler(KazooTestCase):
# blow up client. simulates case where some error leaves
# a byte in the socket which doesn't correspond to the
# request queue.
- write_sock.send(b'\0')
+ write_sock.send(b"\0")
# eventually this byte should disappear from socket
wait(lambda: client.handler.select([read_sock], [], [], 0)[0] == [])
@@ -245,7 +245,7 @@ class TestConnectionDrop(KazooTestCase):
path = "/" + uuid.uuid4().hex
self.client.create(path)
self.client.add_listener(back)
- result = self.client.set_async(path, b'a' * 1000 * 1024)
+ result = self.client.set_async(path, b"a" * 1000 * 1024)
self.client._call(_CONNECTION_DROP, None)
with pytest.raises(ConnectionLoss):
@@ -273,7 +273,7 @@ class TestReadOnlyMode(KazooTestCase):
def tearDown(self):
self.client.stop()
- os.environ.pop('ZOOKEEPER_LOCAL_SESSION_RO', None)
+ os.environ.pop("ZOOKEEPER_LOCAL_SESSION_RO", None)
def test_read_only(self):
from kazoo.exceptions import NotReadOnlyCallError
@@ -294,14 +294,10 @@ class TestReadOnlyMode(KazooTestCase):
# else the test seems flaky when on CI hosts
zk_stop_threads = []
zk_stop_threads.append(
- threading.Thread(
- target=self.cluster[1].stop, daemon=True
- )
+ threading.Thread(target=self.cluster[1].stop, daemon=True)
)
zk_stop_threads.append(
- threading.Thread(
- target=self.cluster[2].stop, daemon=True
- )
+ threading.Thread(target=self.cluster[2].stop, daemon=True)
)
for thread in zk_stop_threads:
thread.start()
@@ -312,11 +308,11 @@ class TestReadOnlyMode(KazooTestCase):
assert client.client_state == KeeperState.CONNECTED_RO
# Test read only command
- assert client.get_children('/') == []
+ assert client.get_children("/") == []
# Test error with write command
with pytest.raises(NotReadOnlyCallError):
- client.create('/fred')
+ client.create("/fred")
# Wait for a ping
time.sleep(15)
@@ -373,15 +369,15 @@ class TestUnorderedXids(KazooTestCase):
ev.clear()
with pytest.raises(RuntimeError):
- self.client.get_children('/')
+ self.client.get_children("/")
ev.wait()
assert self.client.connected is False
- assert self.client.state == 'LOST'
+ assert self.client.state == "LOST"
assert self.client.client_state == KeeperState.CLOSED
args, exc_info = error_stack[-1]
- assert args == ('Unhandled exception in connection loop',)
+ assert args == ("Unhandled exception in connection loop",)
assert exc_info[0] == RuntimeError
self.client.handler.sleep_func(0.2)
diff --git a/kazoo/tests/test_barrier.py b/kazoo/tests/test_barrier.py
index d0ea870..5f79e86 100644
--- a/kazoo/tests/test_barrier.py
+++ b/kazoo/tests/test_barrier.py
@@ -136,15 +136,15 @@ class KazooDoubleBarrierTests(KazooTestCase):
t2.join()
def test_barrier_existing_parent_node(self):
- b = self.client.DoubleBarrier('/some/path', 1)
+ b = self.client.DoubleBarrier("/some/path", 1)
assert b.participating is False
- self.client.create('/some', ephemeral=True)
+ self.client.create("/some", ephemeral=True)
# the barrier cannot create children under an ephemeral node
b.enter()
assert b.participating is False
def test_barrier_existing_node(self):
- b = self.client.DoubleBarrier('/some', 1)
+ b = self.client.DoubleBarrier("/some", 1)
assert b.participating is False
self.client.ensure_path(b.path)
self.client.create(b.create_path, ephemeral=True)
diff --git a/kazoo/tests/test_build.py b/kazoo/tests/test_build.py
index 687c1dc..01dbf87 100644
--- a/kazoo/tests/test_build.py
+++ b/kazoo/tests/test_build.py
@@ -8,15 +8,15 @@ from kazoo.testing import KazooTestCase
class TestBuildEnvironment(KazooTestCase):
def setUp(self):
KazooTestCase.setUp(self)
- if not os.environ.get('CI'):
- pytest.skip('Only run build config tests on CI.')
+ if not os.environ.get("CI"):
+ pytest.skip("Only run build config tests on CI.")
def test_zookeeper_version(self):
server_version = self.client.server_version()
- server_version = '.'.join([str(i) for i in server_version])
- env_version = os.environ.get('ZOOKEEPER_VERSION')
+ server_version = ".".join([str(i) for i in server_version])
+ env_version = os.environ.get("ZOOKEEPER_VERSION")
if env_version:
- if '-' in env_version:
+ if "-" in env_version:
# Ignore pre-release markers like -alpha
- env_version = env_version.split('-')[0]
+ env_version = env_version.split("-")[0]
assert env_version == server_version
diff --git a/kazoo/tests/test_cache.py b/kazoo/tests/test_cache.py
index 33dafac..af1282a 100644
--- a/kazoo/tests/test_cache.py
+++ b/kazoo/tests/test_cache.py
@@ -13,9 +13,9 @@ from kazoo.recipe.cache import TreeCache, TreeNode, TreeEvent
class KazooAdaptiveHandlerTestCase(KazooTestHarness):
HANDLERS = (
- ('kazoo.handlers.gevent', 'SequentialGeventHandler'),
- ('kazoo.handlers.eventlet', 'SequentialEventletHandler'),
- ('kazoo.handlers.threading', 'SequentialThreadingHandler'),
+ ("kazoo.handlers.gevent", "SequentialGeventHandler"),
+ ("kazoo.handlers.eventlet", "SequentialEventletHandler"),
+ ("kazoo.handlers.threading", "SequentialThreadingHandler"),
)
def setUp(self):
@@ -35,7 +35,7 @@ class KazooAdaptiveHandlerTestCase(KazooTestHarness):
continue
else:
return cls()
- raise ImportError('No available handler')
+ raise ImportError("No available handler")
class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase):
@@ -59,7 +59,7 @@ class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase):
def make_cache(self):
if self.cache is None:
- self.path = '/' + uuid.uuid4().hex
+ self.path = "/" + uuid.uuid4().hex
self.cache = TreeCache(self.client, self.path)
self.cache.listen(lambda event: self._event_queue.put(event))
self.cache.listen_fault(lambda error: self._error_queue.put(error))
@@ -87,7 +87,7 @@ class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase):
# trigger switching on some coroutine handlers
self.client.handler.sleep_func(0.1)
- completion_queue = getattr(self.handler, 'completion_queue', None)
+ completion_queue = getattr(self.handler, "completion_queue", None)
if completion_queue is not None:
while not self.client.handler.completion_queue.empty():
self.client.handler.sleep_func(0.1)
@@ -101,10 +101,10 @@ class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase):
result = set()
for _ in range(5):
self._wait_gc()
- result.add(count_refs_by_type('TreeNode'))
+ result.add(count_refs_by_type("TreeNode"))
if len(result) == 1:
return list(result)[0]
- raise RuntimeError('could not count refs exactly')
+ raise RuntimeError("could not count refs exactly")
def test_start(self):
self.make_cache()
@@ -132,25 +132,25 @@ class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase):
self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)
- self.client.create(self.path + '/foo/bar/baz', makepath=True)
+ self.client.create(self.path + "/foo/bar/baz", makepath=True)
for _ in range(3):
self.wait_cache(TreeEvent.NODE_ADDED)
# setup stub watchers which are outside of tree cache
stub_data_watcher = Mock(spec=lambda event: None)
stub_child_watcher = Mock(spec=lambda event: None)
- self.client.get(self.path + '/foo', stub_data_watcher)
- self.client.get_children(self.path + '/foo', stub_child_watcher)
+ self.client.get(self.path + "/foo", stub_data_watcher)
+ self.client.get_children(self.path + "/foo", stub_child_watcher)
# watchers inside tree cache should be here
root_path = self.client.chroot + self.path
- assert len(self.client._data_watchers[root_path + '/foo']) == 2
- assert len(self.client._data_watchers[root_path + '/foo/bar']) == 1
- assert len(self.client._data_watchers[root_path + '/foo/bar/baz']) == 1
- assert len(self.client._child_watchers[root_path + '/foo']) == 2
- assert len(self.client._child_watchers[root_path + '/foo/bar']) == 1
+ assert len(self.client._data_watchers[root_path + "/foo"]) == 2
+ assert len(self.client._data_watchers[root_path + "/foo/bar"]) == 1
+ assert len(self.client._data_watchers[root_path + "/foo/bar/baz"]) == 1
+ assert len(self.client._child_watchers[root_path + "/foo"]) == 2
+ assert len(self.client._child_watchers[root_path + "/foo/bar"]) == 1
assert (
- len(self.client._child_watchers[root_path + '/foo/bar/baz']) == 1
+ len(self.client._child_watchers[root_path + "/foo/bar/baz"]) == 1
)
self.cache.close()
@@ -167,22 +167,22 @@ class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase):
assert self.cache._root._state != TreeNode.STATE_DEAD
# watchers should be reset
- assert len(self.client._data_watchers[root_path + '/foo']) == 1
- assert len(self.client._data_watchers[root_path + '/foo/bar']) == 0
- assert len(self.client._data_watchers[root_path + '/foo/bar/baz']) == 0
- assert len(self.client._child_watchers[root_path + '/foo']) == 1
- assert len(self.client._child_watchers[root_path + '/foo/bar']) == 0
+ assert len(self.client._data_watchers[root_path + "/foo"]) == 1
+ assert len(self.client._data_watchers[root_path + "/foo/bar"]) == 0
+ assert len(self.client._data_watchers[root_path + "/foo/bar/baz"]) == 0
+ assert len(self.client._child_watchers[root_path + "/foo"]) == 1
+ assert len(self.client._child_watchers[root_path + "/foo/bar"]) == 0
assert (
- len(self.client._child_watchers[root_path + '/foo/bar/baz']) == 0
+ len(self.client._child_watchers[root_path + "/foo/bar/baz"]) == 0
)
# outside watchers should not be deleted
assert (
- list(self.client._data_watchers[root_path + '/foo'])[0]
+ list(self.client._data_watchers[root_path + "/foo"])[0]
== stub_data_watcher
)
assert (
- list(self.client._child_watchers[root_path + '/foo'])[0]
+ list(self.client._child_watchers[root_path + "/foo"])[0]
== stub_child_watcher
)
@@ -196,11 +196,11 @@ class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase):
assert self.count_tree_node() == 1
- self.client.create(self.path + '/foo/bar/baz', makepath=True)
+ self.client.create(self.path + "/foo/bar/baz", makepath=True)
for _ in range(3):
self.wait_cache(TreeEvent.NODE_ADDED)
- self.client.delete(self.path + '/foo', recursive=True)
+ self.client.delete(self.path + "/foo", recursive=True)
for _ in range(3):
self.wait_cache(TreeEvent.NODE_REMOVED)
@@ -209,12 +209,12 @@ class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase):
# watchers should be reset
root_path = self.client.chroot + self.path
- assert self.client._data_watchers[root_path + '/foo'] == set()
- assert self.client._data_watchers[root_path + '/foo/bar'] == set()
- assert self.client._data_watchers[root_path + '/foo/bar/baz'] == set()
- assert self.client._child_watchers[root_path + '/foo'] == set()
- assert self.client._child_watchers[root_path + '/foo/bar'] == set()
- assert self.client._child_watchers[root_path + '/foo/bar/baz'] == set()
+ assert self.client._data_watchers[root_path + "/foo"] == set()
+ assert self.client._data_watchers[root_path + "/foo/bar"] == set()
+ assert self.client._data_watchers[root_path + "/foo/bar/baz"] == set()
+ assert self.client._child_watchers[root_path + "/foo"] == set()
+ assert self.client._child_watchers[root_path + "/foo/bar"] == set()
+ assert self.client._child_watchers[root_path + "/foo/bar/baz"] == set()
# should not be any leaked memory (tree node) here
assert self.count_tree_node() == 1
@@ -223,41 +223,41 @@ class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase):
self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)
- self.client.create(self.path + '/test_children', b'test_children_1')
+ self.client.create(self.path + "/test_children", b"test_children_1")
event = self.wait_cache(TreeEvent.NODE_ADDED)
assert event.event_type == TreeEvent.NODE_ADDED
- assert event.event_data.path == self.path + '/test_children'
- assert event.event_data.data == b'test_children_1'
+ assert event.event_data.path == self.path + "/test_children"
+ assert event.event_data.data == b"test_children_1"
assert event.event_data.stat.version == 0
- self.client.set(self.path + '/test_children', b'test_children_2')
+ self.client.set(self.path + "/test_children", b"test_children_2")
event = self.wait_cache(TreeEvent.NODE_UPDATED)
assert event.event_type == TreeEvent.NODE_UPDATED
- assert event.event_data.path == self.path + '/test_children'
- assert event.event_data.data == b'test_children_2'
+ assert event.event_data.path == self.path + "/test_children"
+ assert event.event_data.data == b"test_children_2"
assert event.event_data.stat.version == 1
- self.client.delete(self.path + '/test_children')
+ self.client.delete(self.path + "/test_children")
event = self.wait_cache(TreeEvent.NODE_REMOVED)
assert event.event_type == TreeEvent.NODE_REMOVED
- assert event.event_data.path == self.path + '/test_children'
- assert event.event_data.data == b'test_children_2'
+ assert event.event_data.path == self.path + "/test_children"
+ assert event.event_data.data == b"test_children_2"
assert event.event_data.stat.version == 1
def test_subtree_operation(self):
self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)
- self.client.create(self.path + '/foo/bar/baz', makepath=True)
- for relative_path in ('/foo', '/foo/bar', '/foo/bar/baz'):
+ self.client.create(self.path + "/foo/bar/baz", makepath=True)
+ for relative_path in ("/foo", "/foo/bar", "/foo/bar/baz"):
event = self.wait_cache(TreeEvent.NODE_ADDED)
assert event.event_type == TreeEvent.NODE_ADDED
assert event.event_data.path == self.path + relative_path
- assert event.event_data.data == b''
+ assert event.event_data.data == b""
assert event.event_data.stat.version == 0
- self.client.delete(self.path + '/foo', recursive=True)
- for relative_path in ('/foo/bar/baz', '/foo/bar', '/foo'):
+ self.client.delete(self.path + "/foo", recursive=True)
+ for relative_path in ("/foo/bar/baz", "/foo/bar", "/foo"):
event = self.wait_cache(TreeEvent.NODE_REMOVED)
assert event.event_type == TreeEvent.NODE_REMOVED
assert event.event_data.path == self.path + relative_path
@@ -265,79 +265,78 @@ class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase):
def test_get_data(self):
cache = self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)
- self.client.create(self.path + '/foo/bar/baz', b'@', makepath=True)
+ self.client.create(self.path + "/foo/bar/baz", b"@", makepath=True)
self.wait_cache(TreeEvent.NODE_ADDED)
self.wait_cache(TreeEvent.NODE_ADDED)
self.wait_cache(TreeEvent.NODE_ADDED)
- with patch.object(cache, '_client'): # disable any remote operation
- assert cache.get_data(self.path).data == b''
+ with patch.object(cache, "_client"): # disable any remote operation
+ assert cache.get_data(self.path).data == b""
assert cache.get_data(self.path).stat.version == 0
- assert cache.get_data(self.path + '/foo').data == b''
- assert cache.get_data(self.path + '/foo').stat.version == 0
+ assert cache.get_data(self.path + "/foo").data == b""
+ assert cache.get_data(self.path + "/foo").stat.version == 0
- assert cache.get_data(self.path + '/foo/bar').data == b''
- assert cache.get_data(self.path + '/foo/bar').stat.version == 0
+ assert cache.get_data(self.path + "/foo/bar").data == b""
+ assert cache.get_data(self.path + "/foo/bar").stat.version == 0
- assert cache.get_data(self.path + '/foo/bar/baz').data == b'@'
- assert cache.get_data(self.path + '/foo/bar/baz').stat.version == 0
+ assert cache.get_data(self.path + "/foo/bar/baz").data == b"@"
+ assert cache.get_data(self.path + "/foo/bar/baz").stat.version == 0
def test_get_children(self):
cache = self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)
- self.client.create(self.path + '/foo/bar/baz', b'@', makepath=True)
+ self.client.create(self.path + "/foo/bar/baz", b"@", makepath=True)
self.wait_cache(TreeEvent.NODE_ADDED)
self.wait_cache(TreeEvent.NODE_ADDED)
self.wait_cache(TreeEvent.NODE_ADDED)
- with patch.object(cache, '_client'): # disable any remote operation
+ with patch.object(cache, "_client"): # disable any remote operation
assert (
- cache.get_children(self.path + '/foo/bar/baz') == frozenset()
+ cache.get_children(self.path + "/foo/bar/baz") == frozenset()
)
- assert (
- cache.get_children(self.path + '/foo/bar')
- == frozenset(['baz'])
+ assert cache.get_children(self.path + "/foo/bar") == frozenset(
+ ["baz"]
)
- assert cache.get_children(self.path + '/foo') == frozenset(['bar'])
- assert cache.get_children(self.path) == frozenset(['foo'])
+ assert cache.get_children(self.path + "/foo") == frozenset(["bar"])
+ assert cache.get_children(self.path) == frozenset(["foo"])
def test_get_data_out_of_tree(self):
self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)
with pytest.raises(ValueError):
- self.cache.get_data('/out_of_tree')
+ self.cache.get_data("/out_of_tree")
def test_get_children_out_of_tree(self):
self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)
with pytest.raises(ValueError):
- self.cache.get_children('/out_of_tree')
+ self.cache.get_children("/out_of_tree")
def test_get_data_no_node(self):
cache = self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)
- with patch.object(cache, '_client'): # disable any remote operation
- assert cache.get_data(self.path + '/non_exists') is None
+ with patch.object(cache, "_client"): # disable any remote operation
+ assert cache.get_data(self.path + "/non_exists") is None
def test_get_children_no_node(self):
cache = self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)
- with patch.object(cache, '_client'): # disable any remote operation
- assert cache.get_children(self.path + '/non_exists') is None
+ with patch.object(cache, "_client"): # disable any remote operation
+ assert cache.get_children(self.path + "/non_exists") is None
def test_session_reconnected(self):
self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)
- self.client.create(self.path + '/foo')
+ self.client.create(self.path + "/foo")
event = self.wait_cache(TreeEvent.NODE_ADDED)
- assert event.event_data.path == self.path + '/foo'
+ assert event.event_data.path == self.path + "/foo"
- with self.spy_client('get_async') as get_data:
- with self.spy_client('get_children_async') as get_children:
+ with self.spy_client("get_async") as get_data:
+ with self.spy_client("get_children_async") as get_children:
# session suspended
self.lose_connection(self.client.handler.event_object)
self.wait_cache(TreeEvent.CONNECTION_SUSPENDED)
@@ -355,14 +354,14 @@ class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase):
# inspect in-memory nodes
_node_root = self.cache._root
- _node_foo = self.cache._root._children['foo']
+ _node_foo = self.cache._root._children["foo"]
# make sure that all nodes are refreshed
get_data.assert_has_calls(
[
call(self.path, watch=_node_root._process_watch),
call(
- self.path + '/foo', watch=_node_foo._process_watch
+ self.path + "/foo", watch=_node_foo._process_watch
),
],
any_order=True,
@@ -371,7 +370,7 @@ class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase):
[
call(self.path, watch=_node_root._process_watch),
call(
- self.path + '/foo', watch=_node_foo._process_watch
+ self.path + "/foo", watch=_node_foo._process_watch
),
],
any_order=True,
@@ -385,7 +384,7 @@ class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase):
self.client.delete(self.path)
event = self.wait_cache(TreeEvent.NODE_REMOVED)
assert event.event_type == TreeEvent.NODE_REMOVED
- assert event.event_data.data == b''
+ assert event.event_data.data == b""
assert event.event_data.path == self.path
assert event.event_data.stat.version == 0
@@ -393,19 +392,19 @@ class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase):
self.client.ensure_path(self.path)
event = self.wait_cache(TreeEvent.NODE_ADDED)
assert event.event_type == TreeEvent.NODE_ADDED
- assert event.event_data.data == b''
+ assert event.event_data.data == b""
assert event.event_data.path == self.path
assert event.event_data.stat.version == 0
assert self.cache._outstanding_ops >= 0, (
- 'unexpected outstanding ops %r' % self.cache._outstanding_ops
+ "unexpected outstanding ops %r" % self.cache._outstanding_ops
)
def test_exception_handler(self):
error_value = FakeException()
error_handler = Mock()
- with patch.object(TreeNode, 'on_deleted') as on_deleted:
+ with patch.object(TreeNode, "on_deleted") as on_deleted:
on_deleted.side_effect = [error_value]
self.make_cache()
@@ -424,10 +423,10 @@ class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase):
self.client.handler.start() # keep the async completion
self.wait_cache(since=TreeEvent.CONNECTION_LOST)
- with patch.object(TreeNode, 'on_created') as on_created:
- self.cache._root._call_client('exists', '/')
- self.cache._root._call_client('get', '/')
- self.cache._root._call_client('get_children', '/')
+ with patch.object(TreeNode, "on_created") as on_created:
+ self.cache._root._call_client("exists", "/")
+ self.cache._root._call_client("get", "/")
+ self.cache._root._call_client("get_children", "/")
self.wait_cache(since=TreeEvent.INITIALIZED)
on_created.assert_not_called()
diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py
index a0f1abd..49aafe7 100644
--- a/kazoo/tests/test_client.py
+++ b/kazoo/tests/test_client.py
@@ -29,10 +29,13 @@ from kazoo.protocol.states import KeeperState, KazooState
from kazoo.tests.util import CI_ZK_VERSION
-if sys.version_info > (3, ): # pragma: nocover
+if sys.version_info > (3,): # pragma: nocover
+
def u(s):
return s
+
else: # pragma: nocover
+
def u(s):
return unicode(s, "unicode_escape")
@@ -81,44 +84,47 @@ class TestClientConstructor(unittest.TestCase):
self._makeOne(handler=SequentialThreadingHandler)
def test_chroot(self):
- assert self._makeOne(hosts='127.0.0.1:2181/').chroot == ''
- assert self._makeOne(hosts='127.0.0.1:2181/a').chroot == '/a'
- assert self._makeOne(hosts='127.0.0.1/a').chroot == '/a'
- assert self._makeOne(hosts='127.0.0.1/a/b').chroot == '/a/b'
+ assert self._makeOne(hosts="127.0.0.1:2181/").chroot == ""
+ assert self._makeOne(hosts="127.0.0.1:2181/a").chroot == "/a"
+ assert self._makeOne(hosts="127.0.0.1/a").chroot == "/a"
+ assert self._makeOne(hosts="127.0.0.1/a/b").chroot == "/a/b"
assert (
- self._makeOne(hosts='127.0.0.1:2181,127.0.0.1:2182/a/b').chroot
- == '/a/b'
+ self._makeOne(hosts="127.0.0.1:2181,127.0.0.1:2182/a/b").chroot
+ == "/a/b"
)
def test_connection_timeout(self):
from kazoo.handlers.threading import KazooTimeoutError
- client = self._makeOne(hosts='127.0.0.1:9')
+ client = self._makeOne(hosts="127.0.0.1:9")
assert client.handler.timeout_exception is KazooTimeoutError
with pytest.raises(KazooTimeoutError):
client.start(0.1)
def test_ordered_host_selection(self):
- client = self._makeOne(hosts='127.0.0.1:9,127.0.0.2:9/a',
- randomize_hosts=False)
+ client = self._makeOne(
+ hosts="127.0.0.1:9,127.0.0.2:9/a", randomize_hosts=False
+ )
hosts = [h for h in client.hosts]
- assert hosts == [('127.0.0.1', 9), ('127.0.0.2', 9)]
+ assert hosts == [("127.0.0.1", 9), ("127.0.0.2", 9)]
def test_invalid_hostname(self):
- client = self._makeOne(hosts='nosuchhost/a')
+ client = self._makeOne(hosts="nosuchhost/a")
timeout = client.handler.timeout_exception
with pytest.raises(timeout):
client.start(0.1)
def test_another_invalid_hostname(self):
with pytest.raises(ValueError):
- self._makeOne(hosts='/nosuchhost/a')
+ self._makeOne(hosts="/nosuchhost/a")
def test_retry_options_dict(self):
from kazoo.retry import KazooRetry
- client = self._makeOne(command_retry=dict(max_tries=99),
- connection_retry=dict(delay=99))
+
+ client = self._makeOne(
+ command_retry=dict(max_tries=99), connection_retry=dict(delay=99)
+ )
assert type(client._conn_retry) is KazooRetry
assert type(client._retry) is KazooRetry
assert client._retry.max_tries == 99
@@ -174,10 +180,10 @@ class TestAuthentication(KazooTestCase):
digest_auth = "%s:%s" % (username, password)
acl = self._makeAuth(username, password, all=True)
- client = self._get_client(auth_data=[('digest', digest_auth)])
+ client = self._get_client(auth_data=[("digest", digest_auth)])
client.start()
try:
- client.create('/1', acl=(acl,))
+ client.create("/1", acl=(acl,))
# give ZK a chance to copy data to other node
time.sleep(0.1)
@@ -185,7 +191,7 @@ class TestAuthentication(KazooTestCase):
self.client.get("/1")
finally:
- client.delete('/1')
+ client.delete("/1")
client.stop()
client.close()
@@ -227,10 +233,10 @@ class TestAuthentication(KazooTestCase):
client.start()
with pytest.raises(TypeError):
- client.add_auth('digest', ('user', 'pass'))
+ client.add_auth("digest", ("user", "pass"))
with pytest.raises(TypeError):
- client.add_auth(None, ('user', 'pass'))
+ client.add_auth(None, ("user", "pass"))
def test_async_auth(self):
client = self._get_client()
@@ -272,9 +278,9 @@ class TestConnection(KazooTestCase):
def test_chroot_warning(self):
k = self._get_nonchroot_client()
- k.chroot = 'abba'
+ k.chroot = "abba"
try:
- with patch('warnings.warn') as mock_func:
+ with patch("warnings.warn") as mock_func:
k.start()
assert mock_func.called
finally:
@@ -363,7 +369,7 @@ class TestConnection(KazooTestCase):
assert client.client_id is None
with pytest.raises(ConnectionClosedError):
- client.exists('/')
+ client.exists("/")
def test_close_connecting_connection(self):
client = self.client
@@ -388,7 +394,7 @@ class TestConnection(KazooTestCase):
ev.wait(5)
with pytest.raises(ConnectionClosedError):
- self.client.create('/foobar')
+ self.client.create("/foobar")
def test_double_start(self):
assert self.client.connected is True
@@ -425,13 +431,13 @@ class TestConnection(KazooTestCase):
try:
# simulate call made after write socket is closed
with pytest.raises(ConnectionClosedError):
- client.exists('/')
+ client.exists("/")
# simulate call made after write socket is set to None
client._connection._write_sock = None
with pytest.raises(ConnectionClosedError):
- client.exists('/')
+ client.exists("/")
finally:
# reset for teardown
@@ -468,10 +474,10 @@ class TestClient(KazooTestCase):
def test_server_version_retries_fail(self):
client = self.client
side_effects = [
- '',
- 'zookeeper.version=',
- 'zookeeper.version=1.',
- 'zookeeper.ver',
+ "",
+ "zookeeper.version=",
+ "zookeeper.version=1.",
+ "zookeeper.ver",
]
client.command = mock.MagicMock()
client.command.side_effect = side_effects
@@ -480,7 +486,7 @@ class TestClient(KazooTestCase):
def test_server_version_retries_eventually_ok(self):
client = self.client
- actual_version = 'zookeeper.version=1.2'
+ actual_version = "zookeeper.version=1.2"
side_effects = []
for i in range(0, len(actual_version) + 1):
side_effects.append(actual_version[0:i])
@@ -510,17 +516,17 @@ class TestClient(KazooTestCase):
client._state = KeeperState.EXPIRED_SESSION
with pytest.raises(SessionExpiredError):
- client.create('/closedpath', b'bar')
+ client.create("/closedpath", b"bar")
client._state = KeeperState.AUTH_FAILED
with pytest.raises(AuthFailedError):
- client.create('/closedpath', b'bar')
+ client.create("/closedpath", b"bar")
client.stop()
client.close()
with pytest.raises(ConnectionClosedError):
- client.create('/closedpath', b'bar')
+ client.create("/closedpath", b"bar")
def test_create_null_data(self):
client = self.client
@@ -543,13 +549,13 @@ class TestClient(KazooTestCase):
def test_create_async_returns_unchrooted_path(self):
client = self.client
- path = client.create_async('/1').get()
+ path = client.create_async("/1").get()
assert path == "/1"
def test_create_invalid_path(self):
client = self.client
with pytest.raises(TypeError):
- client.create(('a',))
+ client.create(("a",))
with pytest.raises(ValueError):
client.create(".")
with pytest.raises(ValueError):
@@ -565,17 +571,17 @@ class TestClient(KazooTestCase):
single_acl = OPEN_ACL_UNSAFE[0]
client = self.client
with pytest.raises(TypeError):
- client.create('a', acl='all')
+ client.create("a", acl="all")
with pytest.raises(TypeError):
- client.create('a', acl=single_acl)
+ client.create("a", acl=single_acl)
with pytest.raises(TypeError):
- client.create('a', value=['a'])
+ client.create("a", value=["a"])
with pytest.raises(TypeError):
- client.create('a', ephemeral='yes')
+ client.create("a", ephemeral="yes")
with pytest.raises(TypeError):
- client.create('a', sequence='yes')
+ client.create("a", sequence="yes")
with pytest.raises(TypeError):
- client.create('a', makepath='yes')
+ client.create("a", makepath="yes")
def test_create_value(self):
client = self.client
@@ -659,8 +665,9 @@ class TestClient(KazooTestCase):
def test_create_ephemeral_sequence(self):
basepath = "/" + uuid.uuid4().hex
- realpath = self.client.create(basepath, b"sandwich",
- sequence=True, ephemeral=True)
+ realpath = self.client.create(
+ basepath, b"sandwich", sequence=True, ephemeral=True
+ )
assert basepath != realpath and realpath.startswith(basepath)
data, stat = self.client.get(realpath)
assert data == b"sandwich"
@@ -696,7 +703,7 @@ class TestClient(KazooTestCase):
self.client.create("/1/2/3/4/5", b"val2", makepath=True)
finally:
- alt_client.delete('/', recursive=True)
+ alt_client.delete("/", recursive=True)
alt_client.stop()
def test_create_no_makepath(self):
@@ -756,9 +763,9 @@ class TestClient(KazooTestCase):
def test_get_invalid_arguments(self):
client = self.client
with pytest.raises(TypeError):
- client.get(('a', 'b'))
+ client.get(("a", "b"))
with pytest.raises(TypeError):
- client.get('a', watch=True)
+ client.get("a", watch=True)
def test_bad_argument(self):
client = self.client
@@ -798,9 +805,9 @@ class TestClient(KazooTestCase):
def test_exists_invalid_arguments(self):
client = self.client
with pytest.raises(TypeError):
- client.exists(('a', 'b'))
+ client.exists(("a", "b"))
with pytest.raises(TypeError):
- client.exists('a', watch=True)
+ client.exists("a", watch=True)
def test_exists_watch(self):
nodepath = "/" + uuid.uuid4().hex
@@ -850,49 +857,49 @@ class TestClient(KazooTestCase):
def test_get_acls(self):
from kazoo.security import make_digest_acl
- user = 'user'
- passw = 'pass'
+ user = "user"
+ passw = "pass"
acl = make_digest_acl(user, passw, all=True)
client = self.client
try:
- client.create('/a', acl=[acl])
- client.add_auth('digest', '{}:{}'.format(user, passw))
- assert acl in client.get_acls('/a')[0]
+ client.create("/a", acl=[acl])
+ client.add_auth("digest", "{}:{}".format(user, passw))
+ assert acl in client.get_acls("/a")[0]
finally:
- client.delete('/a')
+ client.delete("/a")
def test_get_acls_invalid_arguments(self):
client = self.client
with pytest.raises(TypeError):
- client.get_acls(('a', 'b'))
+ client.get_acls(("a", "b"))
def test_set_acls(self):
from kazoo.security import make_digest_acl
- user = 'user'
- passw = 'pass'
+ user = "user"
+ passw = "pass"
acl = make_digest_acl(user, passw, all=True)
client = self.client
- client.create('/a')
+ client.create("/a")
try:
- client.set_acls('/a', [acl])
- client.add_auth('digest', '{}:{}'.format(user, passw))
- assert acl in client.get_acls('/a')[0]
+ client.set_acls("/a", [acl])
+ client.add_auth("digest", "{}:{}".format(user, passw))
+ assert acl in client.get_acls("/a")[0]
finally:
- client.delete('/a')
+ client.delete("/a")
def test_set_acls_empty(self):
client = self.client
- client.create('/a')
+ client.create("/a")
with pytest.raises(InvalidACLError):
- client.set_acls('/a', [])
+ client.set_acls("/a", [])
def test_set_acls_no_node(self):
from kazoo.security import OPEN_ACL_UNSAFE
client = self.client
with pytest.raises(NoNodeError):
- client.set_acls('/a', OPEN_ACL_UNSAFE)
+ client.set_acls("/a", OPEN_ACL_UNSAFE)
def test_set_acls_invalid_arguments(self):
from kazoo.security import OPEN_ACL_UNSAFE
@@ -900,20 +907,20 @@ class TestClient(KazooTestCase):
single_acl = OPEN_ACL_UNSAFE[0]
client = self.client
with pytest.raises(TypeError):
- client.set_acls(('a', 'b'), ())
+ client.set_acls(("a", "b"), ())
with pytest.raises(TypeError):
- client.set_acls('a', single_acl)
+ client.set_acls("a", single_acl)
with pytest.raises(TypeError):
- client.set_acls('a', 'all')
+ client.set_acls("a", "all")
with pytest.raises(TypeError):
- client.set_acls('a', [single_acl], 'V1')
+ client.set_acls("a", [single_acl], "V1")
def test_set(self):
client = self.client
- client.create('a', b'first')
- stat = client.set('a', b'second')
- data, stat2 = client.get('a')
- assert data == b'second'
+ client.create("a", b"first")
+ stat = client.set("a", b"second")
+ data, stat2 = client.get("a")
+ assert data == b"second"
assert stat == stat2
def test_set_null_data(self):
@@ -932,85 +939,85 @@ class TestClient(KazooTestCase):
def test_set_invalid_arguments(self):
client = self.client
- client.create('a', b'first')
+ client.create("a", b"first")
with pytest.raises(TypeError):
- client.set(('a', 'b'), b'value')
+ client.set(("a", "b"), b"value")
with pytest.raises(TypeError):
- client.set('a', ['v', 'w'])
+ client.set("a", ["v", "w"])
with pytest.raises(TypeError):
- client.set('a', b'value', 'V1')
+ client.set("a", b"value", "V1")
def test_delete(self):
client = self.client
- client.ensure_path('/a/b')
- assert 'b' in client.get_children('a')
- client.delete('/a/b')
- assert 'b' not in client.get_children('a')
+ client.ensure_path("/a/b")
+ assert "b" in client.get_children("a")
+ client.delete("/a/b")
+ assert "b" not in client.get_children("a")
def test_delete_recursive(self):
client = self.client
- client.ensure_path('/a/b/c')
- client.ensure_path('/a/b/d')
- client.delete('/a/b', recursive=True)
- client.delete('/a/b/c', recursive=True)
- assert 'b' not in client.get_children('a')
+ client.ensure_path("/a/b/c")
+ client.ensure_path("/a/b/d")
+ client.delete("/a/b", recursive=True)
+ client.delete("/a/b/c", recursive=True)
+ assert "b" not in client.get_children("a")
def test_delete_invalid_arguments(self):
client = self.client
- client.ensure_path('/a/b')
+ client.ensure_path("/a/b")
with pytest.raises(TypeError):
- client.delete('/a/b', recursive='all')
+ client.delete("/a/b", recursive="all")
with pytest.raises(TypeError):
- client.delete(('a', 'b'))
+ client.delete(("a", "b"))
with pytest.raises(TypeError):
- client.delete('/a/b', version='V1')
+ client.delete("/a/b", version="V1")
def test_get_children(self):
client = self.client
- client.ensure_path('/a/b/c')
- client.ensure_path('/a/b/d')
- assert client.get_children('/a') == ['b']
- assert set(client.get_children('/a/b')) == set(['c', 'd'])
- assert client.get_children('/a/b/c') == []
+ client.ensure_path("/a/b/c")
+ client.ensure_path("/a/b/d")
+ assert client.get_children("/a") == ["b"]
+ assert set(client.get_children("/a/b")) == set(["c", "d"])
+ assert client.get_children("/a/b/c") == []
def test_get_children2(self):
client = self.client
- client.ensure_path('/a/b')
- children, stat = client.get_children('/a', include_data=True)
- value, stat2 = client.get('/a')
- assert children == ['b']
+ client.ensure_path("/a/b")
+ children, stat = client.get_children("/a", include_data=True)
+ value, stat2 = client.get("/a")
+ assert children == ["b"]
assert stat2.version == stat.version
def test_get_children2_many_nodes(self):
client = self.client
- client.ensure_path('/a/b')
- client.ensure_path('/a/c')
- client.ensure_path('/a/d')
- children, stat = client.get_children('/a', include_data=True)
- value, stat2 = client.get('/a')
- assert set(children) == set(['b', 'c', 'd'])
+ client.ensure_path("/a/b")
+ client.ensure_path("/a/c")
+ client.ensure_path("/a/d")
+ children, stat = client.get_children("/a", include_data=True)
+ value, stat2 = client.get("/a")
+ assert set(children) == set(["b", "c", "d"])
assert stat2.version == stat.version
def test_get_children_no_node(self):
client = self.client
with pytest.raises(NoNodeError):
- client.get_children('/none')
+ client.get_children("/none")
with pytest.raises(NoNodeError):
- client.get_children('/none', include_data=True)
+ client.get_children("/none", include_data=True)
def test_get_children_invalid_path(self):
client = self.client
with pytest.raises(ValueError):
- client.get_children('../a')
+ client.get_children("../a")
def test_get_children_invalid_arguments(self):
client = self.client
with pytest.raises(TypeError):
- client.get_children(('a', 'b'))
+ client.get_children(("a", "b"))
with pytest.raises(TypeError):
- client.get_children('a', watch=True)
+ client.get_children("a", watch=True)
with pytest.raises(TypeError):
- client.get_children('a', include_data='yes')
+ client.get_children("a", include_data="yes")
def test_invalid_auth(self):
from kazoo.exceptions import AuthFailedError
@@ -1021,7 +1028,7 @@ class TestClient(KazooTestCase):
client._state = KeeperState.AUTH_FAILED
with pytest.raises(AuthFailedError):
- client.get('/')
+ client.get("/")
def test_client_state(self):
from kazoo.protocol.states import KeeperState
@@ -1039,7 +1046,7 @@ class TestClient(KazooTestCase):
# try to change the chroot, not currently allowed
with pytest.raises(ConfigurationError):
- client.set_hosts(hosts + '/new_chroot')
+ client.set_hosts(hosts + "/new_chroot")
# grow the cluster to 3
client.set_hosts(self.servers)
@@ -1107,9 +1114,9 @@ class TestClient(KazooTestCase):
# optionally cause a SessionExpiredError to occur by
# mangling the first byte of the session password.
if expire_session:
- b0 = b'\x00'
+ b0 = b"\x00"
if client._session_passwd[0] == 0:
- b0 = b'\xff'
+ b0 = b"\xff"
client._session_passwd = b0 + client._session_passwd[1:]
finally:
server.run()
@@ -1127,10 +1134,7 @@ class TestClient(KazooTestCase):
try:
result = self._request_queuing_common(
- client=client,
- server=server,
- path=path,
- expire_session=False
+ client=client, server=server, path=path, expire_session=False
)
assert result.get() == path
@@ -1144,10 +1148,7 @@ class TestClient(KazooTestCase):
try:
result = self._request_queuing_common(
- client=client,
- server=server,
- path=path,
- expire_session=True
+ client=client, server=server, path=path, expire_session=True
)
assert len(client._queue) == 0
@@ -1158,17 +1159,17 @@ class TestClient(KazooTestCase):
dummy_dict = {
- 'aversion': 1,
- 'ctime': 0,
- 'cversion': 1,
- 'czxid': 110,
- 'dataLength': 1,
- 'ephemeralOwner': 'ben',
- 'mtime': 1,
- 'mzxid': 1,
- 'numChildren': 0,
- 'pzxid': 1,
- 'version': 1,
+ "aversion": 1,
+ "ctime": 0,
+ "cversion": 1,
+ "czxid": 110,
+ "dataLength": 1,
+ "ephemeralOwner": "ben",
+ "mtime": 1,
+ "mzxid": 1,
+ "numChildren": 0,
+ "pzxid": 1,
+ "version": 1,
}
@@ -1189,18 +1190,22 @@ class TestClientTransactions(KazooTestCase):
def test_basic_create(self):
t = self.client.transaction()
- t.create('/freddy')
- t.create('/fred', ephemeral=True)
- t.create('/smith', sequence=True)
+ t.create("/freddy")
+ t.create("/fred", ephemeral=True)
+ t.create("/smith", sequence=True)
results = t.commit()
assert len(results) == 3
- assert results[0] == '/freddy'
- assert results[2].startswith('/smith0') is True
+ assert results[0] == "/freddy"
+ assert results[2].startswith("/smith0") is True
def test_bad_creates(self):
- args_list = [(True,), ('/smith', 0), ('/smith', b'', 'bleh'),
- ('/smith', b'', None, 'fred'),
- ('/smith', b'', None, True, 'fred')]
+ args_list = [
+ (True,),
+ ("/smith", 0),
+ ("/smith", b"", "bleh"),
+ ("/smith", b"", None, "fred"),
+ ("/smith", b"", None, True, "fred"),
+ ]
for args in args_list:
with pytest.raises(TypeError):
@@ -1220,19 +1225,22 @@ class TestClientTransactions(KazooTestCase):
self.client.default_acl = (acl,)
t = self.client.transaction()
- t.create('/freddy')
+ t.create("/freddy")
results = t.commit()
- assert results[0] == '/freddy'
+ assert results[0] == "/freddy"
def test_basic_delete(self):
- self.client.create('/fred')
+ self.client.create("/fred")
t = self.client.transaction()
- t.delete('/fred')
+ t.delete("/fred")
results = t.commit()
assert results[0] is True
def test_bad_deletes(self):
- args_list = [(True,), ('/smith', 'woops'), ]
+ args_list = [
+ (True,),
+ ("/smith", "woops"),
+ ]
for args in args_list:
with pytest.raises(TypeError):
@@ -1240,15 +1248,15 @@ class TestClientTransactions(KazooTestCase):
t.delete(*args)
def test_set(self):
- self.client.create('/fred', b'01')
+ self.client.create("/fred", b"01")
t = self.client.transaction()
- t.set_data('/fred', b'oops')
+ t.set_data("/fred", b"oops")
t.commit()
- res = self.client.get('/fred')
- assert res[0] == b'oops'
+ res = self.client.get("/fred")
+ assert res[0] == b"oops"
def test_bad_sets(self):
- args_list = [(42, 52), ('/smith', False), ('/smith', b'', 'oops')]
+ args_list = [(42, 52), ("/smith", False), ("/smith", b"", "oops")]
for args in args_list:
with pytest.raises(TypeError):
@@ -1256,17 +1264,17 @@ class TestClientTransactions(KazooTestCase):
t.set_data(*args)
def test_check(self):
- self.client.create('/fred')
- version = self.client.get('/fred')[1].version
+ self.client.create("/fred")
+ version = self.client.get("/fred")[1].version
t = self.client.transaction()
- t.check('/fred', version)
- t.create('/blah')
+ t.check("/fred", version)
+ t.create("/blah")
results = t.commit()
assert results[0] is True
- assert results[1] == '/blah'
+ assert results[1] == "/blah"
def test_bad_checks(self):
- args_list = [(42, 52), ('/smith', 'oops')]
+ args_list = [(42, 52), ("/smith", "oops")]
for args in args_list:
with pytest.raises(TypeError):
@@ -1277,8 +1285,8 @@ class TestClientTransactions(KazooTestCase):
from kazoo.exceptions import RolledBackError, NoNodeError
t = self.client.transaction()
- t.create('/fred')
- t.delete('/smith')
+ t.create("/fred")
+ t.delete("/smith")
results = t.commit()
assert results[0].__class__ == RolledBackError
assert results[1].__class__ == NoNodeError
@@ -1297,8 +1305,8 @@ class TestClientTransactions(KazooTestCase):
def test_context(self):
with self.client.transaction() as t:
- t.create('/smith', b'32')
- assert self.client.get('/smith')[0] == b'32'
+ t.create("/smith", b"32")
+ assert self.client.get("/smith")[0] == b"32"
class TestSessionCallbacks(unittest.TestCase):
@@ -1358,7 +1366,7 @@ class TestCallbacks(KazooTestCase):
class TestNonChrootClient(KazooTestCase):
def test_create(self):
client = self._get_nonchroot_client()
- assert client.chroot == ''
+ assert client.chroot == ""
client.start()
node = uuid.uuid4().hex
path = client.create(node, ephemeral=True)
@@ -1388,7 +1396,7 @@ class TestReconfig(KazooTestCase):
def test_no_super_auth(self):
with pytest.raises(NoAuthError):
self.client.reconfig(
- joining='server.999=0.0.0.0:1234:2345:observer;3456',
+ joining="server.999=0.0.0.0:1234:2345:observer;3456",
leaving=None,
new_members=None,
)
@@ -1396,13 +1404,13 @@ class TestReconfig(KazooTestCase):
def test_add_remove_observer(self):
def free_sock_port():
s = socket.socket()
- s.bind(('', 0))
+ s.bind(("", 0))
return s, s.getsockname()[1]
username = "super"
password = "test"
digest_auth = "%s:%s" % (username, password)
- client = self._get_client(auth_data=[('digest', digest_auth)])
+ client = self._get_client(auth_data=[("digest", digest_auth)])
client.start()
# get ports for election, zab and client endpoints. we need to use
@@ -1413,20 +1421,23 @@ class TestReconfig(KazooTestCase):
s2, port2 = free_sock_port()
s3, port3 = free_sock_port()
- joining = 'server.100=0.0.0.0:%d:%d:observer;0.0.0.0:%d' % (
- port1, port2, port3)
- data, _ = client.reconfig(joining=joining,
- leaving=None,
- new_members=None)
- assert joining.encode('utf8') in data
+ joining = "server.100=0.0.0.0:%d:%d:observer;0.0.0.0:%d" % (
+ port1,
+ port2,
+ port3,
+ )
+ data, _ = client.reconfig(
+ joining=joining, leaving=None, new_members=None
+ )
+ assert joining.encode("utf8") in data
- data, _ = client.reconfig(joining=None,
- leaving='100',
- new_members=None)
- assert joining.encode('utf8') not in data
+ data, _ = client.reconfig(
+ joining=None, leaving="100", new_members=None
+ )
+ assert joining.encode("utf8") not in data
# try to add it again, but a config number in the future
- curver = int(data.decode().split('\n')[-1].split('=')[1], base=16)
+ curver = int(data.decode().split("\n")[-1].split("=")[1], base=16)
with pytest.raises(BadVersionError):
self.client.reconfig(
joining=joining,
@@ -1438,5 +1449,5 @@ class TestReconfig(KazooTestCase):
def test_bad_input(self):
with pytest.raises(BadArgumentsError):
self.client.reconfig(
- joining='some thing', leaving=None, new_members=None
+ joining="some thing", leaving=None, new_members=None
)
diff --git a/kazoo/tests/test_election.py b/kazoo/tests/test_election.py
index 45ef0be..c7b1b59 100644
--- a/kazoo/tests/test_election.py
+++ b/kazoo/tests/test_election.py
@@ -9,8 +9,7 @@ from kazoo.tests.util import wait
class UniqueError(Exception):
- """Error raised only by test leader function
- """
+ """Error raised only by test leader function"""
class KazooElectionTests(KazooTestCase):
@@ -33,8 +32,9 @@ class KazooElectionTests(KazooTestCase):
self.thread_exc_info = None
def _spawn_contender(self, contender_id, election):
- thread = threading.Thread(target=self._election_thread,
- args=(contender_id, election))
+ thread = threading.Thread(
+ target=self._election_thread, args=(contender_id, election)
+ )
thread.daemon = True
thread.start()
return thread
@@ -75,7 +75,8 @@ class KazooElectionTests(KazooTestCase):
contender = "c" + uuid.uuid4().hex
elections[contender] = self.client.Election(self.path, contender)
threads[contender] = self._spawn_contender(
- contender, elections[contender])
+ contender, elections[contender]
+ )
# wait for a leader to be elected
times = 0
@@ -84,8 +85,9 @@ class KazooElectionTests(KazooTestCase):
self.condition.wait(5)
times += 1
if times > 5:
- raise Exception("Still not a leader: lid: %s",
- self.leader_id)
+ raise Exception(
+ "Still not a leader: lid: %s", self.leader_id
+ )
election = self.client.Election(self.path)
diff --git a/kazoo/tests/test_eventlet_handler.py b/kazoo/tests/test_eventlet_handler.py
index 43ec4f9..5e89789 100644
--- a/kazoo/tests/test_eventlet_handler.py
+++ b/kazoo/tests/test_eventlet_handler.py
@@ -138,7 +138,7 @@ class TestEventletHandler(unittest.TestCase):
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096))
except (ValueError, resource.error):
- self.skipTest('couldnt raise fd limit high enough')
+ self.skipTest("couldnt raise fd limit high enough")
fd = 0
socks = []
while fd < 4000:
diff --git a/kazoo/tests/test_exceptions.py b/kazoo/tests/test_exceptions.py
index ae869f4..d2fb9c6 100644
--- a/kazoo/tests/test_exceptions.py
+++ b/kazoo/tests/test_exceptions.py
@@ -2,15 +2,16 @@ from unittest import TestCase
import pytest
-class ExceptionsTestCase(TestCase):
+class ExceptionsTestCase(TestCase):
def _get(self):
from kazoo import exceptions
+
return exceptions
def test_backwards_alias(self):
module = self._get()
- assert hasattr(module, 'NoNodeException')
+ assert hasattr(module, "NoNodeException")
assert module.NoNodeException is module.NoNodeError
def test_exceptions_code(self):
diff --git a/kazoo/tests/test_gevent_handler.py b/kazoo/tests/test_gevent_handler.py
index 5515114..0745c06 100644
--- a/kazoo/tests/test_gevent_handler.py
+++ b/kazoo/tests/test_gevent_handler.py
@@ -150,7 +150,7 @@ class TestBasicGeventClient(KazooTestCase):
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096))
except (ValueError, resource.error):
- self.skipTest('couldnt raise fd limit high enough')
+ self.skipTest("couldnt raise fd limit high enough")
fd = 0
socks = []
while fd < 4000:
diff --git a/kazoo/tests/test_hosts.py b/kazoo/tests/test_hosts.py
index 0ad83cf..8f8a811 100644
--- a/kazoo/tests/test_hosts.py
+++ b/kazoo/tests/test_hosts.py
@@ -4,50 +4,55 @@ from kazoo.hosts import collect_hosts
class HostsTestCase(TestCase):
-
def test_ipv4(self):
- hosts, chroot = collect_hosts('127.0.0.1:2181, 192.168.1.2:2181, \
- 132.254.111.10:2181')
- assert hosts == [('127.0.0.1', 2181),
- ('192.168.1.2', 2181),
- ('132.254.111.10', 2181)]
+ hosts, chroot = collect_hosts(
+ "127.0.0.1:2181, 192.168.1.2:2181, \
+ 132.254.111.10:2181"
+ )
+ assert hosts == [
+ ("127.0.0.1", 2181),
+ ("192.168.1.2", 2181),
+ ("132.254.111.10", 2181),
+ ]
assert chroot is None
- hosts, chroot = collect_hosts(['127.0.0.1:2181',
- '192.168.1.2:2181',
- '132.254.111.10:2181'])
- assert hosts == [('127.0.0.1', 2181),
- ('192.168.1.2', 2181),
- ('132.254.111.10', 2181)]
+ hosts, chroot = collect_hosts(
+ ["127.0.0.1:2181", "192.168.1.2:2181", "132.254.111.10:2181"]
+ )
+ assert hosts == [
+ ("127.0.0.1", 2181),
+ ("192.168.1.2", 2181),
+ ("132.254.111.10", 2181),
+ ]
assert chroot is None
def test_ipv6(self):
- hosts, chroot = collect_hosts('[fe80::200:5aee:feaa:20a2]:2181')
- assert hosts == [('fe80::200:5aee:feaa:20a2', 2181)]
+ hosts, chroot = collect_hosts("[fe80::200:5aee:feaa:20a2]:2181")
+ assert hosts == [("fe80::200:5aee:feaa:20a2", 2181)]
assert chroot is None
- hosts, chroot = collect_hosts(['[fe80::200:5aee:feaa:20a2]:2181'])
- assert hosts == [('fe80::200:5aee:feaa:20a2', 2181)]
+ hosts, chroot = collect_hosts(["[fe80::200:5aee:feaa:20a2]:2181"])
+ assert hosts == [("fe80::200:5aee:feaa:20a2", 2181)]
assert chroot is None
def test_hosts_list(self):
- hosts, chroot = collect_hosts('zk01:2181, zk02:2181, zk03:2181')
- expected1 = [('zk01', 2181), ('zk02', 2181), ('zk03', 2181)]
+ hosts, chroot = collect_hosts("zk01:2181, zk02:2181, zk03:2181")
+ expected1 = [("zk01", 2181), ("zk02", 2181), ("zk03", 2181)]
assert hosts == expected1
assert chroot is None
- hosts, chroot = collect_hosts(['zk01:2181', 'zk02:2181', 'zk03:2181'])
+ hosts, chroot = collect_hosts(["zk01:2181", "zk02:2181", "zk03:2181"])
assert hosts == expected1
assert chroot is None
- expected2 = '/test'
- hosts, chroot = collect_hosts('zk01:2181, zk02:2181, zk03:2181/test')
+ expected2 = "/test"
+ hosts, chroot = collect_hosts("zk01:2181, zk02:2181, zk03:2181/test")
assert hosts == expected1
assert chroot == expected2
- hosts, chroot = collect_hosts(['zk01:2181',
- 'zk02:2181',
- 'zk03:2181', '/test'])
+ hosts, chroot = collect_hosts(
+ ["zk01:2181", "zk02:2181", "zk03:2181", "/test"]
+ )
assert hosts == expected1
assert chroot == expected2
diff --git a/kazoo/tests/test_interrupt.py b/kazoo/tests/test_interrupt.py
index 4f4a997..ad4ae5d 100644
--- a/kazoo/tests/test_interrupt.py
+++ b/kazoo/tests/test_interrupt.py
@@ -8,16 +8,16 @@ from kazoo.testing import KazooTestCase
class KazooInterruptTests(KazooTestCase):
def test_interrupted_systemcall(self):
- '''
+ """
Make sure interrupted system calls don't break the world, since we
can't control what all signals our connection thread will get
- '''
- if 'linux' not in platform:
+ """
+ if "linux" not in platform:
pytest.skip(
- 'Unable to reproduce error case on non-linux platforms'
+ "Unable to reproduce error case on non-linux platforms"
)
- path = 'interrupt_test'
+ path = "interrupt_test"
value = b"1"
self.client.create(path, value)
diff --git a/kazoo/tests/test_lease.py b/kazoo/tests/test_lease.py
index 5e7a6f2..98a1256 100644
--- a/kazoo/tests/test_lease.py
+++ b/kazoo/tests/test_lease.py
@@ -42,192 +42,302 @@ class NonBlockingLeaseTests(KazooLeaseTests):
# Use client convenience method here to test it at least once. Use
# class directly in
# other tests in order to get better IDE support.
- lease = self.client.NonBlockingLease(self.path,
- datetime.timedelta(seconds=3),
- utcnow=self.clock)
+ lease = self.client.NonBlockingLease(
+ self.path, datetime.timedelta(seconds=3), utcnow=self.clock
+ )
assert lease
assert lease.obtained is True
self.clock.forward(2)
renewed_lease = self.client.NonBlockingLease(
- self.path, datetime.timedelta(seconds=3), utcnow=self.clock)
+ self.path, datetime.timedelta(seconds=3), utcnow=self.clock
+ )
assert renewed_lease
def test_busy(self):
- lease = NonBlockingLease(self.client, self.path,
- datetime.timedelta(seconds=3),
- utcnow=self.clock)
+ lease = NonBlockingLease(
+ self.client,
+ self.path,
+ datetime.timedelta(seconds=3),
+ utcnow=self.clock,
+ )
assert lease
self.clock.forward(2)
foreigner_lease = NonBlockingLease(
- self.client2, self.path, datetime.timedelta(seconds=3),
- identifier="some.other.host", utcnow=self.clock)
+ self.client2,
+ self.path,
+ datetime.timedelta(seconds=3),
+ identifier="some.other.host",
+ utcnow=self.clock,
+ )
assert not foreigner_lease
assert foreigner_lease.obtained is False
def test_overtake(self):
- lease = NonBlockingLease(self.client, self.path,
- datetime.timedelta(seconds=3),
- utcnow=self.clock)
+ lease = NonBlockingLease(
+ self.client,
+ self.path,
+ datetime.timedelta(seconds=3),
+ utcnow=self.clock,
+ )
assert lease
self.clock.forward(4)
foreigner_lease = NonBlockingLease(
- self.client2, self.path, datetime.timedelta(seconds=3),
- identifier="some.other.host", utcnow=self.clock)
+ self.client2,
+ self.path,
+ datetime.timedelta(seconds=3),
+ identifier="some.other.host",
+ utcnow=self.clock,
+ )
assert foreigner_lease
def test_renew_no_overtake(self):
- lease = self.client.NonBlockingLease(self.path,
- datetime.timedelta(seconds=3),
- utcnow=self.clock)
+ lease = self.client.NonBlockingLease(
+ self.path, datetime.timedelta(seconds=3), utcnow=self.clock
+ )
assert lease
assert lease.obtained is True
self.clock.forward(2)
renewed_lease = self.client.NonBlockingLease(
- self.path, datetime.timedelta(seconds=3), utcnow=self.clock)
+ self.path, datetime.timedelta(seconds=3), utcnow=self.clock
+ )
assert renewed_lease
self.clock.forward(2)
foreigner_lease = NonBlockingLease(
- self.client2, self.path, datetime.timedelta(seconds=3),
- identifier="some.other.host", utcnow=self.clock)
+ self.client2,
+ self.path,
+ datetime.timedelta(seconds=3),
+ identifier="some.other.host",
+ utcnow=self.clock,
+ )
assert not foreigner_lease
def test_overtaker_renews(self):
- lease = NonBlockingLease(self.client, self.path,
- datetime.timedelta(seconds=3),
- utcnow=self.clock)
+ lease = NonBlockingLease(
+ self.client,
+ self.path,
+ datetime.timedelta(seconds=3),
+ utcnow=self.clock,
+ )
assert lease
self.clock.forward(4)
foreigner_lease = NonBlockingLease(
- self.client2, self.path, datetime.timedelta(seconds=3),
- identifier="some.other.host", utcnow=self.clock)
+ self.client2,
+ self.path,
+ datetime.timedelta(seconds=3),
+ identifier="some.other.host",
+ utcnow=self.clock,
+ )
assert foreigner_lease
self.clock.forward(2)
foreigner_renew = NonBlockingLease(
- self.client2, self.path, datetime.timedelta(seconds=3),
- identifier="some.other.host", utcnow=self.clock)
+ self.client2,
+ self.path,
+ datetime.timedelta(seconds=3),
+ identifier="some.other.host",
+ utcnow=self.clock,
+ )
assert foreigner_renew
def test_overtake_refuse_first(self):
- lease = NonBlockingLease(self.client, self.path,
- datetime.timedelta(seconds=3),
- utcnow=self.clock)
+ lease = NonBlockingLease(
+ self.client,
+ self.path,
+ datetime.timedelta(seconds=3),
+ utcnow=self.clock,
+ )
assert lease
self.clock.forward(4)
foreigner_lease = NonBlockingLease(
- self.client2, self.path, datetime.timedelta(seconds=3),
- identifier="some.other.host", utcnow=self.clock)
+ self.client2,
+ self.path,
+ datetime.timedelta(seconds=3),
+ identifier="some.other.host",
+ utcnow=self.clock,
+ )
assert foreigner_lease
self.clock.forward(2)
first_again_lease = NonBlockingLease(
- self.client, self.path, datetime.timedelta(seconds=3),
- utcnow=self.clock)
+ self.client,
+ self.path,
+ datetime.timedelta(seconds=3),
+ utcnow=self.clock,
+ )
assert not first_again_lease
def test_old_version(self):
# Skip to a future version
NonBlockingLease._version += 1
- lease = NonBlockingLease(self.client, self.path,
- datetime.timedelta(seconds=3),
- utcnow=self.clock)
+ lease = NonBlockingLease(
+ self.client,
+ self.path,
+ datetime.timedelta(seconds=3),
+ utcnow=self.clock,
+ )
assert lease
# Then back to today.
NonBlockingLease._version -= 1
self.clock.forward(4)
foreigner_lease = NonBlockingLease(
- self.client2, self.path, datetime.timedelta(seconds=3),
- identifier="some.other.host", utcnow=self.clock)
+ self.client2,
+ self.path,
+ datetime.timedelta(seconds=3),
+ identifier="some.other.host",
+ utcnow=self.clock,
+ )
# Since a newer version wrote the lease file, the lease is not taken.
assert not foreigner_lease
class MultiNonBlockingLeaseTest(KazooLeaseTests):
def test_1_renew(self):
- ls = self.client.MultiNonBlockingLease(1, self.path,
- datetime.timedelta(seconds=4),
- utcnow=self.clock)
+ ls = self.client.MultiNonBlockingLease(
+ 1, self.path, datetime.timedelta(seconds=4), utcnow=self.clock
+ )
assert ls
self.clock.forward(2)
- ls2 = MultiNonBlockingLease(self.client, 1, self.path,
- datetime.timedelta(seconds=4),
- utcnow=self.clock)
+ ls2 = MultiNonBlockingLease(
+ self.client,
+ 1,
+ self.path,
+ datetime.timedelta(seconds=4),
+ utcnow=self.clock,
+ )
assert ls2
def test_1_reject(self):
- ls = MultiNonBlockingLease(self.client, 1, self.path,
- datetime.timedelta(seconds=4),
- utcnow=self.clock)
+ ls = MultiNonBlockingLease(
+ self.client,
+ 1,
+ self.path,
+ datetime.timedelta(seconds=4),
+ utcnow=self.clock,
+ )
assert ls
self.clock.forward(2)
- ls2 = MultiNonBlockingLease(self.client2, 1, self.path,
- datetime.timedelta(seconds=4),
- identifier="some.other.host",
- utcnow=self.clock)
+ ls2 = MultiNonBlockingLease(
+ self.client2,
+ 1,
+ self.path,
+ datetime.timedelta(seconds=4),
+ identifier="some.other.host",
+ utcnow=self.clock,
+ )
assert not ls2
def test_2_renew(self):
- ls = MultiNonBlockingLease(self.client, 2, self.path,
- datetime.timedelta(seconds=7),
- utcnow=self.clock)
+ ls = MultiNonBlockingLease(
+ self.client,
+ 2,
+ self.path,
+ datetime.timedelta(seconds=7),
+ utcnow=self.clock,
+ )
assert ls
self.clock.forward(2)
- ls2 = MultiNonBlockingLease(self.client2, 2, self.path,
- datetime.timedelta(seconds=7),
- identifier="host2", utcnow=self.clock)
+ ls2 = MultiNonBlockingLease(
+ self.client2,
+ 2,
+ self.path,
+ datetime.timedelta(seconds=7),
+ identifier="host2",
+ utcnow=self.clock,
+ )
assert ls2
self.clock.forward(2)
- ls3 = MultiNonBlockingLease(self.client, 2, self.path,
- datetime.timedelta(seconds=7),
- utcnow=self.clock)
+ ls3 = MultiNonBlockingLease(
+ self.client,
+ 2,
+ self.path,
+ datetime.timedelta(seconds=7),
+ utcnow=self.clock,
+ )
assert ls3
self.clock.forward(2)
- ls4 = MultiNonBlockingLease(self.client2, 2, self.path,
- datetime.timedelta(seconds=7),
- identifier="host2", utcnow=self.clock)
+ ls4 = MultiNonBlockingLease(
+ self.client2,
+ 2,
+ self.path,
+ datetime.timedelta(seconds=7),
+ identifier="host2",
+ utcnow=self.clock,
+ )
assert ls4
def test_2_reject(self):
- ls = MultiNonBlockingLease(self.client, 2, self.path,
- datetime.timedelta(seconds=7),
- utcnow=self.clock)
+ ls = MultiNonBlockingLease(
+ self.client,
+ 2,
+ self.path,
+ datetime.timedelta(seconds=7),
+ utcnow=self.clock,
+ )
assert ls
self.clock.forward(2)
- ls2 = MultiNonBlockingLease(self.client2, 2, self.path,
- datetime.timedelta(seconds=7),
- identifier="host2", utcnow=self.clock)
+ ls2 = MultiNonBlockingLease(
+ self.client2,
+ 2,
+ self.path,
+ datetime.timedelta(seconds=7),
+ identifier="host2",
+ utcnow=self.clock,
+ )
assert ls2
self.clock.forward(2)
- ls3 = MultiNonBlockingLease(self.client3, 2, self.path,
- datetime.timedelta(seconds=7),
- identifier="host3", utcnow=self.clock)
+ ls3 = MultiNonBlockingLease(
+ self.client3,
+ 2,
+ self.path,
+ datetime.timedelta(seconds=7),
+ identifier="host3",
+ utcnow=self.clock,
+ )
assert not ls3
def test_2_handover(self):
- ls = MultiNonBlockingLease(self.client, 2, self.path,
- datetime.timedelta(seconds=4),
- utcnow=self.clock)
+ ls = MultiNonBlockingLease(
+ self.client,
+ 2,
+ self.path,
+ datetime.timedelta(seconds=4),
+ utcnow=self.clock,
+ )
assert ls
self.clock.forward(2)
- ls2 = MultiNonBlockingLease(self.client2, 2, self.path,
- datetime.timedelta(seconds=4),
- identifier="host2", utcnow=self.clock)
+ ls2 = MultiNonBlockingLease(
+ self.client2,
+ 2,
+ self.path,
+ datetime.timedelta(seconds=4),
+ identifier="host2",
+ utcnow=self.clock,
+ )
assert ls2
self.clock.forward(3)
- ls3 = MultiNonBlockingLease(self.client3, 2, self.path,
- datetime.timedelta(seconds=4),
- identifier="host3", utcnow=self.clock)
+ ls3 = MultiNonBlockingLease(
+ self.client3,
+ 2,
+ self.path,
+ datetime.timedelta(seconds=4),
+ identifier="host3",
+ utcnow=self.clock,
+ )
assert ls3
self.clock.forward(2)
- ls4 = MultiNonBlockingLease(self.client, 2, self.path,
- datetime.timedelta(seconds=4),
- utcnow=self.clock)
+ ls4 = MultiNonBlockingLease(
+ self.client,
+ 2,
+ self.path,
+ datetime.timedelta(seconds=4),
+ utcnow=self.clock,
+ )
assert ls4
diff --git a/kazoo/tests/test_lock.py b/kazoo/tests/test_lock.py
index ebb7b23..8c175f7 100644
--- a/kazoo/tests/test_lock.py
+++ b/kazoo/tests/test_lock.py
@@ -469,17 +469,17 @@ class KazooLockTests(KazooTestCase):
def test_rw_lock(self):
reader_event = self.make_event()
- reader_lock = self.client.ReadLock(self. lockpath, "reader")
+ reader_lock = self.client.ReadLock(self.lockpath, "reader")
reader_thread = self.make_thread(
target=self._thread_lock_acquire_til_event,
- args=("reader", reader_lock, reader_event)
+ args=("reader", reader_lock, reader_event),
)
writer_event = self.make_event()
- writer_lock = self.client.WriteLock(self. lockpath, "writer")
+ writer_lock = self.client.WriteLock(self.lockpath, "writer")
writer_thread = self.make_thread(
target=self._thread_lock_acquire_til_event,
- args=("writer", writer_lock, writer_event)
+ args=("writer", writer_lock, writer_event),
)
# acquire a write lock ourselves first to make the others line up
@@ -798,8 +798,7 @@ class TestSemaphore(KazooTestCase):
class TestSequence(unittest.TestCase):
def test_get_predecessor(self):
- """Validate selection of predecessors.
- """
+ """Validate selection of predecessors."""
goLock = "_c_8eb60557ba51e0da67eefc47467d3f34-lock-0000000031"
pyLock = "514e5a831836450cb1a56c741e990fd8__lock__0000000032"
children = ["hello", goLock, "world", pyLock]
diff --git a/kazoo/tests/test_partitioner.py b/kazoo/tests/test_partitioner.py
index aebc766..02ce5ca 100644
--- a/kazoo/tests/test_partitioner.py
+++ b/kazoo/tests/test_partitioner.py
@@ -17,8 +17,9 @@ class SlowLockMock:
def __init__(self, client, lock, delay_time=None):
self._client = client
self._lock = lock
- self.delay_time = self.default_delay_time \
- if delay_time is None else delay_time
+ self.delay_time = (
+ self.default_delay_time if delay_time is None else delay_time
+ )
def acquire(self, timeout=None):
sleep = self._client.handler.sleep_func
@@ -85,8 +86,9 @@ class KazooPartitionerTests(KazooTestCase):
self.__create_partitioner(size=3, identifier="2")
self.__wait()
- self.__assert_state(PartitionState.RELEASE,
- partitioners=self.__partitioners[:-1])
+ self.__assert_state(
+ PartitionState.RELEASE, partitioners=self.__partitioners[:-1]
+ )
for partitioner in self.__partitioners[-1]:
assert partitioner.state_change_event.is_set()
self.__release(self.__partitioners[:-1])
@@ -137,8 +139,9 @@ class KazooPartitionerTests(KazooTestCase):
self.__create_partitioner(identifier="2", size=3)
self.__wait()
- self.__assert_state(PartitionState.RELEASE,
- partitioners=self.__partitioners[:-1])
+ self.__assert_state(
+ PartitionState.RELEASE, partitioners=self.__partitioners[:-1]
+ )
self.__release(partitioners=self.__partitioners[:-1])
self.__wait_for_acquire()
self.__assert_state(PartitionState.ACQUIRED)
@@ -214,8 +217,11 @@ class KazooPartitionerTests(KazooTestCase):
def __create_partitioner(self, size, identifier=None):
partitioner = self.client.SetPartitioner(
- self.path, set=range(size), time_boundary=0.2,
- identifier=identifier)
+ self.path,
+ set=range(size),
+ time_boundary=0.2,
+ identifier=identifier,
+ )
self.__partitioners.append(partitioner)
return partitioner
@@ -232,8 +238,9 @@ class KazooPartitionerTests(KazooTestCase):
def __assert_partitions(self, *partitions):
assert len(partitions) == len(self.__partitioners)
- for partitioner, own_partitions in zip(self.__partitioners,
- partitions):
+ for partitioner, own_partitions in zip(
+ self.__partitioners, partitions
+ ):
assert list(partitioner) == own_partitions
def __wait(self):
diff --git a/kazoo/tests/test_party.py b/kazoo/tests/test_party.py
index 2089d54..1b32523 100644
--- a/kazoo/tests/test_party.py
+++ b/kazoo/tests/test_party.py
@@ -9,8 +9,7 @@ class KazooPartyTests(KazooTestCase):
self.path = "/" + uuid.uuid4().hex
def test_party(self):
- parties = [self.client.Party(self.path, "p%s" % i)
- for i in range(5)]
+ parties = [self.client.Party(self.path, "p%s" % i) for i in range(5)]
one_party = parties[0]
@@ -20,14 +19,14 @@ class KazooPartyTests(KazooTestCase):
participants = set()
for party in parties:
party.join()
- participants.add(party.data.decode('utf-8'))
+ participants.add(party.data.decode("utf-8"))
assert set(party) == participants
assert len(party) == len(participants)
for party in parties:
party.leave()
- participants.remove(party.data.decode('utf-8'))
+ participants.remove(party.data.decode("utf-8"))
assert set(party) == participants
assert len(party) == len(participants)
@@ -58,8 +57,9 @@ class KazooShallowPartyTests(KazooTestCase):
self.path = "/" + uuid.uuid4().hex
def test_party(self):
- parties = [self.client.ShallowParty(self.path, "p%s" % i)
- for i in range(5)]
+ parties = [
+ self.client.ShallowParty(self.path, "p%s" % i) for i in range(5)
+ ]
one_party = parties[0]
@@ -69,14 +69,14 @@ class KazooShallowPartyTests(KazooTestCase):
participants = set()
for party in parties:
party.join()
- participants.add(party.data.decode('utf-8'))
+ participants.add(party.data.decode("utf-8"))
assert set(party) == participants
assert len(party) == len(participants)
for party in parties:
party.leave()
- participants.remove(party.data.decode('utf-8'))
+ participants.remove(party.data.decode("utf-8"))
assert set(party) == participants
assert len(party) == len(participants)
diff --git a/kazoo/tests/test_paths.py b/kazoo/tests/test_paths.py
index 730a189..4e42538 100644
--- a/kazoo/tests/test_paths.py
+++ b/kazoo/tests/test_paths.py
@@ -6,100 +6,97 @@ import pytest
from kazoo.protocol import paths
-if sys.version_info > (3, ): # pragma: nocover
+if sys.version_info > (3,): # pragma: nocover
+
def u(s):
return s
+
else: # pragma: nocover
+
def u(s):
return unicode(s, "unicode_escape")
class NormPathTestCase(TestCase):
-
def test_normpath(self):
- assert paths.normpath('/a/b') == '/a/b'
+ assert paths.normpath("/a/b") == "/a/b"
def test_normpath_empty(self):
- assert paths.normpath('') == ''
+ assert paths.normpath("") == ""
def test_normpath_unicode(self):
- assert paths.normpath(u('/\xe4/b')) == u('/\xe4/b')
+ assert paths.normpath(u("/\xe4/b")) == u("/\xe4/b")
def test_normpath_dots(self):
- assert paths.normpath('/a./b../c') == '/a./b../c'
+ assert paths.normpath("/a./b../c") == "/a./b../c"
def test_normpath_slash(self):
- assert paths.normpath('/') == '/'
+ assert paths.normpath("/") == "/"
def test_normpath_multiple_slashes(self):
- assert paths.normpath('//') == '/'
- assert paths.normpath('//a/b') == '/a/b'
- assert paths.normpath('/a//b//') == '/a/b'
- assert paths.normpath('//a////b///c/') == '/a/b/c'
+ assert paths.normpath("//") == "/"
+ assert paths.normpath("//a/b") == "/a/b"
+ assert paths.normpath("/a//b//") == "/a/b"
+ assert paths.normpath("//a////b///c/") == "/a/b/c"
def test_normpath_relative(self):
with pytest.raises(ValueError):
- paths.normpath('./a/b')
+ paths.normpath("./a/b")
with pytest.raises(ValueError):
- paths.normpath('/a/../b')
+ paths.normpath("/a/../b")
def test_normpath_trailing(self):
- assert paths.normpath('/', trailing=True) == '/'
+ assert paths.normpath("/", trailing=True) == "/"
class JoinTestCase(TestCase):
-
def test_join(self):
- assert paths.join('/a') == '/a'
- assert paths.join('/a', 'b/') == '/a/b/'
- assert paths.join('/a', 'b', 'c') == '/a/b/c'
+ assert paths.join("/a") == "/a"
+ assert paths.join("/a", "b/") == "/a/b/"
+ assert paths.join("/a", "b", "c") == "/a/b/c"
def test_join_empty(self):
- assert paths.join('') == ''
- assert paths.join('', 'a', 'b') == 'a/b'
- assert paths.join('/a', '', 'b/', 'c') == '/a/b/c'
+ assert paths.join("") == ""
+ assert paths.join("", "a", "b") == "a/b"
+ assert paths.join("/a", "", "b/", "c") == "/a/b/c"
def test_join_absolute(self):
- assert paths.join('/a/b', '/c') == '/c'
+ assert paths.join("/a/b", "/c") == "/c"
class IsAbsTestCase(TestCase):
-
def test_isabs(self):
- assert paths.isabs('/') is True
- assert paths.isabs('/a') is True
- assert paths.isabs('/a//b/c') is True
- assert paths.isabs('//a/b') is True
+ assert paths.isabs("/") is True
+ assert paths.isabs("/a") is True
+ assert paths.isabs("/a//b/c") is True
+ assert paths.isabs("//a/b") is True
def test_isabs_false(self):
- assert paths.isabs('') is False
- assert paths.isabs('a/') is False
- assert paths.isabs('a/../') is False
+ assert paths.isabs("") is False
+ assert paths.isabs("a/") is False
+ assert paths.isabs("a/../") is False
class BaseNameTestCase(TestCase):
-
def test_basename(self):
- assert paths.basename('') == ''
- assert paths.basename('/') == ''
- assert paths.basename('//a') == 'a'
- assert paths.basename('//a/') == ''
- assert paths.basename('/a/b.//c..') == 'c..'
+ assert paths.basename("") == ""
+ assert paths.basename("/") == ""
+ assert paths.basename("//a") == "a"
+ assert paths.basename("//a/") == ""
+ assert paths.basename("/a/b.//c..") == "c.."
class PrefixRootTestCase(TestCase):
-
def test_prefix_root(self):
- assert paths._prefix_root('/a/', 'b/c') == '/a/b/c'
- assert paths._prefix_root('/a/b', 'c/d') == '/a/b/c/d'
- assert paths._prefix_root('/a', '/b/c') == '/a/b/c'
- assert paths._prefix_root('/a', '//b/c.') == '/a/b/c.'
+ assert paths._prefix_root("/a/", "b/c") == "/a/b/c"
+ assert paths._prefix_root("/a/b", "c/d") == "/a/b/c/d"
+ assert paths._prefix_root("/a", "/b/c") == "/a/b/c"
+ assert paths._prefix_root("/a", "//b/c.") == "/a/b/c."
class NormRootTestCase(TestCase):
-
def test_norm_root(self):
- assert paths._norm_root('') == '/'
- assert paths._norm_root('/') == '/'
- assert paths._norm_root('//a') == '/a'
- assert paths._norm_root('//a./b') == '/a./b'
+ assert paths._norm_root("") == "/"
+ assert paths._norm_root("/") == "/"
+ assert paths._norm_root("//a") == "/a"
+ assert paths._norm_root("//a./b") == "/a./b"
diff --git a/kazoo/tests/test_retry.py b/kazoo/tests/test_retry.py
index 7a30667..5b79eca 100644
--- a/kazoo/tests/test_retry.py
+++ b/kazoo/tests/test_retry.py
@@ -2,6 +2,7 @@ import unittest
import pytest
+
class TestRetrySleeper(unittest.TestCase):
def _pass(self):
pass
@@ -12,11 +13,11 @@ class TestRetrySleeper(unittest.TestCase):
scope = dict(times=0)
def inner():
- if scope['times'] >= times:
+ if scope["times"] >= times:
pass
else:
- scope['times'] += 1
- raise ForceRetryError('Failed!')
+ scope["times"] += 1
+ raise ForceRetryError("Failed!")
return inner
diff --git a/kazoo/tests/test_sasl.py b/kazoo/tests/test_sasl.py
index 3cca854..6daa2a0 100644
--- a/kazoo/tests/test_sasl.py
+++ b/kazoo/tests/test_sasl.py
@@ -31,7 +31,7 @@ class TestLegacySASLDigestAuthentication(KazooTestHarness):
def tearDown(self):
self.teardown_zookeeper()
- os.environ.pop('ZOOKEEPER_JAAS_AUTH', None)
+ os.environ.pop("ZOOKEEPER_JAAS_AUTH", None)
def test_connect_sasl_auth(self):
from kazoo.security import make_acl
@@ -81,7 +81,7 @@ class TestSASLDigestAuthentication(KazooTestHarness):
def tearDown(self):
self.teardown_zookeeper()
- os.environ.pop('ZOOKEEPER_JAAS_AUTH', None)
+ os.environ.pop("ZOOKEEPER_JAAS_AUTH", None)
def test_connect_sasl_auth(self):
from kazoo.security import make_acl
@@ -147,7 +147,7 @@ class TestSASLGSSAPIAuthentication(KazooTestHarness):
def tearDown(self):
self.teardown_zookeeper()
- os.environ.pop('ZOOKEEPER_JAAS_AUTH', None)
+ os.environ.pop("ZOOKEEPER_JAAS_AUTH", None)
def test_connect_gssapi_auth(self):
from kazoo.security import make_acl
diff --git a/kazoo/tests/test_security.py b/kazoo/tests/test_security.py
index ee740a9..bc45483 100644
--- a/kazoo/tests/test_security.py
+++ b/kazoo/tests/test_security.py
@@ -14,29 +14,41 @@ class TestACL(unittest.TestCase):
assert acl.perms & Permissions.READ == Permissions.READ
def test_all_perms(self):
- acl = self._makeOne("digest", ":", read=True, write=True,
- create=True, delete=True, admin=True)
- for perm in [Permissions.READ, Permissions.CREATE, Permissions.WRITE,
- Permissions.DELETE, Permissions.ADMIN]:
+ acl = self._makeOne(
+ "digest",
+ ":",
+ read=True,
+ write=True,
+ create=True,
+ delete=True,
+ admin=True,
+ )
+ for perm in [
+ Permissions.READ,
+ Permissions.CREATE,
+ Permissions.WRITE,
+ Permissions.DELETE,
+ Permissions.ADMIN,
+ ]:
assert acl.perms & perm == perm
def test_perm_listing(self):
from kazoo.security import ACL
- f = ACL(15, 'fred')
- assert 'READ' in f.acl_list
- assert 'WRITE' in f.acl_list
- assert 'CREATE' in f.acl_list
- assert 'DELETE' in f.acl_list
+ f = ACL(15, "fred")
+ assert "READ" in f.acl_list
+ assert "WRITE" in f.acl_list
+ assert "CREATE" in f.acl_list
+ assert "DELETE" in f.acl_list
- f = ACL(16, 'fred')
- assert 'ADMIN' in f.acl_list
+ f = ACL(16, "fred")
+ assert "ADMIN" in f.acl_list
- f = ACL(31, 'george')
- assert 'ALL' in f.acl_list
+ f = ACL(31, "george")
+ assert "ALL" in f.acl_list
def test_perm_repr(self):
from kazoo.security import ACL
- f = ACL(16, 'fred')
+ f = ACL(16, "fred")
assert "ACL(perms=16, acl_list=['ADMIN']" in repr(f)
diff --git a/kazoo/tests/test_selectors_select.py b/kazoo/tests/test_selectors_select.py
index 2baf225..19b224d 100644
--- a/kazoo/tests/test_selectors_select.py
+++ b/kazoo/tests/test_selectors_select.py
@@ -14,15 +14,16 @@ from kazoo.handlers.utils import selector_select
select = selector_select
-@unittest.skipIf((sys.platform[:3] == 'win'),
- "can't easily test on this system")
+@unittest.skipIf(
+ (sys.platform[:3] == "win"), "can't easily test on this system"
+)
class SelectTestCase(unittest.TestCase):
class Nope:
pass
class Almost:
def fileno(self):
- return 'fileno'
+ return "fileno"
def test_error_conditions(self):
self.assertRaises(TypeError, select, 1, 2, 3)
@@ -32,10 +33,12 @@ class SelectTestCase(unittest.TestCase):
self.assertRaises(ValueError, select, [], [], [], -1)
# Issue #12367: http://www.freebsd.org/cgi/query-pr.cgi?pr=kern/155606
- @unittest.skipIf(sys.platform.startswith('freebsd'),
- 'skip because of a FreeBSD bug: kern/155606')
+ @unittest.skipIf(
+ sys.platform.startswith("freebsd"),
+ "skip because of a FreeBSD bug: kern/155606",
+ )
def test_errno(self):
- with open(__file__, 'rb') as fp:
+ with open(__file__, "rb") as fp:
fd = fp.fileno()
fp.close()
try:
@@ -53,8 +56,8 @@ class SelectTestCase(unittest.TestCase):
self.assertIsNot(w, x)
def test_select(self):
- cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
- p = os.popen(cmd, 'r')
+ cmd = "for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done"
+ p = os.popen(cmd, "r")
for tout in (0, 1, 2, 4, 8, 16) + (None,) * 10:
rfd, wfd, xfd = select([p], [], [], tout)
if (rfd, wfd, xfd) == ([], [], []):
@@ -64,7 +67,7 @@ class SelectTestCase(unittest.TestCase):
if not line:
break
continue
- self.fail('Unexpected return values from select():', rfd, wfd, xfd)
+ self.fail("Unexpected return values from select():", rfd, wfd, xfd)
p.close()
# Issue 16230: Crash on select resized list
@@ -81,5 +84,5 @@ class SelectTestCase(unittest.TestCase):
self.assertEqual(select([], a, []), ([], a[:5], []))
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main()
diff --git a/kazoo/tests/test_threading_handler.py b/kazoo/tests/test_threading_handler.py
index dbdccd7..8d7ed13 100644
--- a/kazoo/tests/test_threading_handler.py
+++ b/kazoo/tests/test_threading_handler.py
@@ -20,7 +20,7 @@ class TestThreadingHandler(unittest.TestCase):
h = self._makeOne()
h.start()
# In Python 3.3 _Event is gone, before Event is function
- event_class = getattr(threading, '_Event', threading.Event)
+ event_class = getattr(threading, "_Event", threading.Event)
assert isinstance(h.event_object(), event_class)
def test_matching_async(self):
@@ -52,7 +52,7 @@ class TestThreadingHandler(unittest.TestCase):
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096))
except (ValueError, resource.error):
- self.skipTest('couldnt raise fd limit high enough')
+ self.skipTest("couldnt raise fd limit high enough")
fd = 0
socks = []
while fd < 4000:
@@ -83,7 +83,7 @@ class TestThreadingAsync(unittest.TestCase):
async_result = self._makeOne(mock_handler)
assert async_result.ready() is False
- async_result.set('val')
+ async_result.set("val")
assert async_result.ready() is True
assert async_result.successful() is True
assert async_result.exception is None
@@ -94,7 +94,7 @@ class TestThreadingAsync(unittest.TestCase):
async_result = self._makeOne(mock_handler)
async_result.rawlink(lambda a: a)
- async_result.set('val')
+ async_result.set("val")
assert mock_handler.completion_queue.put.called
@@ -103,7 +103,7 @@ class TestThreadingAsync(unittest.TestCase):
mock_handler.completion_queue = mock.Mock()
async_result = self._makeOne(mock_handler)
async_result.rawlink(lambda a: a)
- async_result.set_exception(ImportError('Error occured'))
+ async_result.set_exception(ImportError("Error occured"))
assert isinstance(async_result.exception, ImportError)
assert mock_handler.completion_queue.put.called
@@ -126,9 +126,9 @@ class TestThreadingAsync(unittest.TestCase):
th.start()
bv.wait()
- async_result.set('fred')
+ async_result.set("fred")
cv.wait()
- assert lst == ['fred']
+ assert lst == ["fred"]
th.join()
def test_get_with_nowait(self):
@@ -155,7 +155,7 @@ class TestThreadingAsync(unittest.TestCase):
try:
val = async_result.get()
except ImportError:
- lst.append('oops')
+ lst.append("oops")
else:
lst.append(val)
cv.set()
@@ -166,7 +166,7 @@ class TestThreadingAsync(unittest.TestCase):
async_result.set_exception(ImportError)
cv.wait()
- assert lst == ['oops']
+ assert lst == ["oops"]
th.join()
def test_wait(self):
@@ -182,7 +182,7 @@ class TestThreadingAsync(unittest.TestCase):
try:
val = async_result.wait(10)
except ImportError:
- lst.append('oops')
+ lst.append("oops")
else:
lst.append(val)
cv.set()
@@ -230,7 +230,7 @@ class TestThreadingAsync(unittest.TestCase):
lst = []
cv = threading.Event()
- async_result.set('fred')
+ async_result.set("fred")
def wait_for_val():
val = async_result.get()
@@ -240,7 +240,7 @@ class TestThreadingAsync(unittest.TestCase):
th = threading.Thread(target=wait_for_val)
th.start()
cv.wait()
- assert lst == ['fred']
+ assert lst == ["fred"]
th.join()
def test_set_exc_before_wait(self):
@@ -255,7 +255,7 @@ class TestThreadingAsync(unittest.TestCase):
try:
val = async_result.get()
except ImportError:
- lst.append('ooops')
+ lst.append("ooops")
else:
lst.append(val)
cv.set()
@@ -263,7 +263,7 @@ class TestThreadingAsync(unittest.TestCase):
th = threading.Thread(target=wait_for_val)
th.start()
cv.wait()
- assert lst == ['ooops']
+ assert lst == ["ooops"]
th.join()
def test_linkage(self):
@@ -284,11 +284,11 @@ class TestThreadingAsync(unittest.TestCase):
th.start()
async_result.rawlink(add_on)
- async_result.set(b'fred')
+ async_result.set(b"fred")
assert mock_handler.completion_queue.put.called
async_result.unlink(add_on)
cv.wait()
- assert async_result.value == b'fred'
+ assert async_result.value == b"fred"
th.join()
def test_linkage_not_ready(self):
@@ -300,7 +300,7 @@ class TestThreadingAsync(unittest.TestCase):
def add_on():
lst.append(True)
- async_result.set('fred')
+ async_result.set("fred")
assert not mock_handler.completion_queue.called
async_result.rawlink(add_on)
assert mock_handler.completion_queue.put.called
@@ -317,7 +317,7 @@ class TestThreadingAsync(unittest.TestCase):
async_result.rawlink(add_on)
assert not mock_handler.completion_queue.put.called
async_result.unlink(add_on)
- async_result.set('fred')
+ async_result.set("fred")
assert not mock_handler.completion_queue.put.called
def test_captured_exception(self):
@@ -371,22 +371,22 @@ class TestThreadingAsync(unittest.TestCase):
@wrap(async_result)
def regular_function():
- return 'hello'
+ return "hello"
- assert regular_function() == 'hello'
+ assert regular_function() == "hello"
assert mock_handler.completion_queue.put.called
- assert async_result.get() == 'hello'
+ assert async_result.get() == "hello"
def test_multiple_callbacks(self):
- mockback1 = mock.Mock(name='mockback1')
- mockback2 = mock.Mock(name='mockback2')
+ mockback1 = mock.Mock(name="mockback1")
+ mockback2 = mock.Mock(name="mockback2")
handler = self._makeHandler()
handler.start()
async_result = self._makeOne(handler)
async_result.rawlink(mockback1)
async_result.rawlink(mockback2)
- async_result.set('howdy')
+ async_result.set("howdy")
async_result.wait()
handler.stop()
diff --git a/kazoo/tests/test_utils.py b/kazoo/tests/test_utils.py
index 7e21a40..1a9f025 100644
--- a/kazoo/tests/test_utils.py
+++ b/kazoo/tests/test_utils.py
@@ -16,38 +16,40 @@ class TestCreateTCPConnection(unittest.TestCase):
from kazoo.handlers import utils
from kazoo.handlers.utils import create_tcp_connection, socket, time
- with patch.object(socket, 'create_connection') as create_connection:
- with patch.object(utils, '_set_default_tcpsock_options'):
+ with patch.object(socket, "create_connection") as create_connection:
+ with patch.object(utils, "_set_default_tcpsock_options"):
# Ensure a gap between calls to time.time() does not result in
# create_connection being called with a negative timeout
# argument.
- with patch.object(time, 'time', side_effect=range(10)):
- create_tcp_connection(socket, ('127.0.0.1', 2181),
- timeout=1.5)
+ with patch.object(time, "time", side_effect=range(10)):
+ create_tcp_connection(
+ socket, ("127.0.0.1", 2181), timeout=1.5
+ )
for call_args in create_connection.call_args_list:
timeout = call_args[0][1]
- assert timeout >= 0, 'socket timeout must be nonnegative'
+ assert timeout >= 0, "socket timeout must be nonnegative"
def test_timeout_arg_eventlet(self):
if not EVENTLET_HANDLER_AVAILABLE:
- pytest.skip('eventlet handler not available.')
+ pytest.skip("eventlet handler not available.")
from kazoo.handlers import utils
from kazoo.handlers.utils import create_tcp_connection, time
- with patch.object(socket, 'create_connection') as create_connection:
- with patch.object(utils, '_set_default_tcpsock_options'):
+ with patch.object(socket, "create_connection") as create_connection:
+ with patch.object(utils, "_set_default_tcpsock_options"):
# Ensure a gap between calls to time.time() does not result in
# create_connection being called with a negative timeout
# argument.
- with patch.object(time, 'time', side_effect=range(10)):
- create_tcp_connection(socket, ('127.0.0.1', 2181),
- timeout=1.5)
+ with patch.object(time, "time", side_effect=range(10)):
+ create_tcp_connection(
+ socket, ("127.0.0.1", 2181), timeout=1.5
+ )
for call_args in create_connection.call_args_list:
timeout = call_args[0][1]
- assert timeout >= 0, 'socket timeout must be nonnegative'
+ assert timeout >= 0, "socket timeout must be nonnegative"
def test_slow_connect(self):
# Currently, create_tcp_connection will raise a socket timeout if it
@@ -57,15 +59,15 @@ class TestCreateTCPConnection(unittest.TestCase):
from kazoo.handlers.utils import create_tcp_connection, socket, time
# Simulate a second passing between calls to check the current time.
- with patch.object(time, 'time', side_effect=range(10)):
+ with patch.object(time, "time", side_effect=range(10)):
with pytest.raises(socket.error):
- create_tcp_connection(socket, ('127.0.0.1', 2181), timeout=0.5)
+ create_tcp_connection(socket, ("127.0.0.1", 2181), timeout=0.5)
def test_negative_timeout(self):
from kazoo.handlers.utils import create_tcp_connection, socket
with pytest.raises(socket.error):
- create_tcp_connection(socket, ('127.0.0.1', 2181), timeout=-1)
+ create_tcp_connection(socket, ("127.0.0.1", 2181), timeout=-1)
def test_zero_timeout(self):
# Rather than pass '0' through as a timeout to
@@ -75,6 +77,6 @@ class TestCreateTCPConnection(unittest.TestCase):
from kazoo.handlers.utils import create_tcp_connection, socket, time
# Simulate no time passing between calls to check the current time.
- with patch.object(time, 'time', return_value=time.time()):
+ with patch.object(time, "time", return_value=time.time()):
with pytest.raises(socket.error):
- create_tcp_connection(socket, ('127.0.0.1', 2181), timeout=0)
+ create_tcp_connection(socket, ("127.0.0.1", 2181), timeout=0)
diff --git a/kazoo/tests/test_watchers.py b/kazoo/tests/test_watchers.py
index 69d8fce..9b0c339 100644
--- a/kazoo/tests/test_watchers.py
+++ b/kazoo/tests/test_watchers.py
@@ -20,7 +20,7 @@ class KazooDataWatcherTests(KazooTestCase):
data = [True]
# Make it a non-existent path
- self.path += 'f'
+ self.path += "f"
@self.client.DataWatch(self.path)
def changed(d, stat):
@@ -32,9 +32,9 @@ class KazooDataWatcherTests(KazooTestCase):
assert data == [None]
update.clear()
- self.client.create(self.path, b'fred')
+ self.client.create(self.path, b"fred")
update.wait(10)
- assert data[0] == b'fred'
+ assert data[0] == b"fred"
update.clear()
def test_data_watcher_once(self):
@@ -42,7 +42,7 @@ class KazooDataWatcherTests(KazooTestCase):
data = [True]
# Make it a non-existent path
- self.path += 'f'
+ self.path += "f"
dwatcher = self.client.DataWatch(self.path)
@@ -69,7 +69,7 @@ class KazooDataWatcherTests(KazooTestCase):
data = [True]
# Make it a non-existent path
- self.path += 'f'
+ self.path += "f"
@self.client.DataWatch(self.path)
def changed(d, stat, event):
@@ -81,7 +81,7 @@ class KazooDataWatcherTests(KazooTestCase):
assert data == [None]
update.clear()
- self.client.create(self.path, b'fred')
+ self.client.create(self.path, b"fred")
update.wait(10)
assert data[0].type == EventType.CREATED
update.clear()
@@ -91,7 +91,7 @@ class KazooDataWatcherTests(KazooTestCase):
data = [True]
# Make it a non-existent path
- path = self.path + 'f'
+ path = self.path + "f"
def changed(d, stat):
data.pop()
@@ -104,9 +104,9 @@ class KazooDataWatcherTests(KazooTestCase):
assert data == [None]
update.clear()
- self.client.create(path, b'fred')
+ self.client.create(path, b"fred")
update.wait(10)
- assert data[0] == b'fred'
+ assert data[0] == b"fred"
update.clear()
def test_datawatch_across_session_expire(self):
@@ -124,9 +124,9 @@ class KazooDataWatcherTests(KazooTestCase):
update.clear()
self.expire_session(threading.Event)
- self.client.retry(self.client.set, self.path, b'fred')
+ self.client.retry(self.client.set, self.path, b"fred")
update.wait(25)
- assert data[0] == b'fred'
+ assert data[0] == b"fred"
def test_func_stops(self):
update = threading.Event()
@@ -149,17 +149,17 @@ class KazooDataWatcherTests(KazooTestCase):
update.clear()
fail_through.append(True)
- self.client.create(self.path, b'fred')
+ self.client.create(self.path, b"fred")
update.wait(10)
- assert data[0] == b'fred'
+ assert data[0] == b"fred"
update.clear()
- self.client.set(self.path, b'asdfasdf')
+ self.client.set(self.path, b"asdfasdf")
update.wait(0.2)
- assert data[0] == b'fred'
+ assert data[0] == b"fred"
d, stat = self.client.get(self.path)
- assert d == b'asdfasdf'
+ assert d == b"asdfasdf"
def test_no_such_node(self):
args = []
@@ -172,7 +172,7 @@ class KazooDataWatcherTests(KazooTestCase):
def test_no_such_node_for_children_watch(self):
args = []
- path = self.path + '/test_no_such_node_for_children_watch'
+ path = self.path + "/test_no_such_node_for_children_watch"
update = threading.Event()
def changed(children):
@@ -186,22 +186,22 @@ class KazooDataWatcherTests(KazooTestCase):
assert args == []
# watch a node which exists
- self.client.create(path, b'')
+ self.client.create(path, b"")
children_watch = self.client.ChildrenWatch(path, changed)
update.wait(3)
assert args == [[]]
update.clear()
# watch changes
- self.client.create(path + '/fred', b'')
+ self.client.create(path + "/fred", b"")
update.wait(3)
- assert args == [[], ['fred']]
+ assert args == [[], ["fred"]]
update.clear()
# delete children
- self.client.delete(path + '/fred')
+ self.client.delete(path + "/fred")
update.wait(3)
- assert args == [[], ['fred'], []]
+ assert args == [[], ["fred"], []]
update.clear()
# delete watching
@@ -225,7 +225,7 @@ class KazooDataWatcherTests(KazooTestCase):
watcher = WeirdWatcher()
self.client.DataWatch(self.path, watcher)
- self.client.set(self.path, b'mwahaha')
+ self.client.set(self.path, b"mwahaha")
assert watcher.called is True
def test_watcher_repeat_delete(self):
@@ -242,21 +242,21 @@ class KazooDataWatcherTests(KazooTestCase):
assert a == [None]
ev.wait(10)
ev.clear()
- self.client.create(self.path, b'blah')
+ self.client.create(self.path, b"blah")
ev.wait(10)
assert ev.is_set() is True
ev.clear()
- assert a == [None, b'blah']
+ assert a == [None, b"blah"]
self.client.delete(self.path)
ev.wait(10)
assert ev.is_set() is True
ev.clear()
- assert a == [None, b'blah', None]
- self.client.create(self.path, b'blah')
+ assert a == [None, b"blah", None]
+ self.client.create(self.path, b"blah")
ev.wait(10)
assert ev.is_set() is True
ev.clear()
- assert a == [None, b'blah', None, b'blah']
+ assert a == [None, b"blah", None, b"blah"]
def test_watcher_with_closing(self):
a = []
@@ -287,7 +287,7 @@ class KazooChildrenWatcherTests(KazooTestCase):
def test_child_watcher(self):
update = threading.Event()
- all_children = ['fred']
+ all_children = ["fred"]
@self.client.ChildrenWatch(self.path)
def changed(children):
@@ -300,18 +300,18 @@ class KazooChildrenWatcherTests(KazooTestCase):
assert all_children == []
update.clear()
- self.client.create(self.path + '/' + 'smith')
+ self.client.create(self.path + "/" + "smith")
update.wait(10)
- assert all_children == ['smith']
+ assert all_children == ["smith"]
update.clear()
- self.client.create(self.path + '/' + 'george')
+ self.client.create(self.path + "/" + "george")
update.wait(10)
- assert sorted(all_children) == ['george', 'smith']
+ assert sorted(all_children) == ["george", "smith"]
def test_child_watcher_once(self):
update = threading.Event()
- all_children = ['fred']
+ all_children = ["fred"]
cwatch = self.client.ChildrenWatch(self.path)
@@ -346,14 +346,14 @@ class KazooChildrenWatcherTests(KazooTestCase):
assert events == [None]
update.clear()
- self.client.create(self.path + '/' + 'smith')
+ self.client.create(self.path + "/" + "smith")
update.wait(10)
assert events[0].type == EventType.CHILD
update.clear()
def test_func_style_child_watcher(self):
update = threading.Event()
- all_children = ['fred']
+ all_children = ["fred"]
def changed(children):
while all_children:
@@ -367,18 +367,18 @@ class KazooChildrenWatcherTests(KazooTestCase):
assert all_children == []
update.clear()
- self.client.create(self.path + '/' + 'smith')
+ self.client.create(self.path + "/" + "smith")
update.wait(10)
- assert all_children == ['smith']
+ assert all_children == ["smith"]
update.clear()
- self.client.create(self.path + '/' + 'george')
+ self.client.create(self.path + "/" + "george")
update.wait(10)
- assert sorted(all_children) == ['george', 'smith']
+ assert sorted(all_children) == ["george", "smith"]
def test_func_stops(self):
update = threading.Event()
- all_children = ['fred']
+ all_children = ["fred"]
fail_through = []
@@ -396,18 +396,18 @@ class KazooChildrenWatcherTests(KazooTestCase):
update.clear()
fail_through.append(True)
- self.client.create(self.path + '/' + 'smith')
+ self.client.create(self.path + "/" + "smith")
update.wait(10)
- assert all_children == ['smith']
+ assert all_children == ["smith"]
update.clear()
- self.client.create(self.path + '/' + 'george')
+ self.client.create(self.path + "/" + "george")
update.wait(0.5)
- assert all_children == ['smith']
+ assert all_children == ["smith"]
def test_child_watcher_remove_session_watcher(self):
update = threading.Event()
- all_children = ['fred']
+ all_children = ["fred"]
fail_through = []
@@ -428,20 +428,20 @@ class KazooChildrenWatcherTests(KazooTestCase):
update.clear()
fail_through.append(True)
- self.client.create(self.path + '/' + 'smith')
+ self.client.create(self.path + "/" + "smith")
update.wait(10)
assert session_watcher not in self.client.state_listeners
- assert all_children == ['smith']
+ assert all_children == ["smith"]
update.clear()
- self.client.create(self.path + '/' + 'george')
+ self.client.create(self.path + "/" + "george")
update.wait(10)
assert session_watcher not in self.client.state_listeners
- assert all_children == ['smith']
+ assert all_children == ["smith"]
def test_child_watch_session_loss(self):
update = threading.Event()
- all_children = ['fred']
+ all_children = ["fred"]
@self.client.ChildrenWatch(self.path)
def changed(children):
@@ -454,20 +454,19 @@ class KazooChildrenWatcherTests(KazooTestCase):
assert all_children == []
update.clear()
- self.client.create(self.path + '/' + 'smith')
+ self.client.create(self.path + "/" + "smith")
update.wait(10)
- assert all_children == ['smith']
+ assert all_children == ["smith"]
update.clear()
self.expire_session(threading.Event)
- self.client.retry(self.client.create,
- self.path + '/' + 'george')
+ self.client.retry(self.client.create, self.path + "/" + "george")
update.wait(20)
- assert sorted(all_children) == ['george', 'smith']
+ assert sorted(all_children) == ["george", "smith"]
def test_child_stop_on_session_loss(self):
update = threading.Event()
- all_children = ['fred']
+ all_children = ["fred"]
@self.client.ChildrenWatch(self.path, allow_session_lost=False)
def changed(children):
@@ -480,20 +479,19 @@ class KazooChildrenWatcherTests(KazooTestCase):
assert all_children == []
update.clear()
- self.client.create(self.path + '/' + 'smith')
+ self.client.create(self.path + "/" + "smith")
update.wait(10)
- assert all_children == ['smith']
+ assert all_children == ["smith"]
update.clear()
self.expire_session(threading.Event)
- self.client.retry(self.client.create,
- self.path + '/' + 'george')
+ self.client.retry(self.client.create, self.path + "/" + "george")
update.wait(4)
assert update.is_set() is False
- assert all_children == ['smith']
+ assert all_children == ["smith"]
children = self.client.get_children(self.path)
- assert sorted(children) == ['george', 'smith']
+ assert sorted(children) == ["george", "smith"]
class KazooPatientChildrenWatcherTests(KazooTestCase):
@@ -514,7 +512,7 @@ class KazooPatientChildrenWatcherTests(KazooTestCase):
assert len(children) == 0
assert asy.ready() is False
- self.client.create(self.path + '/' + 'fred')
+ self.client.create(self.path + "/" + "fred")
asy.get(timeout=1)
assert asy.ready() is True
@@ -534,11 +532,11 @@ class KazooPatientChildrenWatcherTests(KazooTestCase):
assert result.ready() is False
time.sleep(0.08)
- self.client.create(self.path + '/' + uuid.uuid4().hex)
+ self.client.create(self.path + "/" + uuid.uuid4().hex)
assert result.ready() is False
time.sleep(0.08)
assert result.ready() is False
- self.client.create(self.path + '/' + uuid.uuid4().hex)
+ self.client.create(self.path + "/" + uuid.uuid4().hex)
time.sleep(0.08)
assert result.ready() is False
diff --git a/kazoo/tests/util.py b/kazoo/tests/util.py
index dbe3b48..223fe64 100644
--- a/kazoo/tests/util.py
+++ b/kazoo/tests/util.py
@@ -16,17 +16,16 @@ import logging
import os
import time
-CI = os.environ.get('CI', False)
-CI_ZK_VERSION = CI and os.environ.get('ZOOKEEPER_VERSION', None)
+CI = os.environ.get("CI", False)
+CI_ZK_VERSION = CI and os.environ.get("ZOOKEEPER_VERSION", None)
if CI_ZK_VERSION:
- if '-' in CI_ZK_VERSION:
+ if "-" in CI_ZK_VERSION:
# Ignore pre-release markers like -alpha
- CI_ZK_VERSION = CI_ZK_VERSION.split('-')[0]
- CI_ZK_VERSION = tuple([int(n) for n in CI_ZK_VERSION.split('.')])
+ CI_ZK_VERSION = CI_ZK_VERSION.split("-")[0]
+ CI_ZK_VERSION = tuple([int(n) for n in CI_ZK_VERSION.split(".")])
class Handler(logging.Handler):
-
def __init__(self, *names, **kw):
logging.Handler.__init__(self)
self.names = names
@@ -57,34 +56,48 @@ class Handler(logging.Handler):
logger.removeHandler(self)
def __str__(self):
- return '\n'.join(
- [("%s %s\n %s" %
- (record.name, record.levelname,
- '\n'.join([line
- for line in record.getMessage().split('\n')
- if line.strip()])
- )
- )
- for record in self.records])
+ return "\n".join(
+ [
+ (
+ "%s %s\n %s"
+ % (
+ record.name,
+ record.levelname,
+ "\n".join(
+ [
+ line
+ for line in record.getMessage().split("\n")
+ if line.strip()
+ ]
+ ),
+ )
+ )
+ for record in self.records
+ ]
+ )
class InstalledHandler(Handler):
-
def __init__(self, *names, **kw):
Handler.__init__(self, *names, **kw)
self.install()
class Wait(object):
-
class TimeOutWaitingFor(Exception):
"A test condition timed out"
timeout = 9
- wait = .01
+ wait = 0.01
- def __init__(self, timeout=None, wait=None, exception=None,
- getnow=(lambda: time.time), getsleep=(lambda: time.sleep)):
+ def __init__(
+ self,
+ timeout=None,
+ wait=None,
+ exception=None,
+ getnow=(lambda: time.time),
+ getsleep=(lambda: time.sleep),
+ ):
if timeout is not None:
self.timeout = timeout
@@ -120,9 +133,10 @@ class Wait(object):
return
if now() > deadline:
raise self.TimeOutWaitingFor(
- message or
- getattr(func, '__doc__') or
- getattr(func, '__name__')
- )
+ message
+ or getattr(func, "__doc__")
+ or getattr(func, "__name__")
+ )
+
wait = Wait()
diff --git a/kazoo/version.py b/kazoo/version.py
index 387cfac..43ce13d 100644
--- a/kazoo/version.py
+++ b/kazoo/version.py
@@ -1 +1 @@
-__version__ = '2.9.0'
+__version__ = "2.9.0"