diff options
| author | Anastasia Karpinska <akarpinska@griddynamics.com> | 2013-09-04 11:27:39 +0400 |
|---|---|---|
| committer | Ivan A. Melnikov <imelnikov@griddynamics.com> | 2013-09-04 19:33:12 +0400 |
| commit | a638c9864f8d5f050fc59be1a84e728e30f981aa (patch) | |
| tree | 84c894061c8016110dc2e20e101fb16b23d9e6a2 /taskflow/examples/calculate_linear.py | |
| parent | 6ee4d32fc25a7343ef4802f8390e76ef608106b9 (diff) | |
| download | taskflow-a638c9864f8d5f050fc59be1a84e728e30f981aa.tar.gz | |
Converted some examples to use patterns/engines
Change-Id: If7154019f1cb5e723069ff35f6301fce048323b5
Diffstat (limited to 'taskflow/examples/calculate_linear.py')
| -rw-r--r-- | taskflow/examples/calculate_linear.py | 109 |
1 files changed, 37 insertions, 72 deletions
diff --git a/taskflow/examples/calculate_linear.py b/taskflow/examples/calculate_linear.py index 58b6a7e..ed45b09 100644 --- a/taskflow/examples/calculate_linear.py +++ b/taskflow/examples/calculate_linear.py @@ -8,93 +8,58 @@ my_dir_path = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, os.path.join(os.path.join(my_dir_path, os.pardir), os.pardir)) -from taskflow.patterns import linear_flow as lf +from taskflow import blocks +from taskflow.engines.action_engine import engine as eng from taskflow import task -def flow_notify(state, details): - print("'%s' entered state: %s" % (details['flow'], state)) +# In this example LinearFlow is used to group four tasks to +# calculate value. Added task is used twice. In the first case +# it uses default parameters ('x' and 'y') and in the second one +# arguments are binding with 'z' and 'd' keys from engine storage. +# Multiplier task uses binding too, but explicitly shows that 'z' +# parameter is binded with 'a' key from engine storage. -def task_notify(state, details): - print("'%s' entered state: %s" % (details['runner'], state)) - - -# This class is used to populate requirements that further tasks need to run -# by populating those tasks from a dictionary and returning said dictionary -# when the flow runs (so that further tasks can use those values). You can -# think of this a needed bootstrapping of a flow in a way. class Provider(task.Task): - def __init__(self, name, **kwargs): + + def __init__(self, name, *args): super(Provider, self).__init__(name) - self.provides.update(kwargs.keys()) - self._provide = kwargs + self._provide = args - def execute(self, context): + def execute(self): return self._provide class Adder(task.Task): - def __init__(self, name, x_name, y_name, provides_name): + + def __init__(self, name): super(Adder, self).__init__(name) - self.requires.update([x_name, y_name]) - self.provides.update([provides_name]) - self._provides_name = provides_name - def execute(self, context, **kwargs): - return { - self._provides_name: sum(kwargs.values()), - } + def execute(self, x, y): + return x + y class Multiplier(task.Task): - def __init__(self, name, z_name, by_how_much): + def __init__(self, name, multiplier): super(Multiplier, self).__init__(name) - self.requires.update([z_name]) - self._by_how_much = by_how_much - self._z_name = z_name - - def execute(self, context, **kwargs): - return kwargs.pop(self._z_name) * self._by_how_much - - -flow = lf.Flow("calc-them") -flow.add(Provider("provide-adder", x=2, y=3, d=5)) - -# Add x + y to produce z (5) -flow.add(Adder('add', 'x', 'y', 'z')) - -# Add z + d to produce a (5 + 5) -flow.add(Adder('add', 'z', 'd', 'a')) - -# Multiple a by 3 (30) -multi_uuid = flow.add(Multiplier('multi', 'a', 3)) - -# Get notified of the state changes the flow is going through. -flow.notifier.register('*', flow_notify) - -# Get notified of the state changes the flows tasks/runners are going through. -flow.task_notifier.register('*', task_notify) - -# Context is typically passed in openstack, it is not needed here. -print '-' * 7 -print 'Running' -print '-' * 7 -context = {} -flow.run(context) - -# This will have the last results and the task that produced that result, -# but we don't care about the task that produced it and just want the result -# itself. -print '-' * 11 -print 'All results' -print '-' * 11 -for (uuid, v) in flow.results.items(): - print '%s => %s' % (uuid, v) - -multi_results = flow.results[multi_uuid] -print '-' * 15 -print "Multiply result" -print '-' * 15 -print(multi_results) -assert multi_results == 30, "Example is broken" + self._multiplier = multiplier + + def execute(self, z): + return z * self._multiplier + + +flow = blocks.LinearFlow().add( + # x = 2, y = 3, d = 5 + blocks.Task(Provider("provide-adder", 2, 3, 5), save_as=('x', 'y', 'd')), + # z = x+y = 5 + blocks.Task(Adder("add"), save_as='z'), + # a = z+d = 10 + blocks.Task(Adder("add"), save_as='a', rebind_args=['z', 'd']), + # r = a*3 = 30 + blocks.Task(Multiplier("multi", 3), save_as='r', rebind_args={'z': 'a'})) + +engine = eng.SingleThreadedActionEngine(flow) +engine.run() + +print engine.storage.fetch_all() |
