summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2015-07-13 11:33:12 -0700
committerJoshua Harlow <harlowja@gmail.com>2015-07-17 08:48:17 -0700
commit02c83d40612bbe3146a9f2ff212759ecdcdeb8ba (patch)
tree2c52a1a6a8d0243b8c576a36c96ca371ef490f4e
parent9f846d0475b9862da6af52bd959d15a2cd8f5ab0 (diff)
downloadtaskflow-02c83d40612bbe3146a9f2ff212759ecdcdeb8ba.tar.gz
Remove **most** usage of taskflow.utils in examples
It appears folks are using the taskflow.utils code in there own code-bases (likely taking it from the examples) which we do not want to encourage, so remove the usage of **most** of taskflow.utils code from the examples so that people are less likely to copy/paste/reference it. Change-Id: I0ce3c520de347e3e746e7912aa1366a515458424
-rw-r--r--taskflow/examples/dump_memory_backend.py14
-rw-r--r--taskflow/examples/hello_world.py14
-rw-r--r--taskflow/examples/parallel_table_multiply.py6
-rw-r--r--taskflow/examples/persistence_example.py3
-rw-r--r--taskflow/examples/resume_from_backend.py31
-rw-r--r--taskflow/examples/resume_vm_boot.py18
-rw-r--r--taskflow/examples/resume_volume_create.py13
-rw-r--r--taskflow/examples/run_by_iter.py7
-rw-r--r--taskflow/examples/run_by_iter_enumerate.py7
-rw-r--r--taskflow/examples/switch_graph_flow.py12
-rw-r--r--taskflow/persistence/models.py93
-rw-r--r--taskflow/utils/persistence_utils.py75
12 files changed, 159 insertions, 134 deletions
diff --git a/taskflow/examples/dump_memory_backend.py b/taskflow/examples/dump_memory_backend.py
index 6c6d548..d448667 100644
--- a/taskflow/examples/dump_memory_backend.py
+++ b/taskflow/examples/dump_memory_backend.py
@@ -29,9 +29,7 @@ sys.path.insert(0, self_dir)
from taskflow import engines
from taskflow.patterns import linear_flow as lf
-from taskflow.persistence import backends
from taskflow import task
-from taskflow.utils import persistence_utils as pu
# INTRO: in this example we create a dummy flow with a dummy task, and run
# it using a in-memory backend and pre/post run we dump out the contents
@@ -43,22 +41,18 @@ class PrintTask(task.Task):
def execute(self):
print("Running '%s'" % self.name)
-
-backend = backends.fetch({
- 'connection': 'memory://',
-})
-book, flow_detail = pu.temporary_flow_detail(backend=backend)
-
# Make a little flow and run it...
f = lf.Flow('root')
for alpha in ['a', 'b', 'c']:
f.add(PrintTask(alpha))
-e = engines.load(f, flow_detail=flow_detail,
- book=book, backend=backend)
+e = engines.load(f)
e.compile()
e.prepare()
+# After prepare the storage layer + backend can now be accessed safely...
+backend = e.storage.backend
+
print("----------")
print("Before run")
print("----------")
diff --git a/taskflow/examples/hello_world.py b/taskflow/examples/hello_world.py
index 38a6b38..2ec1c95 100644
--- a/taskflow/examples/hello_world.py
+++ b/taskflow/examples/hello_world.py
@@ -31,7 +31,6 @@ from taskflow import engines
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf
from taskflow import task
-from taskflow.utils import eventlet_utils
# INTRO: This is the defacto hello world equivalent for taskflow; it shows how
@@ -82,25 +81,34 @@ song.add(PrinterTask("conductor@begin",
show_name=False, inject={'output': "*dong*"}))
# Run in parallel using eventlet green threads...
-if eventlet_utils.EVENTLET_AVAILABLE:
- with futurist.GreenThreadPoolExecutor() as executor:
+try:
+ executor = futurist.GreenThreadPoolExecutor()
+except RuntimeError:
+ # No eventlet currently active, skip running with it...
+ pass
+else:
+ print("-- Running in parallel using eventlet --")
+ with executor:
e = engines.load(song, executor=executor, engine='parallel')
e.run()
# Run in parallel using real threads...
with futurist.ThreadPoolExecutor(max_workers=1) as executor:
+ print("-- Running in parallel using threads --")
e = engines.load(song, executor=executor, engine='parallel')
e.run()
# Run in parallel using external processes...
with futurist.ProcessPoolExecutor(max_workers=1) as executor:
+ print("-- Running in parallel using processes --")
e = engines.load(song, executor=executor, engine='parallel')
e.run()
# Run serially (aka, if the workflow could have been ran in parallel, it will
# not be when ran in this mode)...
+print("-- Running serially --")
e = engines.load(song, engine='serial')
e.run()
diff --git a/taskflow/examples/parallel_table_multiply.py b/taskflow/examples/parallel_table_multiply.py
index e06e36d..5cd8e9c 100644
--- a/taskflow/examples/parallel_table_multiply.py
+++ b/taskflow/examples/parallel_table_multiply.py
@@ -33,7 +33,6 @@ from six.moves import range as compat_range
from taskflow import engines
from taskflow.patterns import unordered_flow as uf
from taskflow import task
-from taskflow.utils import eventlet_utils
# INTRO: This example walks through a miniature workflow which does a parallel
# table modification where each row in the table gets adjusted by a thread, or
@@ -97,9 +96,10 @@ def main():
f = make_flow(tbl)
# Now run it (using the specified executor)...
- if eventlet_utils.EVENTLET_AVAILABLE:
+ try:
executor = futurist.GreenThreadPoolExecutor(max_workers=5)
- else:
+ except RuntimeError:
+ # No eventlet currently active, use real threads instead.
executor = futurist.ThreadPoolExecutor(max_workers=5)
try:
e = engines.load(f, engine='parallel', executor=executor)
diff --git a/taskflow/examples/persistence_example.py b/taskflow/examples/persistence_example.py
index de9b427..c7c0954 100644
--- a/taskflow/examples/persistence_example.py
+++ b/taskflow/examples/persistence_example.py
@@ -33,7 +33,6 @@ from taskflow import engines
from taskflow.patterns import linear_flow as lf
from taskflow.persistence import models
from taskflow import task
-from taskflow.utils import persistence_utils as p_utils
import example_utils as eu # noqa
@@ -110,4 +109,4 @@ with eu.get_backend(backend_uri) as backend:
traceback.print_exc(file=sys.stdout)
eu.print_wrapped("Book contents")
- print(p_utils.pformat(book))
+ print(book.pformat())
diff --git a/taskflow/examples/resume_from_backend.py b/taskflow/examples/resume_from_backend.py
index 677937d..1b8d160 100644
--- a/taskflow/examples/resume_from_backend.py
+++ b/taskflow/examples/resume_from_backend.py
@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import contextlib
import logging
import os
import sys
@@ -27,10 +28,12 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)
+from oslo_utils import uuidutils
+
import taskflow.engines
from taskflow.patterns import linear_flow as lf
+from taskflow.persistence import models
from taskflow import task
-from taskflow.utils import persistence_utils as p_utils
import example_utils as eu # noqa
@@ -99,19 +102,25 @@ def flow_factory():
# INITIALIZE PERSISTENCE ####################################
with eu.get_backend() as backend:
- logbook = p_utils.temporary_log_book(backend)
+
+ # Create a place where the persistence information will be stored.
+ book = models.LogBook("example")
+ flow_detail = models.FlowDetail("resume from backend example",
+ uuid=uuidutils.generate_uuid())
+ book.add(flow_detail)
+ with contextlib.closing(backend.get_connection()) as conn:
+ conn.save_logbook(book)
# CREATE AND RUN THE FLOW: FIRST ATTEMPT ####################
flow = flow_factory()
- flowdetail = p_utils.create_flow_detail(flow, logbook, backend)
- engine = taskflow.engines.load(flow, flow_detail=flowdetail,
- backend=backend)
+ engine = taskflow.engines.load(flow, flow_detail=flow_detail,
+ book=book, backend=backend)
- print_task_states(flowdetail, "At the beginning, there is no state")
+ print_task_states(flow_detail, "At the beginning, there is no state")
eu.print_wrapped("Running")
engine.run()
- print_task_states(flowdetail, "After running")
+ print_task_states(flow_detail, "After running")
# RE-CREATE, RESUME, RUN ####################################
@@ -127,9 +136,9 @@ with eu.get_backend() as backend:
# start it again for situations where this is useful to-do (say the process
# running the above flow crashes).
flow2 = flow_factory()
- flowdetail2 = find_flow_detail(backend, logbook.uuid, flowdetail.uuid)
+ flow_detail_2 = find_flow_detail(backend, book.uuid, flow_detail.uuid)
engine2 = taskflow.engines.load(flow2,
- flow_detail=flowdetail2,
- backend=backend)
+ flow_detail=flow_detail_2,
+ backend=backend, book=book)
engine2.run()
- print_task_states(flowdetail2, "At the end")
+ print_task_states(flow_detail_2, "At the end")
diff --git a/taskflow/examples/resume_vm_boot.py b/taskflow/examples/resume_vm_boot.py
index ec2293b..70c8d28 100644
--- a/taskflow/examples/resume_vm_boot.py
+++ b/taskflow/examples/resume_vm_boot.py
@@ -38,9 +38,8 @@ from taskflow import engines
from taskflow import exceptions as exc
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
+from taskflow.persistence import models
from taskflow import task
-from taskflow.utils import eventlet_utils
-from taskflow.utils import persistence_utils as p_utils
import example_utils as eu # noqa
@@ -226,6 +225,8 @@ eu.print_wrapped("Initializing")
# Setup the persistence & resumption layer.
with eu.get_backend() as backend:
+
+ # Try to find a previously passed in tracking id...
try:
book_id, flow_id = sys.argv[2].split("+", 1)
if not uuidutils.is_uuid_like(book_id):
@@ -237,14 +238,17 @@ with eu.get_backend() as backend:
flow_id = None
# Set up how we want our engine to run, serial, parallel...
- executor = None
- if eventlet_utils.EVENTLET_AVAILABLE:
- executor = futurist.GreenThreadPoolExecutor(5)
+ try:
+ executor = futurist.GreenThreadPoolExecutor(max_workers=5)
+ except RuntimeError:
+ # No eventlet installed, just let the default be used instead.
+ executor = None
# Create/fetch a logbook that will track the workflows work.
book = None
flow_detail = None
if all([book_id, flow_id]):
+ # Try to find in a prior logbook and flow detail...
with contextlib.closing(backend.get_connection()) as conn:
try:
book = conn.get_logbook(book_id)
@@ -252,7 +256,9 @@ with eu.get_backend() as backend:
except exc.NotFound:
pass
if book is None and flow_detail is None:
- book = p_utils.temporary_log_book(backend)
+ book = models.LogBook("vm-boot")
+ with contextlib.closing(backend.get_connection()) as conn:
+ conn.save_logbook(book)
engine = engines.load_from_factory(create_flow,
backend=backend, book=book,
engine='parallel',
diff --git a/taskflow/examples/resume_volume_create.py b/taskflow/examples/resume_volume_create.py
index 93025d9..3c11812 100644
--- a/taskflow/examples/resume_volume_create.py
+++ b/taskflow/examples/resume_volume_create.py
@@ -31,11 +31,13 @@ top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)
+from oslo_utils import uuidutils
+
from taskflow import engines
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
+from taskflow.persistence import models
from taskflow import task
-from taskflow.utils import persistence_utils as p_utils
import example_utils # noqa
@@ -134,9 +136,12 @@ with example_utils.get_backend() as backend:
# potentially running (and which may have partially completed) back
# with taskflow so that those workflows can be resumed (or reverted)
# after a process/thread/engine has failed in someway.
- logbook = p_utils.temporary_log_book(backend)
- flow_detail = p_utils.create_flow_detail(flow, logbook, backend)
- print("!! Your tracking id is: '%s+%s'" % (logbook.uuid,
+ book = models.LogBook('resume-volume-create')
+ flow_detail = models.FlowDetail("root", uuid=uuidutils.generate_uuid())
+ book.add(flow_detail)
+ with contextlib.closing(backend.get_connection()) as conn:
+ conn.save_logbook(book)
+ print("!! Your tracking id is: '%s+%s'" % (book.uuid,
flow_detail.uuid))
print("!! Please submit this on later runs for tracking purposes")
else:
diff --git a/taskflow/examples/run_by_iter.py b/taskflow/examples/run_by_iter.py
index 3a00a10..37087ec 100644
--- a/taskflow/examples/run_by_iter.py
+++ b/taskflow/examples/run_by_iter.py
@@ -32,9 +32,7 @@ sys.path.insert(0, self_dir)
from taskflow import engines
from taskflow.patterns import linear_flow as lf
-from taskflow.persistence import backends as persistence_backends
from taskflow import task
-from taskflow.utils import persistence_utils
# INTRO: This example shows how to run a set of engines at the same time, each
@@ -73,12 +71,9 @@ flows = []
for i in range(0, flow_count):
f = make_alphabet_flow(i + 1)
flows.append(make_alphabet_flow(i + 1))
-be = persistence_backends.fetch(conf={'connection': 'memory'})
-book = persistence_utils.temporary_log_book(be)
engine_iters = []
for f in flows:
- fd = persistence_utils.create_flow_detail(f, book, be)
- e = engines.load(f, flow_detail=fd, backend=be, book=book)
+ e = engines.load(f)
e.compile()
e.storage.inject({'A': 'A'})
e.prepare()
diff --git a/taskflow/examples/run_by_iter_enumerate.py b/taskflow/examples/run_by_iter_enumerate.py
index 07334cc..37901b2 100644
--- a/taskflow/examples/run_by_iter_enumerate.py
+++ b/taskflow/examples/run_by_iter_enumerate.py
@@ -29,9 +29,7 @@ sys.path.insert(0, self_dir)
from taskflow import engines
from taskflow.patterns import linear_flow as lf
-from taskflow.persistence import backends as persistence_backends
from taskflow import task
-from taskflow.utils import persistence_utils
# INTRO: These examples show how to run an engine using the engine iteration
# capability, in between iterations other activities occur (in this case a
@@ -48,10 +46,7 @@ f = lf.Flow("counter")
for i in range(0, 10):
f.add(EchoNameTask("echo_%s" % (i + 1)))
-be = persistence_backends.fetch(conf={'connection': 'memory'})
-book = persistence_utils.temporary_log_book(be)
-fd = persistence_utils.create_flow_detail(f, book, be)
-e = engines.load(f, flow_detail=fd, backend=be, book=book)
+e = engines.load(f)
e.compile()
e.prepare()
diff --git a/taskflow/examples/switch_graph_flow.py b/taskflow/examples/switch_graph_flow.py
index 273763c..471e633 100644
--- a/taskflow/examples/switch_graph_flow.py
+++ b/taskflow/examples/switch_graph_flow.py
@@ -27,9 +27,7 @@ sys.path.insert(0, top_dir)
from taskflow import engines
from taskflow.patterns import graph_flow as gf
-from taskflow.persistence import backends
from taskflow import task
-from taskflow.utils import persistence_utils as pu
class DummyTask(task.Task):
@@ -42,18 +40,15 @@ def allow(history):
return False
+# Declare our work to be done...
r = gf.Flow("root")
r_a = DummyTask('r-a')
r_b = DummyTask('r-b')
r.add(r_a, r_b)
r.link(r_a, r_b, decider=allow)
-backend = backends.fetch({
- 'connection': 'memory://',
-})
-book, flow_detail = pu.temporary_flow_detail(backend=backend)
-
-e = engines.load(r, flow_detail=flow_detail, book=book, backend=backend)
+# Setup and run the engine layer.
+e = engines.load(r)
e.compile()
e.prepare()
e.run()
@@ -62,6 +57,7 @@ e.run()
print("---------")
print("After run")
print("---------")
+backend = e.storage.backend
entries = [os.path.join(backend.memory.root_path, child)
for child in backend.memory.ls(backend.memory.root_path)]
while entries:
diff --git a/taskflow/persistence/models.py b/taskflow/persistence/models.py
index c7a6eae..93314f0 100644
--- a/taskflow/persistence/models.py
+++ b/taskflow/persistence/models.py
@@ -17,6 +17,7 @@
import abc
import copy
+import os
from oslo_utils import timeutils
from oslo_utils import uuidutils
@@ -26,6 +27,7 @@ from taskflow import exceptions as exc
from taskflow import logging
from taskflow import states
from taskflow.types import failure as ft
+from taskflow.utils import misc
LOG = logging.getLogger(__name__)
@@ -33,6 +35,35 @@ LOG = logging.getLogger(__name__)
# Internal helpers...
+def _format_meta(metadata, indent):
+ """Format the common metadata dictionary in the same manner."""
+ if not metadata:
+ return []
+ lines = [
+ '%s- metadata:' % (" " * indent),
+ ]
+ for (k, v) in metadata.items():
+ # Progress for now is a special snowflake and will be formatted
+ # in percent format.
+ if k == 'progress' and isinstance(v, misc.NUMERIC_TYPES):
+ v = "%0.2f%%" % (v * 100.0)
+ lines.append("%s+ %s = %s" % (" " * (indent + 2), k, v))
+ return lines
+
+
+def _format_shared(obj, indent):
+ """Format the common shared attributes in the same manner."""
+ if obj is None:
+ return []
+ lines = []
+ for attr_name in ("uuid", "state"):
+ if not hasattr(obj, attr_name):
+ continue
+ lines.append("%s- %s = %s" % (" " * indent, attr_name,
+ getattr(obj, attr_name)))
+ return lines
+
+
def _copy_function(deep_copy):
if deep_copy:
return copy.deepcopy
@@ -96,6 +127,33 @@ class LogBook(object):
self.updated_at = None
self.meta = {}
+ def pformat(self, indent=0, linesep=os.linesep):
+ """Pretty formats this logbook into a string.
+
+ >>> from taskflow.persistence import models
+ >>> tmp = models.LogBook("example")
+ >>> print(tmp.pformat())
+ LogBook: 'example'
+ - uuid = ...
+ - created_at = ...
+ """
+ cls_name = self.__class__.__name__
+ lines = ["%s%s: '%s'" % (" " * indent, cls_name, self.name)]
+ lines.extend(_format_shared(self, indent=indent + 1))
+ lines.extend(_format_meta(self.meta, indent=indent + 1))
+ if self.created_at is not None:
+ lines.append("%s- created_at = %s"
+ % (" " * (indent + 1),
+ timeutils.isotime(self.created_at)))
+ if self.updated_at is not None:
+ lines.append("%s- updated_at = %s"
+ % (" " * (indent + 1),
+ timeutils.isotime(self.updated_at)))
+ for flow_detail in self:
+ lines.append(flow_detail.pformat(indent=indent + 1,
+ linesep=linesep))
+ return linesep.join(lines)
+
def add(self, fd):
"""Adds a new flow detail into this logbook.
@@ -267,6 +325,27 @@ class FlowDetail(object):
self.meta = fd.meta
return self
+ def pformat(self, indent=0, linesep=os.linesep):
+ """Pretty formats this flow detail into a string.
+
+ >>> from oslo_utils import uuidutils
+ >>> from taskflow.persistence import models
+ >>> flow_detail = models.FlowDetail("example",
+ ... uuid=uuidutils.generate_uuid())
+ >>> print(flow_detail.pformat())
+ FlowDetail: 'example'
+ - uuid = ...
+ - state = ...
+ """
+ cls_name = self.__class__.__name__
+ lines = ["%s%s: '%s'" % (" " * indent, cls_name, self.name)]
+ lines.extend(_format_shared(self, indent=indent + 1))
+ lines.extend(_format_meta(self.meta, indent=indent + 1))
+ for atom_detail in self:
+ lines.append(atom_detail.pformat(indent=indent + 1,
+ linesep=linesep))
+ return linesep.join(lines)
+
def merge(self, fd, deep_copy=False):
"""Merges the current object state with the given one's state.
@@ -572,6 +651,20 @@ class AtomDetail(object):
def copy(self):
"""Copies this atom detail."""
+ def pformat(self, indent=0, linesep=os.linesep):
+ """Pretty formats this atom detail into a string."""
+ cls_name = self.__class__.__name__
+ lines = ["%s%s: '%s'" % (" " * (indent), cls_name, self.name)]
+ lines.extend(_format_shared(self, indent=indent + 1))
+ lines.append("%s- version = %s"
+ % (" " * (indent + 1), misc.get_version_string(self)))
+ lines.append("%s- results = %s"
+ % (" " * (indent + 1), self.results))
+ lines.append("%s- failure = %s" % (" " * (indent + 1),
+ bool(self.failure)))
+ lines.extend(_format_meta(self.meta, indent=indent + 1))
+ return linesep.join(lines)
+
class TaskDetail(AtomDetail):
"""A task detail (an atom detail typically associated with a |tt| atom).
diff --git a/taskflow/utils/persistence_utils.py b/taskflow/utils/persistence_utils.py
index 0837afb..1d4dc26 100644
--- a/taskflow/utils/persistence_utils.py
+++ b/taskflow/utils/persistence_utils.py
@@ -15,14 +15,11 @@
# under the License.
import contextlib
-import os
-from oslo_utils import timeutils
from oslo_utils import uuidutils
from taskflow import logging
from taskflow.persistence import models
-from taskflow.utils import misc
LOG = logging.getLogger(__name__)
@@ -97,75 +94,3 @@ def create_flow_detail(flow, book=None, backend=None, meta=None):
return book.find(flow_id)
else:
return flow_detail
-
-
-def _format_meta(metadata, indent):
- """Format the common metadata dictionary in the same manner."""
- if not metadata:
- return []
- lines = [
- '%s- metadata:' % (" " * indent),
- ]
- for (k, v) in metadata.items():
- # Progress for now is a special snowflake and will be formatted
- # in percent format.
- if k == 'progress' and isinstance(v, misc.NUMERIC_TYPES):
- v = "%0.2f%%" % (v * 100.0)
- lines.append("%s+ %s = %s" % (" " * (indent + 2), k, v))
- return lines
-
-
-def _format_shared(obj, indent):
- """Format the common shared attributes in the same manner."""
- if obj is None:
- return []
- lines = []
- for attr_name in ("uuid", "state"):
- if not hasattr(obj, attr_name):
- continue
- lines.append("%s- %s = %s" % (" " * indent, attr_name,
- getattr(obj, attr_name)))
- return lines
-
-
-def pformat_atom_detail(atom_detail, indent=0):
- """Pretty formats a atom detail."""
- detail_type = models.atom_detail_type(atom_detail)
- lines = ["%s%s: '%s'" % (" " * (indent), detail_type, atom_detail.name)]
- lines.extend(_format_shared(atom_detail, indent=indent + 1))
- lines.append("%s- version = %s"
- % (" " * (indent + 1), misc.get_version_string(atom_detail)))
- lines.append("%s- results = %s"
- % (" " * (indent + 1), atom_detail.results))
- lines.append("%s- failure = %s" % (" " * (indent + 1),
- bool(atom_detail.failure)))
- lines.extend(_format_meta(atom_detail.meta, indent=indent + 1))
- return os.linesep.join(lines)
-
-
-def pformat_flow_detail(flow_detail, indent=0):
- """Pretty formats a flow detail."""
- lines = ["%sFlow: '%s'" % (" " * indent, flow_detail.name)]
- lines.extend(_format_shared(flow_detail, indent=indent + 1))
- lines.extend(_format_meta(flow_detail.meta, indent=indent + 1))
- for task_detail in flow_detail:
- lines.append(pformat_atom_detail(task_detail, indent=indent + 1))
- return os.linesep.join(lines)
-
-
-def pformat(book, indent=0):
- """Pretty formats a logbook."""
- lines = ["%sLogbook: '%s'" % (" " * indent, book.name)]
- lines.extend(_format_shared(book, indent=indent + 1))
- lines.extend(_format_meta(book.meta, indent=indent + 1))
- if book.created_at is not None:
- lines.append("%s- created_at = %s"
- % (" " * (indent + 1),
- timeutils.isotime(book.created_at)))
- if book.updated_at is not None:
- lines.append("%s- updated_at = %s"
- % (" " * (indent + 1),
- timeutils.isotime(book.updated_at)))
- for flow_detail in book:
- lines.append(pformat_flow_detail(flow_detail, indent=indent + 1))
- return os.linesep.join(lines)