diff options
author | Stefan Metzmacher <metze@samba.org> | 2018-11-15 19:35:27 +0100 |
---|---|---|
committer | Andrew Bartlett <abartlet@samba.org> | 2018-11-20 01:33:34 +0100 |
commit | 1cb782119a314dd5a4e0344695b87e26c1d322c7 (patch) | |
tree | 9b3daa5ce4611851dbabbb802fd39edb910e4bfc /buildtools | |
parent | 5bb48cb650cce8331976051123027a359f1feac8 (diff) | |
download | samba-1cb782119a314dd5a4e0344695b87e26c1d322c7.tar.gz |
wafsamba: add a fix for broken python threading if just one job is forced
This fixes random failures during (at least) configure on AIX.
Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Diffstat (limited to 'buildtools')
-rw-r--r-- | buildtools/wafsamba/nothreads.py | 219 | ||||
-rw-r--r-- | buildtools/wafsamba/samba_utils.py | 18 | ||||
-rw-r--r-- | buildtools/wafsamba/wafsamba.py | 4 |
3 files changed, 18 insertions, 223 deletions
diff --git a/buildtools/wafsamba/nothreads.py b/buildtools/wafsamba/nothreads.py deleted file mode 100644 index f873d7d5de1..00000000000 --- a/buildtools/wafsamba/nothreads.py +++ /dev/null @@ -1,219 +0,0 @@ -# encoding: utf-8 -# Thomas Nagy, 2005-2008 (ita) - -# this replaces the core of Runner.py in waf with a varient that works -# on systems with completely broken threading (such as Python 2.5.x on -# AIX). For simplicity we enable this when JOBS=1, which is triggered -# by the compatibility makefile used for the waf build. That also ensures -# this code is tested, as it means it is used in the build farm, and by -# anyone using 'make' to build Samba with waf - -"Execute the tasks" - -import sys, random, threading -try: from Queue import Queue -except ImportError: from queue import Queue -from waflib import Utils, Options, Errors -from waflib.TaskGen import EXCEPTION, CRASHED, MAXJOBS, ASK_LATER, SKIPPED, SKIP_ME, SUCCESS - -GAP = 15 - -run_old = threading.Thread.run -def run(*args, **kwargs): - try: - run_old(*args, **kwargs) - except (KeyboardInterrupt, SystemExit): - raise - except: - sys.excepthook(*sys.exc_info()) -threading.Thread.run = run - - -class TaskConsumer(object): - consumers = 1 - -def process(tsk): - m = tsk.master - if m.stop: - m.out.put(tsk) - return - - try: - tsk.generator.bld.printout(tsk.display()) - if tsk.__class__.stat: ret = tsk.__class__.stat(tsk) - # actual call to task's run() function - else: ret = tsk.call_run() - except Exception as e: - tsk.err_msg = Utils.ex_stack() - tsk.hasrun = EXCEPTION - - # TODO cleanup - m.error_handler(tsk) - m.out.put(tsk) - return - - if ret: - tsk.err_code = ret - tsk.hasrun = CRASHED - else: - try: - tsk.post_run() - except Errors.WafError: - pass - except Exception: - tsk.err_msg = Utils.ex_stack() - tsk.hasrun = EXCEPTION - else: - tsk.hasrun = SUCCESS - if tsk.hasrun != SUCCESS: - m.error_handler(tsk) - - m.out.put(tsk) - -class Parallel(object): - """ - keep the consumer threads busy, and avoid consuming cpu cycles - when no more tasks can be added (end of the build, etc) - """ - def __init__(self, bld, j=2): - - # number of consumers - self.numjobs = j - - self.manager = bld.task_manager - self.manager.current_group = 0 - - self.total = self.manager.total() - - # tasks waiting to be processed - IMPORTANT - self.outstanding = [] - self.maxjobs = MAXJOBS - - # tasks that are awaiting for another task to complete - self.frozen = [] - - # tasks returned by the consumers - self.out = Queue(0) - - self.count = 0 # tasks not in the producer area - - self.processed = 1 # progress indicator - - self.stop = False # error condition to stop the build - self.error = False # error flag - - def get_next(self): - "override this method to schedule the tasks in a particular order" - if not self.outstanding: - return None - return self.outstanding.pop(0) - - def postpone(self, tsk): - "override this method to schedule the tasks in a particular order" - # TODO consider using a deque instead - if random.randint(0, 1): - self.frozen.insert(0, tsk) - else: - self.frozen.append(tsk) - - def refill_task_list(self): - "called to set the next group of tasks" - - while self.count > self.numjobs + GAP or self.count >= self.maxjobs: - self.get_out() - - while not self.outstanding: - if self.count: - self.get_out() - - if self.frozen: - self.outstanding += self.frozen - self.frozen = [] - elif not self.count: - (jobs, tmp) = self.manager.get_next_set() - if jobs is not None: - self.maxjobs = jobs - if tmp: - self.outstanding += tmp - break - - def get_out(self): - "the tasks that are put to execute are all collected using get_out" - ret = self.out.get() - self.manager.add_finished(ret) - if not self.stop and getattr(ret, 'more_tasks', None): - self.outstanding += ret.more_tasks - self.total += len(ret.more_tasks) - self.count -= 1 - - def error_handler(self, tsk): - "by default, errors make the build stop (not thread safe so be careful)" - if not Options.options.keep: - self.stop = True - self.error = True - - def start(self): - "execute the tasks" - - while not self.stop: - - self.refill_task_list() - - # consider the next task - tsk = self.get_next() - if not tsk: - if self.count: - # tasks may add new ones after they are run - continue - else: - # no tasks to run, no tasks running, time to exit - break - - if tsk.hasrun: - # if the task is marked as "run", just skip it - self.processed += 1 - self.manager.add_finished(tsk) - continue - - try: - st = tsk.runnable_status() - except Exception as e: - self.processed += 1 - if self.stop and not Options.options.keep: - tsk.hasrun = SKIPPED - self.manager.add_finished(tsk) - continue - self.error_handler(tsk) - self.manager.add_finished(tsk) - tsk.hasrun = EXCEPTION - tsk.err_msg = Utils.ex_stack() - continue - - if st == ASK_LATER: - self.postpone(tsk) - elif st == SKIP_ME: - self.processed += 1 - tsk.hasrun = SKIPPED - self.manager.add_finished(tsk) - else: - # run me: put the task in ready queue - tsk.position = (self.processed, self.total) - self.count += 1 - self.processed += 1 - tsk.master = self - - process(tsk) - - # self.count represents the tasks that have been made available to the consumer threads - # collect all the tasks after an error else the message may be incomplete - while self.error and self.count: - self.get_out() - - #print loop - assert (self.count == 0 or self.stop) - - -# enable nothreads -import Runner -Runner.process = process -Runner.Parallel = Parallel diff --git a/buildtools/wafsamba/samba_utils.py b/buildtools/wafsamba/samba_utils.py index afcf54c4e60..8e312773462 100644 --- a/buildtools/wafsamba/samba_utils.py +++ b/buildtools/wafsamba/samba_utils.py @@ -496,6 +496,24 @@ def wafsamba_options_parse_cmd_args(self, _args=None, cwd=None, allow_unknown=Fa cwd=cwd, allow_unknown=allow_unknown) CHECK_MAKEFLAGS(options) + if options.jobs == 1: + # + # waflib.Runner.Parallel processes jobs inline if the possible number + # of jobs is just 1. But (at least in waf <= 2.0.12) it still calls + # create a waflib.Runner.Spawner() which creates a single + # waflib.Runner.Consumer() thread that tries to process jobs from the + # queue. + # + # This has strange effects, which are not noticed typically, + # but at least on AIX python has broken threading and fails + # in random ways. + # + # So we just add a dummy Spawner class. + class NoOpSpawner(object): + def __init__(self, master): + return + from waflib import Runner + Runner.Spawner = NoOpSpawner return options, commands, envvars Options.OptionsContext.parse_cmd_args = wafsamba_options_parse_cmd_args diff --git a/buildtools/wafsamba/wafsamba.py b/buildtools/wafsamba/wafsamba.py index 59a4eacb305..906ae9e75b6 100644 --- a/buildtools/wafsamba/wafsamba.py +++ b/buildtools/wafsamba/wafsamba.py @@ -33,10 +33,6 @@ import pkgconfig import configure_file import samba_waf18 -# some systems have broken threading in python -if os.environ.get('WAF_NOTHREADS') == '1': - import nothreads - LIB_PATH="shared" os.environ['PYTHONUNBUFFERED'] = '1' |