summaryrefslogtreecommitdiff
path: root/taskflow/examples/parallel_table_multiply.py
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2014-11-07 06:24:20 -0800
committerJoshua Harlow <harlowja@yahoo-inc.com>2014-12-17 19:13:35 +0000
commitcafa3b2e256275d24d1c2a298580316448119740 (patch)
treea9a8c309b9ca63ed8f967e76f2bb55753d749f69 /taskflow/examples/parallel_table_multiply.py
parent74ebb43474e3ec2a5b9f832f2a44b67b8c83dc68 (diff)
downloadtaskflow-cafa3b2e256275d24d1c2a298580316448119740.tar.gz
Add a parallel table mutation example
A new simple example that is pretty easy to follow that does a embarrassingly parallel computation on some input table to create a new output table (by performing a multiplication on each cell in that source table to create a new table). Part of blueprint more-examples Change-Id: I2684f39b3525ee2d43a03ab353d029fdc0e1b2a1
Diffstat (limited to 'taskflow/examples/parallel_table_multiply.py')
-rw-r--r--taskflow/examples/parallel_table_multiply.py129
1 files changed, 129 insertions, 0 deletions
diff --git a/taskflow/examples/parallel_table_multiply.py b/taskflow/examples/parallel_table_multiply.py
new file mode 100644
index 0000000..88562a2
--- /dev/null
+++ b/taskflow/examples/parallel_table_multiply.py
@@ -0,0 +1,129 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import csv
+import logging
+import os
+import random
+import sys
+
+logging.basicConfig(level=logging.ERROR)
+
+top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ os.pardir,
+ os.pardir))
+sys.path.insert(0, top_dir)
+
+from six.moves import range as compat_range
+
+from taskflow import engines
+from taskflow.patterns import unordered_flow as uf
+from taskflow import task
+from taskflow.types import futures
+from taskflow.utils import async_utils
+
+# INTRO: This example walks through a miniature workflow which does a parallel
+# table modification where each row in the table gets adjusted by a thread, or
+# green thread (if eventlet is available) in parallel and then the result
+# is reformed into a new table and some verifications are performed on it
+# to ensure everything went as expected.
+
+
+MULTIPLER = 10
+
+
+class RowMultiplier(task.Task):
+ """Performs a modification of an input row, creating a output row."""
+
+ def __init__(self, name, index, row, multiplier):
+ super(RowMultiplier, self).__init__(name=name)
+ self.index = index
+ self.multiplier = multiplier
+ self.row = row
+
+ def execute(self):
+ return [r * self.multiplier for r in self.row]
+
+
+def make_flow(table):
+ # This creation will allow for parallel computation (since the flow here
+ # is specifically unordered; and when things are unordered they have
+ # no dependencies and when things have no dependencies they can just be
+ # ran at the same time, limited in concurrency by the executor or max
+ # workers of that executor...)
+ f = uf.Flow("root")
+ for i, row in enumerate(table):
+ f.add(RowMultiplier("m-%s" % i, i, row, MULTIPLER))
+ # NOTE(harlowja): at this point nothing has ran, the above is just
+ # defining what should be done (but not actually doing it) and associating
+ # an ordering dependencies that should be enforced (the flow pattern used
+ # forces this), the engine in the later main() function will actually
+ # perform this work...
+ return f
+
+
+def main():
+ if len(sys.argv) == 2:
+ tbl = []
+ with open(sys.argv[1], 'rb') as fh:
+ reader = csv.reader(fh)
+ for row in reader:
+ tbl.append([float(r) if r else 0.0 for r in row])
+ else:
+ # Make some random table out of thin air...
+ tbl = []
+ cols = random.randint(1, 100)
+ rows = random.randint(1, 100)
+ for _i in compat_range(0, rows):
+ row = []
+ for _j in compat_range(0, cols):
+ row.append(random.random())
+ tbl.append(row)
+
+ # Generate the work to be done.
+ f = make_flow(tbl)
+
+ # Now run it (using the specified executor)...
+ if async_utils.EVENTLET_AVAILABLE:
+ executor = futures.GreenThreadPoolExecutor(max_workers=5)
+ else:
+ executor = futures.ThreadPoolExecutor(max_workers=5)
+ try:
+ e = engines.load(f, engine='parallel', executor=executor)
+ for st in e.run_iter():
+ print(st)
+ finally:
+ executor.shutdown()
+
+ # Find the old rows and put them into place...
+ #
+ # TODO(harlowja): probably easier just to sort instead of search...
+ computed_tbl = []
+ for i in compat_range(0, len(tbl)):
+ for t in f:
+ if t.index == i:
+ computed_tbl.append(e.storage.get(t.name))
+
+ # Do some basic validation (which causes the return code of this process
+ # to be different if things were not as expected...)
+ if len(computed_tbl) != len(tbl):
+ return 1
+ else:
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())