summaryrefslogtreecommitdiff
path: root/taskflow/examples/simple_linear_listening.py
blob: d14c82c49895dba85ef2edd0064900a26f8f11a5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# -*- coding: utf-8 -*-

#    Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task
from taskflow.types import notifier

ANY = notifier.Notifier.ANY

# INTRO: In this example we create two tasks (this time as functions instead
# of task subclasses as in the simple_linear.py example), each of which ~calls~
# a given ~phone~ number (provided as a function input) in a linear fashion
# (one after the other).
#
# For a workflow which is serial this shows a extremely simple way
# of structuring your tasks (the code that does the work) into a linear
# sequence (the flow) and then passing the work off to an engine, with some
# initial data to be ran in a reliable manner.
#
# This example shows a basic usage of the taskflow structures without involving
# the complexity of persistence. Using the structures that taskflow provides
# via tasks and flows makes it possible for you to easily at a later time
# hook in a persistence layer (and then gain the functionality that offers)
# when you decide the complexity of adding that layer in is 'worth it' for your
# applications usage pattern (which some applications may not need).
#
# It **also** adds on to the simple_linear.py example by adding a set of
# callback functions which the engine will call when a flow state transition
# or task state transition occurs. These types of functions are useful for
# updating task or flow progress, or for debugging, sending notifications to
# external systems, or for other yet unknown future usage that you may create!


def call_jim(context):
    print("Calling jim.")
    print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))


def call_joe(context):
    print("Calling joe.")
    print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))


def flow_watch(state, details):
    print('Flow => %s' % state)


def task_watch(state, details):
    print('Task %s => %s' % (details.get('task_name'), state))


# Wrap your functions into a task type that knows how to treat your functions
# as tasks. There was previous work done to just allow a function to be
# directly passed, but in python 3.0 there is no easy way to capture an
# instance method, so this wrapping approach was decided upon instead which
# can attach to instance methods (if that's desired).
flow = lf.Flow("Call-them")
flow.add(task.FunctorTask(execute=call_jim))
flow.add(task.FunctorTask(execute=call_joe))

# Now load (but do not run) the flow using the provided initial data.
engine = taskflow.engines.load(flow, store={
    'context': {
        "joe_number": 444,
        "jim_number": 555,
    }
})

# This is where we attach our callback functions to the 2 different
# notification objects that a engine exposes. The usage of a '*' (kleene star)
# here means that we want to be notified on all state changes, if you want to
# restrict to a specific state change, just register that instead.
engine.notifier.register(ANY, flow_watch)
engine.task_notifier.register(ANY, task_watch)

# And now run!
engine.run()