diff options
-rw-r--r-- | doc/source/conf.py | 4 | ||||
-rw-r--r-- | releasenotes/notes/fix-zookeeper-option-parsing-f9d37fbc39af47f4.yaml | 7 | ||||
-rw-r--r-- | setup.cfg | 1 | ||||
-rw-r--r-- | taskflow/jobs/backends/impl_zookeeper.py | 4 | ||||
-rw-r--r-- | taskflow/persistence/backends/impl_sqlalchemy.py | 34 | ||||
-rw-r--r-- | taskflow/persistence/backends/impl_zookeeper.py | 4 | ||||
-rw-r--r-- | taskflow/tests/unit/jobs/test_zk_job.py | 44 | ||||
-rw-r--r-- | taskflow/tests/unit/test_utils_kazoo_utils.py | 67 | ||||
-rw-r--r-- | taskflow/utils/kazoo_utils.py | 29 |
9 files changed, 169 insertions, 25 deletions
diff --git a/doc/source/conf.py b/doc/source/conf.py index b46a5a8..d1006bb 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -68,8 +68,8 @@ modindex_common_prefix = ['taskflow.'] # Shortened external links. source_tree = 'https://opendev.org/openstack/taskflow/src/branch/master/' extlinks = { - 'example': (source_tree + '/taskflow/examples/%s.py', ''), - 'pybug': ('http://bugs.python.org/issue%s', ''), + 'example': (source_tree + '/taskflow/examples/%s.py', '%s'), + 'pybug': ('http://bugs.python.org/issue%s', '%s'), } diff --git a/releasenotes/notes/fix-zookeeper-option-parsing-f9d37fbc39af47f4.yaml b/releasenotes/notes/fix-zookeeper-option-parsing-f9d37fbc39af47f4.yaml new file mode 100644 index 0000000..3f81b29 --- /dev/null +++ b/releasenotes/notes/fix-zookeeper-option-parsing-f9d37fbc39af47f4.yaml @@ -0,0 +1,7 @@ +--- +fixes: + - | + Fixed an issue when the configuration options of the zookeeper jobboard + backend were passed as strings, the string ''"False"'' was wrongly + interpreted as ''True''. Now the string ''"False"'' is interpreted as the + ''False'' boolean. @@ -19,6 +19,7 @@ classifier = Programming Language :: Python :: 3 Programming Language :: Python :: 3.8 Programming Language :: Python :: 3.9 + Programming Language :: Python :: 3.10 Programming Language :: Python :: 3 :: Only Programming Language :: Python :: Implementation :: CPython Topic :: Software Development :: Libraries diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index fc9399a..1e45f0f 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -28,6 +28,7 @@ from kazoo.protocol import states as k_states from kazoo.recipe import watchers from oslo_serialization import jsonutils from oslo_utils import excutils +from oslo_utils import strutils from oslo_utils import timeutils from oslo_utils import uuidutils @@ -829,7 +830,8 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): excp.raise_with_cause(excp.JobFailure, "Failed to connect to zookeeper") try: - if self._conf.get('check_compatible', True): + if strutils.bool_from_string( + self._conf.get('check_compatible'), default=True): kazoo_utils.check_compatible(self._client, self.MIN_ZK_VERSION) if self._worker is None and self._emit_notifications: self._worker = futurist.ThreadPoolExecutor(max_workers=1) diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index 3ac0f3d..742403b 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -205,15 +205,17 @@ class _Alchemist(object): return atom_cls.from_dict(row) def atom_query_iter(self, conn, parent_uuid): - q = (sql.select([self._tables.atomdetails]). + q = (sql.select(self._tables.atomdetails). where(self._tables.atomdetails.c.parent_uuid == parent_uuid)) for row in conn.execute(q): + row = row._mapping yield self.convert_atom_detail(row) def flow_query_iter(self, conn, parent_uuid): - q = (sql.select([self._tables.flowdetails]). + q = (sql.select(self._tables.flowdetails). where(self._tables.flowdetails.c.parent_uuid == parent_uuid)) for row in conn.execute(q): + row = row._mapping yield self.convert_flow_detail(row) def populate_book(self, conn, book): @@ -257,7 +259,6 @@ class SQLAlchemyBackend(base.Backend): conf = copy.deepcopy(conf) engine_args = { 'echo': _as_bool(conf.pop('echo', False)), - 'convert_unicode': _as_bool(conf.pop('convert_unicode', True)), 'pool_recycle': 3600, } if 'idle_timeout' in conf: @@ -421,12 +422,13 @@ class Connection(base.Connection): try: atomdetails = self._tables.atomdetails with self._engine.begin() as conn: - q = (sql.select([atomdetails]). + q = (sql.select(atomdetails). where(atomdetails.c.uuid == atom_detail.uuid)) row = conn.execute(q).first() if not row: raise exc.NotFound("No atom details found with uuid" " '%s'" % atom_detail.uuid) + row = row._mapping e_ad = self._converter.convert_atom_detail(row) self._update_atom_details(conn, atom_detail, e_ad) return e_ad @@ -438,7 +440,7 @@ class Connection(base.Connection): def _insert_flow_details(self, conn, fd, parent_uuid): value = fd.to_dict() value['parent_uuid'] = parent_uuid - conn.execute(sql.insert(self._tables.flowdetails, value)) + conn.execute(sql.insert(self._tables.flowdetails).values(**value)) for ad in fd: self._insert_atom_details(conn, ad, fd.uuid) @@ -446,7 +448,7 @@ class Connection(base.Connection): value = ad.to_dict() value['parent_uuid'] = parent_uuid value['atom_type'] = models.atom_detail_type(ad) - conn.execute(sql.insert(self._tables.atomdetails, value)) + conn.execute(sql.insert(self._tables.atomdetails).values(**value)) def _update_atom_details(self, conn, ad, e_ad): e_ad.merge(ad) @@ -471,12 +473,13 @@ class Connection(base.Connection): try: flowdetails = self._tables.flowdetails with self._engine.begin() as conn: - q = (sql.select([flowdetails]). + q = (sql.select(flowdetails). where(flowdetails.c.uuid == flow_detail.uuid)) row = conn.execute(q).first() if not row: raise exc.NotFound("No flow details found with" " uuid '%s'" % flow_detail.uuid) + row = row._mapping e_fd = self._converter.convert_flow_detail(row) self._converter.populate_flow_detail(conn, e_fd) self._update_flow_details(conn, flow_detail, e_fd) @@ -503,10 +506,11 @@ class Connection(base.Connection): try: logbooks = self._tables.logbooks with self._engine.begin() as conn: - q = (sql.select([logbooks]). + q = (sql.select(logbooks). where(logbooks.c.uuid == book.uuid)) row = conn.execute(q).first() if row: + row = row._mapping e_lb = self._converter.convert_book(row) self._converter.populate_book(conn, e_lb) e_lb.merge(book) @@ -522,7 +526,7 @@ class Connection(base.Connection): self._update_flow_details(conn, fd, e_fd) return e_lb else: - conn.execute(sql.insert(logbooks, book.to_dict())) + conn.execute(sql.insert(logbooks).values(**book.to_dict())) for fd in book: self._insert_flow_details(conn, fd, book.uuid) return book @@ -535,12 +539,13 @@ class Connection(base.Connection): try: logbooks = self._tables.logbooks with contextlib.closing(self._engine.connect()) as conn: - q = (sql.select([logbooks]). + q = (sql.select(logbooks). where(logbooks.c.uuid == book_uuid)) row = conn.execute(q).first() if not row: raise exc.NotFound("No logbook found with" " uuid '%s'" % book_uuid) + row = row._mapping book = self._converter.convert_book(row) if not lazy: self._converter.populate_book(conn, book) @@ -553,8 +558,9 @@ class Connection(base.Connection): gathered = [] try: with contextlib.closing(self._engine.connect()) as conn: - q = sql.select([self._tables.logbooks]) + q = sql.select(self._tables.logbooks) for row in conn.execute(q): + row = row._mapping book = self._converter.convert_book(row) if not lazy: self._converter.populate_book(conn, book) @@ -584,12 +590,13 @@ class Connection(base.Connection): try: flowdetails = self._tables.flowdetails with self._engine.begin() as conn: - q = (sql.select([flowdetails]). + q = (sql.select(flowdetails). where(flowdetails.c.uuid == fd_uuid)) row = conn.execute(q).first() if not row: raise exc.NotFound("No flow details found with uuid" " '%s'" % fd_uuid) + row = row._mapping fd = self._converter.convert_flow_detail(row) if not lazy: self._converter.populate_flow_detail(conn, fd) @@ -603,12 +610,13 @@ class Connection(base.Connection): try: atomdetails = self._tables.atomdetails with self._engine.begin() as conn: - q = (sql.select([atomdetails]). + q = (sql.select(atomdetails). where(atomdetails.c.uuid == ad_uuid)) row = conn.execute(q).first() if not row: raise exc.NotFound("No atom details found with uuid" " '%s'" % ad_uuid) + row = row._mapping return self._converter.convert_atom_detail(row) except sa_exc.SQLAlchemyError: exc.raise_with_cause(exc.StorageFailure, diff --git a/taskflow/persistence/backends/impl_zookeeper.py b/taskflow/persistence/backends/impl_zookeeper.py index 1189723..e75826e 100644 --- a/taskflow/persistence/backends/impl_zookeeper.py +++ b/taskflow/persistence/backends/impl_zookeeper.py @@ -20,6 +20,7 @@ import contextlib from kazoo import exceptions as k_exc from kazoo.protocol import paths from oslo_serialization import jsonutils +from oslo_utils import strutils from taskflow import exceptions as exc from taskflow.persistence import path_based @@ -161,7 +162,8 @@ class ZkConnection(path_based.PathBasedConnection): def validate(self): with self._exc_wrapper(): try: - if self._conf.get('check_compatible', True): + if strutils.bool_from_string( + self._conf.get('check_compatible'), default=True): k_utils.check_compatible(self._client, MIN_ZK_VERSION) except exc.IncompatibleVersion: exc.raise_with_cause(exc.StorageFailure, "Backend storage is" diff --git a/taskflow/tests/unit/jobs/test_zk_job.py b/taskflow/tests/unit/jobs/test_zk_job.py index d93f148..23d0505 100644 --- a/taskflow/tests/unit/jobs/test_zk_job.py +++ b/taskflow/tests/unit/jobs/test_zk_job.py @@ -293,3 +293,47 @@ class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin): self.assertRaises(excp.NotImplementedError, self.board.register_entity, entity_instance_2) + + def test_connect_check_compatible(self): + # Valid version + client = fake_client.FakeClient() + board = impl_zookeeper.ZookeeperJobBoard( + 'test-board', {'check_compatible': True}, + client=client) + self.addCleanup(board.close) + self.addCleanup(self.close_client, client) + + with base.connect_close(board): + pass + + # Invalid version, no check + client = fake_client.FakeClient(server_version=(3, 2, 0)) + board = impl_zookeeper.ZookeeperJobBoard( + 'test-board', {'check_compatible': False}, + client=client) + self.addCleanup(board.close) + self.addCleanup(self.close_client, client) + + with base.connect_close(board): + pass + + # Invalid version, check_compatible=True + client = fake_client.FakeClient(server_version=(3, 2, 0)) + board = impl_zookeeper.ZookeeperJobBoard( + 'test-board', {'check_compatible': True}, + client=client) + self.addCleanup(board.close) + self.addCleanup(self.close_client, client) + + self.assertRaises(excp.IncompatibleVersion, board.connect) + + # Invalid version, check_compatible='False' + client = fake_client.FakeClient(server_version=(3, 2, 0)) + board = impl_zookeeper.ZookeeperJobBoard( + 'test-board', {'check_compatible': 'False'}, + client=client) + self.addCleanup(board.close) + self.addCleanup(self.close_client, client) + + with base.connect_close(board): + pass diff --git a/taskflow/tests/unit/test_utils_kazoo_utils.py b/taskflow/tests/unit/test_utils_kazoo_utils.py new file mode 100644 index 0000000..a28b3e8 --- /dev/null +++ b/taskflow/tests/unit/test_utils_kazoo_utils.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from unittest import mock + +from taskflow import test +from taskflow.utils import kazoo_utils + + +class MakeClientTest(test.TestCase): + + @mock.patch("kazoo.client.KazooClient") + def test_make_client_config(self, mock_kazoo_client): + conf = {} + expected = { + 'hosts': 'localhost:2181', + 'logger': mock.ANY, + 'read_only': False, + 'randomize_hosts': False, + 'keyfile': None, + 'keyfile_password': None, + 'certfile': None, + 'use_ssl': False, + 'verify_certs': True + } + + kazoo_utils.make_client(conf) + + mock_kazoo_client.assert_called_once_with(**expected) + + mock_kazoo_client.reset_mock() + + # With boolean passed as strings + conf = { + 'use_ssl': 'True', + 'verify_certs': 'False' + } + expected = { + 'hosts': 'localhost:2181', + 'logger': mock.ANY, + 'read_only': False, + 'randomize_hosts': False, + 'keyfile': None, + 'keyfile_password': None, + 'certfile': None, + 'use_ssl': True, + 'verify_certs': False + } + + kazoo_utils.make_client(conf) + + mock_kazoo_client.assert_called_once_with(**expected) + + mock_kazoo_client.reset_mock() diff --git a/taskflow/utils/kazoo_utils.py b/taskflow/utils/kazoo_utils.py index 505c101..2b9f7b9 100644 --- a/taskflow/utils/kazoo_utils.py +++ b/taskflow/utils/kazoo_utils.py @@ -17,6 +17,7 @@ from kazoo import client from kazoo import exceptions as k_exc from oslo_utils import reflection +from oslo_utils import strutils from taskflow import exceptions as exc from taskflow import logging @@ -24,6 +25,15 @@ from taskflow import logging LOG = logging.getLogger(__name__) +CONF_TRANSFERS = ( + ('read_only', strutils.bool_from_string, False), + ('randomize_hosts', strutils.bool_from_string, False), + ('keyfile', None, None), + ('keyfile_password', None, None), + ('certfile', None, None), + ('use_ssl', strutils.bool_from_string, False), + ('verify_certs', strutils.bool_from_string, True)) + def _parse_hosts(hosts): if isinstance(hosts, str): @@ -193,16 +203,19 @@ def make_client(conf): """ # See: https://kazoo.readthedocs.io/en/latest/api/client.html client_kwargs = { - 'read_only': bool(conf.get('read_only')), - 'randomize_hosts': bool(conf.get('randomize_hosts')), 'logger': LOG, - 'keyfile': conf.get('keyfile', None), - 'keyfile_password': conf.get('keyfile_password', None), - 'certfile': conf.get('certfile', None), - 'use_ssl': conf.get('use_ssl', False), - 'verify_certs': conf.get('verify_certs', True), - } + + for key, value_type_converter, default in CONF_TRANSFERS: + if key in conf: + if value_type_converter is not None: + client_kwargs[key] = value_type_converter(conf[key], + default=default) + else: + client_kwargs[key] = conf[key] + else: + client_kwargs[key] = default + # See: https://kazoo.readthedocs.io/en/latest/api/retry.html if 'command_retry' in conf: client_kwargs['command_retry'] = conf['command_retry'] |