diff options
Diffstat (limited to 'yarnlib/scenario_multi_runner.py')
-rw-r--r-- | yarnlib/scenario_multi_runner.py | 164 |
1 files changed, 164 insertions, 0 deletions
diff --git a/yarnlib/scenario_multi_runner.py b/yarnlib/scenario_multi_runner.py new file mode 100644 index 0000000..402e20c --- /dev/null +++ b/yarnlib/scenario_multi_runner.py @@ -0,0 +1,164 @@ +# Copyright 2014 Richard Maw +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# =*= License: GPL-3+ =*= + + +import asyncore +import collections +import errno +import logging +import os +import socket +import subprocess + +import yarnlib + + +class StepOutputHandler(asyncore.file_dispatcher): + def __init__(self, fd, handle_closed, handle_output, map=None): + asyncore.file_dispatcher.__init__(self, fd=fd, map=map) + self.handle_closed = handle_closed + self.handle_output = handle_output + def handle_read(self): + d = self.read(4096) + if not d: + self.close() + self.handle_closed() + self.handle_output(d) + + +class StepJob(object): + stdout_closed = False + stderr_closed = False + def __init__(self, step, script, env, scenario_queue, runner, + fd_map, scenarios, failed_scenarios): + self.step = step + self.scenario_queue = scenario_queue + self.runner = runner + self.fd_map = fd_map + self.scenarios = scenarios + self.failed_scenarios = failed_scenarios + + self.outbuf = [] + self.errbuf = [] + + self.sp = sp = subprocess.Popen(['sh', '-xeuc', script], + stdin=open(os.devnull, 'r'), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, env=env, + cwd=runner.srcdir) + StepOutputHandler(fd=sp.stdout, map=fd_map, + handle_output=self.outbuf.append, + handle_closed=self.handle_stdout_closed) + StepOutputHandler(fd=sp.stderr, map=fd_map, + handle_output=self.errbuf.append, + handle_closed=self.handle_stderr_closed) + def handle_stdout_closed(self): + self.stdout_closed = True + if self.stderr_closed: + self.terminate() + def handle_stderr_closed(self): + self.stderr_closed = True + if self.stdout_closed: + self.terminate() + def terminate(self): + exit_code = self.sp.poll() + if exit_code is None: + logging.warning('Step %s closed output before terminating, ' + 'waiting for it to exit', self.step.text) + exit_code = self.sp.wait() + self.handle_terminated(exit_code, ''.join(self.outbuf), + ''.join(self.errbuf)) + def handle_terminated(self, exit_code, stdout, stderr): + try: + self.scenario_queue.send((exit_code, stdout, stderr)) + except StopIteration: + # end of scenario, queue another + if self.scenarios: + scenario = self.scenarios.pop(0) + s = self.runner.run_scenario( + fd_map=self.fd_map, scenario=scenario, + scenarios=self.scenarios, + failed_scenarios=self.failed_scenarios) + s.next() + s.send(s) + + +class ScenarioMultiRunner(yarnlib.ScenarioRunner): + def __init__(self, *args, **kwargs): + self.max_jobs = kwargs.pop('max_jobs') + yarnlib.ScenarioRunner.__init__(self, *args, **kwargs) + + def run_scenarios(self, scenarios): + jobs_to_run = min(self.max_jobs, len(scenarios)) + remaining_scenarios = scenarios[jobs_to_run:] + fd_map = {} + failed_scenarios = [] + for scenario in scenarios[:jobs_to_run]: + s = self.run_scenario(fd_map=fd_map, + scenario=scenario, + scenarios=remaining_scenarios, + failed_scenarios=failed_scenarios) + s.next() # need an initial next + s.send(s) + asyncore.loop(map=fd_map) + return failed_scenarios + + def run_scenario(self, fd_map, scenario, scenarios, failed_scenarios): + scenario_dir, datadir, homedir, pre_scenario_ud = ( + self.setup_scenario(scenario)) + assuming, cleanup, normal = self.partition_steps(scenario) + scenario_env = dict(self.env, HOME=homedir, DATADIR=datadir) + + ok = True + step_number = 1 + scenario_queue = (yield) + + # can't delegate common setup here, since it's more code to + # delegate a yield than it's worth without python3's yield from, + # so rather than using the nice for-loops in ScenarioRunner, + # we move the control flow logic to after the step. + queue = collections.deque(assuming + normal + cleanup) + while queue: + step = queue.popleft() + + step_env, pre_step_userdata, shell_script = self.setup_step( + step=step, scenario_env=scenario_env, + scenario=scenario, step_number=step_number) + StepJob(step=step, script=shell_script, env=step_env, + runner=self, scenario_queue=scenario_queue, + fd_map=fd_map, scenarios=scenarios, + failed_scenarios=failed_scenarios) + exit, stdout, stderr = (yield) + self.post_step_cb(scenario=scenario, step=step, + step_number=step_number, step_env=step_env, + exit=exit, stdout=stdout, stderr=stderr, + pre_step_userdata=pre_step_userdata) + step_number += 1 + if exit != 0: + if step in assuming: + break + elif step in normal: + ok = False + queue = collections.deque(cleanup) + else: + ok = False + break + if not ok: + failed_scenarios.append(scenario) + self.post_scenario_cb(scenario=scenario, datadir=datadir, + homedir=homedir, + pre_scenario_userdata=pre_scenario_ud) |