diff options
author | Jenkins <jenkins@review.openstack.org> | 2015-02-16 16:25:49 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2015-02-16 16:25:49 +0000 |
commit | a3f126f0e66fd5cf98db52e6ce7a03de1c4ccd3e (patch) | |
tree | 6582d8e78c5e2ed7f0a34b1dff7d4bd294a91fd1 | |
parent | 14009d23341e67ebc6031b04a75401521f8daaa2 (diff) | |
parent | a14adc3c62658228a12d45e0851ae443c8429b85 (diff) | |
download | taskflow-a3f126f0e66fd5cf98db52e6ce7a03de1c4ccd3e.tar.gz |
Merge "Add retries to fetching the zookeeper server version"
-rw-r--r-- | requirements-py2.txt | 3 | ||||
-rw-r--r-- | requirements-py3.txt | 3 | ||||
-rw-r--r-- | taskflow/tests/unit/test_utils_kazoo_utils.py | 56 | ||||
-rw-r--r-- | taskflow/utils/kazoo_utils.py | 33 |
4 files changed, 92 insertions, 3 deletions
diff --git a/requirements-py2.txt b/requirements-py2.txt index 083caec..8657809 100644 --- a/requirements-py2.txt +++ b/requirements-py2.txt @@ -25,6 +25,9 @@ futures>=2.1.6 # Used for structured input validation jsonschema>=2.0.0,<3.0.0 +# For utility retries... +retrying>=1.2.3,!=1.3.0 + # For common utilities oslo.utils>=1.2.0 # Apache-2.0 oslo.serialization>=1.2.0 # Apache-2.0 diff --git a/requirements-py3.txt b/requirements-py3.txt index b04fc0a..8e555f0 100644 --- a/requirements-py3.txt +++ b/requirements-py3.txt @@ -19,6 +19,9 @@ stevedore>=1.1.0 # Apache-2.0 # Used for structured input validation jsonschema>=2.0.0,<3.0.0 +# For utility retries... +retrying>=1.2.3,!=1.3.0 + # For common utilities oslo.utils>=1.2.0 # Apache-2.0 oslo.serialization>=1.2.0 # Apache-2.0 diff --git a/taskflow/tests/unit/test_utils_kazoo_utils.py b/taskflow/tests/unit/test_utils_kazoo_utils.py new file mode 100644 index 0000000..9759a73 --- /dev/null +++ b/taskflow/tests/unit/test_utils_kazoo_utils.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from kazoo import client +from kazoo import exceptions as k_exc +from six.moves import range as compat_range + +from taskflow import test +from taskflow.test import mock +from taskflow.utils import kazoo_utils as ku + +_FAKE_ZK_VER = (3, 4, 0) + + +def _iter_succeed_after(attempts): + for i in compat_range(0, attempts): + if i % 2 == 0: + yield ValueError("Broken") + else: + yield AttributeError("Broken") + yield _FAKE_ZK_VER + + +class KazooUtilTest(test.TestCase): + def test_flakey_version_fetch_fail(self): + m = mock.create_autospec(client.KazooClient, instance=True) + m.server_version.side_effect = _iter_succeed_after(11) + self.assertRaises(k_exc.KazooException, + ku.fetch_server_version, m, 10) + self.assertEqual(10, m.server_version.call_count) + + def test_flakey_version_fetch_fail_truncated(self): + m = mock.create_autospec(client.KazooClient, instance=True) + m.server_version.side_effect = [None, [], "", [1]] + self.assertRaises(k_exc.KazooException, + ku.fetch_server_version, m, 4) + self.assertEqual(4, m.server_version.call_count) + + def test_flakey_version_fetch_pass(self): + m = mock.create_autospec(client.KazooClient, instance=True) + m.server_version.side_effect = _iter_succeed_after(4) + self.assertEqual((3, 4, 0), ku.fetch_server_version(m, 5)) + self.assertEqual(5, m.server_version.call_count) diff --git a/taskflow/utils/kazoo_utils.py b/taskflow/utils/kazoo_utils.py index c2869bd..689b50b 100644 --- a/taskflow/utils/kazoo_utils.py +++ b/taskflow/utils/kazoo_utils.py @@ -17,6 +17,7 @@ from kazoo import client from kazoo import exceptions as k_exc from oslo_utils import reflection +import retrying import six from six.moves import zip as compat_zip @@ -138,7 +139,32 @@ def finalize_client(client): pass -def check_compatible(client, min_version=None, max_version=None): +def fetch_server_version(client, fetch_attempts): + """Fetches the server version but also handles its apparent flakiness. + + The issue @ https://github.com/python-zk/kazoo/issues/274 explains + why this happens and how it may become better at some point in the + future; once that issue is addressed we should be able to handle this + better... + """ + # If for some reason the parsed version is not composed of a 'major.minor' + # version we likely got a truncated value back and we should try again... + retry_on_result = lambda version: not version or len(version) <= 1 + retry_on_exception = lambda exc: isinstance(exc, (AttributeError, + ValueError)) + r = retrying.Retrying(retry_on_exception=retry_on_exception, + stop_max_attempt_number=fetch_attempts, + retry_on_result=retry_on_result) + try: + return r.call(client.server_version) + except (AttributeError, ValueError, retrying.RetryError): + raise k_exc.KazooException("Unable to fetch useable server" + " version after trying %s times" + % (fetch_attempts)) + + +def check_compatible(client, min_version=None, max_version=None, + version_fetch_attempts=3): """Checks if a kazoo client is backed by a zookeeper server version. This check will verify that the zookeeper server version that the client @@ -148,7 +174,7 @@ def check_compatible(client, min_version=None, max_version=None): """ server_version = None if min_version: - server_version = tuple((int(a) for a in client.server_version())) + server_version = fetch_server_version(client, version_fetch_attempts) min_version = tuple((int(a) for a in min_version)) if server_version < min_version: pretty_server_version = ".".join([str(a) for a in server_version]) @@ -159,7 +185,8 @@ def check_compatible(client, min_version=None, max_version=None): min_version)) if max_version: if server_version is None: - server_version = tuple((int(a) for a in client.server_version())) + server_version = fetch_server_version(client, + version_fetch_attempts) max_version = tuple((int(a) for a in max_version)) if server_version > max_version: pretty_server_version = ".".join([str(a) for a in server_version]) |