summaryrefslogtreecommitdiff
path: root/taskflow/examples/calculate_linear.py
diff options
context:
space:
mode:
authorAnastasia Karpinska <akarpinska@griddynamics.com>2013-09-04 11:27:39 +0400
committerIvan A. Melnikov <imelnikov@griddynamics.com>2013-09-04 19:33:12 +0400
commita638c9864f8d5f050fc59be1a84e728e30f981aa (patch)
tree84c894061c8016110dc2e20e101fb16b23d9e6a2 /taskflow/examples/calculate_linear.py
parent6ee4d32fc25a7343ef4802f8390e76ef608106b9 (diff)
downloadtaskflow-a638c9864f8d5f050fc59be1a84e728e30f981aa.tar.gz
Converted some examples to use patterns/engines
Change-Id: If7154019f1cb5e723069ff35f6301fce048323b5
Diffstat (limited to 'taskflow/examples/calculate_linear.py')
-rw-r--r--taskflow/examples/calculate_linear.py109
1 files changed, 37 insertions, 72 deletions
diff --git a/taskflow/examples/calculate_linear.py b/taskflow/examples/calculate_linear.py
index 58b6a7e..ed45b09 100644
--- a/taskflow/examples/calculate_linear.py
+++ b/taskflow/examples/calculate_linear.py
@@ -8,93 +8,58 @@ my_dir_path = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir),
os.pardir))
-from taskflow.patterns import linear_flow as lf
+from taskflow import blocks
+from taskflow.engines.action_engine import engine as eng
from taskflow import task
-def flow_notify(state, details):
- print("'%s' entered state: %s" % (details['flow'], state))
+# In this example LinearFlow is used to group four tasks to
+# calculate value. Added task is used twice. In the first case
+# it uses default parameters ('x' and 'y') and in the second one
+# arguments are binding with 'z' and 'd' keys from engine storage.
+# Multiplier task uses binding too, but explicitly shows that 'z'
+# parameter is binded with 'a' key from engine storage.
-def task_notify(state, details):
- print("'%s' entered state: %s" % (details['runner'], state))
-
-
-# This class is used to populate requirements that further tasks need to run
-# by populating those tasks from a dictionary and returning said dictionary
-# when the flow runs (so that further tasks can use those values). You can
-# think of this a needed bootstrapping of a flow in a way.
class Provider(task.Task):
- def __init__(self, name, **kwargs):
+
+ def __init__(self, name, *args):
super(Provider, self).__init__(name)
- self.provides.update(kwargs.keys())
- self._provide = kwargs
+ self._provide = args
- def execute(self, context):
+ def execute(self):
return self._provide
class Adder(task.Task):
- def __init__(self, name, x_name, y_name, provides_name):
+
+ def __init__(self, name):
super(Adder, self).__init__(name)
- self.requires.update([x_name, y_name])
- self.provides.update([provides_name])
- self._provides_name = provides_name
- def execute(self, context, **kwargs):
- return {
- self._provides_name: sum(kwargs.values()),
- }
+ def execute(self, x, y):
+ return x + y
class Multiplier(task.Task):
- def __init__(self, name, z_name, by_how_much):
+ def __init__(self, name, multiplier):
super(Multiplier, self).__init__(name)
- self.requires.update([z_name])
- self._by_how_much = by_how_much
- self._z_name = z_name
-
- def execute(self, context, **kwargs):
- return kwargs.pop(self._z_name) * self._by_how_much
-
-
-flow = lf.Flow("calc-them")
-flow.add(Provider("provide-adder", x=2, y=3, d=5))
-
-# Add x + y to produce z (5)
-flow.add(Adder('add', 'x', 'y', 'z'))
-
-# Add z + d to produce a (5 + 5)
-flow.add(Adder('add', 'z', 'd', 'a'))
-
-# Multiple a by 3 (30)
-multi_uuid = flow.add(Multiplier('multi', 'a', 3))
-
-# Get notified of the state changes the flow is going through.
-flow.notifier.register('*', flow_notify)
-
-# Get notified of the state changes the flows tasks/runners are going through.
-flow.task_notifier.register('*', task_notify)
-
-# Context is typically passed in openstack, it is not needed here.
-print '-' * 7
-print 'Running'
-print '-' * 7
-context = {}
-flow.run(context)
-
-# This will have the last results and the task that produced that result,
-# but we don't care about the task that produced it and just want the result
-# itself.
-print '-' * 11
-print 'All results'
-print '-' * 11
-for (uuid, v) in flow.results.items():
- print '%s => %s' % (uuid, v)
-
-multi_results = flow.results[multi_uuid]
-print '-' * 15
-print "Multiply result"
-print '-' * 15
-print(multi_results)
-assert multi_results == 30, "Example is broken"
+ self._multiplier = multiplier
+
+ def execute(self, z):
+ return z * self._multiplier
+
+
+flow = blocks.LinearFlow().add(
+ # x = 2, y = 3, d = 5
+ blocks.Task(Provider("provide-adder", 2, 3, 5), save_as=('x', 'y', 'd')),
+ # z = x+y = 5
+ blocks.Task(Adder("add"), save_as='z'),
+ # a = z+d = 10
+ blocks.Task(Adder("add"), save_as='a', rebind_args=['z', 'd']),
+ # r = a*3 = 30
+ blocks.Task(Multiplier("multi", 3), save_as='r', rebind_args={'z': 'a'}))
+
+engine = eng.SingleThreadedActionEngine(flow)
+engine.run()
+
+print engine.storage.fetch_all()