diff options
Diffstat (limited to 'yarnlib')
-rw-r--r-- | yarnlib/__init__.py | 1 | ||||
-rw-r--r-- | yarnlib/scenario_multi_runner.py | 164 | ||||
-rw-r--r-- | yarnlib/scenario_runner.py | 82 |
3 files changed, 237 insertions, 10 deletions
diff --git a/yarnlib/__init__.py b/yarnlib/__init__.py index 515082e..c2a5edd 100644 --- a/yarnlib/__init__.py +++ b/yarnlib/__init__.py @@ -26,6 +26,7 @@ from scenario_step_connector import (implements_matches_step, StepNotImplementedError, StepMultipleImplementationsError) from scenario_runner import ScenarioRunner +from scenario_multi_runner import ScenarioMultiRunner from shell_libraries import load_shell_libraries diff --git a/yarnlib/scenario_multi_runner.py b/yarnlib/scenario_multi_runner.py new file mode 100644 index 0000000..402e20c --- /dev/null +++ b/yarnlib/scenario_multi_runner.py @@ -0,0 +1,164 @@ +# Copyright 2014 Richard Maw +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# =*= License: GPL-3+ =*= + + +import asyncore +import collections +import errno +import logging +import os +import socket +import subprocess + +import yarnlib + + +class StepOutputHandler(asyncore.file_dispatcher): + def __init__(self, fd, handle_closed, handle_output, map=None): + asyncore.file_dispatcher.__init__(self, fd=fd, map=map) + self.handle_closed = handle_closed + self.handle_output = handle_output + def handle_read(self): + d = self.read(4096) + if not d: + self.close() + self.handle_closed() + self.handle_output(d) + + +class StepJob(object): + stdout_closed = False + stderr_closed = False + def __init__(self, step, script, env, scenario_queue, runner, + fd_map, scenarios, failed_scenarios): + self.step = step + self.scenario_queue = scenario_queue + self.runner = runner + self.fd_map = fd_map + self.scenarios = scenarios + self.failed_scenarios = failed_scenarios + + self.outbuf = [] + self.errbuf = [] + + self.sp = sp = subprocess.Popen(['sh', '-xeuc', script], + stdin=open(os.devnull, 'r'), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, env=env, + cwd=runner.srcdir) + StepOutputHandler(fd=sp.stdout, map=fd_map, + handle_output=self.outbuf.append, + handle_closed=self.handle_stdout_closed) + StepOutputHandler(fd=sp.stderr, map=fd_map, + handle_output=self.errbuf.append, + handle_closed=self.handle_stderr_closed) + def handle_stdout_closed(self): + self.stdout_closed = True + if self.stderr_closed: + self.terminate() + def handle_stderr_closed(self): + self.stderr_closed = True + if self.stdout_closed: + self.terminate() + def terminate(self): + exit_code = self.sp.poll() + if exit_code is None: + logging.warning('Step %s closed output before terminating, ' + 'waiting for it to exit', self.step.text) + exit_code = self.sp.wait() + self.handle_terminated(exit_code, ''.join(self.outbuf), + ''.join(self.errbuf)) + def handle_terminated(self, exit_code, stdout, stderr): + try: + self.scenario_queue.send((exit_code, stdout, stderr)) + except StopIteration: + # end of scenario, queue another + if self.scenarios: + scenario = self.scenarios.pop(0) + s = self.runner.run_scenario( + fd_map=self.fd_map, scenario=scenario, + scenarios=self.scenarios, + failed_scenarios=self.failed_scenarios) + s.next() + s.send(s) + + +class ScenarioMultiRunner(yarnlib.ScenarioRunner): + def __init__(self, *args, **kwargs): + self.max_jobs = kwargs.pop('max_jobs') + yarnlib.ScenarioRunner.__init__(self, *args, **kwargs) + + def run_scenarios(self, scenarios): + jobs_to_run = min(self.max_jobs, len(scenarios)) + remaining_scenarios = scenarios[jobs_to_run:] + fd_map = {} + failed_scenarios = [] + for scenario in scenarios[:jobs_to_run]: + s = self.run_scenario(fd_map=fd_map, + scenario=scenario, + scenarios=remaining_scenarios, + failed_scenarios=failed_scenarios) + s.next() # need an initial next + s.send(s) + asyncore.loop(map=fd_map) + return failed_scenarios + + def run_scenario(self, fd_map, scenario, scenarios, failed_scenarios): + scenario_dir, datadir, homedir, pre_scenario_ud = ( + self.setup_scenario(scenario)) + assuming, cleanup, normal = self.partition_steps(scenario) + scenario_env = dict(self.env, HOME=homedir, DATADIR=datadir) + + ok = True + step_number = 1 + scenario_queue = (yield) + + # can't delegate common setup here, since it's more code to + # delegate a yield than it's worth without python3's yield from, + # so rather than using the nice for-loops in ScenarioRunner, + # we move the control flow logic to after the step. + queue = collections.deque(assuming + normal + cleanup) + while queue: + step = queue.popleft() + + step_env, pre_step_userdata, shell_script = self.setup_step( + step=step, scenario_env=scenario_env, + scenario=scenario, step_number=step_number) + StepJob(step=step, script=shell_script, env=step_env, + runner=self, scenario_queue=scenario_queue, + fd_map=fd_map, scenarios=scenarios, + failed_scenarios=failed_scenarios) + exit, stdout, stderr = (yield) + self.post_step_cb(scenario=scenario, step=step, + step_number=step_number, step_env=step_env, + exit=exit, stdout=stdout, stderr=stderr, + pre_step_userdata=pre_step_userdata) + step_number += 1 + if exit != 0: + if step in assuming: + break + elif step in normal: + ok = False + queue = collections.deque(cleanup) + else: + ok = False + break + if not ok: + failed_scenarios.append(scenario) + self.post_scenario_cb(scenario=scenario, datadir=datadir, + homedir=homedir, + pre_scenario_userdata=pre_scenario_ud) diff --git a/yarnlib/scenario_runner.py b/yarnlib/scenario_runner.py index e0cdf62..cbe2fad 100644 --- a/yarnlib/scenario_runner.py +++ b/yarnlib/scenario_runner.py @@ -101,26 +101,54 @@ class ScenarioRunner(object): def __init__(self, shell_prelude, srcdir, extra_env=(), pre_step_cb=default_pre_step, post_step_cb=default_post_step, - cmdrunner=cliapp.runcmd_unchecked): + pre_scenario_cb=lambda *x: None, + post_scenario_cb=lambda *x: None, + cmdrunner=cliapp.runcmd_unchecked, testdir=None): self.shell_prelude = shell_prelude self.srcdir = srcdir self.env = self.clean_env(extra_env, SRCDIR=srcdir) self.pre_step_cb = pre_step_cb self.post_step_cb = post_step_cb + self.pre_scenario_cb = pre_scenario_cb + self.post_scenario_cb = post_scenario_cb self.cmdrunner = cmdrunner + self.testdir = testdir + + def setup_scenario(self, scenario): # pragma: no cover + scenario_dir = self.scenario_dir(self.testdir, scenario) + os.mkdir(scenario_dir) + datadir = self.datadir(scenario_dir) + os.mkdir(datadir) + homedir = self.homedir(datadir) + os.mkdir(homedir) + + ud = self.pre_scenario_cb(scenario, datadir, homedir) + return scenario_dir, datadir, homedir, ud + + def run_scenarios(self, scenarios): # pragma: no cover + failed = [] + for scenario in scenarios: + scenario_dir, datadir, homedir, ud = self.setup_scenario(scenario) + if not self.run_scenario(scenario, datadir, homedir): + failed.append(scenario) + self.post_scenario_cb(scenario, datadir, homedir, ud) + return failed - def run_scenario(self, scenario, datadir, homedir): + @staticmethod + def partition_steps(scenario): assuming = [s for s in scenario.steps if s.what == 'ASSUMING'] cleanup = [s for s in scenario.steps if s.what == 'FINALLY'] - normal = [s for s in scenario.steps if s not in assuming + cleanup] + normal = [s for s in scenario.steps + if s.what not in ('ASSUMING', 'FINALLY')] + return assuming, cleanup, normal + + def run_scenario(self, scenario, datadir, homedir): + assuming, cleanup, normal = self.partition_steps(scenario) + scenario_env = dict(self.env, HOME=homedir, DATADIR=datadir) ok = True step_number = 1 - scenario_env = dict(self.env) - scenario_env['HOME'] = homedir - scenario_env['DATADIR'] = datadir - for step in assuming: exit = self.run_step(scenario, step, scenario_env, step_number) step_number += 1 @@ -143,12 +171,12 @@ class ScenarioRunner(object): return ok - def run_step(self, scenario, step, scenario_env, step_number): + def setup_step(self, step, scenario_env, scenario, step_number): m = yarnlib.implements_matches_step(step.implementation, step) assert m is not None step_env = dict(scenario_env) - for i, match in enumerate(m.groups('')): - step_env['MATCH_%d' % (i+1)] = match + step_env.update(('MATCH_%d' % i, match) for (i, match) + in enumerate(m.groups(''), 1)) # All parameters passed as keyword-arguments, so that the callback # may declare parameters in any order, and ignore any parameters @@ -159,6 +187,12 @@ class ScenarioRunner(object): shell_script = '%s\n\n%s\n' % ( self.shell_prelude, step.implementation.shell) + return step_env, pre_step_userdata, shell_script + + def run_step(self, scenario, step, scenario_env, step_number): + step_env, pre_step_userdata, shell_script = self.setup_step( + step=step, scenario_env=scenario_env, + scenario=scenario, step_number=step_number) exit, stdout, stderr = self.cmdrunner( ['sh', '-xeuc', shell_script], env=step_env, cwd=self.srcdir) @@ -200,3 +234,31 @@ class ScenarioRunner(object): env.update(kwarg_env) return env + + @classmethod + def scenario_dir(cls, testdir, scenario): # pragma: no cover + return os.path.join(testdir, cls.nice(scenario.name)) + + @staticmethod + def datadir(scenario_dir): # pragma: no cover + return os.path.join(scenario_dir, 'datadir') + + @staticmethod + def homedir(datadir): # pragma: no cover + return os.path.join(datadir, 'HOME') + + @staticmethod + def nice(name): # pragma: no cover + # Quote a scenario or step name so it forms a nice filename. + nice_chars = "abcdefghijklmnopqrstuvwxyz" + nice_chars += nice_chars.upper() + nice_chars += "0123456789-." + + nice = [] + for c in name: + if c in nice_chars: + nice.append(c) + elif not nice or nice[-1] != '_': + nice.append('_') + nice = ''.join(nice) + return nice |