summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/source/conf.py4
-rw-r--r--releasenotes/notes/fix-zookeeper-option-parsing-f9d37fbc39af47f4.yaml7
-rw-r--r--setup.cfg1
-rw-r--r--taskflow/jobs/backends/impl_zookeeper.py4
-rw-r--r--taskflow/persistence/backends/impl_sqlalchemy.py34
-rw-r--r--taskflow/persistence/backends/impl_zookeeper.py4
-rw-r--r--taskflow/tests/unit/jobs/test_zk_job.py44
-rw-r--r--taskflow/tests/unit/test_utils_kazoo_utils.py67
-rw-r--r--taskflow/utils/kazoo_utils.py29
9 files changed, 169 insertions, 25 deletions
diff --git a/doc/source/conf.py b/doc/source/conf.py
index b46a5a8..d1006bb 100644
--- a/doc/source/conf.py
+++ b/doc/source/conf.py
@@ -68,8 +68,8 @@ modindex_common_prefix = ['taskflow.']
# Shortened external links.
source_tree = 'https://opendev.org/openstack/taskflow/src/branch/master/'
extlinks = {
- 'example': (source_tree + '/taskflow/examples/%s.py', ''),
- 'pybug': ('http://bugs.python.org/issue%s', ''),
+ 'example': (source_tree + '/taskflow/examples/%s.py', '%s'),
+ 'pybug': ('http://bugs.python.org/issue%s', '%s'),
}
diff --git a/releasenotes/notes/fix-zookeeper-option-parsing-f9d37fbc39af47f4.yaml b/releasenotes/notes/fix-zookeeper-option-parsing-f9d37fbc39af47f4.yaml
new file mode 100644
index 0000000..3f81b29
--- /dev/null
+++ b/releasenotes/notes/fix-zookeeper-option-parsing-f9d37fbc39af47f4.yaml
@@ -0,0 +1,7 @@
+---
+fixes:
+ - |
+ Fixed an issue when the configuration options of the zookeeper jobboard
+ backend were passed as strings, the string ''"False"'' was wrongly
+ interpreted as ''True''. Now the string ''"False"'' is interpreted as the
+ ''False'' boolean.
diff --git a/setup.cfg b/setup.cfg
index b97eea8..0d0f804 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -19,6 +19,7 @@ classifier =
Programming Language :: Python :: 3
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
+ Programming Language :: Python :: 3.10
Programming Language :: Python :: 3 :: Only
Programming Language :: Python :: Implementation :: CPython
Topic :: Software Development :: Libraries
diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py
index fc9399a..1e45f0f 100644
--- a/taskflow/jobs/backends/impl_zookeeper.py
+++ b/taskflow/jobs/backends/impl_zookeeper.py
@@ -28,6 +28,7 @@ from kazoo.protocol import states as k_states
from kazoo.recipe import watchers
from oslo_serialization import jsonutils
from oslo_utils import excutils
+from oslo_utils import strutils
from oslo_utils import timeutils
from oslo_utils import uuidutils
@@ -829,7 +830,8 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
excp.raise_with_cause(excp.JobFailure,
"Failed to connect to zookeeper")
try:
- if self._conf.get('check_compatible', True):
+ if strutils.bool_from_string(
+ self._conf.get('check_compatible'), default=True):
kazoo_utils.check_compatible(self._client, self.MIN_ZK_VERSION)
if self._worker is None and self._emit_notifications:
self._worker = futurist.ThreadPoolExecutor(max_workers=1)
diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py
index 3ac0f3d..742403b 100644
--- a/taskflow/persistence/backends/impl_sqlalchemy.py
+++ b/taskflow/persistence/backends/impl_sqlalchemy.py
@@ -205,15 +205,17 @@ class _Alchemist(object):
return atom_cls.from_dict(row)
def atom_query_iter(self, conn, parent_uuid):
- q = (sql.select([self._tables.atomdetails]).
+ q = (sql.select(self._tables.atomdetails).
where(self._tables.atomdetails.c.parent_uuid == parent_uuid))
for row in conn.execute(q):
+ row = row._mapping
yield self.convert_atom_detail(row)
def flow_query_iter(self, conn, parent_uuid):
- q = (sql.select([self._tables.flowdetails]).
+ q = (sql.select(self._tables.flowdetails).
where(self._tables.flowdetails.c.parent_uuid == parent_uuid))
for row in conn.execute(q):
+ row = row._mapping
yield self.convert_flow_detail(row)
def populate_book(self, conn, book):
@@ -257,7 +259,6 @@ class SQLAlchemyBackend(base.Backend):
conf = copy.deepcopy(conf)
engine_args = {
'echo': _as_bool(conf.pop('echo', False)),
- 'convert_unicode': _as_bool(conf.pop('convert_unicode', True)),
'pool_recycle': 3600,
}
if 'idle_timeout' in conf:
@@ -421,12 +422,13 @@ class Connection(base.Connection):
try:
atomdetails = self._tables.atomdetails
with self._engine.begin() as conn:
- q = (sql.select([atomdetails]).
+ q = (sql.select(atomdetails).
where(atomdetails.c.uuid == atom_detail.uuid))
row = conn.execute(q).first()
if not row:
raise exc.NotFound("No atom details found with uuid"
" '%s'" % atom_detail.uuid)
+ row = row._mapping
e_ad = self._converter.convert_atom_detail(row)
self._update_atom_details(conn, atom_detail, e_ad)
return e_ad
@@ -438,7 +440,7 @@ class Connection(base.Connection):
def _insert_flow_details(self, conn, fd, parent_uuid):
value = fd.to_dict()
value['parent_uuid'] = parent_uuid
- conn.execute(sql.insert(self._tables.flowdetails, value))
+ conn.execute(sql.insert(self._tables.flowdetails).values(**value))
for ad in fd:
self._insert_atom_details(conn, ad, fd.uuid)
@@ -446,7 +448,7 @@ class Connection(base.Connection):
value = ad.to_dict()
value['parent_uuid'] = parent_uuid
value['atom_type'] = models.atom_detail_type(ad)
- conn.execute(sql.insert(self._tables.atomdetails, value))
+ conn.execute(sql.insert(self._tables.atomdetails).values(**value))
def _update_atom_details(self, conn, ad, e_ad):
e_ad.merge(ad)
@@ -471,12 +473,13 @@ class Connection(base.Connection):
try:
flowdetails = self._tables.flowdetails
with self._engine.begin() as conn:
- q = (sql.select([flowdetails]).
+ q = (sql.select(flowdetails).
where(flowdetails.c.uuid == flow_detail.uuid))
row = conn.execute(q).first()
if not row:
raise exc.NotFound("No flow details found with"
" uuid '%s'" % flow_detail.uuid)
+ row = row._mapping
e_fd = self._converter.convert_flow_detail(row)
self._converter.populate_flow_detail(conn, e_fd)
self._update_flow_details(conn, flow_detail, e_fd)
@@ -503,10 +506,11 @@ class Connection(base.Connection):
try:
logbooks = self._tables.logbooks
with self._engine.begin() as conn:
- q = (sql.select([logbooks]).
+ q = (sql.select(logbooks).
where(logbooks.c.uuid == book.uuid))
row = conn.execute(q).first()
if row:
+ row = row._mapping
e_lb = self._converter.convert_book(row)
self._converter.populate_book(conn, e_lb)
e_lb.merge(book)
@@ -522,7 +526,7 @@ class Connection(base.Connection):
self._update_flow_details(conn, fd, e_fd)
return e_lb
else:
- conn.execute(sql.insert(logbooks, book.to_dict()))
+ conn.execute(sql.insert(logbooks).values(**book.to_dict()))
for fd in book:
self._insert_flow_details(conn, fd, book.uuid)
return book
@@ -535,12 +539,13 @@ class Connection(base.Connection):
try:
logbooks = self._tables.logbooks
with contextlib.closing(self._engine.connect()) as conn:
- q = (sql.select([logbooks]).
+ q = (sql.select(logbooks).
where(logbooks.c.uuid == book_uuid))
row = conn.execute(q).first()
if not row:
raise exc.NotFound("No logbook found with"
" uuid '%s'" % book_uuid)
+ row = row._mapping
book = self._converter.convert_book(row)
if not lazy:
self._converter.populate_book(conn, book)
@@ -553,8 +558,9 @@ class Connection(base.Connection):
gathered = []
try:
with contextlib.closing(self._engine.connect()) as conn:
- q = sql.select([self._tables.logbooks])
+ q = sql.select(self._tables.logbooks)
for row in conn.execute(q):
+ row = row._mapping
book = self._converter.convert_book(row)
if not lazy:
self._converter.populate_book(conn, book)
@@ -584,12 +590,13 @@ class Connection(base.Connection):
try:
flowdetails = self._tables.flowdetails
with self._engine.begin() as conn:
- q = (sql.select([flowdetails]).
+ q = (sql.select(flowdetails).
where(flowdetails.c.uuid == fd_uuid))
row = conn.execute(q).first()
if not row:
raise exc.NotFound("No flow details found with uuid"
" '%s'" % fd_uuid)
+ row = row._mapping
fd = self._converter.convert_flow_detail(row)
if not lazy:
self._converter.populate_flow_detail(conn, fd)
@@ -603,12 +610,13 @@ class Connection(base.Connection):
try:
atomdetails = self._tables.atomdetails
with self._engine.begin() as conn:
- q = (sql.select([atomdetails]).
+ q = (sql.select(atomdetails).
where(atomdetails.c.uuid == ad_uuid))
row = conn.execute(q).first()
if not row:
raise exc.NotFound("No atom details found with uuid"
" '%s'" % ad_uuid)
+ row = row._mapping
return self._converter.convert_atom_detail(row)
except sa_exc.SQLAlchemyError:
exc.raise_with_cause(exc.StorageFailure,
diff --git a/taskflow/persistence/backends/impl_zookeeper.py b/taskflow/persistence/backends/impl_zookeeper.py
index 1189723..e75826e 100644
--- a/taskflow/persistence/backends/impl_zookeeper.py
+++ b/taskflow/persistence/backends/impl_zookeeper.py
@@ -20,6 +20,7 @@ import contextlib
from kazoo import exceptions as k_exc
from kazoo.protocol import paths
from oslo_serialization import jsonutils
+from oslo_utils import strutils
from taskflow import exceptions as exc
from taskflow.persistence import path_based
@@ -161,7 +162,8 @@ class ZkConnection(path_based.PathBasedConnection):
def validate(self):
with self._exc_wrapper():
try:
- if self._conf.get('check_compatible', True):
+ if strutils.bool_from_string(
+ self._conf.get('check_compatible'), default=True):
k_utils.check_compatible(self._client, MIN_ZK_VERSION)
except exc.IncompatibleVersion:
exc.raise_with_cause(exc.StorageFailure, "Backend storage is"
diff --git a/taskflow/tests/unit/jobs/test_zk_job.py b/taskflow/tests/unit/jobs/test_zk_job.py
index d93f148..23d0505 100644
--- a/taskflow/tests/unit/jobs/test_zk_job.py
+++ b/taskflow/tests/unit/jobs/test_zk_job.py
@@ -293,3 +293,47 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
self.assertRaises(excp.NotImplementedError,
self.board.register_entity,
entity_instance_2)
+
+ def test_connect_check_compatible(self):
+ # Valid version
+ client = fake_client.FakeClient()
+ board = impl_zookeeper.ZookeeperJobBoard(
+ 'test-board', {'check_compatible': True},
+ client=client)
+ self.addCleanup(board.close)
+ self.addCleanup(self.close_client, client)
+
+ with base.connect_close(board):
+ pass
+
+ # Invalid version, no check
+ client = fake_client.FakeClient(server_version=(3, 2, 0))
+ board = impl_zookeeper.ZookeeperJobBoard(
+ 'test-board', {'check_compatible': False},
+ client=client)
+ self.addCleanup(board.close)
+ self.addCleanup(self.close_client, client)
+
+ with base.connect_close(board):
+ pass
+
+ # Invalid version, check_compatible=True
+ client = fake_client.FakeClient(server_version=(3, 2, 0))
+ board = impl_zookeeper.ZookeeperJobBoard(
+ 'test-board', {'check_compatible': True},
+ client=client)
+ self.addCleanup(board.close)
+ self.addCleanup(self.close_client, client)
+
+ self.assertRaises(excp.IncompatibleVersion, board.connect)
+
+ # Invalid version, check_compatible='False'
+ client = fake_client.FakeClient(server_version=(3, 2, 0))
+ board = impl_zookeeper.ZookeeperJobBoard(
+ 'test-board', {'check_compatible': 'False'},
+ client=client)
+ self.addCleanup(board.close)
+ self.addCleanup(self.close_client, client)
+
+ with base.connect_close(board):
+ pass
diff --git a/taskflow/tests/unit/test_utils_kazoo_utils.py b/taskflow/tests/unit/test_utils_kazoo_utils.py
new file mode 100644
index 0000000..a28b3e8
--- /dev/null
+++ b/taskflow/tests/unit/test_utils_kazoo_utils.py
@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) Red Hat
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+
+from taskflow import test
+from taskflow.utils import kazoo_utils
+
+
+class MakeClientTest(test.TestCase):
+
+ @mock.patch("kazoo.client.KazooClient")
+ def test_make_client_config(self, mock_kazoo_client):
+ conf = {}
+ expected = {
+ 'hosts': 'localhost:2181',
+ 'logger': mock.ANY,
+ 'read_only': False,
+ 'randomize_hosts': False,
+ 'keyfile': None,
+ 'keyfile_password': None,
+ 'certfile': None,
+ 'use_ssl': False,
+ 'verify_certs': True
+ }
+
+ kazoo_utils.make_client(conf)
+
+ mock_kazoo_client.assert_called_once_with(**expected)
+
+ mock_kazoo_client.reset_mock()
+
+ # With boolean passed as strings
+ conf = {
+ 'use_ssl': 'True',
+ 'verify_certs': 'False'
+ }
+ expected = {
+ 'hosts': 'localhost:2181',
+ 'logger': mock.ANY,
+ 'read_only': False,
+ 'randomize_hosts': False,
+ 'keyfile': None,
+ 'keyfile_password': None,
+ 'certfile': None,
+ 'use_ssl': True,
+ 'verify_certs': False
+ }
+
+ kazoo_utils.make_client(conf)
+
+ mock_kazoo_client.assert_called_once_with(**expected)
+
+ mock_kazoo_client.reset_mock()
diff --git a/taskflow/utils/kazoo_utils.py b/taskflow/utils/kazoo_utils.py
index 505c101..2b9f7b9 100644
--- a/taskflow/utils/kazoo_utils.py
+++ b/taskflow/utils/kazoo_utils.py
@@ -17,6 +17,7 @@
from kazoo import client
from kazoo import exceptions as k_exc
from oslo_utils import reflection
+from oslo_utils import strutils
from taskflow import exceptions as exc
from taskflow import logging
@@ -24,6 +25,15 @@ from taskflow import logging
LOG = logging.getLogger(__name__)
+CONF_TRANSFERS = (
+ ('read_only', strutils.bool_from_string, False),
+ ('randomize_hosts', strutils.bool_from_string, False),
+ ('keyfile', None, None),
+ ('keyfile_password', None, None),
+ ('certfile', None, None),
+ ('use_ssl', strutils.bool_from_string, False),
+ ('verify_certs', strutils.bool_from_string, True))
+
def _parse_hosts(hosts):
if isinstance(hosts, str):
@@ -193,16 +203,19 @@ def make_client(conf):
"""
# See: https://kazoo.readthedocs.io/en/latest/api/client.html
client_kwargs = {
- 'read_only': bool(conf.get('read_only')),
- 'randomize_hosts': bool(conf.get('randomize_hosts')),
'logger': LOG,
- 'keyfile': conf.get('keyfile', None),
- 'keyfile_password': conf.get('keyfile_password', None),
- 'certfile': conf.get('certfile', None),
- 'use_ssl': conf.get('use_ssl', False),
- 'verify_certs': conf.get('verify_certs', True),
-
}
+
+ for key, value_type_converter, default in CONF_TRANSFERS:
+ if key in conf:
+ if value_type_converter is not None:
+ client_kwargs[key] = value_type_converter(conf[key],
+ default=default)
+ else:
+ client_kwargs[key] = conf[key]
+ else:
+ client_kwargs[key] = default
+
# See: https://kazoo.readthedocs.io/en/latest/api/retry.html
if 'command_retry' in conf:
client_kwargs['command_retry'] = conf['command_retry']