summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-02-16 16:25:49 +0000
committerGerrit Code Review <review@openstack.org>2015-02-16 16:25:49 +0000
commita3f126f0e66fd5cf98db52e6ce7a03de1c4ccd3e (patch)
tree6582d8e78c5e2ed7f0a34b1dff7d4bd294a91fd1
parent14009d23341e67ebc6031b04a75401521f8daaa2 (diff)
parenta14adc3c62658228a12d45e0851ae443c8429b85 (diff)
downloadtaskflow-a3f126f0e66fd5cf98db52e6ce7a03de1c4ccd3e.tar.gz
Merge "Add retries to fetching the zookeeper server version"
-rw-r--r--requirements-py2.txt3
-rw-r--r--requirements-py3.txt3
-rw-r--r--taskflow/tests/unit/test_utils_kazoo_utils.py56
-rw-r--r--taskflow/utils/kazoo_utils.py33
4 files changed, 92 insertions, 3 deletions
diff --git a/requirements-py2.txt b/requirements-py2.txt
index 083caec..8657809 100644
--- a/requirements-py2.txt
+++ b/requirements-py2.txt
@@ -25,6 +25,9 @@ futures>=2.1.6
# Used for structured input validation
jsonschema>=2.0.0,<3.0.0
+# For utility retries...
+retrying>=1.2.3,!=1.3.0
+
# For common utilities
oslo.utils>=1.2.0 # Apache-2.0
oslo.serialization>=1.2.0 # Apache-2.0
diff --git a/requirements-py3.txt b/requirements-py3.txt
index b04fc0a..8e555f0 100644
--- a/requirements-py3.txt
+++ b/requirements-py3.txt
@@ -19,6 +19,9 @@ stevedore>=1.1.0 # Apache-2.0
# Used for structured input validation
jsonschema>=2.0.0,<3.0.0
+# For utility retries...
+retrying>=1.2.3,!=1.3.0
+
# For common utilities
oslo.utils>=1.2.0 # Apache-2.0
oslo.serialization>=1.2.0 # Apache-2.0
diff --git a/taskflow/tests/unit/test_utils_kazoo_utils.py b/taskflow/tests/unit/test_utils_kazoo_utils.py
new file mode 100644
index 0000000..9759a73
--- /dev/null
+++ b/taskflow/tests/unit/test_utils_kazoo_utils.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from kazoo import client
+from kazoo import exceptions as k_exc
+from six.moves import range as compat_range
+
+from taskflow import test
+from taskflow.test import mock
+from taskflow.utils import kazoo_utils as ku
+
+_FAKE_ZK_VER = (3, 4, 0)
+
+
+def _iter_succeed_after(attempts):
+ for i in compat_range(0, attempts):
+ if i % 2 == 0:
+ yield ValueError("Broken")
+ else:
+ yield AttributeError("Broken")
+ yield _FAKE_ZK_VER
+
+
+class KazooUtilTest(test.TestCase):
+ def test_flakey_version_fetch_fail(self):
+ m = mock.create_autospec(client.KazooClient, instance=True)
+ m.server_version.side_effect = _iter_succeed_after(11)
+ self.assertRaises(k_exc.KazooException,
+ ku.fetch_server_version, m, 10)
+ self.assertEqual(10, m.server_version.call_count)
+
+ def test_flakey_version_fetch_fail_truncated(self):
+ m = mock.create_autospec(client.KazooClient, instance=True)
+ m.server_version.side_effect = [None, [], "", [1]]
+ self.assertRaises(k_exc.KazooException,
+ ku.fetch_server_version, m, 4)
+ self.assertEqual(4, m.server_version.call_count)
+
+ def test_flakey_version_fetch_pass(self):
+ m = mock.create_autospec(client.KazooClient, instance=True)
+ m.server_version.side_effect = _iter_succeed_after(4)
+ self.assertEqual((3, 4, 0), ku.fetch_server_version(m, 5))
+ self.assertEqual(5, m.server_version.call_count)
diff --git a/taskflow/utils/kazoo_utils.py b/taskflow/utils/kazoo_utils.py
index c2869bd..689b50b 100644
--- a/taskflow/utils/kazoo_utils.py
+++ b/taskflow/utils/kazoo_utils.py
@@ -17,6 +17,7 @@
from kazoo import client
from kazoo import exceptions as k_exc
from oslo_utils import reflection
+import retrying
import six
from six.moves import zip as compat_zip
@@ -138,7 +139,32 @@ def finalize_client(client):
pass
-def check_compatible(client, min_version=None, max_version=None):
+def fetch_server_version(client, fetch_attempts):
+ """Fetches the server version but also handles its apparent flakiness.
+
+ The issue @ https://github.com/python-zk/kazoo/issues/274 explains
+ why this happens and how it may become better at some point in the
+ future; once that issue is addressed we should be able to handle this
+ better...
+ """
+ # If for some reason the parsed version is not composed of a 'major.minor'
+ # version we likely got a truncated value back and we should try again...
+ retry_on_result = lambda version: not version or len(version) <= 1
+ retry_on_exception = lambda exc: isinstance(exc, (AttributeError,
+ ValueError))
+ r = retrying.Retrying(retry_on_exception=retry_on_exception,
+ stop_max_attempt_number=fetch_attempts,
+ retry_on_result=retry_on_result)
+ try:
+ return r.call(client.server_version)
+ except (AttributeError, ValueError, retrying.RetryError):
+ raise k_exc.KazooException("Unable to fetch useable server"
+ " version after trying %s times"
+ % (fetch_attempts))
+
+
+def check_compatible(client, min_version=None, max_version=None,
+ version_fetch_attempts=3):
"""Checks if a kazoo client is backed by a zookeeper server version.
This check will verify that the zookeeper server version that the client
@@ -148,7 +174,7 @@ def check_compatible(client, min_version=None, max_version=None):
"""
server_version = None
if min_version:
- server_version = tuple((int(a) for a in client.server_version()))
+ server_version = fetch_server_version(client, version_fetch_attempts)
min_version = tuple((int(a) for a in min_version))
if server_version < min_version:
pretty_server_version = ".".join([str(a) for a in server_version])
@@ -159,7 +185,8 @@ def check_compatible(client, min_version=None, max_version=None):
min_version))
if max_version:
if server_version is None:
- server_version = tuple((int(a) for a in client.server_version()))
+ server_version = fetch_server_version(client,
+ version_fetch_attempts)
max_version = tuple((int(a) for a in max_version))
if server_version > max_version:
pretty_server_version = ".".join([str(a) for a in server_version])