diff options
| author | Joshua Harlow <harlowja@yahoo-inc.com> | 2014-11-07 06:24:20 -0800 |
|---|---|---|
| committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2014-12-17 19:13:35 +0000 |
| commit | cafa3b2e256275d24d1c2a298580316448119740 (patch) | |
| tree | a9a8c309b9ca63ed8f967e76f2bb55753d749f69 /taskflow/examples/parallel_table_multiply.py | |
| parent | 74ebb43474e3ec2a5b9f832f2a44b67b8c83dc68 (diff) | |
| download | taskflow-cafa3b2e256275d24d1c2a298580316448119740.tar.gz | |
Add a parallel table mutation example
A new simple example that is pretty easy to follow that does
a embarrassingly parallel computation on some input table to
create a new output table (by performing a multiplication on
each cell in that source table to create a new table).
Part of blueprint more-examples
Change-Id: I2684f39b3525ee2d43a03ab353d029fdc0e1b2a1
Diffstat (limited to 'taskflow/examples/parallel_table_multiply.py')
| -rw-r--r-- | taskflow/examples/parallel_table_multiply.py | 129 |
1 files changed, 129 insertions, 0 deletions
diff --git a/taskflow/examples/parallel_table_multiply.py b/taskflow/examples/parallel_table_multiply.py new file mode 100644 index 0000000..88562a2 --- /dev/null +++ b/taskflow/examples/parallel_table_multiply.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import csv +import logging +import os +import random +import sys + +logging.basicConfig(level=logging.ERROR) + +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +from six.moves import range as compat_range + +from taskflow import engines +from taskflow.patterns import unordered_flow as uf +from taskflow import task +from taskflow.types import futures +from taskflow.utils import async_utils + +# INTRO: This example walks through a miniature workflow which does a parallel +# table modification where each row in the table gets adjusted by a thread, or +# green thread (if eventlet is available) in parallel and then the result +# is reformed into a new table and some verifications are performed on it +# to ensure everything went as expected. + + +MULTIPLER = 10 + + +class RowMultiplier(task.Task): + """Performs a modification of an input row, creating a output row.""" + + def __init__(self, name, index, row, multiplier): + super(RowMultiplier, self).__init__(name=name) + self.index = index + self.multiplier = multiplier + self.row = row + + def execute(self): + return [r * self.multiplier for r in self.row] + + +def make_flow(table): + # This creation will allow for parallel computation (since the flow here + # is specifically unordered; and when things are unordered they have + # no dependencies and when things have no dependencies they can just be + # ran at the same time, limited in concurrency by the executor or max + # workers of that executor...) + f = uf.Flow("root") + for i, row in enumerate(table): + f.add(RowMultiplier("m-%s" % i, i, row, MULTIPLER)) + # NOTE(harlowja): at this point nothing has ran, the above is just + # defining what should be done (but not actually doing it) and associating + # an ordering dependencies that should be enforced (the flow pattern used + # forces this), the engine in the later main() function will actually + # perform this work... + return f + + +def main(): + if len(sys.argv) == 2: + tbl = [] + with open(sys.argv[1], 'rb') as fh: + reader = csv.reader(fh) + for row in reader: + tbl.append([float(r) if r else 0.0 for r in row]) + else: + # Make some random table out of thin air... + tbl = [] + cols = random.randint(1, 100) + rows = random.randint(1, 100) + for _i in compat_range(0, rows): + row = [] + for _j in compat_range(0, cols): + row.append(random.random()) + tbl.append(row) + + # Generate the work to be done. + f = make_flow(tbl) + + # Now run it (using the specified executor)... + if async_utils.EVENTLET_AVAILABLE: + executor = futures.GreenThreadPoolExecutor(max_workers=5) + else: + executor = futures.ThreadPoolExecutor(max_workers=5) + try: + e = engines.load(f, engine='parallel', executor=executor) + for st in e.run_iter(): + print(st) + finally: + executor.shutdown() + + # Find the old rows and put them into place... + # + # TODO(harlowja): probably easier just to sort instead of search... + computed_tbl = [] + for i in compat_range(0, len(tbl)): + for t in f: + if t.index == i: + computed_tbl.append(e.storage.get(t.name)) + + # Do some basic validation (which causes the return code of this process + # to be different if things were not as expected...) + if len(computed_tbl) != len(tbl): + return 1 + else: + return 0 + + +if __name__ == "__main__": + sys.exit(main()) |
