diff options
| author | Anastasia Karpinska <akarpinska@griddynamics.com> | 2013-09-18 18:21:15 +0300 |
|---|---|---|
| committer | Anastasia Karpinska <akarpinska@griddynamics.com> | 2013-09-18 19:04:18 +0300 |
| commit | 1623dbb01ed5f1f9e6e6c595a8993f3776f285ef (patch) | |
| tree | ae7054d4241841ae2747045991e099ea7cad9355 /taskflow/examples/graph_flow.py | |
| parent | b07ee63f78e9c200c3a117f148b78599f8b71628 (diff) | |
| download | taskflow-1623dbb01ed5f1f9e6e6c595a8993f3776f285ef.tar.gz | |
Graph flow, sequential graph action
Change-Id: I07cc820aa2f37d0f9599f34efab07b28cf47ca48
Diffstat (limited to 'taskflow/examples/graph_flow.py')
| -rw-r--r-- | taskflow/examples/graph_flow.py | 75 |
1 files changed, 75 insertions, 0 deletions
diff --git a/taskflow/examples/graph_flow.py b/taskflow/examples/graph_flow.py new file mode 100644 index 0000000..5c2b68f --- /dev/null +++ b/taskflow/examples/graph_flow.py @@ -0,0 +1,75 @@ +import logging +import os +import sys + +logging.basicConfig(level=logging.ERROR) + +my_dir_path = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), + os.pardir)) + +from taskflow.engines.action_engine import engine as eng +from taskflow.patterns import graph_flow as gf +from taskflow.patterns import linear_flow as lf +from taskflow import task + + +# In this example there are complex dependencies between +# tasks. User shouldn't care about ordering the Tasks. +# GraphFlow resolves dependencies automatically using tasks' +# requirements and provided values. +# Flows of any types can be nested into Graph flow. Subflows +# dependencies will be resolved too. + + +class Adder(task.Task): + + def execute(self, x, y): + return x + y + + +flow = gf.Flow('root').add( + lf.Flow('nested_linear').add( + # x2 = y3+y4 = 12 + Adder("add2", provides='x2', rebind=['y3', 'y4']), + # x1 = y1+y2 = 4 + Adder("add1", provides='x1', rebind=['y1', 'y2']) + ), + # x5 = x1+x3 = 20 + Adder("add5", provides='x5', rebind=['x1', 'x3']), + # x3 = x1+x2 = 16 + Adder("add3", provides='x3', rebind=['x1', 'x2']), + # x4 = x2+y5 = 21 + Adder("add4", provides='x4', rebind=['x2', 'y5']), + # x6 = x5+x4 = 41 + Adder("add6", provides='x6', rebind=['x5', 'x4']), + # x7 = x6+x6 = 82 + Adder("add7", provides='x7', rebind=['x6', 'x6'])) + +single_threaded_engine = eng.SingleThreadedActionEngine(flow) +single_threaded_engine.storage.inject({ + "y1": 1, + "y2": 3, + "y3": 5, + "y4": 7, + "y5": 9, +}) + +single_threaded_engine.run() + +print ("Single threaded engine result %s" % + single_threaded_engine.storage.fetch_all()) + +multi_threaded_engine = eng.MultiThreadedActionEngine(flow) +multi_threaded_engine.storage.inject({ + "y1": 1, + "y2": 3, + "y3": 5, + "y4": 7, + "y5": 9, +}) + +multi_threaded_engine.run() + +print ("Multi threaded engine result %s" % + multi_threaded_engine.storage.fetch_all()) |
