summaryrefslogtreecommitdiff
path: root/taskflow/examples
diff options
context:
space:
mode:
authorIvan A. Melnikov <imelnikov@griddynamics.com>2013-09-13 12:22:27 +0400
committerIvan A. Melnikov <imelnikov@griddynamics.com>2013-10-03 10:08:55 +0400
commitcde0dee14e0e93018f5d9a5a32a98623ec29bbc4 (patch)
treec71cabcd182ac62aadc2e2b8139bb3ce815ed44d /taskflow/examples
parent568c79494c9652f337f32182fcb1214165e0d3f8 (diff)
downloadtaskflow-cde0dee14e0e93018f5d9a5a32a98623ec29bbc4.tar.gz
Simpler API to load flows into engines
Previously to run a flow client code had to put together the flow, an engine, logbook, flowdetail, and storage backend. This commit adds two helper functions, run() and load(), so that simplest usecase now looks like taskflow.engines.run(flow) Client code may also provide configuration for storage and engine if needed, but if not needed it just works with defaults. Engines are loaded via stevedore, as drivers in 'taskflow.engines' backend. Now three entry points are defined in that namespace: - 'default', for SingleThreadedActionEngine, used by default; - 'serial', as another synonym for SingleThreadedActionEngine; - 'parallel', for MultiThreadedActionEngine. Closes-bug: #1224726 Change-Id: I7f4cb5c8ff7f5f12831ddd0952c202d2fd8cd6ef
Diffstat (limited to 'taskflow/examples')
-rw-r--r--taskflow/examples/calculate_in_parallel.py21
-rw-r--r--taskflow/examples/calculate_linear.py15
-rw-r--r--taskflow/examples/complex_graph.py46
-rw-r--r--taskflow/examples/fake_boot_vm.py28
-rw-r--r--taskflow/examples/graph_flow.py35
-rw-r--r--taskflow/examples/reverting_linear.py21
-rw-r--r--taskflow/examples/simple_linear.py20
-rw-r--r--taskflow/examples/simple_linear_listening.py24
8 files changed, 93 insertions, 117 deletions
diff --git a/taskflow/examples/calculate_in_parallel.py b/taskflow/examples/calculate_in_parallel.py
index 7513e34..7b8f8a3 100644
--- a/taskflow/examples/calculate_in_parallel.py
+++ b/taskflow/examples/calculate_in_parallel.py
@@ -4,11 +4,13 @@ import sys
logging.basicConfig(level=logging.ERROR)
-my_dir_path = os.path.dirname(os.path.abspath(__file__))
-sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir),
- os.pardir))
+top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ os.pardir,
+ os.pardir))
+sys.path.insert(0, top_dir)
+
+import taskflow.engines
-from taskflow.engines.action_engine import engine as eng
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf
from taskflow import task
@@ -20,7 +22,6 @@ from taskflow import task
class Provider(task.Task):
-
def __init__(self, name, *args, **kwargs):
super(Provider, self).__init__(name=name, **kwargs)
self._provide = args
@@ -30,11 +31,6 @@ class Provider(task.Task):
class Adder(task.Task):
-
- def __init__(self, name, provides, rebind):
- super(Adder, self).__init__(name=name, provides=provides,
- rebind=rebind)
-
def execute(self, x, y):
return x + y
@@ -52,7 +48,6 @@ flow = lf.Flow('root').add(
# r = z1+z2 = 18
Adder(name="sum-1", provides='r', rebind=['z1', 'z2']))
-engine = eng.MultiThreadedActionEngine(flow)
-engine.run()
-print engine.storage.fetch_all()
+result = taskflow.engines.run(flow, engine_conf='parallel')
+print result
diff --git a/taskflow/examples/calculate_linear.py b/taskflow/examples/calculate_linear.py
index ca946f6..d5cb24c 100644
--- a/taskflow/examples/calculate_linear.py
+++ b/taskflow/examples/calculate_linear.py
@@ -4,11 +4,12 @@ import sys
logging.basicConfig(level=logging.ERROR)
-my_dir_path = os.path.dirname(os.path.abspath(__file__))
-sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir),
- os.pardir))
+top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ os.pardir,
+ os.pardir))
+sys.path.insert(0, top_dir)
-from taskflow.engines.action_engine import engine as eng
+import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task
@@ -62,7 +63,5 @@ flow = lf.Flow('root').add(
Multiplier("multi", 3, provides='r', rebind={'z': 'a'})
)
-engine = eng.SingleThreadedActionEngine(flow)
-engine.run()
-
-print engine.storage.fetch_all()
+results = taskflow.engines.run(flow)
+print results
diff --git a/taskflow/examples/complex_graph.py b/taskflow/examples/complex_graph.py
index fb31ab2..9dc9126 100644
--- a/taskflow/examples/complex_graph.py
+++ b/taskflow/examples/complex_graph.py
@@ -7,11 +7,13 @@ import sys
logging.basicConfig(level=logging.ERROR)
-my_dir_path = os.path.dirname(os.path.abspath(__file__))
-sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir),
- os.pardir))
+top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ os.pardir,
+ os.pardir))
+sys.path.insert(0, top_dir)
-from taskflow.engines.action_engine import engine as eng
+
+import taskflow.engines
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow import task
@@ -54,7 +56,6 @@ def trash(**kwargs):
def startup(**kwargs):
- pass
# TODO(harlowja): try triggering reversion here!
# raise ValueError("Car not verified")
return True
@@ -95,11 +96,7 @@ flow = lf.Flow("make-auto").add(
'windows_installed',
'wheels_installed']))
-engine = eng.SingleThreadedActionEngine(flow)
-engine.notifier.register('*', flow_watch)
-engine.task_notifier.register('*', task_watch)
-
-engine.storage.inject({'spec': {
+spec = {
"frame": 'steel',
"engine": 'honda',
"doors": '2',
@@ -108,28 +105,25 @@ engine.storage.inject({'spec': {
"doors_installed": True,
"windows_installed": True,
"wheels_installed": True,
-}})
+}
-print "Build a car"
-engine.run()
-engine = eng.SingleThreadedActionEngine(flow)
+engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
engine.notifier.register('*', flow_watch)
engine.task_notifier.register('*', task_watch)
-engine.storage.inject({'spec': {
- "frame": 'steel',
- "engine": 'honda',
- "doors": '5',
- "wheels": '4',
- "engine_installed": True,
- "doors_installed": True,
- "windows_installed": True,
- "wheels_installed": True,
-}})
+print("Build a car")
+engine.run()
+
+
+spec['doors'] = 5
+
+engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
+engine.notifier.register('*', flow_watch)
+engine.task_notifier.register('*', task_watch)
try:
- print "Build a wrong car that doesn't match specification"
+ print("Build a wrong car that doesn't match specification")
engine.run()
except Exception as e:
- print e
+ print("Flow failed: %s" % e)
diff --git a/taskflow/examples/fake_boot_vm.py b/taskflow/examples/fake_boot_vm.py
index e14e63a..eba7334 100644
--- a/taskflow/examples/fake_boot_vm.py
+++ b/taskflow/examples/fake_boot_vm.py
@@ -8,11 +8,13 @@ import uuid
logging.basicConfig(level=logging.ERROR)
-my_dir_path = os.path.dirname(os.path.abspath(__file__))
-sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir),
- os.pardir))
+top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ os.pardir,
+ os.pardir))
+sys.path.insert(0, top_dir)
-from taskflow.engines.action_engine import engine as eng
+
+import taskflow.engines
from taskflow.patterns import graph_flow as gf
from taskflow import task
@@ -141,14 +143,6 @@ flow = gf.Flow("Boot-Fake-Vm").add(
ScheduleVM(),
BootVM())
-engine = eng.SingleThreadedActionEngine(flow)
-
-# Get notified of the state changes the flow is going through.
-engine.notifier.register('*', flow_notify)
-
-# Get notified of the state changes the flows tasks/runners are going through.
-engine.task_notifier.register('*', task_notify)
-
# Simulates what nova/glance/keystone... calls a context
context = {
'user_id': 'xyz',
@@ -157,7 +151,15 @@ context = {
}
context = Context(**context)
-engine.storage.inject({'context': context})
+# Load the flow
+engine = taskflow.engines.load(flow, store={'context': context})
+
+# Get notified of the state changes the flow is going through.
+engine.notifier.register('*', flow_notify)
+
+# Get notified of the state changes the flows tasks/runners are going through.
+engine.task_notifier.register('*', task_notify)
+
print '-' * 7
print 'Running'
diff --git a/taskflow/examples/graph_flow.py b/taskflow/examples/graph_flow.py
index 5c2b68f..56041b2 100644
--- a/taskflow/examples/graph_flow.py
+++ b/taskflow/examples/graph_flow.py
@@ -4,11 +4,12 @@ import sys
logging.basicConfig(level=logging.ERROR)
-my_dir_path = os.path.dirname(os.path.abspath(__file__))
-sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir),
- os.pardir))
+top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ os.pardir,
+ os.pardir))
+sys.path.insert(0, top_dir)
-from taskflow.engines.action_engine import engine as eng
+import taskflow.engines
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow import task
@@ -46,30 +47,20 @@ flow = gf.Flow('root').add(
# x7 = x6+x6 = 82
Adder("add7", provides='x7', rebind=['x6', 'x6']))
-single_threaded_engine = eng.SingleThreadedActionEngine(flow)
-single_threaded_engine.storage.inject({
+store = {
"y1": 1,
"y2": 3,
"y3": 5,
"y4": 7,
"y5": 9,
-})
+}
-single_threaded_engine.run()
+result = taskflow.engines.run(
+ flow, engine_conf='serial', store=store)
-print ("Single threaded engine result %s" %
- single_threaded_engine.storage.fetch_all())
+print("Single threaded engine result %s" % result)
-multi_threaded_engine = eng.MultiThreadedActionEngine(flow)
-multi_threaded_engine.storage.inject({
- "y1": 1,
- "y2": 3,
- "y3": 5,
- "y4": 7,
- "y5": 9,
-})
-
-multi_threaded_engine.run()
+result = taskflow.engines.run(
+ flow, engine_conf='parallel', store=store)
-print ("Multi threaded engine result %s" %
- multi_threaded_engine.storage.fetch_all())
+print("Multi threaded engine result %s" % result)
diff --git a/taskflow/examples/reverting_linear.py b/taskflow/examples/reverting_linear.py
index b16209a..b1f45a2 100644
--- a/taskflow/examples/reverting_linear.py
+++ b/taskflow/examples/reverting_linear.py
@@ -4,11 +4,13 @@ import sys
logging.basicConfig(level=logging.ERROR)
-my_dir_path = os.path.dirname(os.path.abspath(__file__))
-sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir),
- os.pardir))
+top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ os.pardir,
+ os.pardir))
+sys.path.insert(0, top_dir)
+
+import taskflow.engines
-from taskflow.engines.action_engine import engine as eng
from taskflow.patterns import linear_flow as lf
from taskflow import task
@@ -43,15 +45,10 @@ flow = lf.Flow('simple-linear').add(
CallJoe(),
CallSuzzie()
)
-engine = eng.SingleThreadedActionEngine(flow)
-
-engine.storage.inject({
- "joe_number": 444,
- "jim_number": 555,
- "suzzie_number": 666
-})
try:
- engine.run()
+ taskflow.engines.run(flow, store=dict(joe_number=444,
+ jim_number=555,
+ suzzie_number=666))
except Exception as e:
print "Flow failed: %r" % e
diff --git a/taskflow/examples/simple_linear.py b/taskflow/examples/simple_linear.py
index bde8c64..fb2a7c1 100644
--- a/taskflow/examples/simple_linear.py
+++ b/taskflow/examples/simple_linear.py
@@ -4,11 +4,12 @@ import sys
logging.basicConfig(level=logging.ERROR)
-my_dir_path = os.path.dirname(os.path.abspath(__file__))
-sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir),
- os.pardir))
+top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ os.pardir,
+ os.pardir))
+sys.path.insert(0, top_dir)
-from taskflow.engines.action_engine import engine as eng
+import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task
@@ -30,16 +31,11 @@ class CallJoe(task.Task):
def execute(self, joe_number, *args, **kwargs):
print("Calling joe %s." % joe_number)
+
flow = lf.Flow('simple-linear').add(
CallJim(),
CallJoe()
)
-engine = eng.SingleThreadedActionEngine(flow)
-
-engine.storage.inject({
- "joe_number": 444,
- "jim_number": 555,
-})
-
-engine.run()
+taskflow.engines.run(flow, store=dict(joe_number=444,
+ jim_number=555))
diff --git a/taskflow/examples/simple_linear_listening.py b/taskflow/examples/simple_linear_listening.py
index 090b4d7..0fbe261 100644
--- a/taskflow/examples/simple_linear_listening.py
+++ b/taskflow/examples/simple_linear_listening.py
@@ -4,11 +4,13 @@ import sys
logging.basicConfig(level=logging.ERROR)
-my_dir_path = os.path.dirname(os.path.abspath(__file__))
-sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir),
- os.pardir))
+top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ os.pardir,
+ os.pardir))
+sys.path.insert(0, top_dir)
+
+import taskflow.engines
-from taskflow.engines.action_engine import engine as eng
from taskflow.patterns import linear_flow as lf
from taskflow import task
@@ -35,13 +37,13 @@ flow = lf.Flow("Call-them")
flow.add(task.FunctorTask(execute=call_jim))
flow.add(task.FunctorTask(execute=call_joe))
-engine = eng.SingleThreadedActionEngine(flow)
+engine = taskflow.engines.load(flow, store={
+ 'context': {
+ "joe_number": 444,
+ "jim_number": 555,
+ }
+})
+
engine.notifier.register('*', flow_watch)
engine.task_notifier.register('*', task_watch)
-
-context = {
- "joe_number": 444,
- "jim_number": 555,
-}
-engine.storage.inject({'context': context})
engine.run()