diff options
author | Richard Maw <richard.maw@codethink.co.uk> | 2014-10-24 14:12:13 +0000 |
---|---|---|
committer | Richard Maw <richard.maw@codethink.co.uk> | 2014-10-24 14:12:13 +0000 |
commit | ac91791842c6e7e6eda3213916af413255999c7b (patch) | |
tree | 5c6ca23f22fbc316355be42d543282a8831dddd2 | |
parent | b7de175d185948c1130a1036ecd11d113dbf1175 (diff) | |
parent | ee202bbaba6761d953c36ce193b5130f52da6b2f (diff) | |
download | cmdtest-ac91791842c6e7e6eda3213916af413255999c7b.tar.gz |
Merge branch 'baserock/richardmaw/parallel' into baserock/morphbaserock/morph
Reviewed-by: Sam Thursfield
Reviewed-by: Richard Ipsum
-rw-r--r-- | without-tests | 1 | ||||
-rwxr-xr-x | yarn | 133 | ||||
-rw-r--r-- | yarnlib/__init__.py | 1 | ||||
-rw-r--r-- | yarnlib/scenario_multi_runner.py | 164 | ||||
-rw-r--r-- | yarnlib/scenario_runner.py | 82 |
5 files changed, 302 insertions, 79 deletions
diff --git a/without-tests b/without-tests index 6aa40f1..991cd64 100644 --- a/without-tests +++ b/without-tests @@ -1,3 +1,4 @@ yarnlib/__init__.py setup.py yarnlib/elements.py +yarnlib/scenario_multi_runner.py @@ -19,6 +19,7 @@ import cliapp import collections +import functools import logging import os import re @@ -32,6 +33,16 @@ import cmdtestlib import yarnlib +def dry_scenario_runner_factory(info, ts): + class DryScenarioRunner(yarnlib.ScenarioRunner): + def run_scenario(self, scenario, datadir, homedir): + info('Pretending everything went OK') + for step in scenario.steps: + ts['steps_completed'] += 1 + return True + return DryScenarioRunner + + class YarnRunner(cliapp.Application): def add_settings(self): @@ -84,6 +95,12 @@ class YarnRunner(cliapp.Application): 'allow scenarios to reference steps that do not exist, ' 'by warning about them, but otherwise ignoring the scenarios') + self.settings.integer( + ['max-jobs', 'j'], + 'run as many as JOBS scenarios in parallel', + default=1, + metavar='JOBS') + def info(self, msg): if self.settings['verbose']: logging.info(msg) @@ -114,7 +131,8 @@ class YarnRunner(cliapp.Application): self.ts = ttystatus.TerminalStatus(period=0.001) if not self.settings['quiet'] and not self.settings['verbose']: self.ts.format( - '%ElapsedTime() %Index(current_step,all_steps): ' + '%ElapsedTime() ' + '[%Integer(steps_completed)/%Integer(total_steps)]: ' '%String(scenario_name): ' '%String(step_name)') @@ -136,23 +154,32 @@ class YarnRunner(cliapp.Application): all_steps = [] for scenario in scenarios: all_steps.extend(scenario.steps) - self.ts['all_steps'] = all_steps + self.ts['total_steps'] = len(all_steps) + self.ts['steps_completed'] = 0 self.scenarios_run = 0 self.skipped_for_assuming = 0 self.steps_run = 0 self.timings = [] - scenario_runner = yarnlib.ScenarioRunner(shell_prelude, os.getcwd(), - self.parse_env(), - pre_step_cb=self.pre_step, - post_step_cb=self.post_step) + if self.settings['no-act']: + runner_class = dry_scenario_runner_factory(self.info, self.ts) + elif self.settings['max-jobs'] > 1: + runner_class = functools.partial(yarnlib.ScenarioMultiRunner, + max_jobs=self.settings['max-jobs']) + else: + runner_class = yarnlib.ScenarioRunner + scenario_runner = runner_class(shell_prelude, os.getcwd(), + self.parse_env(), + pre_step_cb=self.pre_step, + post_step_cb=self.post_step, + pre_scenario_cb=self.pre_scenario, + post_scenario_cb=self.post_scenario, + testdir=self.tempdir) start_time = time.time() - failed_scenarios = [] - for scenario in self.select_scenarios(scenarios): - if not self.run_scenario(scenario_runner, scenario): - failed_scenarios.append(scenario) + failed_scenarios = ( + scenario_runner.run_scenarios(self.select_scenarios(scenarios))) duration = time.time() - start_time if not self.settings['snapshot']: @@ -234,56 +261,40 @@ class YarnRunner(cliapp.Application): return scenarios - def run_scenario(self, scenario_runner, scenario): + def parse_env(self): + for option_arg in self.settings['env']: + if '=' not in option_arg: + raise cliapp.AppException( + '--env argument must contain "=" ' + 'to separate environment variable name and value') + key, value = option_arg.split('=', 1) + yield key, value + + def pre_scenario(self, scenario, datadir, homedir): self.start_scenario_timing(scenario.name) started = time.time() self.info('Running scenario %s' % scenario.name) - self.ts['scenario_name'] = scenario.name - self.scenarios_run += 1 - - if self.settings['no-act']: - self.info('Pretending everything went OK') - for step in scenario.steps: - self.ts['current_step'] = step - self.remember_scenario_timing(time.time() - started) - return True - - os.mkdir(self.scenario_dir(self.tempdir, scenario)) - datadir = self.datadir(self.tempdir, scenario) - os.mkdir(datadir) self.info('DATADIR is %s' % datadir) - homedir = self.homedir(datadir) - os.mkdir(homedir) self.info('HOME for tests is %s' % homedir) + return (started,) - ok = scenario_runner.run_scenario(scenario, datadir, homedir) + def post_scenario(self, scenario, datadir, homedir, pre_scenario_userdata): + stopped = time.time() + started, = pre_scenario_userdata - self.remember_scenario_timing(time.time() - started) + self.scenarios_run += 1 + self.remember_scenario_timing(stopped - started) if not self.settings['snapshot']: shutil.rmtree(datadir, ignore_errors=True) - return ok - - def homedir(self, datadir): - return os.path.join(datadir, 'HOME') - def parse_env(self): - for option_arg in self.settings['env']: - if '=' not in option_arg: - raise cliapp.AppException( - '--env argument must contain "=" ' - 'to separate environment variable name and value') - key, value = option_arg.split('=', 1) - yield key, value - - def pre_step(self, step, **ignored): + def pre_step(self, scenario, step, **ignored): started = time.time() self.info('Running step "%s %s"' % (step.what, step.text)) - self.ts['current_step'] = step + self.ts['scenario_name'] = scenario.name self.ts['step_name'] = '%s %s' % (step.what, step.text) - self.steps_run += 1 return (started,) @@ -292,6 +303,9 @@ class YarnRunner(cliapp.Application): stopped = time.time() (started,) = pre_step_userdata + self.steps_run += 1 + self.ts['steps_completed'] += 1 + logging.debug('Exit code: %d' % exit) if stdout: logging.debug('Standard output:\n%s' % self.indent(stdout)) @@ -307,13 +321,15 @@ class YarnRunner(cliapp.Application): 'Skipping "%s" because "%s %s" failed' % (scenario.name, step.what, step.text)) self.skipped_for_assuming += 1 + skipped = len(scenario.steps) - scenario.steps.index(step) - 1 + self.ts['total_steps'] -= skipped else: self.error( 'ERROR: In scenario "%s"\nstep "%s %s" failed,\n' 'with exit code %d:\n' 'Standard output from shell command:\n%s' 'Standard error from shell command:\n%s' % - (scenario.name, step.what, step.text, exit, + (str(scenario.name), str(step.what), str(step.text), exit, self.indent(stdout), self.indent(stderr))) self.remember_step_timing( '%s %s' % (step.what, step.text), stopped - started) @@ -321,16 +337,10 @@ class YarnRunner(cliapp.Application): self.snapshot_datadir(self.tempdir, step_env['DATADIR'], scenario, step_number, step) - def scenario_dir(self, tempdir, scenario): - return os.path.join(tempdir, self.nice(scenario.name)) - - def datadir(self, tempdir, scenario): - sd = self.scenario_dir(tempdir, scenario) - return os.path.join(sd, 'datadir') - def snapshot_dir(self, tempdir, scenario, step, step_number): - sd = self.scenario_dir(tempdir, scenario) - base = '%03d-%s-%s' % (step_number, step.what, self.nice(step.text)) + sd = yarnlib.ScenarioRunner.scenario_dir(tempdir, scenario) + base = '%03d-%s-%s' % (step_number, step.what, + yarnlib.ScenarioRunner.nice(step.text)) return os.path.join(sd, base) def snapshot_datadir(self, tempdir, datadir, scenario, step_number, step): @@ -340,21 +350,6 @@ class YarnRunner(cliapp.Application): if exit != 0: logging.warning('Snapshot copy failed:\n%s\n%s' % (out, err)) - def nice(self, name): - # Quote a scenario or step name so it forms a nice filename. - nice_chars = "abcdefghijklmnopqrstuvwxyz" - nice_chars += nice_chars.upper() - nice_chars += "0123456789-." - - nice = [] - for c in name: - if c in nice_chars: - nice.append(c) - elif not nice or nice[-1] != '_': - nice.append('_') - nice = ''.join(nice) - return nice - def indent(self, s): return ''.join(' %s\n' % line for line in s.splitlines()) diff --git a/yarnlib/__init__.py b/yarnlib/__init__.py index 515082e..c2a5edd 100644 --- a/yarnlib/__init__.py +++ b/yarnlib/__init__.py @@ -26,6 +26,7 @@ from scenario_step_connector import (implements_matches_step, StepNotImplementedError, StepMultipleImplementationsError) from scenario_runner import ScenarioRunner +from scenario_multi_runner import ScenarioMultiRunner from shell_libraries import load_shell_libraries diff --git a/yarnlib/scenario_multi_runner.py b/yarnlib/scenario_multi_runner.py new file mode 100644 index 0000000..402e20c --- /dev/null +++ b/yarnlib/scenario_multi_runner.py @@ -0,0 +1,164 @@ +# Copyright 2014 Richard Maw +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# =*= License: GPL-3+ =*= + + +import asyncore +import collections +import errno +import logging +import os +import socket +import subprocess + +import yarnlib + + +class StepOutputHandler(asyncore.file_dispatcher): + def __init__(self, fd, handle_closed, handle_output, map=None): + asyncore.file_dispatcher.__init__(self, fd=fd, map=map) + self.handle_closed = handle_closed + self.handle_output = handle_output + def handle_read(self): + d = self.read(4096) + if not d: + self.close() + self.handle_closed() + self.handle_output(d) + + +class StepJob(object): + stdout_closed = False + stderr_closed = False + def __init__(self, step, script, env, scenario_queue, runner, + fd_map, scenarios, failed_scenarios): + self.step = step + self.scenario_queue = scenario_queue + self.runner = runner + self.fd_map = fd_map + self.scenarios = scenarios + self.failed_scenarios = failed_scenarios + + self.outbuf = [] + self.errbuf = [] + + self.sp = sp = subprocess.Popen(['sh', '-xeuc', script], + stdin=open(os.devnull, 'r'), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, env=env, + cwd=runner.srcdir) + StepOutputHandler(fd=sp.stdout, map=fd_map, + handle_output=self.outbuf.append, + handle_closed=self.handle_stdout_closed) + StepOutputHandler(fd=sp.stderr, map=fd_map, + handle_output=self.errbuf.append, + handle_closed=self.handle_stderr_closed) + def handle_stdout_closed(self): + self.stdout_closed = True + if self.stderr_closed: + self.terminate() + def handle_stderr_closed(self): + self.stderr_closed = True + if self.stdout_closed: + self.terminate() + def terminate(self): + exit_code = self.sp.poll() + if exit_code is None: + logging.warning('Step %s closed output before terminating, ' + 'waiting for it to exit', self.step.text) + exit_code = self.sp.wait() + self.handle_terminated(exit_code, ''.join(self.outbuf), + ''.join(self.errbuf)) + def handle_terminated(self, exit_code, stdout, stderr): + try: + self.scenario_queue.send((exit_code, stdout, stderr)) + except StopIteration: + # end of scenario, queue another + if self.scenarios: + scenario = self.scenarios.pop(0) + s = self.runner.run_scenario( + fd_map=self.fd_map, scenario=scenario, + scenarios=self.scenarios, + failed_scenarios=self.failed_scenarios) + s.next() + s.send(s) + + +class ScenarioMultiRunner(yarnlib.ScenarioRunner): + def __init__(self, *args, **kwargs): + self.max_jobs = kwargs.pop('max_jobs') + yarnlib.ScenarioRunner.__init__(self, *args, **kwargs) + + def run_scenarios(self, scenarios): + jobs_to_run = min(self.max_jobs, len(scenarios)) + remaining_scenarios = scenarios[jobs_to_run:] + fd_map = {} + failed_scenarios = [] + for scenario in scenarios[:jobs_to_run]: + s = self.run_scenario(fd_map=fd_map, + scenario=scenario, + scenarios=remaining_scenarios, + failed_scenarios=failed_scenarios) + s.next() # need an initial next + s.send(s) + asyncore.loop(map=fd_map) + return failed_scenarios + + def run_scenario(self, fd_map, scenario, scenarios, failed_scenarios): + scenario_dir, datadir, homedir, pre_scenario_ud = ( + self.setup_scenario(scenario)) + assuming, cleanup, normal = self.partition_steps(scenario) + scenario_env = dict(self.env, HOME=homedir, DATADIR=datadir) + + ok = True + step_number = 1 + scenario_queue = (yield) + + # can't delegate common setup here, since it's more code to + # delegate a yield than it's worth without python3's yield from, + # so rather than using the nice for-loops in ScenarioRunner, + # we move the control flow logic to after the step. + queue = collections.deque(assuming + normal + cleanup) + while queue: + step = queue.popleft() + + step_env, pre_step_userdata, shell_script = self.setup_step( + step=step, scenario_env=scenario_env, + scenario=scenario, step_number=step_number) + StepJob(step=step, script=shell_script, env=step_env, + runner=self, scenario_queue=scenario_queue, + fd_map=fd_map, scenarios=scenarios, + failed_scenarios=failed_scenarios) + exit, stdout, stderr = (yield) + self.post_step_cb(scenario=scenario, step=step, + step_number=step_number, step_env=step_env, + exit=exit, stdout=stdout, stderr=stderr, + pre_step_userdata=pre_step_userdata) + step_number += 1 + if exit != 0: + if step in assuming: + break + elif step in normal: + ok = False + queue = collections.deque(cleanup) + else: + ok = False + break + if not ok: + failed_scenarios.append(scenario) + self.post_scenario_cb(scenario=scenario, datadir=datadir, + homedir=homedir, + pre_scenario_userdata=pre_scenario_ud) diff --git a/yarnlib/scenario_runner.py b/yarnlib/scenario_runner.py index e0cdf62..cbe2fad 100644 --- a/yarnlib/scenario_runner.py +++ b/yarnlib/scenario_runner.py @@ -101,26 +101,54 @@ class ScenarioRunner(object): def __init__(self, shell_prelude, srcdir, extra_env=(), pre_step_cb=default_pre_step, post_step_cb=default_post_step, - cmdrunner=cliapp.runcmd_unchecked): + pre_scenario_cb=lambda *x: None, + post_scenario_cb=lambda *x: None, + cmdrunner=cliapp.runcmd_unchecked, testdir=None): self.shell_prelude = shell_prelude self.srcdir = srcdir self.env = self.clean_env(extra_env, SRCDIR=srcdir) self.pre_step_cb = pre_step_cb self.post_step_cb = post_step_cb + self.pre_scenario_cb = pre_scenario_cb + self.post_scenario_cb = post_scenario_cb self.cmdrunner = cmdrunner + self.testdir = testdir + + def setup_scenario(self, scenario): # pragma: no cover + scenario_dir = self.scenario_dir(self.testdir, scenario) + os.mkdir(scenario_dir) + datadir = self.datadir(scenario_dir) + os.mkdir(datadir) + homedir = self.homedir(datadir) + os.mkdir(homedir) + + ud = self.pre_scenario_cb(scenario, datadir, homedir) + return scenario_dir, datadir, homedir, ud + + def run_scenarios(self, scenarios): # pragma: no cover + failed = [] + for scenario in scenarios: + scenario_dir, datadir, homedir, ud = self.setup_scenario(scenario) + if not self.run_scenario(scenario, datadir, homedir): + failed.append(scenario) + self.post_scenario_cb(scenario, datadir, homedir, ud) + return failed - def run_scenario(self, scenario, datadir, homedir): + @staticmethod + def partition_steps(scenario): assuming = [s for s in scenario.steps if s.what == 'ASSUMING'] cleanup = [s for s in scenario.steps if s.what == 'FINALLY'] - normal = [s for s in scenario.steps if s not in assuming + cleanup] + normal = [s for s in scenario.steps + if s.what not in ('ASSUMING', 'FINALLY')] + return assuming, cleanup, normal + + def run_scenario(self, scenario, datadir, homedir): + assuming, cleanup, normal = self.partition_steps(scenario) + scenario_env = dict(self.env, HOME=homedir, DATADIR=datadir) ok = True step_number = 1 - scenario_env = dict(self.env) - scenario_env['HOME'] = homedir - scenario_env['DATADIR'] = datadir - for step in assuming: exit = self.run_step(scenario, step, scenario_env, step_number) step_number += 1 @@ -143,12 +171,12 @@ class ScenarioRunner(object): return ok - def run_step(self, scenario, step, scenario_env, step_number): + def setup_step(self, step, scenario_env, scenario, step_number): m = yarnlib.implements_matches_step(step.implementation, step) assert m is not None step_env = dict(scenario_env) - for i, match in enumerate(m.groups('')): - step_env['MATCH_%d' % (i+1)] = match + step_env.update(('MATCH_%d' % i, match) for (i, match) + in enumerate(m.groups(''), 1)) # All parameters passed as keyword-arguments, so that the callback # may declare parameters in any order, and ignore any parameters @@ -159,6 +187,12 @@ class ScenarioRunner(object): shell_script = '%s\n\n%s\n' % ( self.shell_prelude, step.implementation.shell) + return step_env, pre_step_userdata, shell_script + + def run_step(self, scenario, step, scenario_env, step_number): + step_env, pre_step_userdata, shell_script = self.setup_step( + step=step, scenario_env=scenario_env, + scenario=scenario, step_number=step_number) exit, stdout, stderr = self.cmdrunner( ['sh', '-xeuc', shell_script], env=step_env, cwd=self.srcdir) @@ -200,3 +234,31 @@ class ScenarioRunner(object): env.update(kwarg_env) return env + + @classmethod + def scenario_dir(cls, testdir, scenario): # pragma: no cover + return os.path.join(testdir, cls.nice(scenario.name)) + + @staticmethod + def datadir(scenario_dir): # pragma: no cover + return os.path.join(scenario_dir, 'datadir') + + @staticmethod + def homedir(datadir): # pragma: no cover + return os.path.join(datadir, 'HOME') + + @staticmethod + def nice(name): # pragma: no cover + # Quote a scenario or step name so it forms a nice filename. + nice_chars = "abcdefghijklmnopqrstuvwxyz" + nice_chars += nice_chars.upper() + nice_chars += "0123456789-." + + nice = [] + for c in name: + if c in nice_chars: + nice.append(c) + elif not nice or nice[-1] != '_': + nice.append('_') + nice = ''.join(nice) + return nice |