diff options
-rw-r--r-- | requirements.txt | 10 | ||||
-rw-r--r-- | taskflow/conductors/backends/impl_blocking.py | 11 | ||||
-rw-r--r-- | taskflow/engines/action_engine/engine.py | 23 | ||||
-rw-r--r-- | taskflow/examples/dump_memory_backend.py | 2 | ||||
-rw-r--r-- | taskflow/examples/job_board_no_test.py | 171 | ||||
-rw-r--r-- | taskflow/persistence/backends/impl_dir.py | 23 | ||||
-rw-r--r-- | taskflow/persistence/backends/impl_memory.py | 61 | ||||
-rw-r--r-- | taskflow/persistence/logbook.py | 8 | ||||
-rw-r--r-- | taskflow/retry.py | 31 | ||||
-rw-r--r-- | taskflow/storage.py | 33 | ||||
-rw-r--r-- | taskflow/tests/test_examples.py | 2 | ||||
-rw-r--r-- | taskflow/tests/unit/persistence/test_dir_persistence.py | 61 | ||||
-rw-r--r-- | taskflow/tests/unit/persistence/test_memory_persistence.py | 60 | ||||
-rw-r--r-- | taskflow/tests/unit/persistence/test_sql_persistence.py | 111 | ||||
-rw-r--r-- | taskflow/tests/unit/test_engines.py | 54 | ||||
-rw-r--r-- | taskflow/tests/unit/test_retries.py | 251 | ||||
-rw-r--r-- | taskflow/tests/unit/worker_based/test_worker.py | 2 | ||||
-rw-r--r-- | taskflow/tests/utils.py | 7 | ||||
-rw-r--r-- | taskflow/utils/misc.py | 32 | ||||
-rw-r--r-- | test-requirements.txt | 2 |
20 files changed, 618 insertions, 337 deletions
diff --git a/requirements.txt b/requirements.txt index e9d0550..f85b92d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,18 +22,24 @@ fasteners>=0.7 # Apache-2.0 # Very nice graph library networkx>=1.8 +# For contextlib new additions/compatibility for <= python 3.3 +contextlib2>=0.4.0 # PSF License + # Used for backend storage engine loading. -stevedore>=1.3.0 # Apache-2.0 +stevedore>=1.5.0 # Apache-2.0 # Backport for concurrent.futures which exists in 3.2+ futures>=3.0 # Used for structured input validation -jsonschema>=2.0.0,<3.0.0 +jsonschema>=2.0.0,<3.0.0,!=2.5.0 # For common utilities oslo.utils>=1.4.0 # Apache-2.0 oslo.serialization>=1.4.0 # Apache-2.0 +# For lru caches and such +cachetools>=1.0.0 # MIT License + # For deprecation of things debtcollector>=0.3.0 # Apache-2.0 diff --git a/taskflow/conductors/backends/impl_blocking.py b/taskflow/conductors/backends/impl_blocking.py index 5156da1..fb8a3c3 100644 --- a/taskflow/conductors/backends/impl_blocking.py +++ b/taskflow/conductors/backends/impl_blocking.py @@ -12,6 +12,11 @@ # License for the specific language governing permissions and limitations # under the License. +try: + from contextlib import ExitStack # noqa +except ImportError: + from contextlib2 import ExitStack # noqa + from debtcollector import removals import six @@ -21,7 +26,6 @@ from taskflow.listeners import logging as logging_listener from taskflow import logging from taskflow.types import timing as tt from taskflow.utils import async_utils -from taskflow.utils import misc from taskflow.utils import threading_utils LOG = logging.getLogger(__name__) @@ -97,8 +101,9 @@ class BlockingConductor(base.Conductor): def _dispatch_job(self, job): engine = self._engine_from_job(job) listeners = self._listeners_from_job(job, engine) - with misc.ListenerStack(LOG) as stack: - stack.register(listeners) + with ExitStack() as stack: + for listener in listeners: + stack.enter_context(listener) LOG.debug("Dispatching engine %s for job: %s", engine, job) consume = True try: diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 716279e..124b8a5 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -82,10 +82,18 @@ class ActionEngine(base.Engine): self._state_lock = threading.RLock() self._storage_ensured = False + def _check(self, name, check_compiled, check_storage_ensured): + """Check (and raise) if the engine has not reached a certain stage.""" + if check_compiled and not self._compiled: + raise exc.InvalidState("Can not %s an engine which" + " has not been compiled" % name) + if check_storage_ensured and not self._storage_ensured: + raise exc.InvalidState("Can not %s an engine" + " which has not has its storage" + " populated" % name) + def suspend(self): - if not self._compiled: - raise exc.InvalidState("Can not suspend an engine" - " which has not been compiled") + self._check('suspend', True, False) self._change_state(states.SUSPENDING) @property @@ -216,10 +224,7 @@ class ActionEngine(base.Engine): @fasteners.locked def validate(self): - if not self._storage_ensured: - raise exc.InvalidState("Can not validate an engine" - " which has not has its storage" - " populated") + self._check('validate', True, True) # At this point we can check to ensure all dependencies are either # flow/task provided or storage provided, if there are still missing # dependencies then this flow will fail at runtime (which we can avoid @@ -263,9 +268,7 @@ class ActionEngine(base.Engine): @fasteners.locked def prepare(self): - if not self._compiled: - raise exc.InvalidState("Can not prepare an engine" - " which has not been compiled") + self._check('prepare', True, False) if not self._storage_ensured: # Set our own state to resuming -> (ensure atoms exist # in storage) -> suspended in the storage unit and notify any diff --git a/taskflow/examples/dump_memory_backend.py b/taskflow/examples/dump_memory_backend.py index 2e3aee7..6c6d548 100644 --- a/taskflow/examples/dump_memory_backend.py +++ b/taskflow/examples/dump_memory_backend.py @@ -70,7 +70,7 @@ e.run() print("---------") print("After run") print("---------") -for path in backend.memory.ls(backend.memory.root_path, recursive=True): +for path in backend.memory.ls_r(backend.memory.root_path, absolute=True): value = backend.memory[path] if value: print("%s -> %s" % (path, value)) diff --git a/taskflow/examples/job_board_no_test.py b/taskflow/examples/job_board_no_test.py deleted file mode 100644 index d37c96a..0000000 --- a/taskflow/examples/job_board_no_test.py +++ /dev/null @@ -1,171 +0,0 @@ -# -*- encoding: utf-8 -*- -# -# Copyright © 2013 eNovance <licensing@enovance.com> -# -# Authors: Dan Krause <dan@dankrause.net> -# Cyril Roelandt <cyril.roelandt@enovance.com> -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# This example shows how to use the job board feature. -# -# Let's start by creating some jobs: -# $ python job_board_no_test.py create my-board my-job '{}' -# $ python job_board_no_test.py create my-board my-job '{"foo": "bar"}' -# $ python job_board_no_test.py create my-board my-job '{"foo": "baz"}' -# $ python job_board_no_test.py create my-board my-job '{"foo": "barbaz"}' -# -# Make sure they were registered: -# $ python job_board_no_test.py list my-board -# 7277181a-1f83-473d-8233-f361615bae9e - {} -# 84a396e8-d02e-450d-8566-d93cb68550c0 - {u'foo': u'bar'} -# 4d355d6a-2c72-44a2-a558-19ae52e8ae2c - {u'foo': u'baz'} -# cd9aae2c-fd64-416d-8ba0-426fa8e3d59c - {u'foo': u'barbaz'} -# -# Perform one job: -# $ python job_board_no_test.py consume my-board \ -# 84a396e8-d02e-450d-8566-d93cb68550c0 -# Performing job 84a396e8-d02e-450d-8566-d93cb68550c0 with args \ -# {u'foo': u'bar'} -# $ python job_board_no_test.py list my-board -# 7277181a-1f83-473d-8233-f361615bae9e - {} -# 4d355d6a-2c72-44a2-a558-19ae52e8ae2c - {u'foo': u'baz'} -# cd9aae2c-fd64-416d-8ba0-426fa8e3d59c - {u'foo': u'barbaz'} -# -# Delete a job: -# $ python job_board_no_test.py delete my-board \ -# cd9aae2c-fd64-416d-8ba0-426fa8e3d59c -# $ python job_board_no_test.py list my-board -# 7277181a-1f83-473d-8233-f361615bae9e - {} -# 4d355d6a-2c72-44a2-a558-19ae52e8ae2c - {u'foo': u'baz'} -# -# Delete all the remaining jobs -# $ python job_board_no_test.py clear my-board -# $ python job_board_no_test.py list my-board -# $ - -import argparse -import contextlib -import json -import os -import sys -import tempfile - -import taskflow.jobs.backends as job_backends -from taskflow.persistence import logbook - -import example_utils # noqa - - -@contextlib.contextmanager -def jobboard(*args, **kwargs): - jb = job_backends.fetch(*args, **kwargs) - jb.connect() - yield jb - jb.close() - - -conf = { - 'board': 'zookeeper', - 'hosts': ['127.0.0.1:2181'] -} - - -def consume_job(args): - def perform_job(job): - print("Performing job %s with args %s" % (job.uuid, job.details)) - - with jobboard(args.board_name, conf) as jb: - for job in jb.iterjobs(ensure_fresh=True): - if job.uuid == args.job_uuid: - jb.claim(job, "test-client") - perform_job(job) - jb.consume(job, "test-client") - - -def clear_jobs(args): - with jobboard(args.board_name, conf) as jb: - for job in jb.iterjobs(ensure_fresh=True): - jb.claim(job, "test-client") - jb.consume(job, "test-client") - - -def create_job(args): - store = json.loads(args.details) - book = logbook.LogBook(args.job_name) - if example_utils.SQLALCHEMY_AVAILABLE: - persist_path = os.path.join(tempfile.gettempdir(), "persisting.db") - backend_uri = "sqlite:///%s" % (persist_path) - else: - persist_path = os.path.join(tempfile.gettempdir(), "persisting") - backend_uri = "file:///%s" % (persist_path) - with example_utils.get_backend(backend_uri) as backend: - backend.get_connection().save_logbook(book) - with jobboard(args.board_name, conf, persistence=backend) as jb: - jb.post(args.job_name, book, details=store) - - -def list_jobs(args): - with jobboard(args.board_name, conf) as jb: - for job in jb.iterjobs(ensure_fresh=True): - print("%s - %s" % (job.uuid, job.details)) - - -def delete_job(args): - with jobboard(args.board_name, conf) as jb: - for job in jb.iterjobs(ensure_fresh=True): - if job.uuid == args.job_uuid: - jb.claim(job, "test-client") - jb.consume(job, "test-client") - - -def main(argv): - parser = argparse.ArgumentParser() - subparsers = parser.add_subparsers(title='subcommands', - description='valid subcommands', - help='additional help') - - # Consume command - parser_consume = subparsers.add_parser('consume') - parser_consume.add_argument('board_name') - parser_consume.add_argument('job_uuid') - parser_consume.set_defaults(func=consume_job) - - # Clear command - parser_consume = subparsers.add_parser('clear') - parser_consume.add_argument('board_name') - parser_consume.set_defaults(func=clear_jobs) - - # Create command - parser_create = subparsers.add_parser('create') - parser_create.add_argument('board_name') - parser_create.add_argument('job_name') - parser_create.add_argument('details') - parser_create.set_defaults(func=create_job) - - # Delete command - parser_delete = subparsers.add_parser('delete') - parser_delete.add_argument('board_name') - parser_delete.add_argument('job_uuid') - parser_delete.set_defaults(func=delete_job) - - # List command - parser_list = subparsers.add_parser('list') - parser_list.add_argument('board_name') - parser_list.set_defaults(func=list_jobs) - - args = parser.parse_args(argv) - args.func(args) - -if __name__ == '__main__': - main(sys.argv[1:]) diff --git a/taskflow/persistence/backends/impl_dir.py b/taskflow/persistence/backends/impl_dir.py index 940b9c4..1047d67 100644 --- a/taskflow/persistence/backends/impl_dir.py +++ b/taskflow/persistence/backends/impl_dir.py @@ -20,6 +20,7 @@ import errno import os import shutil +import cachetools import fasteners from oslo_serialization import jsonutils @@ -54,13 +55,29 @@ class DirBackend(path_based.PathBasedBackend): Example configuration:: conf = { - "path": "/tmp/taskflow", + "path": "/tmp/taskflow", # save data to this root directory + "max_cache_size": 1024, # keep up-to 1024 entries in memory } """ + + DEFAULT_FILE_ENCODING = 'utf-8' + """ + Default encoding used when decoding or encoding files into or from + text/unicode into binary or binary into text/unicode. + """ + def __init__(self, conf): super(DirBackend, self).__init__(conf) - self.file_cache = {} - self.encoding = self._conf.get('encoding', 'utf-8') + max_cache_size = self._conf.get('max_cache_size') + if max_cache_size is not None: + max_cache_size = int(max_cache_size) + if max_cache_size < 1: + raise ValueError("Maximum cache size must be greater than" + " or equal to one") + self.file_cache = cachetools.LRUCache(max_cache_size) + else: + self.file_cache = {} + self.encoding = self._conf.get('encoding', self.DEFAULT_FILE_ENCODING) if not self._path: raise ValueError("Empty path is disallowed") self._path = os.path.abspath(self._path) diff --git a/taskflow/persistence/backends/impl_memory.py b/taskflow/persistence/backends/impl_memory.py index 0aac58e..43207b8 100644 --- a/taskflow/persistence/backends/impl_memory.py +++ b/taskflow/persistence/backends/impl_memory.py @@ -20,6 +20,7 @@ import copy import itertools import posixpath as pp +from debtcollector import removals import fasteners import six @@ -166,13 +167,63 @@ class FakeFilesystem(object): else: return self._copier(node.metadata['value']) + def _up_to_root_selector(self, root_node, child_node): + # Build the path from the child to the root and stop at the + # root, and then form a path string... + path_pieces = [child_node.item] + for parent_node in child_node.path_iter(include_self=False): + if parent_node is root_node: + break + path_pieces.append(parent_node.item) + if len(path_pieces) > 1: + path_pieces.reverse() + return self.join(*path_pieces) + + @staticmethod + def _metadata_path_selector(root_node, child_node): + return child_node.metadata['path'] + + def ls_r(self, path, absolute=False): + """Return list of all children of the given path (recursively).""" + node = self._fetch_node(path) + if absolute: + selector_func = self._metadata_path_selector + else: + selector_func = self._up_to_root_selector + return [selector_func(node, child_node) + for child_node in node.bfs_iter()] + + @removals.removed_kwarg('recursive', version="0.11", removal_version="?") def ls(self, path, recursive=False): - """Return list of all children of the given path.""" - if not recursive: - return [node.item for node in self._fetch_node(path)] + """Return list of all children of the given path. + + NOTE(harlowja): if ``recursive`` is passed in as truthy then the + absolute path is **always** returned (not the relative path). If + ``recursive`` is left as the default or falsey then the + relative path is **always** returned. + + This is documented in bug `1458114`_ and the existing behavior is + being maintained, to get a recursive version that is absolute (or is + not absolute) it is recommended to use the :py:meth:`.ls_r` method + instead. + + .. deprecated:: 0.11 + + In a future release the ``recursive`` keyword argument will + be removed (so preferring and moving to the :py:meth:`.ls_r` should + occur earlier rather than later). + + .. _1458114: https://bugs.launchpad.net/taskflow/+bug/1458114 + """ + node = self._fetch_node(path) + if recursive: + selector_func = self._metadata_path_selector + child_node_it = node.bfs_iter() else: - node = self._fetch_node(path) - return [child.metadata['path'] for child in node.bfs_iter()] + selector_func = self._up_to_root_selector + child_node_it = iter(node) + return [selector_func(node, child_node) + for child_node in child_node_it] def clear(self): """Remove all nodes (except the root) from this filesystem.""" diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py index c7770fe..be254ea1 100644 --- a/taskflow/persistence/logbook.py +++ b/taskflow/persistence/logbook.py @@ -732,8 +732,8 @@ class RetryDetail(AtomDetail): """The last result that was produced.""" try: return self.results[-1][0] - except IndexError as e: - raise exc.NotFound("Last results not found", e) + except IndexError: + exc.raise_with_cause(exc.NotFound, "Last results not found") @property def last_failures(self): @@ -748,8 +748,8 @@ class RetryDetail(AtomDetail): """ try: return self.results[-1][1] - except IndexError as e: - raise exc.NotFound("Last failures not found", e) + except IndexError: + exc.raise_with_cause(exc.NotFound, "Last failures not found") def put(self, state, result): """Puts a result (acquired in the given state) into this detail. diff --git a/taskflow/retry.py b/taskflow/retry.py index b7135a9..3015c79 100644 --- a/taskflow/retry.py +++ b/taskflow/retry.py @@ -241,15 +241,20 @@ class Times(Retry): """Retries subflow given number of times. Returns attempt number.""" def __init__(self, attempts=1, name=None, provides=None, requires=None, - auto_extract=True, rebind=None): + auto_extract=True, rebind=None, revert_all=False): super(Times, self).__init__(name, provides, requires, auto_extract, rebind) self._attempts = attempts + if revert_all: + self._revert_action = REVERT_ALL + else: + self._revert_action = REVERT + def on_failure(self, history, *args, **kwargs): if len(history) < self._attempts: return RETRY - return REVERT + return self._revert_action def execute(self, history, *args, **kwargs): return len(history) + 1 @@ -258,6 +263,16 @@ class Times(Retry): class ForEachBase(Retry): """Base class for retries that iterate over a given collection.""" + def __init__(self, name=None, provides=None, requires=None, + auto_extract=True, rebind=None, revert_all=False): + super(ForEachBase, self).__init__(name, provides, requires, + auto_extract, rebind) + + if revert_all: + self._revert_action = REVERT_ALL + else: + self._revert_action = REVERT + def _get_next_value(self, values, history): # Fetches the next resolution result to try, removes overlapping # entries with what has already been tried and then returns the first @@ -272,7 +287,7 @@ class ForEachBase(Retry): try: self._get_next_value(values, history) except exc.NotFound: - return REVERT + return self._revert_action else: return RETRY @@ -285,9 +300,9 @@ class ForEach(ForEachBase): """ def __init__(self, values, name=None, provides=None, requires=None, - auto_extract=True, rebind=None): + auto_extract=True, rebind=None, revert_all=False): super(ForEach, self).__init__(name, provides, requires, - auto_extract, rebind) + auto_extract, rebind, revert_all) self._values = values def on_failure(self, history, *args, **kwargs): @@ -307,6 +322,12 @@ class ParameterizedForEach(ForEachBase): each try. """ + def __init__(self, name=None, provides=None, requires=None, + auto_extract=True, rebind=None, revert_all=False): + super(ParameterizedForEach, self).__init__(name, provides, requires, + auto_extract, rebind, + revert_all) + def on_failure(self, values, history, *args, **kwargs): return self._on_failure(values, history) diff --git a/taskflow/storage.py b/taskflow/storage.py index 7478cdd..6be4b26 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -94,7 +94,8 @@ def _item_from_single(provider, container, looking_for): try: return _item_from(container, provider.index) except _EXTRACTION_EXCEPTIONS: - raise exceptions.NotFound( + exceptions.raise_with_cause( + exceptions.NotFound, "Unable to find result %r, expected to be able to find it" " created by %s but was unable to perform successful" " extraction" % (looking_for, provider)) @@ -285,7 +286,8 @@ class Storage(object): try: ad = self._flowdetail.find(self._atom_name_to_uuid[atom_name]) except KeyError: - raise exceptions.NotFound("Unknown atom name: %s" % atom_name) + exceptions.raise_with_cause(exceptions.NotFound, + "Unknown atom name: %s" % atom_name) else: # TODO(harlowja): we need to figure out how to get away from doing # these kinds of type checks in general (since they likely mean @@ -458,10 +460,11 @@ class Storage(object): retry_name, expected_type=logbook.RetryDetail, clone=True) try: failures = clone.last_failures - except exceptions.NotFound as e: - raise exceptions.StorageFailure("Unable to fetch most recent" - " retry failures so new retry" - " failure can be inserted", e) + except exceptions.NotFound: + exceptions.raise_with_cause(exceptions.StorageFailure, + "Unable to fetch most recent retry" + " failures so new retry failure can" + " be inserted") else: if failed_atom_name not in failures: failures[failed_atom_name] = failure @@ -669,9 +672,9 @@ class Storage(object): try: providers = self._reverse_mapping[name] except KeyError: - raise exceptions.NotFound("Name %r is not mapped as a" - " produced output by any" - " providers" % name) + exceptions.raise_with_cause(exceptions.NotFound, + "Name %r is not mapped as a produced" + " output by any providers" % name) values = [] for provider in providers: if provider.name is _TRANSIENT_PROVIDER: @@ -813,11 +816,13 @@ class Storage(object): """Gets the results saved for a given provider.""" try: return self._get(provider.name, only_last=True) - except exceptions.NotFound as e: - raise exceptions.NotFound( - "Expected to be able to find output %r produced" - " by %s but was unable to get at that providers" - " results" % (looking_for, provider), e) + except exceptions.NotFound: + exceptions.raise_with_cause(exceptions.NotFound, + "Expected to be able to find" + " output %r produced by %s but was" + " unable to get at that providers" + " results" % (looking_for, + provider)) def _locate_providers(looking_for, possible_providers, scope_walker=None): diff --git a/taskflow/tests/test_examples.py b/taskflow/tests/test_examples.py index a7a297c..ce795dd 100644 --- a/taskflow/tests/test_examples.py +++ b/taskflow/tests/test_examples.py @@ -95,7 +95,7 @@ def iter_examples(): name, ext = os.path.splitext(filename) if ext != ".py": continue - if not any(name.endswith(i) for i in ("utils", "no_test")): + if not name.endswith('utils'): safe_name = safe_filename(name) if safe_name: yield name, safe_name diff --git a/taskflow/tests/unit/persistence/test_dir_persistence.py b/taskflow/tests/unit/persistence/test_dir_persistence.py index 8c1171c..7445145 100644 --- a/taskflow/tests/unit/persistence/test_dir_persistence.py +++ b/taskflow/tests/unit/persistence/test_dir_persistence.py @@ -19,37 +19,80 @@ import os import shutil import tempfile +from oslo_utils import uuidutils +import testscenarios + +from taskflow import exceptions as exc from taskflow.persistence import backends from taskflow.persistence.backends import impl_dir +from taskflow.persistence import logbook from taskflow import test from taskflow.tests.unit.persistence import base -class DirPersistenceTest(test.TestCase, base.PersistenceTestMixin): +class DirPersistenceTest(testscenarios.TestWithScenarios, + test.TestCase, base.PersistenceTestMixin): + + scenarios = [ + ('no_cache', {'max_cache_size': None}), + ('one', {'max_cache_size': 1}), + ('tiny', {'max_cache_size': 256}), + ('medimum', {'max_cache_size': 512}), + ('large', {'max_cache_size': 1024}), + ] + def _get_connection(self): - conf = { - 'path': self.path, - } - return impl_dir.DirBackend(conf).get_connection() + return self.backend.get_connection() def setUp(self): super(DirPersistenceTest, self).setUp() self.path = tempfile.mkdtemp() - conn = self._get_connection() - conn.upgrade() + self.backend = impl_dir.DirBackend({ + 'path': self.path, + 'max_cache_size': self.max_cache_size, + }) + with contextlib.closing(self._get_connection()) as conn: + conn.upgrade() def tearDown(self): super(DirPersistenceTest, self).tearDown() - conn = self._get_connection() - conn.clear_all() if self.path and os.path.isdir(self.path): shutil.rmtree(self.path) self.path = None + self.backend = None def _check_backend(self, conf): with contextlib.closing(backends.fetch(conf)) as be: self.assertIsInstance(be, impl_dir.DirBackend) + def test_dir_backend_invalid_cache_size(self): + for invalid_size in [-1024, 0, -1]: + conf = { + 'path': self.path, + 'max_cache_size': invalid_size, + } + self.assertRaises(ValueError, impl_dir.DirBackend, conf) + + def test_dir_backend_cache_overfill(self): + if self.max_cache_size is not None: + # Ensure cache never goes past the desired max size... + books_ids_made = [] + with contextlib.closing(self._get_connection()) as conn: + for i in range(0, int(1.5 * self.max_cache_size)): + lb_name = 'book-%s' % (i) + lb_id = uuidutils.generate_uuid() + lb = logbook.LogBook(name=lb_name, uuid=lb_id) + self.assertRaises(exc.NotFound, conn.get_logbook, lb_id) + conn.save_logbook(lb) + books_ids_made.append(lb_id) + self.assertLessEqual(self.backend.file_cache.currsize, + self.max_cache_size) + # Also ensure that we can still read all created books... + with contextlib.closing(self._get_connection()) as conn: + for lb_id in books_ids_made: + lb = conn.get_logbook(lb_id) + self.assertIsNotNone(lb) + def test_dir_backend_entry_point(self): self._check_backend(dict(connection='dir:', path=self.path)) diff --git a/taskflow/tests/unit/persistence/test_memory_persistence.py b/taskflow/tests/unit/persistence/test_memory_persistence.py index 24f76aa..8068639 100644 --- a/taskflow/tests/unit/persistence/test_memory_persistence.py +++ b/taskflow/tests/unit/persistence/test_memory_persistence.py @@ -71,7 +71,7 @@ class MemoryFilesystemTest(test.TestCase): self.assertEqual('c', fs['/c']) self.assertEqual('db', fs['/d/b']) - def test_ls_recursive(self): + def test_old_ls_recursive(self): fs = impl_memory.FakeFilesystem() fs.ensure_path("/d") fs.ensure_path("/c/d") @@ -91,12 +91,70 @@ class MemoryFilesystemTest(test.TestCase): '/a/b/c/d', ], contents) + def test_ls_recursive(self): + fs = impl_memory.FakeFilesystem() + fs.ensure_path("/d") + fs.ensure_path("/c/d") + fs.ensure_path("/b/c/d") + fs.ensure_path("/a/b/c/d") + contents = fs.ls_r("/", absolute=False) + self.assertEqual([ + 'a', + 'b', + 'c', + 'd', + 'a/b', + 'b/c', + 'c/d', + 'a/b/c', + 'b/c/d', + 'a/b/c/d', + ], contents) + + def test_ls_recursive_absolute(self): + fs = impl_memory.FakeFilesystem() + fs.ensure_path("/d") + fs.ensure_path("/c/d") + fs.ensure_path("/b/c/d") + fs.ensure_path("/a/b/c/d") + contents = fs.ls_r("/", absolute=True) + self.assertEqual([ + '/a', + '/b', + '/c', + '/d', + '/a/b', + '/b/c', + '/c/d', + '/a/b/c', + '/b/c/d', + '/a/b/c/d', + ], contents) + def test_ls_recursive_targeted(self): fs = impl_memory.FakeFilesystem() fs.ensure_path("/d") fs.ensure_path("/c/d") fs.ensure_path("/b/c/d") fs.ensure_path("/a/b/c/d") + contents = fs.ls_r("/a/b", absolute=False) + self.assertEqual(['c', 'c/d'], contents) + + def test_ls_recursive_targeted_absolute(self): + fs = impl_memory.FakeFilesystem() + fs.ensure_path("/d") + fs.ensure_path("/c/d") + fs.ensure_path("/b/c/d") + fs.ensure_path("/a/b/c/d") + contents = fs.ls_r("/a/b", absolute=True) + self.assertEqual(['/a/b/c', '/a/b/c/d'], contents) + + def test_old_ls_recursive_targeted_absolute(self): + fs = impl_memory.FakeFilesystem() + fs.ensure_path("/d") + fs.ensure_path("/c/d") + fs.ensure_path("/b/c/d") + fs.ensure_path("/a/b/c/d") contents = fs.ls("/a/b", recursive=True) self.assertEqual(['/a/b/c', '/a/b/c/d'], contents) diff --git a/taskflow/tests/unit/persistence/test_sql_persistence.py b/taskflow/tests/unit/persistence/test_sql_persistence.py index 8489160..8a8e22c 100644 --- a/taskflow/tests/unit/persistence/test_sql_persistence.py +++ b/taskflow/tests/unit/persistence/test_sql_persistence.py @@ -40,18 +40,10 @@ PASSWD = "openstack_citest" DATABASE = "tftest_" + ''.join(random.choice('0123456789') for _ in range(12)) -try: - from taskflow.persistence.backends import impl_sqlalchemy - - import sqlalchemy as sa - SQLALCHEMY_AVAILABLE = True -except Exception: - SQLALCHEMY_AVAILABLE = False - -# Testing will try to run against these two mysql library variants. -MYSQL_VARIANTS = ('mysqldb', 'pymysql') +import sqlalchemy as sa from taskflow.persistence import backends +from taskflow.persistence.backends import impl_sqlalchemy from taskflow import test from taskflow.tests.unit.persistence import base @@ -64,7 +56,7 @@ def _get_connect_string(backend, user, passwd, database=None, variant=None): backend = "postgresql+%s" % (variant) elif backend == "mysql": if not variant: - variant = 'mysqldb' + variant = 'pymysql' backend = "mysql+%s" % (variant) else: raise Exception("Unrecognized backend: '%s'" % backend) @@ -74,30 +66,24 @@ def _get_connect_string(backend, user, passwd, database=None, variant=None): def _mysql_exists(): - if not SQLALCHEMY_AVAILABLE: - return False - for variant in MYSQL_VARIANTS: - engine = None - try: - db_uri = _get_connect_string('mysql', USER, PASSWD, - variant=variant) - engine = sa.create_engine(db_uri) - with contextlib.closing(engine.connect()): - return True - except Exception: - pass - finally: - if engine is not None: - try: - engine.dispose() - except Exception: - pass + engine = None + try: + db_uri = _get_connect_string('mysql', USER, PASSWD) + engine = sa.create_engine(db_uri) + with contextlib.closing(engine.connect()): + return True + except Exception: + pass + finally: + if engine is not None: + try: + engine.dispose() + except Exception: + pass return False def _postgres_exists(): - if not SQLALCHEMY_AVAILABLE: - return False engine = None try: db_uri = _get_connect_string('postgres', USER, PASSWD, 'postgres') @@ -114,7 +100,6 @@ def _postgres_exists(): pass -@testtools.skipIf(not SQLALCHEMY_AVAILABLE, 'sqlalchemy is not available') class SqlitePersistenceTest(test.TestCase, base.PersistenceTestMixin): """Inherits from the base test and sets up a sqlite temporary db.""" def _get_connection(self): @@ -185,43 +170,26 @@ class BackendPersistenceTestMixin(base.PersistenceTestMixin): " testing being skipped due to: %s" % (e)) -@testtools.skipIf(not SQLALCHEMY_AVAILABLE, 'sqlalchemy is not available') @testtools.skipIf(not _mysql_exists(), 'mysql is not available') class MysqlPersistenceTest(BackendPersistenceTestMixin, test.TestCase): - def __init__(self, *args, **kwargs): - test.TestCase.__init__(self, *args, **kwargs) - def _init_db(self): - working_variant = None - for variant in MYSQL_VARIANTS: - engine = None - try: - db_uri = _get_connect_string('mysql', USER, PASSWD, - variant=variant) - engine = sa.create_engine(db_uri) - with contextlib.closing(engine.connect()) as conn: - conn.execute("CREATE DATABASE %s" % DATABASE) - working_variant = variant - except Exception: - pass - finally: - if engine is not None: - try: - engine.dispose() - except Exception: - pass - if working_variant: - break - if not working_variant: - variants = ", ".join(MYSQL_VARIANTS) - raise Exception("Failed to initialize MySQL db." - " Tried these variants: %s; MySQL testing" - " being skipped" % (variants)) - else: - return _get_connect_string('mysql', USER, PASSWD, - database=DATABASE, - variant=working_variant) + engine = None + try: + db_uri = _get_connect_string('mysql', USER, PASSWD) + engine = sa.create_engine(db_uri) + with contextlib.closing(engine.connect()) as conn: + conn.execute("CREATE DATABASE %s" % DATABASE) + except Exception as e: + raise Exception('Failed to initialize MySQL db: %s' % (e)) + finally: + if engine is not None: + try: + engine.dispose() + except Exception: + pass + return _get_connect_string('mysql', USER, PASSWD, + database=DATABASE) def _remove_db(self): engine = None @@ -239,13 +207,9 @@ class MysqlPersistenceTest(BackendPersistenceTestMixin, test.TestCase): pass -@testtools.skipIf(not SQLALCHEMY_AVAILABLE, 'sqlalchemy is not available') @testtools.skipIf(not _postgres_exists(), 'postgres is not available') class PostgresPersistenceTest(BackendPersistenceTestMixin, test.TestCase): - def __init__(self, *args, **kwargs): - test.TestCase.__init__(self, *args, **kwargs) - def _init_db(self): engine = None try: @@ -293,7 +257,6 @@ class PostgresPersistenceTest(BackendPersistenceTestMixin, test.TestCase): pass -@testtools.skipIf(not SQLALCHEMY_AVAILABLE, 'sqlalchemy is not available') class SQLBackendFetchingTest(test.TestCase): def test_sqlite_persistence_entry_point(self): @@ -301,16 +264,16 @@ class SQLBackendFetchingTest(test.TestCase): with contextlib.closing(backends.fetch(conf)) as be: self.assertIsInstance(be, impl_sqlalchemy.SQLAlchemyBackend) - @testtools.skipIf(not _postgres_exists(), 'postgres is not available') + @testtools.skipIf(not _mysql_exists(), 'mysql is not available') def test_mysql_persistence_entry_point(self): - uri = "mysql://%s:%s@localhost/%s" % (USER, PASSWD, DATABASE) + uri = _get_connect_string('mysql', USER, PASSWD, database=DATABASE) conf = {'connection': uri} with contextlib.closing(backends.fetch(conf)) as be: self.assertIsInstance(be, impl_sqlalchemy.SQLAlchemyBackend) - @testtools.skipIf(not _mysql_exists(), 'mysql is not available') + @testtools.skipIf(not _postgres_exists(), 'postgres is not available') def test_postgres_persistence_entry_point(self): - uri = "postgresql://%s:%s@localhost/%s" % (USER, PASSWD, DATABASE) + uri = _get_connect_string('postgres', USER, PASSWD, database=DATABASE) conf = {'connection': uri} with contextlib.closing(backends.fetch(conf)) as be: self.assertIsInstance(be, impl_sqlalchemy.SQLAlchemyBackend) diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index cec68f3..4e38dfa 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -159,6 +159,60 @@ class EngineMultipleResultsTest(utils.EngineTestBase): result = engine.storage.fetch('x') self.assertEqual(result, 1) + def test_many_results_visible_to(self): + flow = lf.Flow("flow") + flow.add(utils.AddOneSameProvidesRequires( + 'a', rebind={'value': 'source'})) + flow.add(utils.AddOneSameProvidesRequires('b')) + flow.add(utils.AddOneSameProvidesRequires('c')) + engine = self._make_engine(flow, store={'source': 0}) + engine.run() + + # Check what each task in the prior should be seeing... + atoms = list(flow) + a = atoms[0] + a_kwargs = engine.storage.fetch_mapped_args(a.rebind, + atom_name='a') + self.assertEqual({'value': 0}, a_kwargs) + + b = atoms[1] + b_kwargs = engine.storage.fetch_mapped_args(b.rebind, + atom_name='b') + self.assertEqual({'value': 1}, b_kwargs) + + c = atoms[2] + c_kwargs = engine.storage.fetch_mapped_args(c.rebind, + atom_name='c') + self.assertEqual({'value': 2}, c_kwargs) + + def test_many_results_storage_provided_visible_to(self): + # This works as expected due to docs listed at + # + # http://docs.openstack.org/developer/taskflow/engines.html#scoping + flow = lf.Flow("flow") + flow.add(utils.AddOneSameProvidesRequires('a')) + flow.add(utils.AddOneSameProvidesRequires('b')) + flow.add(utils.AddOneSameProvidesRequires('c')) + engine = self._make_engine(flow, store={'value': 0}) + engine.run() + + # Check what each task in the prior should be seeing... + atoms = list(flow) + a = atoms[0] + a_kwargs = engine.storage.fetch_mapped_args(a.rebind, + atom_name='a') + self.assertEqual({'value': 0}, a_kwargs) + + b = atoms[1] + b_kwargs = engine.storage.fetch_mapped_args(b.rebind, + atom_name='b') + self.assertEqual({'value': 0}, b_kwargs) + + c = atoms[2] + c_kwargs = engine.storage.fetch_mapped_args(c.rebind, + atom_name='c') + self.assertEqual({'value': 0}, c_kwargs) + def test_fetch_with_two_results(self): flow = lf.Flow("flow") flow.add(utils.TaskOneReturn(provides='x')) diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index edcc6d8..ddb256b 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -336,6 +336,66 @@ class RetryTest(utils.EngineTestBase): 'flow-1.f SUCCESS'] self.assertEqual(expected, capturer.values) + def test_nested_flow_with_retry_revert(self): + retry1 = retry.Times(0, 'r1', provides='x2') + flow = lf.Flow('flow-1').add( + utils.ProgressingTask("task1"), + lf.Flow('flow-2', retry1).add( + utils.ConditionalTask("task2", inject={'x': 1})) + ) + engine = self._make_engine(flow) + engine.storage.inject({'y': 2}) + with utils.CaptureListener(engine) as capturer: + try: + engine.run() + except Exception: + pass + self.assertEqual(engine.storage.fetch_all(), {'y': 2}) + expected = ['flow-1.f RUNNING', + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'r1.r RUNNING', + 'r1.r SUCCESS(1)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot!)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r REVERTING', + 'r1.r REVERTED', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) + + def test_nested_flow_with_retry_revert_all(self): + retry1 = retry.Times(0, 'r1', provides='x2', revert_all=True) + flow = lf.Flow('flow-1').add( + utils.ProgressingTask("task1"), + lf.Flow('flow-2', retry1).add( + utils.ConditionalTask("task2", inject={'x': 1})) + ) + engine = self._make_engine(flow) + engine.storage.inject({'y': 2}) + with utils.CaptureListener(engine) as capturer: + try: + engine.run() + except Exception: + pass + self.assertEqual(engine.storage.fetch_all(), {'y': 2}) + expected = ['flow-1.f RUNNING', + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'r1.r RUNNING', + 'r1.r SUCCESS(1)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot!)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r REVERTING', + 'r1.r REVERTED', + 'task1.t REVERTING', + 'task1.t REVERTED', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) + def test_revert_all_retry(self): flow = lf.Flow('flow-1', retry.Times(3, 'r1', provides='x')).add( utils.ProgressingTask("task1"), @@ -594,6 +654,108 @@ class RetryTest(utils.EngineTestBase): 'flow-1.f REVERTED'] self.assertItemsEqual(capturer.values, expected) + def test_nested_for_each_revert(self): + collection = [3, 2, 3, 5] + retry1 = retry.ForEach(collection, 'r1', provides='x') + flow = lf.Flow('flow-1').add( + utils.ProgressingTask("task1"), + lf.Flow('flow-2', retry1).add( + utils.FailingTaskWithOneArg('task2') + ) + ) + engine = self._make_engine(flow) + with utils.CaptureListener(engine) as capturer: + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + expected = ['flow-1.f RUNNING', + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'r1.r RUNNING', + 'r1.r SUCCESS(3)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r RETRYING', + 'task2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(2)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot with 2)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r RETRYING', + 'task2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(3)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r RETRYING', + 'task2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(5)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot with 5)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r REVERTING', + 'r1.r REVERTED', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) + + def test_nested_for_each_revert_all(self): + collection = [3, 2, 3, 5] + retry1 = retry.ForEach(collection, 'r1', provides='x', revert_all=True) + flow = lf.Flow('flow-1').add( + utils.ProgressingTask("task1"), + lf.Flow('flow-2', retry1).add( + utils.FailingTaskWithOneArg('task2') + ) + ) + engine = self._make_engine(flow) + with utils.CaptureListener(engine) as capturer: + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + expected = ['flow-1.f RUNNING', + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'r1.r RUNNING', + 'r1.r SUCCESS(3)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r RETRYING', + 'task2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(2)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot with 2)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r RETRYING', + 'task2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(3)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r RETRYING', + 'task2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(5)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot with 5)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r REVERTING', + 'r1.r REVERTED', + 'task1.t REVERTING', + 'task1.t REVERTED', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) + def test_for_each_empty_collection(self): values = [] retry1 = retry.ForEach(values, 'r1', provides='x') @@ -674,6 +836,95 @@ class RetryTest(utils.EngineTestBase): 'flow-1.f REVERTED'] self.assertItemsEqual(capturer.values, expected) + def test_nested_parameterized_for_each_revert(self): + values = [3, 2, 5] + retry1 = retry.ParameterizedForEach('r1', provides='x') + flow = lf.Flow('flow-1').add( + utils.ProgressingTask('task-1'), + lf.Flow('flow-2', retry1).add( + utils.FailingTaskWithOneArg('task-2') + ) + ) + engine = self._make_engine(flow) + engine.storage.inject({'values': values, 'y': 1}) + with utils.CaptureListener(engine) as capturer: + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + expected = ['flow-1.f RUNNING', + 'task-1.t RUNNING', + 'task-1.t SUCCESS(5)', + 'r1.r RUNNING', + 'r1.r SUCCESS(3)', + 'task-2.t RUNNING', + 'task-2.t FAILURE(Failure: RuntimeError: Woot with 3)', + 'task-2.t REVERTING', + 'task-2.t REVERTED', + 'r1.r RETRYING', + 'task-2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(2)', + 'task-2.t RUNNING', + 'task-2.t FAILURE(Failure: RuntimeError: Woot with 2)', + 'task-2.t REVERTING', + 'task-2.t REVERTED', + 'r1.r RETRYING', + 'task-2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(5)', + 'task-2.t RUNNING', + 'task-2.t FAILURE(Failure: RuntimeError: Woot with 5)', + 'task-2.t REVERTING', + 'task-2.t REVERTED', + 'r1.r REVERTING', + 'r1.r REVERTED', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) + + def test_nested_parameterized_for_each_revert_all(self): + values = [3, 2, 5] + retry1 = retry.ParameterizedForEach('r1', provides='x', + revert_all=True) + flow = lf.Flow('flow-1').add( + utils.ProgressingTask('task-1'), + lf.Flow('flow-2', retry1).add( + utils.FailingTaskWithOneArg('task-2') + ) + ) + engine = self._make_engine(flow) + engine.storage.inject({'values': values, 'y': 1}) + with utils.CaptureListener(engine) as capturer: + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + expected = ['flow-1.f RUNNING', + 'task-1.t RUNNING', + 'task-1.t SUCCESS(5)', + 'r1.r RUNNING', + 'r1.r SUCCESS(3)', + 'task-2.t RUNNING', + 'task-2.t FAILURE(Failure: RuntimeError: Woot with 3)', + 'task-2.t REVERTING', + 'task-2.t REVERTED', + 'r1.r RETRYING', + 'task-2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(2)', + 'task-2.t RUNNING', + 'task-2.t FAILURE(Failure: RuntimeError: Woot with 2)', + 'task-2.t REVERTING', + 'task-2.t REVERTED', + 'r1.r RETRYING', + 'task-2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(5)', + 'task-2.t RUNNING', + 'task-2.t FAILURE(Failure: RuntimeError: Woot with 5)', + 'task-2.t REVERTING', + 'task-2.t REVERTED', + 'r1.r REVERTING', + 'r1.r REVERTED', + 'task-1.t REVERTING', + 'task-1.t REVERTED', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) + def test_parameterized_for_each_empty_collection(self): values = [] retry1 = retry.ParameterizedForEach('r1', provides='x') diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py index a475c51..3acf245 100644 --- a/taskflow/tests/unit/worker_based/test_worker.py +++ b/taskflow/tests/unit/worker_based/test_worker.py @@ -33,7 +33,7 @@ class TestWorker(test.MockTestCase): self.broker_url = 'test-url' self.exchange = 'test-exchange' self.topic = 'test-topic' - self.endpoint_count = 24 + self.endpoint_count = 25 # patch classes self.executor_mock, self.executor_inst_mock = self.patchClass( diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index 031dd70..43f208b 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -89,6 +89,13 @@ class DummyTask(task.Task): pass +class AddOneSameProvidesRequires(task.Task): + default_provides = 'value' + + def execute(self, value): + return value + 1 + + class AddOne(task.Task): default_provides = 'result' diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index 005f394..c097bbe 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -449,38 +449,6 @@ def get_duplicate_keys(iterable, key=None): return duplicates -class ListenerStack(object): - """Listeners that are deregistered on context manager exit. - - TODO(harlowja): replace this with ``contextlib.ExitStack`` or equivalent - in the future (that code is in python3.2+ and in a few backports that - provide nearly equivalent functionality). When/if - https://review.openstack.org/#/c/164222/ merges we should be able to - remove this since listeners are already context managers. - """ - - def __init__(self, log): - self._registered = [] - self._log = log - - def register(self, listeners): - for listener in listeners: - listener.register() - self._registered.append(listener) - - def __enter__(self): - return self - - def __exit__(self, type, value, tb): - while self._registered: - listener = self._registered.pop() - try: - listener.deregister() - except Exception: - self._log.warn("Failed deregistering listener '%s'", - listener, exc_info=True) - - class ExponentialBackoff(object): """An iterable object that will yield back an exponential delay sequence. diff --git a/test-requirements.txt b/test-requirements.txt index 16fc8b2..f25f6d4 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -16,7 +16,7 @@ zake>=0.1.6 # Apache-2.0 kazoo>=1.3.1,!=2.1 # Used for testing database persistence backends. -SQLAlchemy>=0.9.7,<=0.9.99 +SQLAlchemy>=0.9.7,<1.1.0 alembic>=0.7.2 psycopg2 PyMySQL>=0.6.2 # MIT License |