summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2015-06-03 16:52:16 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2015-06-03 17:07:03 -0700
commitabf21c728914ac2c2390efcce7dec4f19eb926a5 (patch)
tree39305f0269f0955568a8d3e1d417603a73acf625
parent1e6b991b23c5e9f14562491915f0fa09b3ce8533 (diff)
downloadtaskflow-abf21c728914ac2c2390efcce7dec4f19eb926a5.tar.gz
Refactor machine builder + runner into single unit
There is really no need to seperate off the runner into a unit that builds a state-machine and then provides a tiny utility function, both of these can just be in the same class and code so that it is easier to understand/read. Change-Id: I18b97514e230451ef804a878a0edcea4d0b2ad20
-rw-r--r--taskflow/engines/action_engine/runner.py94
-rw-r--r--taskflow/tests/unit/action_engine/test_runner.py20
-rwxr-xr-xtools/state_graph.py2
3 files changed, 50 insertions, 66 deletions
diff --git a/taskflow/engines/action_engine/runner.py b/taskflow/engines/action_engine/runner.py
index d50de15..8d637c1 100644
--- a/taskflow/engines/action_engine/runner.py
+++ b/taskflow/engines/action_engine/runner.py
@@ -51,38 +51,48 @@ class _MachineMemory(object):
self.done = set()
-class _MachineBuilder(object):
- """State machine *builder* that the runner uses.
-
- NOTE(harlowja): the machine states that this build will for are::
-
- +--------------+------------------+------------+----------+---------+
- Start | Event | End | On Enter | On Exit
- +--------------+------------------+------------+----------+---------+
- ANALYZING | completed | GAME_OVER | |
- ANALYZING | schedule_next | SCHEDULING | |
- ANALYZING | wait_finished | WAITING | |
- FAILURE[$] | | | |
- GAME_OVER | failed | FAILURE | |
- GAME_OVER | reverted | REVERTED | |
- GAME_OVER | success | SUCCESS | |
- GAME_OVER | suspended | SUSPENDED | |
- RESUMING | schedule_next | SCHEDULING | |
- REVERTED[$] | | | |
- SCHEDULING | wait_finished | WAITING | |
- SUCCESS[$] | | | |
- SUSPENDED[$] | | | |
- UNDEFINED[^] | start | RESUMING | |
- WAITING | examine_finished | ANALYZING | |
- +--------------+------------------+------------+----------+---------+
+class Runner(object):
+ """State machine *builder* + *runner* that powers the engine components.
+
+ NOTE(harlowja): the machine (states and events that will trigger
+ transitions) that this builds is represented by the following
+ table::
+
+ +--------------+------------------+------------+----------+---------+
+ Start | Event | End | On Enter | On Exit
+ +--------------+------------------+------------+----------+---------+
+ ANALYZING | completed | GAME_OVER | |
+ ANALYZING | schedule_next | SCHEDULING | |
+ ANALYZING | wait_finished | WAITING | |
+ FAILURE[$] | | | |
+ GAME_OVER | failed | FAILURE | |
+ GAME_OVER | reverted | REVERTED | |
+ GAME_OVER | success | SUCCESS | |
+ GAME_OVER | suspended | SUSPENDED | |
+ RESUMING | schedule_next | SCHEDULING | |
+ REVERTED[$] | | | |
+ SCHEDULING | wait_finished | WAITING | |
+ SUCCESS[$] | | | |
+ SUSPENDED[$] | | | |
+ UNDEFINED[^] | start | RESUMING | |
+ WAITING | examine_finished | ANALYZING | |
+ +--------------+------------------+------------+----------+---------+
Between any of these yielded states (minus ``GAME_OVER`` and ``UNDEFINED``)
if the engine has been suspended or the engine has failed (due to a
non-resolveable task failure or scheduling failure) the machine will stop
executing new tasks (currently running tasks will be allowed to complete)
and this machines run loop will be broken.
+
+ NOTE(harlowja): If the runtimes scheduler component is able to schedule
+ tasks in parallel, this enables parallel running and/or reversion.
"""
+ # Informational states this action yields while running, not useful to
+ # have the engine record but useful to provide to end-users when doing
+ # execution iterations.
+ ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING)
+
def __init__(self, runtime, waiter):
self._analyzer = runtime.analyzer
self._completer = runtime.completer
@@ -91,9 +101,12 @@ class _MachineBuilder(object):
self._waiter = waiter
def runnable(self):
+ """Checks if the storage says the flow is still runnable/running."""
return self._storage.get_flow_state() == st.RUNNING
def build(self, timeout=None):
+ """Builds a state-machine (that can be/is used during running)."""
+
memory = _MachineMemory()
if timeout is None:
timeout = _WAITING_TIMEOUT
@@ -244,38 +257,9 @@ class _MachineBuilder(object):
m.freeze()
return (m, memory)
-
-class Runner(object):
- """Runner that iterates while executing nodes using the given runtime.
-
- This runner acts as the action engine run loop/state-machine, it resumes
- the workflow, schedules all task it can for execution using the runtimes
- scheduler and analyzer components, and than waits on returned futures and
- then activates the runtimes completion component to finish up those tasks
- and so on...
-
- NOTE(harlowja): If the runtimes scheduler component is able to schedule
- tasks in parallel, this enables parallel running and/or reversion.
- """
-
- # Informational states this action yields while running, not useful to
- # have the engine record but useful to provide to end-users when doing
- # execution iterations.
- ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING)
-
- def __init__(self, runtime, waiter):
- self._builder = _MachineBuilder(runtime, waiter)
-
- @property
- def builder(self):
- return self._builder
-
- def runnable(self):
- return self._builder.runnable()
-
def run_iter(self, timeout=None):
- """Runs the nodes using a built state machine."""
- machine, memory = self.builder.build(timeout=timeout)
+ """Runs iteratively using a locally built state machine."""
+ machine, memory = self.build(timeout=timeout)
for (_prior_state, new_state) in machine.run_iter(_START):
# NOTE(harlowja): skip over meta-states.
if new_state not in _META_STATES:
diff --git a/taskflow/tests/unit/action_engine/test_runner.py b/taskflow/tests/unit/action_engine/test_runner.py
index 98ae0e2..cbc1e2e 100644
--- a/taskflow/tests/unit/action_engine/test_runner.py
+++ b/taskflow/tests/unit/action_engine/test_runner.py
@@ -174,7 +174,7 @@ class RunnerTest(test.TestCase, _RunnerTestMixin):
rt.storage.get_atom_state(sad_tasks[0].name))
-class RunnerBuilderTest(test.TestCase, _RunnerTestMixin):
+class RunnerBuildTest(test.TestCase, _RunnerTestMixin):
def test_builder_manual_process(self):
flow = lf.Flow("root")
tasks = test_utils.make_many(
@@ -182,8 +182,8 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin):
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
- machine, memory = rt.runner.builder.build()
- self.assertTrue(rt.runner.builder.runnable())
+ machine, memory = rt.runner.build()
+ self.assertTrue(rt.runner.runnable())
self.assertRaises(fsm.NotInitialized, machine.process_event, 'poke')
# Should now be pending...
@@ -251,8 +251,8 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin):
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
- machine, memory = rt.runner.builder.build()
- self.assertTrue(rt.runner.builder.runnable())
+ machine, memory = rt.runner.build()
+ self.assertTrue(rt.runner.runnable())
transitions = list(machine.run_iter('start'))
self.assertEqual((runner._UNDEFINED, st.RESUMING), transitions[0])
@@ -265,8 +265,8 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin):
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
- machine, memory = rt.runner.builder.build()
- self.assertTrue(rt.runner.builder.runnable())
+ machine, memory = rt.runner.build()
+ self.assertTrue(rt.runner.runnable())
transitions = list(machine.run_iter('start'))
self.assertEqual((runner._GAME_OVER, st.FAILURE), transitions[-1])
@@ -278,8 +278,8 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin):
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
- machine, memory = rt.runner.builder.build()
- self.assertTrue(rt.runner.builder.runnable())
+ machine, memory = rt.runner.build()
+ self.assertTrue(rt.runner.runnable())
transitions = list(machine.run_iter('start'))
self.assertEqual((runner._GAME_OVER, st.REVERTED), transitions[-1])
@@ -292,7 +292,7 @@ class RunnerBuilderTest(test.TestCase, _RunnerTestMixin):
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
- machine, memory = rt.runner.builder.build()
+ machine, memory = rt.runner.build()
transitions = list(machine.run_iter('start'))
occurrences = dict((t, transitions.count(t)) for t in transitions)
diff --git a/tools/state_graph.py b/tools/state_graph.py
index 3196140..38bd6d9 100755
--- a/tools/state_graph.py
+++ b/tools/state_graph.py
@@ -134,7 +134,7 @@ def main():
elif options.engines:
source_type = "Engines"
r = runner.Runner(DummyRuntime(), None)
- source, memory = r.builder.build()
+ source, memory = r.build()
internal_states.extend(runner._META_STATES)
ordering = 'out'
elif options.wbe_requests: