summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2015-02-05 16:31:33 -0800
committerJoshua Harlow <harlowja@gmail.com>2015-02-05 23:19:08 -0800
commit08a184639635c57735c5ce0ca60aa727777c9e69 (patch)
tree41b396b7d68c31dd5d428b5176099d1159853b55
parent2cd90745ea173825e23a94ab31b0c3b2b5a302e5 (diff)
downloadtaskflow-08a184639635c57735c5ce0ca60aa727777c9e69.tar.gz
Default to using a thread-safe storage unit
Instead of having a nominally useful single-threaded storage unit that uses a dummy r/w lock and a multi-threaded storage unit just have the storage unit by default protect itself from multi-threading calls being used on it (with the appropriate reader/writer locks being activated to make this work correctly). Change-Id: Ib6879edb465156a8e54fd5b4002550d1cec49137
-rw-r--r--doc/source/engines.rst9
-rw-r--r--doc/source/persistence.rst5
-rw-r--r--taskflow/engines/action_engine/engine.py4
-rw-r--r--taskflow/engines/base.py7
-rw-r--r--taskflow/engines/worker_based/engine.py3
-rw-r--r--taskflow/storage.py32
-rw-r--r--taskflow/tests/unit/action_engine/test_runner.py2
-rw-r--r--taskflow/tests/unit/test_storage.py13
-rw-r--r--taskflow/utils/lock_utils.py30
9 files changed, 26 insertions, 79 deletions
diff --git a/doc/source/engines.rst b/doc/source/engines.rst
index c9b0c5d..9e931ad 100644
--- a/doc/source/engines.rst
+++ b/doc/source/engines.rst
@@ -242,9 +242,9 @@ This stage starts by setting up the storage needed for all atoms in the
previously created graph, ensuring that corresponding
:py:class:`~taskflow.persistence.logbook.AtomDetail` (or subclass of) objects
are created for each node in the graph. Once this is done final validation
-occurs on the requirements that are needed to start execution and what storage
-provides. If there is any atom or flow requirements not satisfied then
-execution will not be allowed to continue.
+occurs on the requirements that are needed to start execution and what
+:py:class:`~taskflow.storage.Storage` provides. If there is any atom or flow
+requirements not satisfied then execution will not be allowed to continue.
Execution
---------
@@ -311,7 +311,8 @@ atoms result will be examined and finalized using a
:py:class:`~taskflow.engines.action_engine.completer.Completer` implementation.
It typically will persist results to a provided persistence backend (saved
into the corresponding :py:class:`~taskflow.persistence.logbook.AtomDetail`
-and :py:class:`~taskflow.persistence.logbook.FlowDetail` objects) and reflect
+and :py:class:`~taskflow.persistence.logbook.FlowDetail` objects via the
+:py:class:`~taskflow.storage.Storage` helper) and reflect
the new state of the atom. At this point what typically happens falls into two
categories, one for if that atom failed and one for if it did not. If the atom
failed it may be set to a new intention such as ``RETRY`` or
diff --git a/doc/source/persistence.rst b/doc/source/persistence.rst
index 9cb9989..311865f 100644
--- a/doc/source/persistence.rst
+++ b/doc/source/persistence.rst
@@ -279,6 +279,11 @@ Implementations
.. automodule:: taskflow.persistence.backends.impl_sqlalchemy
.. automodule:: taskflow.persistence.backends.impl_zookeeper
+Storage
+=======
+
+.. automodule:: taskflow.storage
+
Hierarchy
=========
diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py
index 51df569..b40c5fd 100644
--- a/taskflow/engines/action_engine/engine.py
+++ b/taskflow/engines/action_engine/engine.py
@@ -28,7 +28,6 @@ from taskflow.engines.action_engine import runtime
from taskflow.engines import base
from taskflow import exceptions as exc
from taskflow import states
-from taskflow import storage as atom_storage
from taskflow.types import failure
from taskflow.utils import lock_utils
from taskflow.utils import misc
@@ -218,7 +217,6 @@ class ActionEngine(base.Engine):
class SerialActionEngine(ActionEngine):
"""Engine that runs tasks in serial manner."""
- _storage_factory = atom_storage.SingleThreadedStorage
def __init__(self, flow, flow_detail, backend, options):
super(SerialActionEngine, self).__init__(flow, flow_detail,
@@ -276,8 +274,6 @@ String (case insensitive) Executor used
.. |cf| replace:: concurrent.futures
"""
- _storage_factory = atom_storage.MultiThreadedStorage
-
# One of these types should match when a object (non-string) is provided
# for the 'executor' option.
#
diff --git a/taskflow/engines/base.py b/taskflow/engines/base.py
index a97cf3b..7ea8e70 100644
--- a/taskflow/engines/base.py
+++ b/taskflow/engines/base.py
@@ -19,6 +19,7 @@ import abc
import six
+from taskflow import storage
from taskflow.types import notifier
from taskflow.utils import deprecation
from taskflow.utils import misc
@@ -74,11 +75,7 @@ class Engine(object):
@misc.cachedproperty
def storage(self):
"""The storage unit for this flow."""
- return self._storage_factory(self._flow_detail, self._backend)
-
- @abc.abstractproperty
- def _storage_factory(self):
- """Storage factory that will be used to generate storage objects."""
+ return storage.Storage(self._flow_detail, backend=self._backend)
@abc.abstractmethod
def compile(self):
diff --git a/taskflow/engines/worker_based/engine.py b/taskflow/engines/worker_based/engine.py
index aee39e8..31dcade 100644
--- a/taskflow/engines/worker_based/engine.py
+++ b/taskflow/engines/worker_based/engine.py
@@ -17,7 +17,6 @@
from taskflow.engines.action_engine import engine
from taskflow.engines.worker_based import executor
from taskflow.engines.worker_based import protocol as pr
-from taskflow import storage as t_storage
class WorkerBasedActionEngine(engine.ActionEngine):
@@ -46,8 +45,6 @@ class WorkerBasedActionEngine(engine.ActionEngine):
(see: :py:attr:`~.proxy.Proxy.DEFAULT_RETRY_OPTIONS`)
"""
- _storage_factory = t_storage.SingleThreadedStorage
-
def __init__(self, flow, flow_detail, backend, options):
super(WorkerBasedActionEngine, self).__init__(flow, flow_detail,
backend, options)
diff --git a/taskflow/storage.py b/taskflow/storage.py
index df80148..8734b2a 100644
--- a/taskflow/storage.py
+++ b/taskflow/storage.py
@@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import abc
import contextlib
from oslo_utils import reflection
@@ -107,9 +106,8 @@ def _item_from_first_of(providers, looking_for):
" extraction" % (looking_for, providers))
-@six.add_metaclass(abc.ABCMeta)
class Storage(object):
- """Interface between engines and logbook.
+ """Interface between engines and logbook and its backend (if any).
This class provides a simple interface to save atoms of a given flow and
associated activity and results to persistence layer (logbook,
@@ -119,15 +117,21 @@ class Storage(object):
"""
injector_name = '_TaskFlow_INJECTOR'
+ """Injector task detail name.
+
+ This task detail is a **special** detail that will be automatically
+ created and saved to store **persistent** injected values (name conflicts
+ with it must be avoided) that are *global* to the flow being executed.
+ """
def __init__(self, flow_detail, backend=None):
self._result_mappings = {}
self._reverse_mapping = {}
self._backend = backend
self._flowdetail = flow_detail
- self._lock = self._lock_cls()
self._transients = {}
self._injected_args = {}
+ self._lock = lock_utils.ReaderWriterLock()
# NOTE(imelnikov): failure serialization looses information,
# so we cache failures here, in atom name -> failure mapping.
@@ -150,16 +154,6 @@ class Storage(object):
self._set_result_mapping(injector_td.name,
dict((name, name) for name in names))
- @abc.abstractproperty
- def _lock_cls(self):
- """Lock class used to generate reader/writer locks.
-
- These locks are used for protecting read/write access to the
- underlying storage backend when internally mutating operations occur.
- They ensure that we read and write data in a consistent manner when
- being used in a multithreaded situation.
- """
-
def _with_connection(self, functor, *args, **kwargs):
# NOTE(harlowja): Activate the given function with a backend
# connection, if a backend is provided in the first place, otherwise
@@ -771,13 +765,3 @@ class Storage(object):
histories.append((ad.name,
self._translate_into_history(ad)))
return histories
-
-
-class MultiThreadedStorage(Storage):
- """Storage that uses locks to protect against concurrent access."""
- _lock_cls = lock_utils.ReaderWriterLock
-
-
-class SingleThreadedStorage(Storage):
- """Storage that uses dummy locks when you really don't need locks."""
- _lock_cls = lock_utils.DummyReaderWriterLock
diff --git a/taskflow/tests/unit/action_engine/test_runner.py b/taskflow/tests/unit/action_engine/test_runner.py
index 9b3bdb4..98ae0e2 100644
--- a/taskflow/tests/unit/action_engine/test_runner.py
+++ b/taskflow/tests/unit/action_engine/test_runner.py
@@ -35,7 +35,7 @@ class _RunnerTestMixin(object):
def _make_runtime(self, flow, initial_state=None):
compilation = compiler.PatternCompiler(flow).compile()
flow_detail = pu.create_flow_detail(flow)
- store = storage.SingleThreadedStorage(flow_detail)
+ store = storage.Storage(flow_detail)
# This ensures the tasks exist in storage...
for task in compilation.execution_graph:
store.ensure_atom(task)
diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py
index 5f521af..af6afb2 100644
--- a/taskflow/tests/unit/test_storage.py
+++ b/taskflow/tests/unit/test_storage.py
@@ -49,17 +49,14 @@ class StorageTestMixin(object):
for t in threads:
t.join()
- def _get_storage(self, flow_detail=None, threaded=False):
+ def _get_storage(self, flow_detail=None):
if flow_detail is None:
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
- storage_cls = storage.SingleThreadedStorage
- if threaded:
- storage_cls = storage.MultiThreadedStorage
- return storage_cls(flow_detail=flow_detail, backend=self.backend)
+ return storage.Storage(flow_detail=flow_detail, backend=self.backend)
def test_non_saving_storage(self):
_lb, flow_detail = p_utils.temporary_flow_detail(self.backend)
- s = storage.SingleThreadedStorage(flow_detail=flow_detail)
+ s = storage.Storage(flow_detail=flow_detail)
s.ensure_atom(test_utils.NoopTask('my_task'))
self.assertTrue(uuidutils.is_uuid_like(s.get_atom_uuid('my_task')))
@@ -311,7 +308,7 @@ class StorageTestMixin(object):
})
def test_many_thread_ensure_same_task(self):
- s = self._get_storage(threaded=True)
+ s = self._get_storage()
def ensure_my_task():
s.ensure_atom(test_utils.NoopTask('my_task'))
@@ -325,7 +322,7 @@ class StorageTestMixin(object):
self.assertEqual(1, len(s._flowdetail))
def test_many_thread_inject(self):
- s = self._get_storage(threaded=True)
+ s = self._get_storage()
def inject_values(values):
s.inject(values)
diff --git a/taskflow/utils/lock_utils.py b/taskflow/utils/lock_utils.py
index b74931e..7671a42 100644
--- a/taskflow/utils/lock_utils.py
+++ b/taskflow/utils/lock_utils.py
@@ -241,36 +241,6 @@ class ReaderWriterLock(object):
self._cond.release()
-class DummyReaderWriterLock(object):
- """A dummy reader/writer lock.
-
- This dummy lock doesn't lock anything but provides the same functions as a
- normal reader/writer lock class and can be useful in unit tests or other
- similar scenarios (do *not* use it if locking is actually required).
- """
- @contextlib.contextmanager
- def write_lock(self):
- yield self
-
- @contextlib.contextmanager
- def read_lock(self):
- yield self
-
- @property
- def owner(self):
- return None
-
- def is_reader(self):
- return False
-
- def is_writer(self, check_pending=True):
- return False
-
- @property
- def has_pending_writers(self):
- return False
-
-
class MultiLock(object):
"""A class which attempts to obtain & release many locks at once.