summaryrefslogtreecommitdiff
path: root/taskflow/examples/build_a_car.py
blob: 02be020e3614cf7219b85b46843c1abd23906eec (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# -*- coding: utf-8 -*-

#    Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import logging
import os
import sys


logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)


import taskflow.engines
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow import task
from taskflow.types import notifier

ANY = notifier.Notifier.ANY

import example_utils as eu  # noqa


# INTRO: This examples shows how a graph flow and linear flow can be used
# together to execute dependent & non-dependent tasks by going through the
# steps required to build a simplistic car (an assembly line if you will). It
# also shows how raw functions can be wrapped into a task object instead of
# being forced to use the more *heavy* task base class. This is useful in
# scenarios where pre-existing code has functions that you easily want to
# plug-in to taskflow, without requiring a large amount of code changes.


def build_frame():
    return 'steel'


def build_engine():
    return 'honda'


def build_doors():
    return '2'


def build_wheels():
    return '4'


# These just return true to indiciate success, they would in the real work
# do more than just that.

def install_engine(frame, engine):
    return True


def install_doors(frame, windows_installed, doors):
    return True


def install_windows(frame, doors):
    return True


def install_wheels(frame, engine, engine_installed, wheels):
    return True


def trash(**kwargs):
    eu.print_wrapped("Throwing away pieces of car!")


def startup(**kwargs):
    # If you want to see the rollback function being activated try uncommenting
    # the following line.
    #
    # raise ValueError("Car not verified")
    return True


def verify(spec, **kwargs):
    # If the car is not what we ordered throw away the car (trigger reversion).
    for key, value in kwargs.items():
        if spec[key] != value:
            raise Exception("Car doesn't match spec!")
    return True


# These two functions connect into the state transition notification emission
# points that the engine outputs, they can be used to log state transitions
# that are occurring, or they can be used to suspend the engine (or perform
# other useful activities).
def flow_watch(state, details):
    print('Flow => %s' % state)


def task_watch(state, details):
    print('Task %s => %s' % (details.get('task_name'), state))


flow = lf.Flow("make-auto").add(
    task.FunctorTask(startup, revert=trash, provides='ran'),
    # A graph flow allows automatic dependency based ordering, the ordering
    # is determined by analyzing the symbols required and provided and ordering
    # execution based on a functioning order (if one exists).
    gf.Flow("install-parts").add(
        task.FunctorTask(build_frame, provides='frame'),
        task.FunctorTask(build_engine, provides='engine'),
        task.FunctorTask(build_doors, provides='doors'),
        task.FunctorTask(build_wheels, provides='wheels'),
        # These *_installed outputs allow for other tasks to depend on certain
        # actions being performed (aka the components were installed), another
        # way to do this is to link() the tasks manually instead of creating
        # an 'artificial' data dependency that accomplishes the same goal the
        # manual linking would result in.
        task.FunctorTask(install_engine, provides='engine_installed'),
        task.FunctorTask(install_doors, provides='doors_installed'),
        task.FunctorTask(install_windows, provides='windows_installed'),
        task.FunctorTask(install_wheels, provides='wheels_installed')),
    task.FunctorTask(verify, requires=['frame',
                                       'engine',
                                       'doors',
                                       'wheels',
                                       'engine_installed',
                                       'doors_installed',
                                       'windows_installed',
                                       'wheels_installed']))

# This dictionary will be provided to the tasks as a specification for what
# the tasks should produce, in this example this specification will influence
# what those tasks do and what output they create. Different tasks depend on
# different information from this specification, all of which will be provided
# automatically by the engine to those tasks.
spec = {
    "frame": 'steel',
    "engine": 'honda',
    "doors": '2',
    "wheels": '4',
    # These are used to compare the result product, a car without the pieces
    # installed is not a car after all.
    "engine_installed": True,
    "doors_installed": True,
    "windows_installed": True,
    "wheels_installed": True,
}


engine = taskflow.engines.load(flow, store={'spec': spec.copy()})

# This registers all (ANY) state transitions to trigger a call to the
# flow_watch function for flow state transitions, and registers the
# same all (ANY) state transitions for task state transitions.
engine.notifier.register(ANY, flow_watch)
engine.task_notifier.register(ANY, task_watch)

eu.print_wrapped("Building a car")
engine.run()

# Alter the specification and ensure that the reverting logic gets triggered
# since the resultant car that will be built by the build_wheels function will
# build a car with 4 doors only (not 5), this will cause the verification
# task to mark the car that is produced as not matching the desired spec.
spec['doors'] = 5

engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
engine.notifier.register(ANY, flow_watch)
engine.task_notifier.register(ANY, task_watch)

eu.print_wrapped("Building a wrong car that doesn't match specification")
try:
    engine.run()
except Exception as e:
    eu.print_wrapped("Flow failed: %s" % e)