summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2014-07-10 17:23:42 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2014-09-12 22:03:26 +0000
commitadf417cda526f4551079a0ae3fdcdeb8b803be09 (patch)
treef3926794fa1ca274d01fbfe7c85fedf93f0be32f
parent3ea627169767ecce8b17fa7be9e1cd51426d3469 (diff)
downloadtaskflow-adf417cda526f4551079a0ae3fdcdeb8b803be09.tar.gz
Add a mandelbrot parallel calculation WBE example
The mandelbrot calculation is neat example to provide to show how the WBE engine can be used to compute various calculations (the mandelbrot set in this example). This will also create an image from the output if a filename is provided and the pillow library is installed (that library does the actual image writing). See: http://en.wikipedia.org/wiki/Mandelbrot_set Part of blueprint more-examples Change-Id: I12b9b7a2ce61b17ddaa2930cc9cf266ae0c60932
-rw-r--r--doc/source/examples.rst23
-rw-r--r--doc/source/img/mandelbrot.pngbin0 -> 21971 bytes
-rw-r--r--taskflow/examples/wbe_mandelbrot.out.txt6
-rw-r--r--taskflow/examples/wbe_mandelbrot.py254
4 files changed, 283 insertions, 0 deletions
diff --git a/doc/source/examples.rst b/doc/source/examples.rst
index 9199bc1..a6f0dd9 100644
--- a/doc/source/examples.rst
+++ b/doc/source/examples.rst
@@ -163,3 +163,26 @@ Distributed execution (simple)
:language: python
:linenos:
:lines: 16-
+
+Distributed mandelbrot (complex)
+================================
+
+.. note::
+
+ Full source located at :example:`wbe_mandelbrot`
+
+Output
+------
+
+.. image:: img/mandelbrot.png
+ :height: 128px
+ :align: right
+ :alt: Generated mandelbrot fractal
+
+Code
+----
+
+.. literalinclude:: ../../taskflow/examples/wbe_mandelbrot.py
+ :language: python
+ :linenos:
+ :lines: 16-
diff --git a/doc/source/img/mandelbrot.png b/doc/source/img/mandelbrot.png
new file mode 100644
index 0000000..6dc26ee
--- /dev/null
+++ b/doc/source/img/mandelbrot.png
Binary files differ
diff --git a/taskflow/examples/wbe_mandelbrot.out.txt b/taskflow/examples/wbe_mandelbrot.out.txt
new file mode 100644
index 0000000..3b52641
--- /dev/null
+++ b/taskflow/examples/wbe_mandelbrot.out.txt
@@ -0,0 +1,6 @@
+Calculating your mandelbrot fractal of size 512x512.
+Running 2 workers.
+Execution finished.
+Stopping workers.
+Writing image...
+Gathered 262144 results that represents a mandelbrot image (using 8 chunks that are computed jointly by 2 workers).
diff --git a/taskflow/examples/wbe_mandelbrot.py b/taskflow/examples/wbe_mandelbrot.py
new file mode 100644
index 0000000..55ca6e1
--- /dev/null
+++ b/taskflow/examples/wbe_mandelbrot.py
@@ -0,0 +1,254 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import math
+import os
+import sys
+import threading
+
+import six
+
+top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ os.pardir,
+ os.pardir))
+sys.path.insert(0, top_dir)
+
+from taskflow import engines
+from taskflow.engines.worker_based import worker
+from taskflow.patterns import unordered_flow as uf
+from taskflow import task
+
+# INTRO: This example walks through a workflow that will in parallel compute
+# a mandelbrot result set (using X 'remote' workers) and then combine their
+# results together to form a final mandelbrot fractal image. It shows a usage
+# of taskflow to perform a well-known embarrassingly parallel problem that has
+# the added benefit of also being an elegant visualization.
+#
+# NOTE(harlowja): this example simulates the expected larger number of workers
+# by using a set of threads (which in this example simulate the remote workers
+# that would typically be running on other external machines).
+#
+# NOTE(harlowja): to have it produce an image run (after installing pillow):
+#
+# $ python taskflow/examples/wbe_mandelbrot.py output.png
+
+BASE_SHARED_CONF = {
+ 'exchange': 'taskflow',
+}
+WORKERS = 2
+WORKER_CONF = {
+ # These are the tasks the worker can execute, they *must* be importable,
+ # typically this list is used to restrict what workers may execute to
+ # a smaller set of *allowed* tasks that are known to be safe (one would
+ # not want to allow all python code to be executed).
+ 'tasks': [
+ '%s:MandelCalculator' % (__name__),
+ ],
+}
+ENGINE_CONF = {
+ 'engine': 'worker-based',
+}
+
+# Mandelbrot & image settings...
+IMAGE_SIZE = (512, 512)
+CHUNK_COUNT = 8
+MAX_ITERATIONS = 25
+
+
+class MandelCalculator(task.Task):
+ def execute(self, image_config, mandelbrot_config, chunk):
+ """Returns the number of iterations before the computation "escapes".
+
+ Given the real and imaginary parts of a complex number, determine if it
+ is a candidate for membership in the mandelbrot set given a fixed
+ number of iterations.
+ """
+
+ # Parts borrowed from (credit to mark harris and benoƮt mandelbrot).
+ #
+ # http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43
+ def mandelbrot(x, y, max_iters):
+ c = complex(x, y)
+ z = 0.0j
+ for i in six.moves.xrange(max_iters):
+ z = z * z + c
+ if (z.real * z.real + z.imag * z.imag) >= 4:
+ return i
+ return max_iters
+
+ min_x, max_x, min_y, max_y, max_iters = mandelbrot_config
+ height, width = image_config['size']
+ pixel_size_x = (max_x - min_x) / width
+ pixel_size_y = (max_y - min_y) / height
+ block = []
+ for y in six.moves.xrange(chunk[0], chunk[1]):
+ row = []
+ imag = min_y + y * pixel_size_y
+ for x in six.moves.xrange(0, width):
+ real = min_x + x * pixel_size_x
+ row.append(mandelbrot(real, imag, max_iters))
+ block.append(row)
+ return block
+
+
+def calculate(engine_conf):
+ # Subdivide the work into X pieces, then request each worker to calculate
+ # one of those chunks and then later we will write these chunks out to
+ # an image bitmap file.
+
+ # And unordered flow is used here since the mandelbrot calculation is an
+ # example of a embarrassingly parallel computation that we can scatter
+ # across as many workers as possible.
+ flow = uf.Flow("mandelbrot")
+
+ # These symbols will be automatically given to tasks as input to there
+ # execute method, in this case these are constants used in the mandelbrot
+ # calculation.
+ store = {
+ 'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS],
+ 'image_config': {
+ 'size': IMAGE_SIZE,
+ }
+ }
+
+ # We need the task names to be in the right order so that we can extract
+ # the final results in the right order (we don't care about the order when
+ # executing).
+ task_names = []
+
+ # Compose our workflow.
+ height, width = IMAGE_SIZE
+ chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
+ for i in six.moves.xrange(0, CHUNK_COUNT):
+ chunk_name = 'chunk_%s' % i
+ task_name = "calculation_%s" % i
+ # Break the calculation up into chunk size pieces.
+ rows = [i * chunk_size, i * chunk_size + chunk_size]
+ flow.add(
+ MandelCalculator(task_name,
+ # This ensures the storage symbol with name
+ # 'chunk_name' is sent into the tasks local
+ # symbol 'chunk'. This is how we give each
+ # calculator its own correct sequence of rows
+ # to work on.
+ rebind={'chunk': chunk_name}))
+ store[chunk_name] = rows
+ task_names.append(task_name)
+
+ # Now execute it.
+ eng = engines.load(flow, store=store, engine_conf=engine_conf)
+ eng.run()
+
+ # Gather all the results and order them for further processing.
+ gather = []
+ for name in task_names:
+ gather.extend(eng.storage.get(name))
+ points = []
+ for y, row in enumerate(gather):
+ for x, color in enumerate(row):
+ points.append(((x, y), color))
+ return points
+
+
+def write_image(results, output_filename=None):
+ print("Gathered %s results that represents a mandelbrot"
+ " image (using %s chunks that are computed jointly"
+ " by %s workers)." % (len(results), CHUNK_COUNT, WORKERS))
+ if not output_filename:
+ return
+
+ # Pillow (the PIL fork) saves us from writing our own image writer...
+ try:
+ from PIL import Image
+ except ImportError as e:
+ # To currently get this (may change in the future),
+ # $ pip install Pillow
+ raise RuntimeError("Pillow is required to write image files: %s" % e)
+
+ # Limit to 255, find the max and normalize to that...
+ color_max = 0
+ for _point, color in results:
+ color_max = max(color, color_max)
+
+ # Use gray scale since we don't really have other colors.
+ img = Image.new('L', IMAGE_SIZE, "black")
+ pixels = img.load()
+ for (x, y), color in results:
+ if color_max == 0:
+ color = 0
+ else:
+ color = int((float(color) / color_max) * 255.0)
+ pixels[x, y] = color
+ img.save(output_filename)
+
+
+def create_fractal():
+ logging.basicConfig(level=logging.ERROR)
+
+ # Setup our transport configuration and merge it into the worker and
+ # engine configuration so that both of those use it correctly.
+ shared_conf = dict(BASE_SHARED_CONF)
+ shared_conf.update({
+ 'transport': 'memory',
+ 'transport_options': {
+ 'polling_interval': 0.1,
+ },
+ })
+
+ if len(sys.argv) >= 2:
+ output_filename = sys.argv[1]
+ else:
+ output_filename = None
+
+ worker_conf = dict(WORKER_CONF)
+ worker_conf.update(shared_conf)
+ engine_conf = dict(ENGINE_CONF)
+ engine_conf.update(shared_conf)
+ workers = []
+ worker_topics = []
+
+ print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE)
+ try:
+ # Create a set of workers to simulate actual remote workers.
+ print('Running %s workers.' % (WORKERS))
+ for i in range(0, WORKERS):
+ worker_conf['topic'] = 'calculator_%s' % (i + 1)
+ worker_topics.append(worker_conf['topic'])
+ w = worker.Worker(**worker_conf)
+ runner = threading.Thread(target=w.run)
+ runner.daemon = True
+ runner.start()
+ w.wait()
+ workers.append((runner, w.stop))
+
+ # Now use those workers to do something.
+ engine_conf['topics'] = worker_topics
+ results = calculate(engine_conf)
+ print('Execution finished.')
+ finally:
+ # And cleanup.
+ print('Stopping workers.')
+ while workers:
+ r, stopper = workers.pop()
+ stopper()
+ r.join()
+ print("Writing image...")
+ write_image(results, output_filename=output_filename)
+
+
+if __name__ == "__main__":
+ create_fractal()