diff options
author | Joshua Harlow <harlowja@yahoo-inc.com> | 2015-02-05 16:31:33 -0800 |
---|---|---|
committer | Joshua Harlow <harlowja@gmail.com> | 2015-02-05 23:19:08 -0800 |
commit | 08a184639635c57735c5ce0ca60aa727777c9e69 (patch) | |
tree | 41b396b7d68c31dd5d428b5176099d1159853b55 | |
parent | 2cd90745ea173825e23a94ab31b0c3b2b5a302e5 (diff) | |
download | taskflow-08a184639635c57735c5ce0ca60aa727777c9e69.tar.gz |
Default to using a thread-safe storage unit
Instead of having a nominally useful single-threaded storage
unit that uses a dummy r/w lock and a multi-threaded storage
unit just have the storage unit by default protect itself from
multi-threading calls being used on it (with the appropriate
reader/writer locks being activated to make this work
correctly).
Change-Id: Ib6879edb465156a8e54fd5b4002550d1cec49137
-rw-r--r-- | doc/source/engines.rst | 9 | ||||
-rw-r--r-- | doc/source/persistence.rst | 5 | ||||
-rw-r--r-- | taskflow/engines/action_engine/engine.py | 4 | ||||
-rw-r--r-- | taskflow/engines/base.py | 7 | ||||
-rw-r--r-- | taskflow/engines/worker_based/engine.py | 3 | ||||
-rw-r--r-- | taskflow/storage.py | 32 | ||||
-rw-r--r-- | taskflow/tests/unit/action_engine/test_runner.py | 2 | ||||
-rw-r--r-- | taskflow/tests/unit/test_storage.py | 13 | ||||
-rw-r--r-- | taskflow/utils/lock_utils.py | 30 |
9 files changed, 26 insertions, 79 deletions
diff --git a/doc/source/engines.rst b/doc/source/engines.rst index c9b0c5d..9e931ad 100644 --- a/doc/source/engines.rst +++ b/doc/source/engines.rst @@ -242,9 +242,9 @@ This stage starts by setting up the storage needed for all atoms in the previously created graph, ensuring that corresponding :py:class:`~taskflow.persistence.logbook.AtomDetail` (or subclass of) objects are created for each node in the graph. Once this is done final validation -occurs on the requirements that are needed to start execution and what storage -provides. If there is any atom or flow requirements not satisfied then -execution will not be allowed to continue. +occurs on the requirements that are needed to start execution and what +:py:class:`~taskflow.storage.Storage` provides. If there is any atom or flow +requirements not satisfied then execution will not be allowed to continue. Execution --------- @@ -311,7 +311,8 @@ atoms result will be examined and finalized using a :py:class:`~taskflow.engines.action_engine.completer.Completer` implementation. It typically will persist results to a provided persistence backend (saved into the corresponding :py:class:`~taskflow.persistence.logbook.AtomDetail` -and :py:class:`~taskflow.persistence.logbook.FlowDetail` objects) and reflect +and :py:class:`~taskflow.persistence.logbook.FlowDetail` objects via the +:py:class:`~taskflow.storage.Storage` helper) and reflect the new state of the atom. At this point what typically happens falls into two categories, one for if that atom failed and one for if it did not. If the atom failed it may be set to a new intention such as ``RETRY`` or diff --git a/doc/source/persistence.rst b/doc/source/persistence.rst index 9cb9989..311865f 100644 --- a/doc/source/persistence.rst +++ b/doc/source/persistence.rst @@ -279,6 +279,11 @@ Implementations .. automodule:: taskflow.persistence.backends.impl_sqlalchemy .. automodule:: taskflow.persistence.backends.impl_zookeeper +Storage +======= + +.. automodule:: taskflow.storage + Hierarchy ========= diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 51df569..b40c5fd 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -28,7 +28,6 @@ from taskflow.engines.action_engine import runtime from taskflow.engines import base from taskflow import exceptions as exc from taskflow import states -from taskflow import storage as atom_storage from taskflow.types import failure from taskflow.utils import lock_utils from taskflow.utils import misc @@ -218,7 +217,6 @@ class ActionEngine(base.Engine): class SerialActionEngine(ActionEngine): """Engine that runs tasks in serial manner.""" - _storage_factory = atom_storage.SingleThreadedStorage def __init__(self, flow, flow_detail, backend, options): super(SerialActionEngine, self).__init__(flow, flow_detail, @@ -276,8 +274,6 @@ String (case insensitive) Executor used .. |cf| replace:: concurrent.futures """ - _storage_factory = atom_storage.MultiThreadedStorage - # One of these types should match when a object (non-string) is provided # for the 'executor' option. # diff --git a/taskflow/engines/base.py b/taskflow/engines/base.py index a97cf3b..7ea8e70 100644 --- a/taskflow/engines/base.py +++ b/taskflow/engines/base.py @@ -19,6 +19,7 @@ import abc import six +from taskflow import storage from taskflow.types import notifier from taskflow.utils import deprecation from taskflow.utils import misc @@ -74,11 +75,7 @@ class Engine(object): @misc.cachedproperty def storage(self): """The storage unit for this flow.""" - return self._storage_factory(self._flow_detail, self._backend) - - @abc.abstractproperty - def _storage_factory(self): - """Storage factory that will be used to generate storage objects.""" + return storage.Storage(self._flow_detail, backend=self._backend) @abc.abstractmethod def compile(self): diff --git a/taskflow/engines/worker_based/engine.py b/taskflow/engines/worker_based/engine.py index aee39e8..31dcade 100644 --- a/taskflow/engines/worker_based/engine.py +++ b/taskflow/engines/worker_based/engine.py @@ -17,7 +17,6 @@ from taskflow.engines.action_engine import engine from taskflow.engines.worker_based import executor from taskflow.engines.worker_based import protocol as pr -from taskflow import storage as t_storage class WorkerBasedActionEngine(engine.ActionEngine): @@ -46,8 +45,6 @@ class WorkerBasedActionEngine(engine.ActionEngine): (see: :py:attr:`~.proxy.Proxy.DEFAULT_RETRY_OPTIONS`) """ - _storage_factory = t_storage.SingleThreadedStorage - def __init__(self, flow, flow_detail, backend, options): super(WorkerBasedActionEngine, self).__init__(flow, flow_detail, backend, options) diff --git a/taskflow/storage.py b/taskflow/storage.py index df80148..8734b2a 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -14,7 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import abc import contextlib from oslo_utils import reflection @@ -107,9 +106,8 @@ def _item_from_first_of(providers, looking_for): " extraction" % (looking_for, providers)) -@six.add_metaclass(abc.ABCMeta) class Storage(object): - """Interface between engines and logbook. + """Interface between engines and logbook and its backend (if any). This class provides a simple interface to save atoms of a given flow and associated activity and results to persistence layer (logbook, @@ -119,15 +117,21 @@ class Storage(object): """ injector_name = '_TaskFlow_INJECTOR' + """Injector task detail name. + + This task detail is a **special** detail that will be automatically + created and saved to store **persistent** injected values (name conflicts + with it must be avoided) that are *global* to the flow being executed. + """ def __init__(self, flow_detail, backend=None): self._result_mappings = {} self._reverse_mapping = {} self._backend = backend self._flowdetail = flow_detail - self._lock = self._lock_cls() self._transients = {} self._injected_args = {} + self._lock = lock_utils.ReaderWriterLock() # NOTE(imelnikov): failure serialization looses information, # so we cache failures here, in atom name -> failure mapping. @@ -150,16 +154,6 @@ class Storage(object): self._set_result_mapping(injector_td.name, dict((name, name) for name in names)) - @abc.abstractproperty - def _lock_cls(self): - """Lock class used to generate reader/writer locks. - - These locks are used for protecting read/write access to the - underlying storage backend when internally mutating operations occur. - They ensure that we read and write data in a consistent manner when - being used in a multithreaded situation. - """ - def _with_connection(self, functor, *args, **kwargs): # NOTE(harlowja): Activate the given function with a backend # connection, if a backend is provided in the first place, otherwise @@ -771,13 +765,3 @@ class Storage(object): histories.append((ad.name, self._translate_into_history(ad))) return histories - - -class MultiThreadedStorage(Storage): - """Storage that uses locks to protect against concurrent access.""" - _lock_cls = lock_utils.ReaderWriterLock - - -class SingleThreadedStorage(Storage): - """Storage that uses dummy locks when you really don't need locks.""" - _lock_cls = lock_utils.DummyReaderWriterLock diff --git a/taskflow/tests/unit/action_engine/test_runner.py b/taskflow/tests/unit/action_engine/test_runner.py index 9b3bdb4..98ae0e2 100644 --- a/taskflow/tests/unit/action_engine/test_runner.py +++ b/taskflow/tests/unit/action_engine/test_runner.py @@ -35,7 +35,7 @@ class _RunnerTestMixin(object): def _make_runtime(self, flow, initial_state=None): compilation = compiler.PatternCompiler(flow).compile() flow_detail = pu.create_flow_detail(flow) - store = storage.SingleThreadedStorage(flow_detail) + store = storage.Storage(flow_detail) # This ensures the tasks exist in storage... for task in compilation.execution_graph: store.ensure_atom(task) diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index 5f521af..af6afb2 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -49,17 +49,14 @@ class StorageTestMixin(object): for t in threads: t.join() - def _get_storage(self, flow_detail=None, threaded=False): + def _get_storage(self, flow_detail=None): if flow_detail is None: _lb, flow_detail = p_utils.temporary_flow_detail(self.backend) - storage_cls = storage.SingleThreadedStorage - if threaded: - storage_cls = storage.MultiThreadedStorage - return storage_cls(flow_detail=flow_detail, backend=self.backend) + return storage.Storage(flow_detail=flow_detail, backend=self.backend) def test_non_saving_storage(self): _lb, flow_detail = p_utils.temporary_flow_detail(self.backend) - s = storage.SingleThreadedStorage(flow_detail=flow_detail) + s = storage.Storage(flow_detail=flow_detail) s.ensure_atom(test_utils.NoopTask('my_task')) self.assertTrue(uuidutils.is_uuid_like(s.get_atom_uuid('my_task'))) @@ -311,7 +308,7 @@ class StorageTestMixin(object): }) def test_many_thread_ensure_same_task(self): - s = self._get_storage(threaded=True) + s = self._get_storage() def ensure_my_task(): s.ensure_atom(test_utils.NoopTask('my_task')) @@ -325,7 +322,7 @@ class StorageTestMixin(object): self.assertEqual(1, len(s._flowdetail)) def test_many_thread_inject(self): - s = self._get_storage(threaded=True) + s = self._get_storage() def inject_values(values): s.inject(values) diff --git a/taskflow/utils/lock_utils.py b/taskflow/utils/lock_utils.py index b74931e..7671a42 100644 --- a/taskflow/utils/lock_utils.py +++ b/taskflow/utils/lock_utils.py @@ -241,36 +241,6 @@ class ReaderWriterLock(object): self._cond.release() -class DummyReaderWriterLock(object): - """A dummy reader/writer lock. - - This dummy lock doesn't lock anything but provides the same functions as a - normal reader/writer lock class and can be useful in unit tests or other - similar scenarios (do *not* use it if locking is actually required). - """ - @contextlib.contextmanager - def write_lock(self): - yield self - - @contextlib.contextmanager - def read_lock(self): - yield self - - @property - def owner(self): - return None - - def is_reader(self): - return False - - def is_writer(self, check_pending=True): - return False - - @property - def has_pending_writers(self): - return False - - class MultiLock(object): """A class which attempts to obtain & release many locks at once. |