summaryrefslogtreecommitdiff
path: root/taskflow/examples/calculate_in_parallel.py
blob: 0d800a6048af4d737eea3e1cfe02da014eabe360 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# -*- coding: utf-8 -*-

#    Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf
from taskflow import task

# INTRO: These examples show how a linear flow and an unordered flow can be
# used together to execute calculations in parallel and then use the
# result for the next task/s. The adder task is used for all calculations
# and argument bindings are used to set correct parameters for each task.


# This task provides some values from as a result of execution, this can be
# useful when you want to provide values from a static set to other tasks that
# depend on those values existing before those tasks can run.
#
# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
# that provides those values on engine running by prepopulating the storage
# backend before your tasks are ran (which accomplishes a similar goal in a
# more uniform manner).
class Provider(task.Task):
    def __init__(self, name, *args, **kwargs):
        super(Provider, self).__init__(name=name, **kwargs)
        self._provide = args

    def execute(self):
        return self._provide


# This task adds two input variables and returns the result of that addition.
#
# Note that since this task does not have a revert() function (since addition
# is a stateless operation) there are no side-effects that this function needs
# to undo if some later operation fails.
class Adder(task.Task):
    def execute(self, x, y):
        return x + y


flow = lf.Flow('root').add(
    # Provide the initial values for other tasks to depend on.
    #
    # x1 = 2, y1 = 3, x2 = 5, x3 = 8
    Provider("provide-adder", 2, 3, 5, 8,
             provides=('x1', 'y1', 'x2', 'y2')),
    # Note here that we define the flow that contains the 2 adders to be an
    # unordered flow since the order in which these execute does not matter,
    # another way to solve this would be to use a graph_flow pattern, which
    # also can run in parallel (since they have no ordering dependencies).
    uf.Flow('adders').add(
        # Calculate 'z1 = x1+y1 = 5'
        #
        # Rebind here means that the execute() function x argument will be
        # satisfied from a previous output named 'x1', and the y argument
        # of execute() will be populated from the previous output named 'y1'
        #
        # The output (result of adding) will be mapped into a variable named
        # 'z1' which can then be refereed to and depended on by other tasks.
        Adder(name="add", provides='z1', rebind=['x1', 'y1']),
        # z2 = x2+y2 = 13
        Adder(name="add-2", provides='z2', rebind=['x2', 'y2']),
    ),
    # r = z1+z2 = 18
    Adder(name="sum-1", provides='r', rebind=['z1', 'z2']))


# The result here will be all results (from all tasks) which is stored in an
# in-memory storage location that backs this engine since it is not configured
# with persistence storage.
result = taskflow.engines.run(flow, engine='parallel')
print(result)