summaryrefslogtreecommitdiff
path: root/taskflow/tests
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2016-01-25 15:56:07 -0800
committerChangBo Guo(gcb) <eric.guo@easystack.cn>2017-07-11 11:14:04 +0800
commit84c7a7b2c7dcbade1bc802cac7c93ccd9b746cb3 (patch)
tree5e45f394d2720a55a02ed5c41ad2fff129b175a1 /taskflow/tests
parent1229eb2e8e990ad78d45ba5f64ee05d5ef521d78 (diff)
downloadtaskflow-84c7a7b2c7dcbade1bc802cac7c93ccd9b746cb3.tar.gz
Fix process based executor task proxying-back events
Let's dive into what the problem is here. First a description of what happens to a task that is to be executed in a external (but local) process via the process executor mechanism. When a task is about to be sent to execute in the external (but local) process its first cloned, this is mainly done so that its notification callbacks can be altered in a safe manner (ie not altering the original task object to do this) and that clone has its notifier emptied out. What replaces the clone's notifier callbacks though is a new object (that has a __call__ method so it looks like just another callback) that will send messages to the parent process (the one that has the engine in it) over a secure(ish) channel whenever the local task triggers its notifier notify() method. This allows for callbacks in the parent process to get triggered because once the messages recieved the original tasks notifier object has its notify() method called (therefore those callbacks do not really know the task they are getting messages from is executing out of process). The issue though is that if the ANY(*) event type is registered due to how it works in the notifier is that if the child/cloned notifier has the ANY event type registered and the cloned task calls notify() with a specific event this will cause the ANY callback (in the clone) to transmit a message *and* it will cause the *specific* event callback to also transmit a message back to the parent process. On the engine process side it will get 2 messages and trigger the callbacks 3 times (twice for the specific event callback because how the local notifier has the ANY callback registered and one more time when the local process also sends the same event based on its registration of the ANY event in the child process). This is not what is expected (the message rcved on the engine process should only trigger one callback to get triggered if the engine process task has no ANY callback registered or two engine process callbacks to get triggered if the engine process task has the ANY callback registered). Closes-Bug: #1537948 Change-Id: I271bf1f23ad73df6c177cf00fd902c4881ba44ae
Diffstat (limited to 'taskflow/tests')
-rw-r--r--taskflow/tests/unit/test_engines.py42
-rw-r--r--taskflow/tests/utils.py10
2 files changed, 52 insertions, 0 deletions
diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py
index 6fd86f2..89a1a49 100644
--- a/taskflow/tests/unit/test_engines.py
+++ b/taskflow/tests/unit/test_engines.py
@@ -1527,6 +1527,48 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
max_workers=self._EXECUTOR_WORKERS,
**kwargs)
+ def test_update_progress_notifications_proxied(self):
+ captured = collections.defaultdict(list)
+
+ def notify_me(event_type, details):
+ captured[event_type].append(details)
+
+ a = utils.MultiProgressingTask('a')
+ a.notifier.register(a.notifier.ANY, notify_me)
+ progress_chunks = list(x / 10.0 for x in range(1, 10))
+ e = self._make_engine(a, store={'progress_chunks': progress_chunks})
+ e.run()
+
+ self.assertEqual(11, len(captured[task.EVENT_UPDATE_PROGRESS]))
+
+ def test_custom_notifications_proxied(self):
+ captured = collections.defaultdict(list)
+
+ def notify_me(event_type, details):
+ captured[event_type].append(details)
+
+ a = utils.EmittingTask('a')
+ a.notifier.register(a.notifier.ANY, notify_me)
+ e = self._make_engine(a)
+ e.run()
+
+ self.assertEqual(1, len(captured['hi']))
+ self.assertEqual(2, len(captured[task.EVENT_UPDATE_PROGRESS]))
+
+ def test_just_custom_notifications_proxied(self):
+ captured = collections.defaultdict(list)
+
+ def notify_me(event_type, details):
+ captured[event_type].append(details)
+
+ a = utils.EmittingTask('a')
+ a.notifier.register('hi', notify_me)
+ e = self._make_engine(a)
+ e.run()
+
+ self.assertEqual(1, len(captured['hi']))
+ self.assertEqual(0, len(captured[task.EVENT_UPDATE_PROGRESS]))
+
class WorkerBasedEngineTest(EngineTaskTest,
EngineMultipleResultsTest,
diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py
index 471da9b..58cd9ab 100644
--- a/taskflow/tests/utils.py
+++ b/taskflow/tests/utils.py
@@ -19,6 +19,7 @@ import string
import threading
import time
+from oslo_utils import timeutils
import redis
import six
@@ -104,6 +105,15 @@ class DummyTask(task.Task):
pass
+class EmittingTask(task.Task):
+ TASK_EVENTS = (task.EVENT_UPDATE_PROGRESS, 'hi')
+
+ def execute(self, *args, **kwargs):
+ self.notifier.notify('hi',
+ details={'sent_on': timeutils.utcnow(),
+ 'args': args, 'kwargs': kwargs})
+
+
class AddOneSameProvidesRequires(task.Task):
default_provides = 'value'