summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2015-02-11 14:44:56 -0800
committerJoshua Harlow <harlowja@yahoo-inc.com>2015-02-11 15:52:51 -0800
commit4da581c168166f64ee076d32038a760d74bd2afa (patch)
tree94019b471dd03c4d201a203ea89c22fe5c05fa5c
parent761321dec705434befcc9005e16434a46d412c98 (diff)
downloadtaskflow-4da581c168166f64ee076d32038a760d74bd2afa.tar.gz
Improve upon/adjust/move around new optional example
Instead of having the optional requirements example be a example that is itself a unittest just move the example to be an actual unit test that gets tested using the various engine types and change the example to be something slightly different (but shows the same kind of usage information). Change-Id: Ia03a81a6be636c501a35e7e290f587f7d05f8b30
-rw-r--r--doc/source/examples.rst12
-rw-r--r--taskflow/examples/distance_calculator.py109
-rw-r--r--taskflow/examples/optional_arguments.py93
-rw-r--r--taskflow/tests/unit/test_engines.py76
-rw-r--r--taskflow/tests/unit/worker_based/test_worker.py2
-rw-r--r--taskflow/tests/utils.py6
6 files changed, 196 insertions, 102 deletions
diff --git a/doc/source/examples.rst b/doc/source/examples.rst
index d30bd85..f1ebdc7 100644
--- a/doc/source/examples.rst
+++ b/doc/source/examples.rst
@@ -94,6 +94,18 @@ Watching execution timing
:linenos:
:lines: 16-
+Distance calculator
+===================
+
+.. note::
+
+ Full source located at :example:`distance_calculator`
+
+.. literalinclude:: ../../taskflow/examples/distance_calculator.py
+ :language: python
+ :linenos:
+ :lines: 16-
+
Table multiplier (in parallel)
==============================
diff --git a/taskflow/examples/distance_calculator.py b/taskflow/examples/distance_calculator.py
new file mode 100644
index 0000000..5112dfb
--- /dev/null
+++ b/taskflow/examples/distance_calculator.py
@@ -0,0 +1,109 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2015 Hewlett-Packard Development Company, L.P.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+import math
+import os
+import sys
+
+top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ os.pardir,
+ os.pardir))
+sys.path.insert(0, top_dir)
+
+from taskflow import engines
+from taskflow.patterns import linear_flow
+from taskflow import task
+
+# INTRO: This shows how to use a tasks/atoms ability to take requirements from
+# its execute functions default parameters and shows how to provide those
+# via different methods when needed, to influence those parameters to in
+# this case calculate the distance between two points in 2D space.
+
+# A 2D point.
+Point = collections.namedtuple("Point", "x,y")
+
+
+def is_near(val, expected, tolerance=0.001):
+ # Floats don't really provide equality...
+ if val > (expected + tolerance):
+ return False
+ if val < (expected - tolerance):
+ return False
+ return True
+
+
+class DistanceTask(task.Task):
+ # See: http://en.wikipedia.org/wiki/Distance#Distance_in_Euclidean_space
+
+ default_provides = 'distance'
+
+ def execute(self, a=Point(0, 0), b=Point(0, 0)):
+ return math.sqrt(math.pow(b.x - a.x, 2) + math.pow(b.y - a.y, 2))
+
+
+if __name__ == '__main__':
+ # For these we rely on the execute() methods points by default being
+ # at the origin (and we override it with store values when we want) at
+ # execution time (which then influences what is calculated).
+ any_distance = linear_flow.Flow("origin").add(DistanceTask())
+ results = engines.run(any_distance)
+ print(results)
+ print("%s is near-enough to %s: %s" % (results['distance'],
+ 0.0,
+ is_near(results['distance'], 0.0)))
+
+ results = engines.run(any_distance, store={'a': Point(1, 1)})
+ print(results)
+ print("%s is near-enough to %s: %s" % (results['distance'],
+ 1.4142,
+ is_near(results['distance'],
+ 1.4142)))
+
+ results = engines.run(any_distance, store={'a': Point(10, 10)})
+ print(results)
+ print("%s is near-enough to %s: %s" % (results['distance'],
+ 14.14199,
+ is_near(results['distance'],
+ 14.14199)))
+
+ results = engines.run(any_distance,
+ store={'a': Point(5, 5), 'b': Point(10, 10)})
+ print(results)
+ print("%s is near-enough to %s: %s" % (results['distance'],
+ 7.07106,
+ is_near(results['distance'],
+ 7.07106)))
+
+ # For this we use the ability to override at task creation time the
+ # optional arguments so that we don't need to continue to send them
+ # in via the 'store' argument like in the above (and we fix the new
+ # starting point 'a' at (10, 10) instead of (0, 0)...
+
+ ten_distance = linear_flow.Flow("ten")
+ ten_distance.add(DistanceTask(inject={'a': Point(10, 10)}))
+ results = engines.run(ten_distance, store={'b': Point(10, 10)})
+ print(results)
+ print("%s is near-enough to %s: %s" % (results['distance'],
+ 0.0,
+ is_near(results['distance'], 0.0)))
+
+ results = engines.run(ten_distance)
+ print(results)
+ print("%s is near-enough to %s: %s" % (results['distance'],
+ 14.14199,
+ is_near(results['distance'],
+ 14.14199)))
diff --git a/taskflow/examples/optional_arguments.py b/taskflow/examples/optional_arguments.py
deleted file mode 100644
index 66a4d38..0000000
--- a/taskflow/examples/optional_arguments.py
+++ /dev/null
@@ -1,93 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright (C) 2015 Hewlett-Packard Development Company, L.P.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import unittest
-
-from taskflow import engines
-from taskflow.patterns import linear_flow
-from taskflow import task
-
-
-class TestTask(task.Task):
- def execute(self, a, b=5):
- result = a * b
- return result
-
-flow_no_inject = linear_flow.Flow("flow").add(TestTask(provides='result'))
-flow_inject_a = linear_flow.Flow("flow").add(TestTask(provides='result',
- inject={'a': 10}))
-flow_inject_b = linear_flow.Flow("flow").add(TestTask(provides='result',
- inject={'b': 1000}))
-
-ASSERT = True
-
-
-class MyTest(unittest.TestCase):
- def test_my_test(self):
- print("Expected result = 15")
- result = engines.run(flow_no_inject, store={'a': 3})
- print(result)
- if ASSERT:
- self.assertEqual(result,
- {'a': 3, 'result': 15}
- )
-
- print("Expected result = 39")
- result = engines.run(flow_no_inject, store={'a': 3, 'b': 7})
- print(result)
- if ASSERT:
- self.assertEqual(
- result,
- {'a': 3, 'b': 7, 'result': 21}
- )
-
- print("Expected result = 200")
- result = engines.run(flow_inject_a, store={'a': 3})
- print(result)
- if ASSERT:
- self.assertEqual(
- result,
- {'a': 3, 'result': 50}
- )
-
- print("Expected result = 400")
- result = engines.run(flow_inject_a, store={'a': 3, 'b': 7})
- print(result)
- if ASSERT:
- self.assertEqual(
- result,
- {'a': 3, 'b': 7, 'result': 70}
- )
-
- print("Expected result = 40")
- result = engines.run(flow_inject_b, store={'a': 3})
- print(result)
- if ASSERT:
- self.assertEqual(
- result,
- {'a': 3, 'result': 3000}
- )
-
- print("Expected result = 40")
- result = engines.run(flow_inject_b, store={'a': 3, 'b': 7})
- print(result)
- if ASSERT:
- self.assertEqual(
- result,
- {'a': 3, 'b': 7, 'result': 3000}
- )
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py
index 8762d38..9176b9d 100644
--- a/taskflow/tests/unit/test_engines.py
+++ b/taskflow/tests/unit/test_engines.py
@@ -104,6 +104,51 @@ class EngineTaskTest(object):
self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run)
+class EngineOptionalRequirementsTest(utils.EngineTestBase):
+ def test_expected_optional_multiplers(self):
+ flow_no_inject = lf.Flow("flow")
+ flow_no_inject.add(utils.OptionalTask(provides='result'))
+
+ flow_inject_a = lf.Flow("flow")
+ flow_inject_a.add(utils.OptionalTask(provides='result',
+ inject={'a': 10}))
+
+ flow_inject_b = lf.Flow("flow")
+ flow_inject_b.add(utils.OptionalTask(provides='result',
+ inject={'b': 1000}))
+
+ engine = self._make_engine(flow_no_inject, store={'a': 3})
+ engine.run()
+ result = engine.storage.fetch_all()
+ self.assertEqual(result, {'a': 3, 'result': 15})
+
+ engine = self._make_engine(flow_no_inject,
+ store={'a': 3, 'b': 7})
+ engine.run()
+ result = engine.storage.fetch_all()
+ self.assertEqual(result, {'a': 3, 'b': 7, 'result': 21})
+
+ engine = self._make_engine(flow_inject_a, store={'a': 3})
+ engine.run()
+ result = engine.storage.fetch_all()
+ self.assertEqual(result, {'a': 3, 'result': 50})
+
+ engine = self._make_engine(flow_inject_a, store={'a': 3, 'b': 7})
+ engine.run()
+ result = engine.storage.fetch_all()
+ self.assertEqual(result, {'a': 3, 'b': 7, 'result': 70})
+
+ engine = self._make_engine(flow_inject_b, store={'a': 3})
+ engine.run()
+ result = engine.storage.fetch_all()
+ self.assertEqual(result, {'a': 3, 'result': 3000})
+
+ engine = self._make_engine(flow_inject_b, store={'a': 3, 'b': 7})
+ engine.run()
+ result = engine.storage.fetch_all()
+ self.assertEqual(result, {'a': 3, 'b': 7, 'result': 3000})
+
+
class EngineLinearFlowTest(utils.EngineTestBase):
def test_run_empty_flow(self):
@@ -601,14 +646,17 @@ class SerialEngineTest(EngineTaskTest,
EngineLinearFlowTest,
EngineParallelFlowTest,
EngineLinearAndUnorderedExceptionsTest,
+ EngineOptionalRequirementsTest,
EngineGraphFlowTest,
EngineCheckingTaskTest,
test.TestCase):
- def _make_engine(self, flow, flow_detail=None):
+ def _make_engine(self, flow,
+ flow_detail=None, store=None):
return taskflow.engines.load(flow,
flow_detail=flow_detail,
engine='serial',
- backend=self.backend)
+ backend=self.backend,
+ store=store)
def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns)
@@ -623,18 +671,21 @@ class ParallelEngineWithThreadsTest(EngineTaskTest,
EngineLinearFlowTest,
EngineParallelFlowTest,
EngineLinearAndUnorderedExceptionsTest,
+ EngineOptionalRequirementsTest,
EngineGraphFlowTest,
EngineCheckingTaskTest,
test.TestCase):
_EXECUTOR_WORKERS = 2
- def _make_engine(self, flow, flow_detail=None, executor=None):
+ def _make_engine(self, flow,
+ flow_detail=None, executor=None, store=None):
if executor is None:
executor = 'threads'
return taskflow.engines.load(flow, flow_detail=flow_detail,
backend=self.backend,
executor=executor,
engine='parallel',
+ store=store,
max_workers=self._EXECUTOR_WORKERS)
def test_correct_load(self):
@@ -657,23 +708,27 @@ class ParallelEngineWithEventletTest(EngineTaskTest,
EngineLinearFlowTest,
EngineParallelFlowTest,
EngineLinearAndUnorderedExceptionsTest,
+ EngineOptionalRequirementsTest,
EngineGraphFlowTest,
EngineCheckingTaskTest,
test.TestCase):
- def _make_engine(self, flow, flow_detail=None, executor=None):
+ def _make_engine(self, flow,
+ flow_detail=None, executor=None, store=None):
if executor is None:
executor = futures.GreenThreadPoolExecutor()
self.addCleanup(executor.shutdown)
return taskflow.engines.load(flow, flow_detail=flow_detail,
backend=self.backend, engine='parallel',
- executor=executor)
+ executor=executor,
+ store=store)
class ParallelEngineWithProcessTest(EngineTaskTest,
EngineLinearFlowTest,
EngineParallelFlowTest,
EngineLinearAndUnorderedExceptionsTest,
+ EngineOptionalRequirementsTest,
EngineGraphFlowTest,
test.TestCase):
_EXECUTOR_WORKERS = 2
@@ -682,13 +737,15 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
engine = self._make_engine(utils.TaskNoRequiresNoReturns)
self.assertIsInstance(engine, eng.ParallelActionEngine)
- def _make_engine(self, flow, flow_detail=None, executor=None):
+ def _make_engine(self, flow,
+ flow_detail=None, executor=None, store=None):
if executor is None:
executor = 'processes'
return taskflow.engines.load(flow, flow_detail=flow_detail,
backend=self.backend,
engine='parallel',
executor=executor,
+ store=store,
max_workers=self._EXECUTOR_WORKERS)
@@ -696,6 +753,7 @@ class WorkerBasedEngineTest(EngineTaskTest,
EngineLinearFlowTest,
EngineParallelFlowTest,
EngineLinearAndUnorderedExceptionsTest,
+ EngineOptionalRequirementsTest,
EngineGraphFlowTest,
test.TestCase):
def setUp(self):
@@ -740,9 +798,11 @@ class WorkerBasedEngineTest(EngineTaskTest,
self.worker_thread.join()
super(WorkerBasedEngineTest, self).tearDown()
- def _make_engine(self, flow, flow_detail=None):
+ def _make_engine(self, flow,
+ flow_detail=None, store=None):
return taskflow.engines.load(flow, flow_detail=flow_detail,
- backend=self.backend, **self.engine_conf)
+ backend=self.backend,
+ store=store, **self.engine_conf)
def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns)
diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py
index cc4578c..7020a93 100644
--- a/taskflow/tests/unit/worker_based/test_worker.py
+++ b/taskflow/tests/unit/worker_based/test_worker.py
@@ -34,7 +34,7 @@ class TestWorker(test.MockTestCase):
self.exchange = 'test-exchange'
self.topic = 'test-topic'
self.threads_count = 5
- self.endpoint_count = 21
+ self.endpoint_count = 22
# patch classes
self.executor_mock, self.executor_inst_mock = self.patchClass(
diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py
index 2594798..fbbb83c 100644
--- a/taskflow/tests/utils.py
+++ b/taskflow/tests/utils.py
@@ -157,6 +157,12 @@ class FailingTask(ProgressingTask):
raise RuntimeError('Woot!')
+class OptionalTask(task.Task):
+ def execute(self, a, b=5):
+ result = a * b
+ return result
+
+
class TaskWithFailure(task.Task):
def execute(self, **kwargs):