summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-09-27 20:34:06 +0000
committerGerrit Code Review <review@openstack.org>2014-09-27 20:34:06 +0000
commit0578bac0d6b0f3facf39d2dc61b317d77a47153f (patch)
tree15946828a787759891cfe68fa44df52af0082ce6
parent2990d98bf354ac366a39a229476e1062cea8b6b9 (diff)
parentadf417cda526f4551079a0ae3fdcdeb8b803be09 (diff)
downloadtaskflow-0578bac0d6b0f3facf39d2dc61b317d77a47153f.tar.gz
Merge "Add a mandelbrot parallel calculation WBE example"
-rw-r--r--doc/source/examples.rst23
-rw-r--r--doc/source/img/mandelbrot.pngbin0 -> 21971 bytes
-rw-r--r--taskflow/examples/wbe_mandelbrot.out.txt6
-rw-r--r--taskflow/examples/wbe_mandelbrot.py254
4 files changed, 283 insertions, 0 deletions
diff --git a/doc/source/examples.rst b/doc/source/examples.rst
index 1085019..89731d8 100644
--- a/doc/source/examples.rst
+++ b/doc/source/examples.rst
@@ -175,3 +175,26 @@ Distributed execution (simple)
:language: python
:linenos:
:lines: 16-
+
+Distributed mandelbrot (complex)
+================================
+
+.. note::
+
+ Full source located at :example:`wbe_mandelbrot`
+
+Output
+------
+
+.. image:: img/mandelbrot.png
+ :height: 128px
+ :align: right
+ :alt: Generated mandelbrot fractal
+
+Code
+----
+
+.. literalinclude:: ../../taskflow/examples/wbe_mandelbrot.py
+ :language: python
+ :linenos:
+ :lines: 16-
diff --git a/doc/source/img/mandelbrot.png b/doc/source/img/mandelbrot.png
new file mode 100644
index 0000000..6dc26ee
--- /dev/null
+++ b/doc/source/img/mandelbrot.png
Binary files differ
diff --git a/taskflow/examples/wbe_mandelbrot.out.txt b/taskflow/examples/wbe_mandelbrot.out.txt
new file mode 100644
index 0000000..3b52641
--- /dev/null
+++ b/taskflow/examples/wbe_mandelbrot.out.txt
@@ -0,0 +1,6 @@
+Calculating your mandelbrot fractal of size 512x512.
+Running 2 workers.
+Execution finished.
+Stopping workers.
+Writing image...
+Gathered 262144 results that represents a mandelbrot image (using 8 chunks that are computed jointly by 2 workers).
diff --git a/taskflow/examples/wbe_mandelbrot.py b/taskflow/examples/wbe_mandelbrot.py
new file mode 100644
index 0000000..55ca6e1
--- /dev/null
+++ b/taskflow/examples/wbe_mandelbrot.py
@@ -0,0 +1,254 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import math
+import os
+import sys
+import threading
+
+import six
+
+top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ os.pardir,
+ os.pardir))
+sys.path.insert(0, top_dir)
+
+from taskflow import engines
+from taskflow.engines.worker_based import worker
+from taskflow.patterns import unordered_flow as uf
+from taskflow import task
+
+# INTRO: This example walks through a workflow that will in parallel compute
+# a mandelbrot result set (using X 'remote' workers) and then combine their
+# results together to form a final mandelbrot fractal image. It shows a usage
+# of taskflow to perform a well-known embarrassingly parallel problem that has
+# the added benefit of also being an elegant visualization.
+#
+# NOTE(harlowja): this example simulates the expected larger number of workers
+# by using a set of threads (which in this example simulate the remote workers
+# that would typically be running on other external machines).
+#
+# NOTE(harlowja): to have it produce an image run (after installing pillow):
+#
+# $ python taskflow/examples/wbe_mandelbrot.py output.png
+
+BASE_SHARED_CONF = {
+ 'exchange': 'taskflow',
+}
+WORKERS = 2
+WORKER_CONF = {
+ # These are the tasks the worker can execute, they *must* be importable,
+ # typically this list is used to restrict what workers may execute to
+ # a smaller set of *allowed* tasks that are known to be safe (one would
+ # not want to allow all python code to be executed).
+ 'tasks': [
+ '%s:MandelCalculator' % (__name__),
+ ],
+}
+ENGINE_CONF = {
+ 'engine': 'worker-based',
+}
+
+# Mandelbrot & image settings...
+IMAGE_SIZE = (512, 512)
+CHUNK_COUNT = 8
+MAX_ITERATIONS = 25
+
+
+class MandelCalculator(task.Task):
+ def execute(self, image_config, mandelbrot_config, chunk):
+ """Returns the number of iterations before the computation "escapes".
+
+ Given the real and imaginary parts of a complex number, determine if it
+ is a candidate for membership in the mandelbrot set given a fixed
+ number of iterations.
+ """
+
+ # Parts borrowed from (credit to mark harris and benoƮt mandelbrot).
+ #
+ # http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43
+ def mandelbrot(x, y, max_iters):
+ c = complex(x, y)
+ z = 0.0j
+ for i in six.moves.xrange(max_iters):
+ z = z * z + c
+ if (z.real * z.real + z.imag * z.imag) >= 4:
+ return i
+ return max_iters
+
+ min_x, max_x, min_y, max_y, max_iters = mandelbrot_config
+ height, width = image_config['size']
+ pixel_size_x = (max_x - min_x) / width
+ pixel_size_y = (max_y - min_y) / height
+ block = []
+ for y in six.moves.xrange(chunk[0], chunk[1]):
+ row = []
+ imag = min_y + y * pixel_size_y
+ for x in six.moves.xrange(0, width):
+ real = min_x + x * pixel_size_x
+ row.append(mandelbrot(real, imag, max_iters))
+ block.append(row)
+ return block
+
+
+def calculate(engine_conf):
+ # Subdivide the work into X pieces, then request each worker to calculate
+ # one of those chunks and then later we will write these chunks out to
+ # an image bitmap file.
+
+ # And unordered flow is used here since the mandelbrot calculation is an
+ # example of a embarrassingly parallel computation that we can scatter
+ # across as many workers as possible.
+ flow = uf.Flow("mandelbrot")
+
+ # These symbols will be automatically given to tasks as input to there
+ # execute method, in this case these are constants used in the mandelbrot
+ # calculation.
+ store = {
+ 'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS],
+ 'image_config': {
+ 'size': IMAGE_SIZE,
+ }
+ }
+
+ # We need the task names to be in the right order so that we can extract
+ # the final results in the right order (we don't care about the order when
+ # executing).
+ task_names = []
+
+ # Compose our workflow.
+ height, width = IMAGE_SIZE
+ chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
+ for i in six.moves.xrange(0, CHUNK_COUNT):
+ chunk_name = 'chunk_%s' % i
+ task_name = "calculation_%s" % i
+ # Break the calculation up into chunk size pieces.
+ rows = [i * chunk_size, i * chunk_size + chunk_size]
+ flow.add(
+ MandelCalculator(task_name,
+ # This ensures the storage symbol with name
+ # 'chunk_name' is sent into the tasks local
+ # symbol 'chunk'. This is how we give each
+ # calculator its own correct sequence of rows
+ # to work on.
+ rebind={'chunk': chunk_name}))
+ store[chunk_name] = rows
+ task_names.append(task_name)
+
+ # Now execute it.
+ eng = engines.load(flow, store=store, engine_conf=engine_conf)
+ eng.run()
+
+ # Gather all the results and order them for further processing.
+ gather = []
+ for name in task_names:
+ gather.extend(eng.storage.get(name))
+ points = []
+ for y, row in enumerate(gather):
+ for x, color in enumerate(row):
+ points.append(((x, y), color))
+ return points
+
+
+def write_image(results, output_filename=None):
+ print("Gathered %s results that represents a mandelbrot"
+ " image (using %s chunks that are computed jointly"
+ " by %s workers)." % (len(results), CHUNK_COUNT, WORKERS))
+ if not output_filename:
+ return
+
+ # Pillow (the PIL fork) saves us from writing our own image writer...
+ try:
+ from PIL import Image
+ except ImportError as e:
+ # To currently get this (may change in the future),
+ # $ pip install Pillow
+ raise RuntimeError("Pillow is required to write image files: %s" % e)
+
+ # Limit to 255, find the max and normalize to that...
+ color_max = 0
+ for _point, color in results:
+ color_max = max(color, color_max)
+
+ # Use gray scale since we don't really have other colors.
+ img = Image.new('L', IMAGE_SIZE, "black")
+ pixels = img.load()
+ for (x, y), color in results:
+ if color_max == 0:
+ color = 0
+ else:
+ color = int((float(color) / color_max) * 255.0)
+ pixels[x, y] = color
+ img.save(output_filename)
+
+
+def create_fractal():
+ logging.basicConfig(level=logging.ERROR)
+
+ # Setup our transport configuration and merge it into the worker and
+ # engine configuration so that both of those use it correctly.
+ shared_conf = dict(BASE_SHARED_CONF)
+ shared_conf.update({
+ 'transport': 'memory',
+ 'transport_options': {
+ 'polling_interval': 0.1,
+ },
+ })
+
+ if len(sys.argv) >= 2:
+ output_filename = sys.argv[1]
+ else:
+ output_filename = None
+
+ worker_conf = dict(WORKER_CONF)
+ worker_conf.update(shared_conf)
+ engine_conf = dict(ENGINE_CONF)
+ engine_conf.update(shared_conf)
+ workers = []
+ worker_topics = []
+
+ print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE)
+ try:
+ # Create a set of workers to simulate actual remote workers.
+ print('Running %s workers.' % (WORKERS))
+ for i in range(0, WORKERS):
+ worker_conf['topic'] = 'calculator_%s' % (i + 1)
+ worker_topics.append(worker_conf['topic'])
+ w = worker.Worker(**worker_conf)
+ runner = threading.Thread(target=w.run)
+ runner.daemon = True
+ runner.start()
+ w.wait()
+ workers.append((runner, w.stop))
+
+ # Now use those workers to do something.
+ engine_conf['topics'] = worker_topics
+ results = calculate(engine_conf)
+ print('Execution finished.')
+ finally:
+ # And cleanup.
+ print('Stopping workers.')
+ while workers:
+ r, stopper = workers.pop()
+ stopper()
+ r.join()
+ print("Writing image...")
+ write_image(results, output_filename=output_filename)
+
+
+if __name__ == "__main__":
+ create_fractal()