diff options
| author | Ivan A. Melnikov <imelnikov@griddynamics.com> | 2013-10-03 17:03:22 +0400 |
|---|---|---|
| committer | Ivan A. Melnikov <imelnikov@griddynamics.com> | 2013-10-09 19:24:27 +0400 |
| commit | c26fbb2387332268b7432d4d67f7df0a394df6a5 (patch) | |
| tree | 15708dff4deaf3d9ab4129a39f83391338822c37 /taskflow/examples/resume_from_backend.py | |
| parent | ce7e2ad38e9c136d5c2b43db8744e62605dd777c (diff) | |
| download | taskflow-c26fbb2387332268b7432d4d67f7df0a394df6a5.tar.gz | |
Resumption from backend for action engine
Simple refactoring and minor code adjustments to make resumption
from backend actually work:
- call engine.compile and check for missing dependencies
on every run;
- misc.Failure equality semantics adjusted;
- load failures from backend on every run.
Change-Id: I8a0462f2dec0ec66a19ee6a5ef10e4be48110e19
Diffstat (limited to 'taskflow/examples/resume_from_backend.py')
| -rw-r--r-- | taskflow/examples/resume_from_backend.py | 117 |
1 files changed, 117 insertions, 0 deletions
diff --git a/taskflow/examples/resume_from_backend.py b/taskflow/examples/resume_from_backend.py new file mode 100644 index 0000000..b9df1ca --- /dev/null +++ b/taskflow/examples/resume_from_backend.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import sys + +logging.basicConfig(level=logging.ERROR) + +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +import taskflow.engines +from taskflow.patterns import linear_flow as lf +from taskflow.persistence import backends +from taskflow import task +from taskflow.utils import persistence_utils as p_utils + + +### UTILITY FUNCTIONS ######################################### + + +def print_task_states(flowdetail, msg): + print(msg) + print('Flow state: %s' % flowdetail.state) + items = sorted((td.name, td.version, td.state, td.results) + for td in flowdetail) + for item in items: + print("%s==%s: %s, result=%r" % item) + + +def get_backend(): + try: + backend_uri = sys.argv[1] + except Exception: + backend_uri = 'sqlite://' + + backend = backends.fetch({'connection': backend_uri}) + backend.get_connection().upgrade() + return backend + + +def find_flow_detail(backend, lb_id, fd_id): + conn = backend.get_connection() + lb = conn.get_logbook(lb_id) + return lb.find(fd_id) + + +### CREATE FLOW ############################################### + + +class InterruptTask(task.Task): + def execute(self): + # DO NOT TRY THIS AT HOME + engine.suspend() + + +class TestTask(task.Task): + def execute(self): + print 'executing %s' % self + return 'ok' + + +def flow_factory(): + return lf.Flow('resume from backend example').add( + TestTask(name='first'), + InterruptTask(name='boom'), + TestTask(name='second')) + + +### INITIALIZE PERSISTENCE #################################### + +backend = get_backend() +logbook = p_utils.temporary_log_book(backend) + + +### CREATE AND RUN THE FLOW: FIRST ATTEMPT #################### + +flow = flow_factory() +flowdetail = p_utils.create_flow_detail(flow, logbook, backend) +engine = taskflow.engines.load(flow, flow_detail=flowdetail, + backend=backend) + +print_task_states(flowdetail, "\nAt the beginning, there is no state:") +print("\nRunning:") +engine.run() +print_task_states(flowdetail, "\nAfter running:") + + +### RE-CREATE, RESUME, RUN #################################### + +print("\nResuming and running again:") +# reload flowdetail from backend +flowdetail2 = find_flow_detail(backend, logbook.uuid, + flowdetail.uuid) +engine2 = taskflow.engines.load(flow_factory(), + flow_detail=flowdetail, + backend=backend) +engine2.run() +print_task_states(flowdetail, "\nAt the end:") |
