summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@gmail.com>2014-08-29 02:09:36 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2014-09-12 22:04:09 +0000
commitf8fbb30412edc41a1df05c7938db9dc973357f5c (patch)
treee0553b53c847f4961855c4cfb44735d5cb75fa7c
parent494003283f864efe74df61cefae64fc87e35864d (diff)
downloadtaskflow-f8fbb30412edc41a1df05c7938db9dc973357f5c.tar.gz
Mention issue with more than one thread and reduce workers
Due to how it appears the filesystem transport in kombu is not thread-safe we will work around this by not having more than one worker active at the same time in this example. Oddly it appears the memory transport is unaffected (but from looking at the code it doesn't look safe either), this may just be due to how the python memory model works though. Part of blueprint more-examples Change-Id: Idaf04fb1a6a622af292511bbcf25329c9a5aab53
-rw-r--r--taskflow/examples/wbe_simple_linear.py13
1 files changed, 10 insertions, 3 deletions
diff --git a/taskflow/examples/wbe_simple_linear.py b/taskflow/examples/wbe_simple_linear.py
index e28579f..bfec2d8 100644
--- a/taskflow/examples/wbe_simple_linear.py
+++ b/taskflow/examples/wbe_simple_linear.py
@@ -53,7 +53,12 @@ USE_FILESYSTEM = False
BASE_SHARED_CONF = {
'exchange': 'taskflow',
}
-WORKERS = 2
+
+# Until https://github.com/celery/kombu/issues/398 is resolved it is not
+# recommended to run many worker threads in this example due to the types
+# of errors mentioned in that issue.
+MEMORY_WORKERS = 2
+FILE_WORKERS = 1
WORKER_CONF = {
# These are the tasks the worker can execute, they *must* be importable,
# typically this list is used to restrict what workers may execute to
@@ -90,6 +95,7 @@ if __name__ == "__main__":
tmp_path = None
if USE_FILESYSTEM:
+ worker_count = FILE_WORKERS
tmp_path = tempfile.mkdtemp(prefix='wbe-example-')
shared_conf.update({
'transport': 'filesystem',
@@ -100,6 +106,7 @@ if __name__ == "__main__":
},
})
else:
+ worker_count = MEMORY_WORKERS
shared_conf.update({
'transport': 'memory',
'transport_options': {
@@ -115,8 +122,8 @@ if __name__ == "__main__":
try:
# Create a set of workers to simulate actual remote workers.
- print('Running %s workers.' % (WORKERS))
- for i in range(0, WORKERS):
+ print('Running %s workers.' % (worker_count))
+ for i in range(0, worker_count):
worker_conf['topic'] = 'worker-%s' % (i + 1)
worker_topics.append(worker_conf['topic'])
w = worker.Worker(**worker_conf)