summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--requirements.txt10
-rw-r--r--taskflow/conductors/backends/impl_blocking.py11
-rw-r--r--taskflow/engines/action_engine/engine.py23
-rw-r--r--taskflow/examples/dump_memory_backend.py2
-rw-r--r--taskflow/examples/job_board_no_test.py171
-rw-r--r--taskflow/persistence/backends/impl_dir.py23
-rw-r--r--taskflow/persistence/backends/impl_memory.py61
-rw-r--r--taskflow/persistence/logbook.py8
-rw-r--r--taskflow/retry.py31
-rw-r--r--taskflow/storage.py33
-rw-r--r--taskflow/tests/test_examples.py2
-rw-r--r--taskflow/tests/unit/persistence/test_dir_persistence.py61
-rw-r--r--taskflow/tests/unit/persistence/test_memory_persistence.py60
-rw-r--r--taskflow/tests/unit/persistence/test_sql_persistence.py111
-rw-r--r--taskflow/tests/unit/test_engines.py54
-rw-r--r--taskflow/tests/unit/test_retries.py251
-rw-r--r--taskflow/tests/unit/worker_based/test_worker.py2
-rw-r--r--taskflow/tests/utils.py7
-rw-r--r--taskflow/utils/misc.py32
-rw-r--r--test-requirements.txt2
20 files changed, 618 insertions, 337 deletions
diff --git a/requirements.txt b/requirements.txt
index e9d0550..f85b92d 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -22,18 +22,24 @@ fasteners>=0.7 # Apache-2.0
# Very nice graph library
networkx>=1.8
+# For contextlib new additions/compatibility for <= python 3.3
+contextlib2>=0.4.0 # PSF License
+
# Used for backend storage engine loading.
-stevedore>=1.3.0 # Apache-2.0
+stevedore>=1.5.0 # Apache-2.0
# Backport for concurrent.futures which exists in 3.2+
futures>=3.0
# Used for structured input validation
-jsonschema>=2.0.0,<3.0.0
+jsonschema>=2.0.0,<3.0.0,!=2.5.0
# For common utilities
oslo.utils>=1.4.0 # Apache-2.0
oslo.serialization>=1.4.0 # Apache-2.0
+# For lru caches and such
+cachetools>=1.0.0 # MIT License
+
# For deprecation of things
debtcollector>=0.3.0 # Apache-2.0
diff --git a/taskflow/conductors/backends/impl_blocking.py b/taskflow/conductors/backends/impl_blocking.py
index 5156da1..fb8a3c3 100644
--- a/taskflow/conductors/backends/impl_blocking.py
+++ b/taskflow/conductors/backends/impl_blocking.py
@@ -12,6 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
+try:
+ from contextlib import ExitStack # noqa
+except ImportError:
+ from contextlib2 import ExitStack # noqa
+
from debtcollector import removals
import six
@@ -21,7 +26,6 @@ from taskflow.listeners import logging as logging_listener
from taskflow import logging
from taskflow.types import timing as tt
from taskflow.utils import async_utils
-from taskflow.utils import misc
from taskflow.utils import threading_utils
LOG = logging.getLogger(__name__)
@@ -97,8 +101,9 @@ class BlockingConductor(base.Conductor):
def _dispatch_job(self, job):
engine = self._engine_from_job(job)
listeners = self._listeners_from_job(job, engine)
- with misc.ListenerStack(LOG) as stack:
- stack.register(listeners)
+ with ExitStack() as stack:
+ for listener in listeners:
+ stack.enter_context(listener)
LOG.debug("Dispatching engine %s for job: %s", engine, job)
consume = True
try:
diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py
index 716279e..124b8a5 100644
--- a/taskflow/engines/action_engine/engine.py
+++ b/taskflow/engines/action_engine/engine.py
@@ -82,10 +82,18 @@ class ActionEngine(base.Engine):
self._state_lock = threading.RLock()
self._storage_ensured = False
+ def _check(self, name, check_compiled, check_storage_ensured):
+ """Check (and raise) if the engine has not reached a certain stage."""
+ if check_compiled and not self._compiled:
+ raise exc.InvalidState("Can not %s an engine which"
+ " has not been compiled" % name)
+ if check_storage_ensured and not self._storage_ensured:
+ raise exc.InvalidState("Can not %s an engine"
+ " which has not has its storage"
+ " populated" % name)
+
def suspend(self):
- if not self._compiled:
- raise exc.InvalidState("Can not suspend an engine"
- " which has not been compiled")
+ self._check('suspend', True, False)
self._change_state(states.SUSPENDING)
@property
@@ -216,10 +224,7 @@ class ActionEngine(base.Engine):
@fasteners.locked
def validate(self):
- if not self._storage_ensured:
- raise exc.InvalidState("Can not validate an engine"
- " which has not has its storage"
- " populated")
+ self._check('validate', True, True)
# At this point we can check to ensure all dependencies are either
# flow/task provided or storage provided, if there are still missing
# dependencies then this flow will fail at runtime (which we can avoid
@@ -263,9 +268,7 @@ class ActionEngine(base.Engine):
@fasteners.locked
def prepare(self):
- if not self._compiled:
- raise exc.InvalidState("Can not prepare an engine"
- " which has not been compiled")
+ self._check('prepare', True, False)
if not self._storage_ensured:
# Set our own state to resuming -> (ensure atoms exist
# in storage) -> suspended in the storage unit and notify any
diff --git a/taskflow/examples/dump_memory_backend.py b/taskflow/examples/dump_memory_backend.py
index 2e3aee7..6c6d548 100644
--- a/taskflow/examples/dump_memory_backend.py
+++ b/taskflow/examples/dump_memory_backend.py
@@ -70,7 +70,7 @@ e.run()
print("---------")
print("After run")
print("---------")
-for path in backend.memory.ls(backend.memory.root_path, recursive=True):
+for path in backend.memory.ls_r(backend.memory.root_path, absolute=True):
value = backend.memory[path]
if value:
print("%s -> %s" % (path, value))
diff --git a/taskflow/examples/job_board_no_test.py b/taskflow/examples/job_board_no_test.py
deleted file mode 100644
index d37c96a..0000000
--- a/taskflow/examples/job_board_no_test.py
+++ /dev/null
@@ -1,171 +0,0 @@
-# -*- encoding: utf-8 -*-
-#
-# Copyright © 2013 eNovance <licensing@enovance.com>
-#
-# Authors: Dan Krause <dan@dankrause.net>
-# Cyril Roelandt <cyril.roelandt@enovance.com>
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# This example shows how to use the job board feature.
-#
-# Let's start by creating some jobs:
-# $ python job_board_no_test.py create my-board my-job '{}'
-# $ python job_board_no_test.py create my-board my-job '{"foo": "bar"}'
-# $ python job_board_no_test.py create my-board my-job '{"foo": "baz"}'
-# $ python job_board_no_test.py create my-board my-job '{"foo": "barbaz"}'
-#
-# Make sure they were registered:
-# $ python job_board_no_test.py list my-board
-# 7277181a-1f83-473d-8233-f361615bae9e - {}
-# 84a396e8-d02e-450d-8566-d93cb68550c0 - {u'foo': u'bar'}
-# 4d355d6a-2c72-44a2-a558-19ae52e8ae2c - {u'foo': u'baz'}
-# cd9aae2c-fd64-416d-8ba0-426fa8e3d59c - {u'foo': u'barbaz'}
-#
-# Perform one job:
-# $ python job_board_no_test.py consume my-board \
-# 84a396e8-d02e-450d-8566-d93cb68550c0
-# Performing job 84a396e8-d02e-450d-8566-d93cb68550c0 with args \
-# {u'foo': u'bar'}
-# $ python job_board_no_test.py list my-board
-# 7277181a-1f83-473d-8233-f361615bae9e - {}
-# 4d355d6a-2c72-44a2-a558-19ae52e8ae2c - {u'foo': u'baz'}
-# cd9aae2c-fd64-416d-8ba0-426fa8e3d59c - {u'foo': u'barbaz'}
-#
-# Delete a job:
-# $ python job_board_no_test.py delete my-board \
-# cd9aae2c-fd64-416d-8ba0-426fa8e3d59c
-# $ python job_board_no_test.py list my-board
-# 7277181a-1f83-473d-8233-f361615bae9e - {}
-# 4d355d6a-2c72-44a2-a558-19ae52e8ae2c - {u'foo': u'baz'}
-#
-# Delete all the remaining jobs
-# $ python job_board_no_test.py clear my-board
-# $ python job_board_no_test.py list my-board
-# $
-
-import argparse
-import contextlib
-import json
-import os
-import sys
-import tempfile
-
-import taskflow.jobs.backends as job_backends
-from taskflow.persistence import logbook
-
-import example_utils # noqa
-
-
-@contextlib.contextmanager
-def jobboard(*args, **kwargs):
- jb = job_backends.fetch(*args, **kwargs)
- jb.connect()
- yield jb
- jb.close()
-
-
-conf = {
- 'board': 'zookeeper',
- 'hosts': ['127.0.0.1:2181']
-}
-
-
-def consume_job(args):
- def perform_job(job):
- print("Performing job %s with args %s" % (job.uuid, job.details))
-
- with jobboard(args.board_name, conf) as jb:
- for job in jb.iterjobs(ensure_fresh=True):
- if job.uuid == args.job_uuid:
- jb.claim(job, "test-client")
- perform_job(job)
- jb.consume(job, "test-client")
-
-
-def clear_jobs(args):
- with jobboard(args.board_name, conf) as jb:
- for job in jb.iterjobs(ensure_fresh=True):
- jb.claim(job, "test-client")
- jb.consume(job, "test-client")
-
-
-def create_job(args):
- store = json.loads(args.details)
- book = logbook.LogBook(args.job_name)
- if example_utils.SQLALCHEMY_AVAILABLE:
- persist_path = os.path.join(tempfile.gettempdir(), "persisting.db")
- backend_uri = "sqlite:///%s" % (persist_path)
- else:
- persist_path = os.path.join(tempfile.gettempdir(), "persisting")
- backend_uri = "file:///%s" % (persist_path)
- with example_utils.get_backend(backend_uri) as backend:
- backend.get_connection().save_logbook(book)
- with jobboard(args.board_name, conf, persistence=backend) as jb:
- jb.post(args.job_name, book, details=store)
-
-
-def list_jobs(args):
- with jobboard(args.board_name, conf) as jb:
- for job in jb.iterjobs(ensure_fresh=True):
- print("%s - %s" % (job.uuid, job.details))
-
-
-def delete_job(args):
- with jobboard(args.board_name, conf) as jb:
- for job in jb.iterjobs(ensure_fresh=True):
- if job.uuid == args.job_uuid:
- jb.claim(job, "test-client")
- jb.consume(job, "test-client")
-
-
-def main(argv):
- parser = argparse.ArgumentParser()
- subparsers = parser.add_subparsers(title='subcommands',
- description='valid subcommands',
- help='additional help')
-
- # Consume command
- parser_consume = subparsers.add_parser('consume')
- parser_consume.add_argument('board_name')
- parser_consume.add_argument('job_uuid')
- parser_consume.set_defaults(func=consume_job)
-
- # Clear command
- parser_consume = subparsers.add_parser('clear')
- parser_consume.add_argument('board_name')
- parser_consume.set_defaults(func=clear_jobs)
-
- # Create command
- parser_create = subparsers.add_parser('create')
- parser_create.add_argument('board_name')
- parser_create.add_argument('job_name')
- parser_create.add_argument('details')
- parser_create.set_defaults(func=create_job)
-
- # Delete command
- parser_delete = subparsers.add_parser('delete')
- parser_delete.add_argument('board_name')
- parser_delete.add_argument('job_uuid')
- parser_delete.set_defaults(func=delete_job)
-
- # List command
- parser_list = subparsers.add_parser('list')
- parser_list.add_argument('board_name')
- parser_list.set_defaults(func=list_jobs)
-
- args = parser.parse_args(argv)
- args.func(args)
-
-if __name__ == '__main__':
- main(sys.argv[1:])
diff --git a/taskflow/persistence/backends/impl_dir.py b/taskflow/persistence/backends/impl_dir.py
index 940b9c4..1047d67 100644
--- a/taskflow/persistence/backends/impl_dir.py
+++ b/taskflow/persistence/backends/impl_dir.py
@@ -20,6 +20,7 @@ import errno
import os
import shutil
+import cachetools
import fasteners
from oslo_serialization import jsonutils
@@ -54,13 +55,29 @@ class DirBackend(path_based.PathBasedBackend):
Example configuration::
conf = {
- "path": "/tmp/taskflow",
+ "path": "/tmp/taskflow", # save data to this root directory
+ "max_cache_size": 1024, # keep up-to 1024 entries in memory
}
"""
+
+ DEFAULT_FILE_ENCODING = 'utf-8'
+ """
+ Default encoding used when decoding or encoding files into or from
+ text/unicode into binary or binary into text/unicode.
+ """
+
def __init__(self, conf):
super(DirBackend, self).__init__(conf)
- self.file_cache = {}
- self.encoding = self._conf.get('encoding', 'utf-8')
+ max_cache_size = self._conf.get('max_cache_size')
+ if max_cache_size is not None:
+ max_cache_size = int(max_cache_size)
+ if max_cache_size < 1:
+ raise ValueError("Maximum cache size must be greater than"
+ " or equal to one")
+ self.file_cache = cachetools.LRUCache(max_cache_size)
+ else:
+ self.file_cache = {}
+ self.encoding = self._conf.get('encoding', self.DEFAULT_FILE_ENCODING)
if not self._path:
raise ValueError("Empty path is disallowed")
self._path = os.path.abspath(self._path)
diff --git a/taskflow/persistence/backends/impl_memory.py b/taskflow/persistence/backends/impl_memory.py
index 0aac58e..43207b8 100644
--- a/taskflow/persistence/backends/impl_memory.py
+++ b/taskflow/persistence/backends/impl_memory.py
@@ -20,6 +20,7 @@ import copy
import itertools
import posixpath as pp
+from debtcollector import removals
import fasteners
import six
@@ -166,13 +167,63 @@ class FakeFilesystem(object):
else:
return self._copier(node.metadata['value'])
+ def _up_to_root_selector(self, root_node, child_node):
+ # Build the path from the child to the root and stop at the
+ # root, and then form a path string...
+ path_pieces = [child_node.item]
+ for parent_node in child_node.path_iter(include_self=False):
+ if parent_node is root_node:
+ break
+ path_pieces.append(parent_node.item)
+ if len(path_pieces) > 1:
+ path_pieces.reverse()
+ return self.join(*path_pieces)
+
+ @staticmethod
+ def _metadata_path_selector(root_node, child_node):
+ return child_node.metadata['path']
+
+ def ls_r(self, path, absolute=False):
+ """Return list of all children of the given path (recursively)."""
+ node = self._fetch_node(path)
+ if absolute:
+ selector_func = self._metadata_path_selector
+ else:
+ selector_func = self._up_to_root_selector
+ return [selector_func(node, child_node)
+ for child_node in node.bfs_iter()]
+
+ @removals.removed_kwarg('recursive', version="0.11", removal_version="?")
def ls(self, path, recursive=False):
- """Return list of all children of the given path."""
- if not recursive:
- return [node.item for node in self._fetch_node(path)]
+ """Return list of all children of the given path.
+
+ NOTE(harlowja): if ``recursive`` is passed in as truthy then the
+ absolute path is **always** returned (not the relative path). If
+ ``recursive`` is left as the default or falsey then the
+ relative path is **always** returned.
+
+ This is documented in bug `1458114`_ and the existing behavior is
+ being maintained, to get a recursive version that is absolute (or is
+ not absolute) it is recommended to use the :py:meth:`.ls_r` method
+ instead.
+
+ .. deprecated:: 0.11
+
+ In a future release the ``recursive`` keyword argument will
+ be removed (so preferring and moving to the :py:meth:`.ls_r` should
+ occur earlier rather than later).
+
+ .. _1458114: https://bugs.launchpad.net/taskflow/+bug/1458114
+ """
+ node = self._fetch_node(path)
+ if recursive:
+ selector_func = self._metadata_path_selector
+ child_node_it = node.bfs_iter()
else:
- node = self._fetch_node(path)
- return [child.metadata['path'] for child in node.bfs_iter()]
+ selector_func = self._up_to_root_selector
+ child_node_it = iter(node)
+ return [selector_func(node, child_node)
+ for child_node in child_node_it]
def clear(self):
"""Remove all nodes (except the root) from this filesystem."""
diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py
index c7770fe..be254ea1 100644
--- a/taskflow/persistence/logbook.py
+++ b/taskflow/persistence/logbook.py
@@ -732,8 +732,8 @@ class RetryDetail(AtomDetail):
"""The last result that was produced."""
try:
return self.results[-1][0]
- except IndexError as e:
- raise exc.NotFound("Last results not found", e)
+ except IndexError:
+ exc.raise_with_cause(exc.NotFound, "Last results not found")
@property
def last_failures(self):
@@ -748,8 +748,8 @@ class RetryDetail(AtomDetail):
"""
try:
return self.results[-1][1]
- except IndexError as e:
- raise exc.NotFound("Last failures not found", e)
+ except IndexError:
+ exc.raise_with_cause(exc.NotFound, "Last failures not found")
def put(self, state, result):
"""Puts a result (acquired in the given state) into this detail.
diff --git a/taskflow/retry.py b/taskflow/retry.py
index b7135a9..3015c79 100644
--- a/taskflow/retry.py
+++ b/taskflow/retry.py
@@ -241,15 +241,20 @@ class Times(Retry):
"""Retries subflow given number of times. Returns attempt number."""
def __init__(self, attempts=1, name=None, provides=None, requires=None,
- auto_extract=True, rebind=None):
+ auto_extract=True, rebind=None, revert_all=False):
super(Times, self).__init__(name, provides, requires,
auto_extract, rebind)
self._attempts = attempts
+ if revert_all:
+ self._revert_action = REVERT_ALL
+ else:
+ self._revert_action = REVERT
+
def on_failure(self, history, *args, **kwargs):
if len(history) < self._attempts:
return RETRY
- return REVERT
+ return self._revert_action
def execute(self, history, *args, **kwargs):
return len(history) + 1
@@ -258,6 +263,16 @@ class Times(Retry):
class ForEachBase(Retry):
"""Base class for retries that iterate over a given collection."""
+ def __init__(self, name=None, provides=None, requires=None,
+ auto_extract=True, rebind=None, revert_all=False):
+ super(ForEachBase, self).__init__(name, provides, requires,
+ auto_extract, rebind)
+
+ if revert_all:
+ self._revert_action = REVERT_ALL
+ else:
+ self._revert_action = REVERT
+
def _get_next_value(self, values, history):
# Fetches the next resolution result to try, removes overlapping
# entries with what has already been tried and then returns the first
@@ -272,7 +287,7 @@ class ForEachBase(Retry):
try:
self._get_next_value(values, history)
except exc.NotFound:
- return REVERT
+ return self._revert_action
else:
return RETRY
@@ -285,9 +300,9 @@ class ForEach(ForEachBase):
"""
def __init__(self, values, name=None, provides=None, requires=None,
- auto_extract=True, rebind=None):
+ auto_extract=True, rebind=None, revert_all=False):
super(ForEach, self).__init__(name, provides, requires,
- auto_extract, rebind)
+ auto_extract, rebind, revert_all)
self._values = values
def on_failure(self, history, *args, **kwargs):
@@ -307,6 +322,12 @@ class ParameterizedForEach(ForEachBase):
each try.
"""
+ def __init__(self, name=None, provides=None, requires=None,
+ auto_extract=True, rebind=None, revert_all=False):
+ super(ParameterizedForEach, self).__init__(name, provides, requires,
+ auto_extract, rebind,
+ revert_all)
+
def on_failure(self, values, history, *args, **kwargs):
return self._on_failure(values, history)
diff --git a/taskflow/storage.py b/taskflow/storage.py
index 7478cdd..6be4b26 100644
--- a/taskflow/storage.py
+++ b/taskflow/storage.py
@@ -94,7 +94,8 @@ def _item_from_single(provider, container, looking_for):
try:
return _item_from(container, provider.index)
except _EXTRACTION_EXCEPTIONS:
- raise exceptions.NotFound(
+ exceptions.raise_with_cause(
+ exceptions.NotFound,
"Unable to find result %r, expected to be able to find it"
" created by %s but was unable to perform successful"
" extraction" % (looking_for, provider))
@@ -285,7 +286,8 @@ class Storage(object):
try:
ad = self._flowdetail.find(self._atom_name_to_uuid[atom_name])
except KeyError:
- raise exceptions.NotFound("Unknown atom name: %s" % atom_name)
+ exceptions.raise_with_cause(exceptions.NotFound,
+ "Unknown atom name: %s" % atom_name)
else:
# TODO(harlowja): we need to figure out how to get away from doing
# these kinds of type checks in general (since they likely mean
@@ -458,10 +460,11 @@ class Storage(object):
retry_name, expected_type=logbook.RetryDetail, clone=True)
try:
failures = clone.last_failures
- except exceptions.NotFound as e:
- raise exceptions.StorageFailure("Unable to fetch most recent"
- " retry failures so new retry"
- " failure can be inserted", e)
+ except exceptions.NotFound:
+ exceptions.raise_with_cause(exceptions.StorageFailure,
+ "Unable to fetch most recent retry"
+ " failures so new retry failure can"
+ " be inserted")
else:
if failed_atom_name not in failures:
failures[failed_atom_name] = failure
@@ -669,9 +672,9 @@ class Storage(object):
try:
providers = self._reverse_mapping[name]
except KeyError:
- raise exceptions.NotFound("Name %r is not mapped as a"
- " produced output by any"
- " providers" % name)
+ exceptions.raise_with_cause(exceptions.NotFound,
+ "Name %r is not mapped as a produced"
+ " output by any providers" % name)
values = []
for provider in providers:
if provider.name is _TRANSIENT_PROVIDER:
@@ -813,11 +816,13 @@ class Storage(object):
"""Gets the results saved for a given provider."""
try:
return self._get(provider.name, only_last=True)
- except exceptions.NotFound as e:
- raise exceptions.NotFound(
- "Expected to be able to find output %r produced"
- " by %s but was unable to get at that providers"
- " results" % (looking_for, provider), e)
+ except exceptions.NotFound:
+ exceptions.raise_with_cause(exceptions.NotFound,
+ "Expected to be able to find"
+ " output %r produced by %s but was"
+ " unable to get at that providers"
+ " results" % (looking_for,
+ provider))
def _locate_providers(looking_for, possible_providers,
scope_walker=None):
diff --git a/taskflow/tests/test_examples.py b/taskflow/tests/test_examples.py
index a7a297c..ce795dd 100644
--- a/taskflow/tests/test_examples.py
+++ b/taskflow/tests/test_examples.py
@@ -95,7 +95,7 @@ def iter_examples():
name, ext = os.path.splitext(filename)
if ext != ".py":
continue
- if not any(name.endswith(i) for i in ("utils", "no_test")):
+ if not name.endswith('utils'):
safe_name = safe_filename(name)
if safe_name:
yield name, safe_name
diff --git a/taskflow/tests/unit/persistence/test_dir_persistence.py b/taskflow/tests/unit/persistence/test_dir_persistence.py
index 8c1171c..7445145 100644
--- a/taskflow/tests/unit/persistence/test_dir_persistence.py
+++ b/taskflow/tests/unit/persistence/test_dir_persistence.py
@@ -19,37 +19,80 @@ import os
import shutil
import tempfile
+from oslo_utils import uuidutils
+import testscenarios
+
+from taskflow import exceptions as exc
from taskflow.persistence import backends
from taskflow.persistence.backends import impl_dir
+from taskflow.persistence import logbook
from taskflow import test
from taskflow.tests.unit.persistence import base
-class DirPersistenceTest(test.TestCase, base.PersistenceTestMixin):
+class DirPersistenceTest(testscenarios.TestWithScenarios,
+ test.TestCase, base.PersistenceTestMixin):
+
+ scenarios = [
+ ('no_cache', {'max_cache_size': None}),
+ ('one', {'max_cache_size': 1}),
+ ('tiny', {'max_cache_size': 256}),
+ ('medimum', {'max_cache_size': 512}),
+ ('large', {'max_cache_size': 1024}),
+ ]
+
def _get_connection(self):
- conf = {
- 'path': self.path,
- }
- return impl_dir.DirBackend(conf).get_connection()
+ return self.backend.get_connection()
def setUp(self):
super(DirPersistenceTest, self).setUp()
self.path = tempfile.mkdtemp()
- conn = self._get_connection()
- conn.upgrade()
+ self.backend = impl_dir.DirBackend({
+ 'path': self.path,
+ 'max_cache_size': self.max_cache_size,
+ })
+ with contextlib.closing(self._get_connection()) as conn:
+ conn.upgrade()
def tearDown(self):
super(DirPersistenceTest, self).tearDown()
- conn = self._get_connection()
- conn.clear_all()
if self.path and os.path.isdir(self.path):
shutil.rmtree(self.path)
self.path = None
+ self.backend = None
def _check_backend(self, conf):
with contextlib.closing(backends.fetch(conf)) as be:
self.assertIsInstance(be, impl_dir.DirBackend)
+ def test_dir_backend_invalid_cache_size(self):
+ for invalid_size in [-1024, 0, -1]:
+ conf = {
+ 'path': self.path,
+ 'max_cache_size': invalid_size,
+ }
+ self.assertRaises(ValueError, impl_dir.DirBackend, conf)
+
+ def test_dir_backend_cache_overfill(self):
+ if self.max_cache_size is not None:
+ # Ensure cache never goes past the desired max size...
+ books_ids_made = []
+ with contextlib.closing(self._get_connection()) as conn:
+ for i in range(0, int(1.5 * self.max_cache_size)):
+ lb_name = 'book-%s' % (i)
+ lb_id = uuidutils.generate_uuid()
+ lb = logbook.LogBook(name=lb_name, uuid=lb_id)
+ self.assertRaises(exc.NotFound, conn.get_logbook, lb_id)
+ conn.save_logbook(lb)
+ books_ids_made.append(lb_id)
+ self.assertLessEqual(self.backend.file_cache.currsize,
+ self.max_cache_size)
+ # Also ensure that we can still read all created books...
+ with contextlib.closing(self._get_connection()) as conn:
+ for lb_id in books_ids_made:
+ lb = conn.get_logbook(lb_id)
+ self.assertIsNotNone(lb)
+
def test_dir_backend_entry_point(self):
self._check_backend(dict(connection='dir:', path=self.path))
diff --git a/taskflow/tests/unit/persistence/test_memory_persistence.py b/taskflow/tests/unit/persistence/test_memory_persistence.py
index 24f76aa..8068639 100644
--- a/taskflow/tests/unit/persistence/test_memory_persistence.py
+++ b/taskflow/tests/unit/persistence/test_memory_persistence.py
@@ -71,7 +71,7 @@ class MemoryFilesystemTest(test.TestCase):
self.assertEqual('c', fs['/c'])
self.assertEqual('db', fs['/d/b'])
- def test_ls_recursive(self):
+ def test_old_ls_recursive(self):
fs = impl_memory.FakeFilesystem()
fs.ensure_path("/d")
fs.ensure_path("/c/d")
@@ -91,12 +91,70 @@ class MemoryFilesystemTest(test.TestCase):
'/a/b/c/d',
], contents)
+ def test_ls_recursive(self):
+ fs = impl_memory.FakeFilesystem()
+ fs.ensure_path("/d")
+ fs.ensure_path("/c/d")
+ fs.ensure_path("/b/c/d")
+ fs.ensure_path("/a/b/c/d")
+ contents = fs.ls_r("/", absolute=False)
+ self.assertEqual([
+ 'a',
+ 'b',
+ 'c',
+ 'd',
+ 'a/b',
+ 'b/c',
+ 'c/d',
+ 'a/b/c',
+ 'b/c/d',
+ 'a/b/c/d',
+ ], contents)
+
+ def test_ls_recursive_absolute(self):
+ fs = impl_memory.FakeFilesystem()
+ fs.ensure_path("/d")
+ fs.ensure_path("/c/d")
+ fs.ensure_path("/b/c/d")
+ fs.ensure_path("/a/b/c/d")
+ contents = fs.ls_r("/", absolute=True)
+ self.assertEqual([
+ '/a',
+ '/b',
+ '/c',
+ '/d',
+ '/a/b',
+ '/b/c',
+ '/c/d',
+ '/a/b/c',
+ '/b/c/d',
+ '/a/b/c/d',
+ ], contents)
+
def test_ls_recursive_targeted(self):
fs = impl_memory.FakeFilesystem()
fs.ensure_path("/d")
fs.ensure_path("/c/d")
fs.ensure_path("/b/c/d")
fs.ensure_path("/a/b/c/d")
+ contents = fs.ls_r("/a/b", absolute=False)
+ self.assertEqual(['c', 'c/d'], contents)
+
+ def test_ls_recursive_targeted_absolute(self):
+ fs = impl_memory.FakeFilesystem()
+ fs.ensure_path("/d")
+ fs.ensure_path("/c/d")
+ fs.ensure_path("/b/c/d")
+ fs.ensure_path("/a/b/c/d")
+ contents = fs.ls_r("/a/b", absolute=True)
+ self.assertEqual(['/a/b/c', '/a/b/c/d'], contents)
+
+ def test_old_ls_recursive_targeted_absolute(self):
+ fs = impl_memory.FakeFilesystem()
+ fs.ensure_path("/d")
+ fs.ensure_path("/c/d")
+ fs.ensure_path("/b/c/d")
+ fs.ensure_path("/a/b/c/d")
contents = fs.ls("/a/b", recursive=True)
self.assertEqual(['/a/b/c', '/a/b/c/d'], contents)
diff --git a/taskflow/tests/unit/persistence/test_sql_persistence.py b/taskflow/tests/unit/persistence/test_sql_persistence.py
index 8489160..8a8e22c 100644
--- a/taskflow/tests/unit/persistence/test_sql_persistence.py
+++ b/taskflow/tests/unit/persistence/test_sql_persistence.py
@@ -40,18 +40,10 @@ PASSWD = "openstack_citest"
DATABASE = "tftest_" + ''.join(random.choice('0123456789')
for _ in range(12))
-try:
- from taskflow.persistence.backends import impl_sqlalchemy
-
- import sqlalchemy as sa
- SQLALCHEMY_AVAILABLE = True
-except Exception:
- SQLALCHEMY_AVAILABLE = False
-
-# Testing will try to run against these two mysql library variants.
-MYSQL_VARIANTS = ('mysqldb', 'pymysql')
+import sqlalchemy as sa
from taskflow.persistence import backends
+from taskflow.persistence.backends import impl_sqlalchemy
from taskflow import test
from taskflow.tests.unit.persistence import base
@@ -64,7 +56,7 @@ def _get_connect_string(backend, user, passwd, database=None, variant=None):
backend = "postgresql+%s" % (variant)
elif backend == "mysql":
if not variant:
- variant = 'mysqldb'
+ variant = 'pymysql'
backend = "mysql+%s" % (variant)
else:
raise Exception("Unrecognized backend: '%s'" % backend)
@@ -74,30 +66,24 @@ def _get_connect_string(backend, user, passwd, database=None, variant=None):
def _mysql_exists():
- if not SQLALCHEMY_AVAILABLE:
- return False
- for variant in MYSQL_VARIANTS:
- engine = None
- try:
- db_uri = _get_connect_string('mysql', USER, PASSWD,
- variant=variant)
- engine = sa.create_engine(db_uri)
- with contextlib.closing(engine.connect()):
- return True
- except Exception:
- pass
- finally:
- if engine is not None:
- try:
- engine.dispose()
- except Exception:
- pass
+ engine = None
+ try:
+ db_uri = _get_connect_string('mysql', USER, PASSWD)
+ engine = sa.create_engine(db_uri)
+ with contextlib.closing(engine.connect()):
+ return True
+ except Exception:
+ pass
+ finally:
+ if engine is not None:
+ try:
+ engine.dispose()
+ except Exception:
+ pass
return False
def _postgres_exists():
- if not SQLALCHEMY_AVAILABLE:
- return False
engine = None
try:
db_uri = _get_connect_string('postgres', USER, PASSWD, 'postgres')
@@ -114,7 +100,6 @@ def _postgres_exists():
pass
-@testtools.skipIf(not SQLALCHEMY_AVAILABLE, 'sqlalchemy is not available')
class SqlitePersistenceTest(test.TestCase, base.PersistenceTestMixin):
"""Inherits from the base test and sets up a sqlite temporary db."""
def _get_connection(self):
@@ -185,43 +170,26 @@ class BackendPersistenceTestMixin(base.PersistenceTestMixin):
" testing being skipped due to: %s" % (e))
-@testtools.skipIf(not SQLALCHEMY_AVAILABLE, 'sqlalchemy is not available')
@testtools.skipIf(not _mysql_exists(), 'mysql is not available')
class MysqlPersistenceTest(BackendPersistenceTestMixin, test.TestCase):
- def __init__(self, *args, **kwargs):
- test.TestCase.__init__(self, *args, **kwargs)
-
def _init_db(self):
- working_variant = None
- for variant in MYSQL_VARIANTS:
- engine = None
- try:
- db_uri = _get_connect_string('mysql', USER, PASSWD,
- variant=variant)
- engine = sa.create_engine(db_uri)
- with contextlib.closing(engine.connect()) as conn:
- conn.execute("CREATE DATABASE %s" % DATABASE)
- working_variant = variant
- except Exception:
- pass
- finally:
- if engine is not None:
- try:
- engine.dispose()
- except Exception:
- pass
- if working_variant:
- break
- if not working_variant:
- variants = ", ".join(MYSQL_VARIANTS)
- raise Exception("Failed to initialize MySQL db."
- " Tried these variants: %s; MySQL testing"
- " being skipped" % (variants))
- else:
- return _get_connect_string('mysql', USER, PASSWD,
- database=DATABASE,
- variant=working_variant)
+ engine = None
+ try:
+ db_uri = _get_connect_string('mysql', USER, PASSWD)
+ engine = sa.create_engine(db_uri)
+ with contextlib.closing(engine.connect()) as conn:
+ conn.execute("CREATE DATABASE %s" % DATABASE)
+ except Exception as e:
+ raise Exception('Failed to initialize MySQL db: %s' % (e))
+ finally:
+ if engine is not None:
+ try:
+ engine.dispose()
+ except Exception:
+ pass
+ return _get_connect_string('mysql', USER, PASSWD,
+ database=DATABASE)
def _remove_db(self):
engine = None
@@ -239,13 +207,9 @@ class MysqlPersistenceTest(BackendPersistenceTestMixin, test.TestCase):
pass
-@testtools.skipIf(not SQLALCHEMY_AVAILABLE, 'sqlalchemy is not available')
@testtools.skipIf(not _postgres_exists(), 'postgres is not available')
class PostgresPersistenceTest(BackendPersistenceTestMixin, test.TestCase):
- def __init__(self, *args, **kwargs):
- test.TestCase.__init__(self, *args, **kwargs)
-
def _init_db(self):
engine = None
try:
@@ -293,7 +257,6 @@ class PostgresPersistenceTest(BackendPersistenceTestMixin, test.TestCase):
pass
-@testtools.skipIf(not SQLALCHEMY_AVAILABLE, 'sqlalchemy is not available')
class SQLBackendFetchingTest(test.TestCase):
def test_sqlite_persistence_entry_point(self):
@@ -301,16 +264,16 @@ class SQLBackendFetchingTest(test.TestCase):
with contextlib.closing(backends.fetch(conf)) as be:
self.assertIsInstance(be, impl_sqlalchemy.SQLAlchemyBackend)
- @testtools.skipIf(not _postgres_exists(), 'postgres is not available')
+ @testtools.skipIf(not _mysql_exists(), 'mysql is not available')
def test_mysql_persistence_entry_point(self):
- uri = "mysql://%s:%s@localhost/%s" % (USER, PASSWD, DATABASE)
+ uri = _get_connect_string('mysql', USER, PASSWD, database=DATABASE)
conf = {'connection': uri}
with contextlib.closing(backends.fetch(conf)) as be:
self.assertIsInstance(be, impl_sqlalchemy.SQLAlchemyBackend)
- @testtools.skipIf(not _mysql_exists(), 'mysql is not available')
+ @testtools.skipIf(not _postgres_exists(), 'postgres is not available')
def test_postgres_persistence_entry_point(self):
- uri = "postgresql://%s:%s@localhost/%s" % (USER, PASSWD, DATABASE)
+ uri = _get_connect_string('postgres', USER, PASSWD, database=DATABASE)
conf = {'connection': uri}
with contextlib.closing(backends.fetch(conf)) as be:
self.assertIsInstance(be, impl_sqlalchemy.SQLAlchemyBackend)
diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py
index cec68f3..4e38dfa 100644
--- a/taskflow/tests/unit/test_engines.py
+++ b/taskflow/tests/unit/test_engines.py
@@ -159,6 +159,60 @@ class EngineMultipleResultsTest(utils.EngineTestBase):
result = engine.storage.fetch('x')
self.assertEqual(result, 1)
+ def test_many_results_visible_to(self):
+ flow = lf.Flow("flow")
+ flow.add(utils.AddOneSameProvidesRequires(
+ 'a', rebind={'value': 'source'}))
+ flow.add(utils.AddOneSameProvidesRequires('b'))
+ flow.add(utils.AddOneSameProvidesRequires('c'))
+ engine = self._make_engine(flow, store={'source': 0})
+ engine.run()
+
+ # Check what each task in the prior should be seeing...
+ atoms = list(flow)
+ a = atoms[0]
+ a_kwargs = engine.storage.fetch_mapped_args(a.rebind,
+ atom_name='a')
+ self.assertEqual({'value': 0}, a_kwargs)
+
+ b = atoms[1]
+ b_kwargs = engine.storage.fetch_mapped_args(b.rebind,
+ atom_name='b')
+ self.assertEqual({'value': 1}, b_kwargs)
+
+ c = atoms[2]
+ c_kwargs = engine.storage.fetch_mapped_args(c.rebind,
+ atom_name='c')
+ self.assertEqual({'value': 2}, c_kwargs)
+
+ def test_many_results_storage_provided_visible_to(self):
+ # This works as expected due to docs listed at
+ #
+ # http://docs.openstack.org/developer/taskflow/engines.html#scoping
+ flow = lf.Flow("flow")
+ flow.add(utils.AddOneSameProvidesRequires('a'))
+ flow.add(utils.AddOneSameProvidesRequires('b'))
+ flow.add(utils.AddOneSameProvidesRequires('c'))
+ engine = self._make_engine(flow, store={'value': 0})
+ engine.run()
+
+ # Check what each task in the prior should be seeing...
+ atoms = list(flow)
+ a = atoms[0]
+ a_kwargs = engine.storage.fetch_mapped_args(a.rebind,
+ atom_name='a')
+ self.assertEqual({'value': 0}, a_kwargs)
+
+ b = atoms[1]
+ b_kwargs = engine.storage.fetch_mapped_args(b.rebind,
+ atom_name='b')
+ self.assertEqual({'value': 0}, b_kwargs)
+
+ c = atoms[2]
+ c_kwargs = engine.storage.fetch_mapped_args(c.rebind,
+ atom_name='c')
+ self.assertEqual({'value': 0}, c_kwargs)
+
def test_fetch_with_two_results(self):
flow = lf.Flow("flow")
flow.add(utils.TaskOneReturn(provides='x'))
diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py
index edcc6d8..ddb256b 100644
--- a/taskflow/tests/unit/test_retries.py
+++ b/taskflow/tests/unit/test_retries.py
@@ -336,6 +336,66 @@ class RetryTest(utils.EngineTestBase):
'flow-1.f SUCCESS']
self.assertEqual(expected, capturer.values)
+ def test_nested_flow_with_retry_revert(self):
+ retry1 = retry.Times(0, 'r1', provides='x2')
+ flow = lf.Flow('flow-1').add(
+ utils.ProgressingTask("task1"),
+ lf.Flow('flow-2', retry1).add(
+ utils.ConditionalTask("task2", inject={'x': 1}))
+ )
+ engine = self._make_engine(flow)
+ engine.storage.inject({'y': 2})
+ with utils.CaptureListener(engine) as capturer:
+ try:
+ engine.run()
+ except Exception:
+ pass
+ self.assertEqual(engine.storage.fetch_all(), {'y': 2})
+ expected = ['flow-1.f RUNNING',
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(5)',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(1)',
+ 'task2.t RUNNING',
+ 'task2.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'r1.r REVERTING',
+ 'r1.r REVERTED',
+ 'flow-1.f REVERTED']
+ self.assertEqual(expected, capturer.values)
+
+ def test_nested_flow_with_retry_revert_all(self):
+ retry1 = retry.Times(0, 'r1', provides='x2', revert_all=True)
+ flow = lf.Flow('flow-1').add(
+ utils.ProgressingTask("task1"),
+ lf.Flow('flow-2', retry1).add(
+ utils.ConditionalTask("task2", inject={'x': 1}))
+ )
+ engine = self._make_engine(flow)
+ engine.storage.inject({'y': 2})
+ with utils.CaptureListener(engine) as capturer:
+ try:
+ engine.run()
+ except Exception:
+ pass
+ self.assertEqual(engine.storage.fetch_all(), {'y': 2})
+ expected = ['flow-1.f RUNNING',
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(5)',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(1)',
+ 'task2.t RUNNING',
+ 'task2.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'r1.r REVERTING',
+ 'r1.r REVERTED',
+ 'task1.t REVERTING',
+ 'task1.t REVERTED',
+ 'flow-1.f REVERTED']
+ self.assertEqual(expected, capturer.values)
+
def test_revert_all_retry(self):
flow = lf.Flow('flow-1', retry.Times(3, 'r1', provides='x')).add(
utils.ProgressingTask("task1"),
@@ -594,6 +654,108 @@ class RetryTest(utils.EngineTestBase):
'flow-1.f REVERTED']
self.assertItemsEqual(capturer.values, expected)
+ def test_nested_for_each_revert(self):
+ collection = [3, 2, 3, 5]
+ retry1 = retry.ForEach(collection, 'r1', provides='x')
+ flow = lf.Flow('flow-1').add(
+ utils.ProgressingTask("task1"),
+ lf.Flow('flow-2', retry1).add(
+ utils.FailingTaskWithOneArg('task2')
+ )
+ )
+ engine = self._make_engine(flow)
+ with utils.CaptureListener(engine) as capturer:
+ self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ expected = ['flow-1.f RUNNING',
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(5)',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(3)',
+ 'task2.t RUNNING',
+ 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'r1.r RETRYING',
+ 'task2.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(2)',
+ 'task2.t RUNNING',
+ 'task2.t FAILURE(Failure: RuntimeError: Woot with 2)',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'r1.r RETRYING',
+ 'task2.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(3)',
+ 'task2.t RUNNING',
+ 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'r1.r RETRYING',
+ 'task2.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(5)',
+ 'task2.t RUNNING',
+ 'task2.t FAILURE(Failure: RuntimeError: Woot with 5)',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'r1.r REVERTING',
+ 'r1.r REVERTED',
+ 'flow-1.f REVERTED']
+ self.assertEqual(expected, capturer.values)
+
+ def test_nested_for_each_revert_all(self):
+ collection = [3, 2, 3, 5]
+ retry1 = retry.ForEach(collection, 'r1', provides='x', revert_all=True)
+ flow = lf.Flow('flow-1').add(
+ utils.ProgressingTask("task1"),
+ lf.Flow('flow-2', retry1).add(
+ utils.FailingTaskWithOneArg('task2')
+ )
+ )
+ engine = self._make_engine(flow)
+ with utils.CaptureListener(engine) as capturer:
+ self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ expected = ['flow-1.f RUNNING',
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(5)',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(3)',
+ 'task2.t RUNNING',
+ 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'r1.r RETRYING',
+ 'task2.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(2)',
+ 'task2.t RUNNING',
+ 'task2.t FAILURE(Failure: RuntimeError: Woot with 2)',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'r1.r RETRYING',
+ 'task2.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(3)',
+ 'task2.t RUNNING',
+ 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'r1.r RETRYING',
+ 'task2.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(5)',
+ 'task2.t RUNNING',
+ 'task2.t FAILURE(Failure: RuntimeError: Woot with 5)',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'r1.r REVERTING',
+ 'r1.r REVERTED',
+ 'task1.t REVERTING',
+ 'task1.t REVERTED',
+ 'flow-1.f REVERTED']
+ self.assertEqual(expected, capturer.values)
+
def test_for_each_empty_collection(self):
values = []
retry1 = retry.ForEach(values, 'r1', provides='x')
@@ -674,6 +836,95 @@ class RetryTest(utils.EngineTestBase):
'flow-1.f REVERTED']
self.assertItemsEqual(capturer.values, expected)
+ def test_nested_parameterized_for_each_revert(self):
+ values = [3, 2, 5]
+ retry1 = retry.ParameterizedForEach('r1', provides='x')
+ flow = lf.Flow('flow-1').add(
+ utils.ProgressingTask('task-1'),
+ lf.Flow('flow-2', retry1).add(
+ utils.FailingTaskWithOneArg('task-2')
+ )
+ )
+ engine = self._make_engine(flow)
+ engine.storage.inject({'values': values, 'y': 1})
+ with utils.CaptureListener(engine) as capturer:
+ self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ expected = ['flow-1.f RUNNING',
+ 'task-1.t RUNNING',
+ 'task-1.t SUCCESS(5)',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(3)',
+ 'task-2.t RUNNING',
+ 'task-2.t FAILURE(Failure: RuntimeError: Woot with 3)',
+ 'task-2.t REVERTING',
+ 'task-2.t REVERTED',
+ 'r1.r RETRYING',
+ 'task-2.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(2)',
+ 'task-2.t RUNNING',
+ 'task-2.t FAILURE(Failure: RuntimeError: Woot with 2)',
+ 'task-2.t REVERTING',
+ 'task-2.t REVERTED',
+ 'r1.r RETRYING',
+ 'task-2.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(5)',
+ 'task-2.t RUNNING',
+ 'task-2.t FAILURE(Failure: RuntimeError: Woot with 5)',
+ 'task-2.t REVERTING',
+ 'task-2.t REVERTED',
+ 'r1.r REVERTING',
+ 'r1.r REVERTED',
+ 'flow-1.f REVERTED']
+ self.assertEqual(expected, capturer.values)
+
+ def test_nested_parameterized_for_each_revert_all(self):
+ values = [3, 2, 5]
+ retry1 = retry.ParameterizedForEach('r1', provides='x',
+ revert_all=True)
+ flow = lf.Flow('flow-1').add(
+ utils.ProgressingTask('task-1'),
+ lf.Flow('flow-2', retry1).add(
+ utils.FailingTaskWithOneArg('task-2')
+ )
+ )
+ engine = self._make_engine(flow)
+ engine.storage.inject({'values': values, 'y': 1})
+ with utils.CaptureListener(engine) as capturer:
+ self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ expected = ['flow-1.f RUNNING',
+ 'task-1.t RUNNING',
+ 'task-1.t SUCCESS(5)',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(3)',
+ 'task-2.t RUNNING',
+ 'task-2.t FAILURE(Failure: RuntimeError: Woot with 3)',
+ 'task-2.t REVERTING',
+ 'task-2.t REVERTED',
+ 'r1.r RETRYING',
+ 'task-2.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(2)',
+ 'task-2.t RUNNING',
+ 'task-2.t FAILURE(Failure: RuntimeError: Woot with 2)',
+ 'task-2.t REVERTING',
+ 'task-2.t REVERTED',
+ 'r1.r RETRYING',
+ 'task-2.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(5)',
+ 'task-2.t RUNNING',
+ 'task-2.t FAILURE(Failure: RuntimeError: Woot with 5)',
+ 'task-2.t REVERTING',
+ 'task-2.t REVERTED',
+ 'r1.r REVERTING',
+ 'r1.r REVERTED',
+ 'task-1.t REVERTING',
+ 'task-1.t REVERTED',
+ 'flow-1.f REVERTED']
+ self.assertEqual(expected, capturer.values)
+
def test_parameterized_for_each_empty_collection(self):
values = []
retry1 = retry.ParameterizedForEach('r1', provides='x')
diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py
index a475c51..3acf245 100644
--- a/taskflow/tests/unit/worker_based/test_worker.py
+++ b/taskflow/tests/unit/worker_based/test_worker.py
@@ -33,7 +33,7 @@ class TestWorker(test.MockTestCase):
self.broker_url = 'test-url'
self.exchange = 'test-exchange'
self.topic = 'test-topic'
- self.endpoint_count = 24
+ self.endpoint_count = 25
# patch classes
self.executor_mock, self.executor_inst_mock = self.patchClass(
diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py
index 031dd70..43f208b 100644
--- a/taskflow/tests/utils.py
+++ b/taskflow/tests/utils.py
@@ -89,6 +89,13 @@ class DummyTask(task.Task):
pass
+class AddOneSameProvidesRequires(task.Task):
+ default_provides = 'value'
+
+ def execute(self, value):
+ return value + 1
+
+
class AddOne(task.Task):
default_provides = 'result'
diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py
index 005f394..c097bbe 100644
--- a/taskflow/utils/misc.py
+++ b/taskflow/utils/misc.py
@@ -449,38 +449,6 @@ def get_duplicate_keys(iterable, key=None):
return duplicates
-class ListenerStack(object):
- """Listeners that are deregistered on context manager exit.
-
- TODO(harlowja): replace this with ``contextlib.ExitStack`` or equivalent
- in the future (that code is in python3.2+ and in a few backports that
- provide nearly equivalent functionality). When/if
- https://review.openstack.org/#/c/164222/ merges we should be able to
- remove this since listeners are already context managers.
- """
-
- def __init__(self, log):
- self._registered = []
- self._log = log
-
- def register(self, listeners):
- for listener in listeners:
- listener.register()
- self._registered.append(listener)
-
- def __enter__(self):
- return self
-
- def __exit__(self, type, value, tb):
- while self._registered:
- listener = self._registered.pop()
- try:
- listener.deregister()
- except Exception:
- self._log.warn("Failed deregistering listener '%s'",
- listener, exc_info=True)
-
-
class ExponentialBackoff(object):
"""An iterable object that will yield back an exponential delay sequence.
diff --git a/test-requirements.txt b/test-requirements.txt
index 16fc8b2..f25f6d4 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -16,7 +16,7 @@ zake>=0.1.6 # Apache-2.0
kazoo>=1.3.1,!=2.1
# Used for testing database persistence backends.
-SQLAlchemy>=0.9.7,<=0.9.99
+SQLAlchemy>=0.9.7,<1.1.0
alembic>=0.7.2
psycopg2
PyMySQL>=0.6.2 # MIT License