diff options
author | Joshua Harlow <harlowja@yahoo-inc.com> | 2015-07-13 11:33:12 -0700 |
---|---|---|
committer | Joshua Harlow <harlowja@gmail.com> | 2015-07-17 08:48:17 -0700 |
commit | 02c83d40612bbe3146a9f2ff212759ecdcdeb8ba (patch) | |
tree | 2c52a1a6a8d0243b8c576a36c96ca371ef490f4e | |
parent | 9f846d0475b9862da6af52bd959d15a2cd8f5ab0 (diff) | |
download | taskflow-02c83d40612bbe3146a9f2ff212759ecdcdeb8ba.tar.gz |
Remove **most** usage of taskflow.utils in examples
It appears folks are using the taskflow.utils code in there own
code-bases (likely taking it from the examples) which we do not
want to encourage, so remove the usage of **most** of
taskflow.utils code from the examples so that people are less
likely to copy/paste/reference it.
Change-Id: I0ce3c520de347e3e746e7912aa1366a515458424
-rw-r--r-- | taskflow/examples/dump_memory_backend.py | 14 | ||||
-rw-r--r-- | taskflow/examples/hello_world.py | 14 | ||||
-rw-r--r-- | taskflow/examples/parallel_table_multiply.py | 6 | ||||
-rw-r--r-- | taskflow/examples/persistence_example.py | 3 | ||||
-rw-r--r-- | taskflow/examples/resume_from_backend.py | 31 | ||||
-rw-r--r-- | taskflow/examples/resume_vm_boot.py | 18 | ||||
-rw-r--r-- | taskflow/examples/resume_volume_create.py | 13 | ||||
-rw-r--r-- | taskflow/examples/run_by_iter.py | 7 | ||||
-rw-r--r-- | taskflow/examples/run_by_iter_enumerate.py | 7 | ||||
-rw-r--r-- | taskflow/examples/switch_graph_flow.py | 12 | ||||
-rw-r--r-- | taskflow/persistence/models.py | 93 | ||||
-rw-r--r-- | taskflow/utils/persistence_utils.py | 75 |
12 files changed, 159 insertions, 134 deletions
diff --git a/taskflow/examples/dump_memory_backend.py b/taskflow/examples/dump_memory_backend.py index 6c6d548..d448667 100644 --- a/taskflow/examples/dump_memory_backend.py +++ b/taskflow/examples/dump_memory_backend.py @@ -29,9 +29,7 @@ sys.path.insert(0, self_dir) from taskflow import engines from taskflow.patterns import linear_flow as lf -from taskflow.persistence import backends from taskflow import task -from taskflow.utils import persistence_utils as pu # INTRO: in this example we create a dummy flow with a dummy task, and run # it using a in-memory backend and pre/post run we dump out the contents @@ -43,22 +41,18 @@ class PrintTask(task.Task): def execute(self): print("Running '%s'" % self.name) - -backend = backends.fetch({ - 'connection': 'memory://', -}) -book, flow_detail = pu.temporary_flow_detail(backend=backend) - # Make a little flow and run it... f = lf.Flow('root') for alpha in ['a', 'b', 'c']: f.add(PrintTask(alpha)) -e = engines.load(f, flow_detail=flow_detail, - book=book, backend=backend) +e = engines.load(f) e.compile() e.prepare() +# After prepare the storage layer + backend can now be accessed safely... +backend = e.storage.backend + print("----------") print("Before run") print("----------") diff --git a/taskflow/examples/hello_world.py b/taskflow/examples/hello_world.py index 38a6b38..2ec1c95 100644 --- a/taskflow/examples/hello_world.py +++ b/taskflow/examples/hello_world.py @@ -31,7 +31,6 @@ from taskflow import engines from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf from taskflow import task -from taskflow.utils import eventlet_utils # INTRO: This is the defacto hello world equivalent for taskflow; it shows how @@ -82,25 +81,34 @@ song.add(PrinterTask("conductor@begin", show_name=False, inject={'output': "*dong*"})) # Run in parallel using eventlet green threads... -if eventlet_utils.EVENTLET_AVAILABLE: - with futurist.GreenThreadPoolExecutor() as executor: +try: + executor = futurist.GreenThreadPoolExecutor() +except RuntimeError: + # No eventlet currently active, skip running with it... + pass +else: + print("-- Running in parallel using eventlet --") + with executor: e = engines.load(song, executor=executor, engine='parallel') e.run() # Run in parallel using real threads... with futurist.ThreadPoolExecutor(max_workers=1) as executor: + print("-- Running in parallel using threads --") e = engines.load(song, executor=executor, engine='parallel') e.run() # Run in parallel using external processes... with futurist.ProcessPoolExecutor(max_workers=1) as executor: + print("-- Running in parallel using processes --") e = engines.load(song, executor=executor, engine='parallel') e.run() # Run serially (aka, if the workflow could have been ran in parallel, it will # not be when ran in this mode)... +print("-- Running serially --") e = engines.load(song, engine='serial') e.run() diff --git a/taskflow/examples/parallel_table_multiply.py b/taskflow/examples/parallel_table_multiply.py index e06e36d..5cd8e9c 100644 --- a/taskflow/examples/parallel_table_multiply.py +++ b/taskflow/examples/parallel_table_multiply.py @@ -33,7 +33,6 @@ from six.moves import range as compat_range from taskflow import engines from taskflow.patterns import unordered_flow as uf from taskflow import task -from taskflow.utils import eventlet_utils # INTRO: This example walks through a miniature workflow which does a parallel # table modification where each row in the table gets adjusted by a thread, or @@ -97,9 +96,10 @@ def main(): f = make_flow(tbl) # Now run it (using the specified executor)... - if eventlet_utils.EVENTLET_AVAILABLE: + try: executor = futurist.GreenThreadPoolExecutor(max_workers=5) - else: + except RuntimeError: + # No eventlet currently active, use real threads instead. executor = futurist.ThreadPoolExecutor(max_workers=5) try: e = engines.load(f, engine='parallel', executor=executor) diff --git a/taskflow/examples/persistence_example.py b/taskflow/examples/persistence_example.py index de9b427..c7c0954 100644 --- a/taskflow/examples/persistence_example.py +++ b/taskflow/examples/persistence_example.py @@ -33,7 +33,6 @@ from taskflow import engines from taskflow.patterns import linear_flow as lf from taskflow.persistence import models from taskflow import task -from taskflow.utils import persistence_utils as p_utils import example_utils as eu # noqa @@ -110,4 +109,4 @@ with eu.get_backend(backend_uri) as backend: traceback.print_exc(file=sys.stdout) eu.print_wrapped("Book contents") - print(p_utils.pformat(book)) + print(book.pformat()) diff --git a/taskflow/examples/resume_from_backend.py b/taskflow/examples/resume_from_backend.py index 677937d..1b8d160 100644 --- a/taskflow/examples/resume_from_backend.py +++ b/taskflow/examples/resume_from_backend.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib import logging import os import sys @@ -27,10 +28,12 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), sys.path.insert(0, top_dir) sys.path.insert(0, self_dir) +from oslo_utils import uuidutils + import taskflow.engines from taskflow.patterns import linear_flow as lf +from taskflow.persistence import models from taskflow import task -from taskflow.utils import persistence_utils as p_utils import example_utils as eu # noqa @@ -99,19 +102,25 @@ def flow_factory(): # INITIALIZE PERSISTENCE #################################### with eu.get_backend() as backend: - logbook = p_utils.temporary_log_book(backend) + + # Create a place where the persistence information will be stored. + book = models.LogBook("example") + flow_detail = models.FlowDetail("resume from backend example", + uuid=uuidutils.generate_uuid()) + book.add(flow_detail) + with contextlib.closing(backend.get_connection()) as conn: + conn.save_logbook(book) # CREATE AND RUN THE FLOW: FIRST ATTEMPT #################### flow = flow_factory() - flowdetail = p_utils.create_flow_detail(flow, logbook, backend) - engine = taskflow.engines.load(flow, flow_detail=flowdetail, - backend=backend) + engine = taskflow.engines.load(flow, flow_detail=flow_detail, + book=book, backend=backend) - print_task_states(flowdetail, "At the beginning, there is no state") + print_task_states(flow_detail, "At the beginning, there is no state") eu.print_wrapped("Running") engine.run() - print_task_states(flowdetail, "After running") + print_task_states(flow_detail, "After running") # RE-CREATE, RESUME, RUN #################################### @@ -127,9 +136,9 @@ with eu.get_backend() as backend: # start it again for situations where this is useful to-do (say the process # running the above flow crashes). flow2 = flow_factory() - flowdetail2 = find_flow_detail(backend, logbook.uuid, flowdetail.uuid) + flow_detail_2 = find_flow_detail(backend, book.uuid, flow_detail.uuid) engine2 = taskflow.engines.load(flow2, - flow_detail=flowdetail2, - backend=backend) + flow_detail=flow_detail_2, + backend=backend, book=book) engine2.run() - print_task_states(flowdetail2, "At the end") + print_task_states(flow_detail_2, "At the end") diff --git a/taskflow/examples/resume_vm_boot.py b/taskflow/examples/resume_vm_boot.py index ec2293b..70c8d28 100644 --- a/taskflow/examples/resume_vm_boot.py +++ b/taskflow/examples/resume_vm_boot.py @@ -38,9 +38,8 @@ from taskflow import engines from taskflow import exceptions as exc from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf +from taskflow.persistence import models from taskflow import task -from taskflow.utils import eventlet_utils -from taskflow.utils import persistence_utils as p_utils import example_utils as eu # noqa @@ -226,6 +225,8 @@ eu.print_wrapped("Initializing") # Setup the persistence & resumption layer. with eu.get_backend() as backend: + + # Try to find a previously passed in tracking id... try: book_id, flow_id = sys.argv[2].split("+", 1) if not uuidutils.is_uuid_like(book_id): @@ -237,14 +238,17 @@ with eu.get_backend() as backend: flow_id = None # Set up how we want our engine to run, serial, parallel... - executor = None - if eventlet_utils.EVENTLET_AVAILABLE: - executor = futurist.GreenThreadPoolExecutor(5) + try: + executor = futurist.GreenThreadPoolExecutor(max_workers=5) + except RuntimeError: + # No eventlet installed, just let the default be used instead. + executor = None # Create/fetch a logbook that will track the workflows work. book = None flow_detail = None if all([book_id, flow_id]): + # Try to find in a prior logbook and flow detail... with contextlib.closing(backend.get_connection()) as conn: try: book = conn.get_logbook(book_id) @@ -252,7 +256,9 @@ with eu.get_backend() as backend: except exc.NotFound: pass if book is None and flow_detail is None: - book = p_utils.temporary_log_book(backend) + book = models.LogBook("vm-boot") + with contextlib.closing(backend.get_connection()) as conn: + conn.save_logbook(book) engine = engines.load_from_factory(create_flow, backend=backend, book=book, engine='parallel', diff --git a/taskflow/examples/resume_volume_create.py b/taskflow/examples/resume_volume_create.py index 93025d9..3c11812 100644 --- a/taskflow/examples/resume_volume_create.py +++ b/taskflow/examples/resume_volume_create.py @@ -31,11 +31,13 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), sys.path.insert(0, top_dir) sys.path.insert(0, self_dir) +from oslo_utils import uuidutils + from taskflow import engines from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf +from taskflow.persistence import models from taskflow import task -from taskflow.utils import persistence_utils as p_utils import example_utils # noqa @@ -134,9 +136,12 @@ with example_utils.get_backend() as backend: # potentially running (and which may have partially completed) back # with taskflow so that those workflows can be resumed (or reverted) # after a process/thread/engine has failed in someway. - logbook = p_utils.temporary_log_book(backend) - flow_detail = p_utils.create_flow_detail(flow, logbook, backend) - print("!! Your tracking id is: '%s+%s'" % (logbook.uuid, + book = models.LogBook('resume-volume-create') + flow_detail = models.FlowDetail("root", uuid=uuidutils.generate_uuid()) + book.add(flow_detail) + with contextlib.closing(backend.get_connection()) as conn: + conn.save_logbook(book) + print("!! Your tracking id is: '%s+%s'" % (book.uuid, flow_detail.uuid)) print("!! Please submit this on later runs for tracking purposes") else: diff --git a/taskflow/examples/run_by_iter.py b/taskflow/examples/run_by_iter.py index 3a00a10..37087ec 100644 --- a/taskflow/examples/run_by_iter.py +++ b/taskflow/examples/run_by_iter.py @@ -32,9 +32,7 @@ sys.path.insert(0, self_dir) from taskflow import engines from taskflow.patterns import linear_flow as lf -from taskflow.persistence import backends as persistence_backends from taskflow import task -from taskflow.utils import persistence_utils # INTRO: This example shows how to run a set of engines at the same time, each @@ -73,12 +71,9 @@ flows = [] for i in range(0, flow_count): f = make_alphabet_flow(i + 1) flows.append(make_alphabet_flow(i + 1)) -be = persistence_backends.fetch(conf={'connection': 'memory'}) -book = persistence_utils.temporary_log_book(be) engine_iters = [] for f in flows: - fd = persistence_utils.create_flow_detail(f, book, be) - e = engines.load(f, flow_detail=fd, backend=be, book=book) + e = engines.load(f) e.compile() e.storage.inject({'A': 'A'}) e.prepare() diff --git a/taskflow/examples/run_by_iter_enumerate.py b/taskflow/examples/run_by_iter_enumerate.py index 07334cc..37901b2 100644 --- a/taskflow/examples/run_by_iter_enumerate.py +++ b/taskflow/examples/run_by_iter_enumerate.py @@ -29,9 +29,7 @@ sys.path.insert(0, self_dir) from taskflow import engines from taskflow.patterns import linear_flow as lf -from taskflow.persistence import backends as persistence_backends from taskflow import task -from taskflow.utils import persistence_utils # INTRO: These examples show how to run an engine using the engine iteration # capability, in between iterations other activities occur (in this case a @@ -48,10 +46,7 @@ f = lf.Flow("counter") for i in range(0, 10): f.add(EchoNameTask("echo_%s" % (i + 1))) -be = persistence_backends.fetch(conf={'connection': 'memory'}) -book = persistence_utils.temporary_log_book(be) -fd = persistence_utils.create_flow_detail(f, book, be) -e = engines.load(f, flow_detail=fd, backend=be, book=book) +e = engines.load(f) e.compile() e.prepare() diff --git a/taskflow/examples/switch_graph_flow.py b/taskflow/examples/switch_graph_flow.py index 273763c..471e633 100644 --- a/taskflow/examples/switch_graph_flow.py +++ b/taskflow/examples/switch_graph_flow.py @@ -27,9 +27,7 @@ sys.path.insert(0, top_dir) from taskflow import engines from taskflow.patterns import graph_flow as gf -from taskflow.persistence import backends from taskflow import task -from taskflow.utils import persistence_utils as pu class DummyTask(task.Task): @@ -42,18 +40,15 @@ def allow(history): return False +# Declare our work to be done... r = gf.Flow("root") r_a = DummyTask('r-a') r_b = DummyTask('r-b') r.add(r_a, r_b) r.link(r_a, r_b, decider=allow) -backend = backends.fetch({ - 'connection': 'memory://', -}) -book, flow_detail = pu.temporary_flow_detail(backend=backend) - -e = engines.load(r, flow_detail=flow_detail, book=book, backend=backend) +# Setup and run the engine layer. +e = engines.load(r) e.compile() e.prepare() e.run() @@ -62,6 +57,7 @@ e.run() print("---------") print("After run") print("---------") +backend = e.storage.backend entries = [os.path.join(backend.memory.root_path, child) for child in backend.memory.ls(backend.memory.root_path)] while entries: diff --git a/taskflow/persistence/models.py b/taskflow/persistence/models.py index c7a6eae..93314f0 100644 --- a/taskflow/persistence/models.py +++ b/taskflow/persistence/models.py @@ -17,6 +17,7 @@ import abc import copy +import os from oslo_utils import timeutils from oslo_utils import uuidutils @@ -26,6 +27,7 @@ from taskflow import exceptions as exc from taskflow import logging from taskflow import states from taskflow.types import failure as ft +from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -33,6 +35,35 @@ LOG = logging.getLogger(__name__) # Internal helpers... +def _format_meta(metadata, indent): + """Format the common metadata dictionary in the same manner.""" + if not metadata: + return [] + lines = [ + '%s- metadata:' % (" " * indent), + ] + for (k, v) in metadata.items(): + # Progress for now is a special snowflake and will be formatted + # in percent format. + if k == 'progress' and isinstance(v, misc.NUMERIC_TYPES): + v = "%0.2f%%" % (v * 100.0) + lines.append("%s+ %s = %s" % (" " * (indent + 2), k, v)) + return lines + + +def _format_shared(obj, indent): + """Format the common shared attributes in the same manner.""" + if obj is None: + return [] + lines = [] + for attr_name in ("uuid", "state"): + if not hasattr(obj, attr_name): + continue + lines.append("%s- %s = %s" % (" " * indent, attr_name, + getattr(obj, attr_name))) + return lines + + def _copy_function(deep_copy): if deep_copy: return copy.deepcopy @@ -96,6 +127,33 @@ class LogBook(object): self.updated_at = None self.meta = {} + def pformat(self, indent=0, linesep=os.linesep): + """Pretty formats this logbook into a string. + + >>> from taskflow.persistence import models + >>> tmp = models.LogBook("example") + >>> print(tmp.pformat()) + LogBook: 'example' + - uuid = ... + - created_at = ... + """ + cls_name = self.__class__.__name__ + lines = ["%s%s: '%s'" % (" " * indent, cls_name, self.name)] + lines.extend(_format_shared(self, indent=indent + 1)) + lines.extend(_format_meta(self.meta, indent=indent + 1)) + if self.created_at is not None: + lines.append("%s- created_at = %s" + % (" " * (indent + 1), + timeutils.isotime(self.created_at))) + if self.updated_at is not None: + lines.append("%s- updated_at = %s" + % (" " * (indent + 1), + timeutils.isotime(self.updated_at))) + for flow_detail in self: + lines.append(flow_detail.pformat(indent=indent + 1, + linesep=linesep)) + return linesep.join(lines) + def add(self, fd): """Adds a new flow detail into this logbook. @@ -267,6 +325,27 @@ class FlowDetail(object): self.meta = fd.meta return self + def pformat(self, indent=0, linesep=os.linesep): + """Pretty formats this flow detail into a string. + + >>> from oslo_utils import uuidutils + >>> from taskflow.persistence import models + >>> flow_detail = models.FlowDetail("example", + ... uuid=uuidutils.generate_uuid()) + >>> print(flow_detail.pformat()) + FlowDetail: 'example' + - uuid = ... + - state = ... + """ + cls_name = self.__class__.__name__ + lines = ["%s%s: '%s'" % (" " * indent, cls_name, self.name)] + lines.extend(_format_shared(self, indent=indent + 1)) + lines.extend(_format_meta(self.meta, indent=indent + 1)) + for atom_detail in self: + lines.append(atom_detail.pformat(indent=indent + 1, + linesep=linesep)) + return linesep.join(lines) + def merge(self, fd, deep_copy=False): """Merges the current object state with the given one's state. @@ -572,6 +651,20 @@ class AtomDetail(object): def copy(self): """Copies this atom detail.""" + def pformat(self, indent=0, linesep=os.linesep): + """Pretty formats this atom detail into a string.""" + cls_name = self.__class__.__name__ + lines = ["%s%s: '%s'" % (" " * (indent), cls_name, self.name)] + lines.extend(_format_shared(self, indent=indent + 1)) + lines.append("%s- version = %s" + % (" " * (indent + 1), misc.get_version_string(self))) + lines.append("%s- results = %s" + % (" " * (indent + 1), self.results)) + lines.append("%s- failure = %s" % (" " * (indent + 1), + bool(self.failure))) + lines.extend(_format_meta(self.meta, indent=indent + 1)) + return linesep.join(lines) + class TaskDetail(AtomDetail): """A task detail (an atom detail typically associated with a |tt| atom). diff --git a/taskflow/utils/persistence_utils.py b/taskflow/utils/persistence_utils.py index 0837afb..1d4dc26 100644 --- a/taskflow/utils/persistence_utils.py +++ b/taskflow/utils/persistence_utils.py @@ -15,14 +15,11 @@ # under the License. import contextlib -import os -from oslo_utils import timeutils from oslo_utils import uuidutils from taskflow import logging from taskflow.persistence import models -from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -97,75 +94,3 @@ def create_flow_detail(flow, book=None, backend=None, meta=None): return book.find(flow_id) else: return flow_detail - - -def _format_meta(metadata, indent): - """Format the common metadata dictionary in the same manner.""" - if not metadata: - return [] - lines = [ - '%s- metadata:' % (" " * indent), - ] - for (k, v) in metadata.items(): - # Progress for now is a special snowflake and will be formatted - # in percent format. - if k == 'progress' and isinstance(v, misc.NUMERIC_TYPES): - v = "%0.2f%%" % (v * 100.0) - lines.append("%s+ %s = %s" % (" " * (indent + 2), k, v)) - return lines - - -def _format_shared(obj, indent): - """Format the common shared attributes in the same manner.""" - if obj is None: - return [] - lines = [] - for attr_name in ("uuid", "state"): - if not hasattr(obj, attr_name): - continue - lines.append("%s- %s = %s" % (" " * indent, attr_name, - getattr(obj, attr_name))) - return lines - - -def pformat_atom_detail(atom_detail, indent=0): - """Pretty formats a atom detail.""" - detail_type = models.atom_detail_type(atom_detail) - lines = ["%s%s: '%s'" % (" " * (indent), detail_type, atom_detail.name)] - lines.extend(_format_shared(atom_detail, indent=indent + 1)) - lines.append("%s- version = %s" - % (" " * (indent + 1), misc.get_version_string(atom_detail))) - lines.append("%s- results = %s" - % (" " * (indent + 1), atom_detail.results)) - lines.append("%s- failure = %s" % (" " * (indent + 1), - bool(atom_detail.failure))) - lines.extend(_format_meta(atom_detail.meta, indent=indent + 1)) - return os.linesep.join(lines) - - -def pformat_flow_detail(flow_detail, indent=0): - """Pretty formats a flow detail.""" - lines = ["%sFlow: '%s'" % (" " * indent, flow_detail.name)] - lines.extend(_format_shared(flow_detail, indent=indent + 1)) - lines.extend(_format_meta(flow_detail.meta, indent=indent + 1)) - for task_detail in flow_detail: - lines.append(pformat_atom_detail(task_detail, indent=indent + 1)) - return os.linesep.join(lines) - - -def pformat(book, indent=0): - """Pretty formats a logbook.""" - lines = ["%sLogbook: '%s'" % (" " * indent, book.name)] - lines.extend(_format_shared(book, indent=indent + 1)) - lines.extend(_format_meta(book.meta, indent=indent + 1)) - if book.created_at is not None: - lines.append("%s- created_at = %s" - % (" " * (indent + 1), - timeutils.isotime(book.created_at))) - if book.updated_at is not None: - lines.append("%s- updated_at = %s" - % (" " * (indent + 1), - timeutils.isotime(book.updated_at))) - for flow_detail in book: - lines.append(pformat_flow_detail(flow_detail, indent=indent + 1)) - return os.linesep.join(lines) |