summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRichard Maw <richard.maw@codethink.co.uk>2014-10-24 14:12:13 +0000
committerRichard Maw <richard.maw@codethink.co.uk>2014-10-24 14:12:13 +0000
commitac91791842c6e7e6eda3213916af413255999c7b (patch)
tree5c6ca23f22fbc316355be42d543282a8831dddd2
parentb7de175d185948c1130a1036ecd11d113dbf1175 (diff)
parentee202bbaba6761d953c36ce193b5130f52da6b2f (diff)
downloadcmdtest-ac91791842c6e7e6eda3213916af413255999c7b.tar.gz
Merge branch 'baserock/richardmaw/parallel' into baserock/morphbaserock/morph
Reviewed-by: Sam Thursfield Reviewed-by: Richard Ipsum
-rw-r--r--without-tests1
-rwxr-xr-xyarn133
-rw-r--r--yarnlib/__init__.py1
-rw-r--r--yarnlib/scenario_multi_runner.py164
-rw-r--r--yarnlib/scenario_runner.py82
5 files changed, 302 insertions, 79 deletions
diff --git a/without-tests b/without-tests
index 6aa40f1..991cd64 100644
--- a/without-tests
+++ b/without-tests
@@ -1,3 +1,4 @@
yarnlib/__init__.py
setup.py
yarnlib/elements.py
+yarnlib/scenario_multi_runner.py
diff --git a/yarn b/yarn
index ffec41a..1cfa801 100755
--- a/yarn
+++ b/yarn
@@ -19,6 +19,7 @@
import cliapp
import collections
+import functools
import logging
import os
import re
@@ -32,6 +33,16 @@ import cmdtestlib
import yarnlib
+def dry_scenario_runner_factory(info, ts):
+ class DryScenarioRunner(yarnlib.ScenarioRunner):
+ def run_scenario(self, scenario, datadir, homedir):
+ info('Pretending everything went OK')
+ for step in scenario.steps:
+ ts['steps_completed'] += 1
+ return True
+ return DryScenarioRunner
+
+
class YarnRunner(cliapp.Application):
def add_settings(self):
@@ -84,6 +95,12 @@ class YarnRunner(cliapp.Application):
'allow scenarios to reference steps that do not exist, '
'by warning about them, but otherwise ignoring the scenarios')
+ self.settings.integer(
+ ['max-jobs', 'j'],
+ 'run as many as JOBS scenarios in parallel',
+ default=1,
+ metavar='JOBS')
+
def info(self, msg):
if self.settings['verbose']:
logging.info(msg)
@@ -114,7 +131,8 @@ class YarnRunner(cliapp.Application):
self.ts = ttystatus.TerminalStatus(period=0.001)
if not self.settings['quiet'] and not self.settings['verbose']:
self.ts.format(
- '%ElapsedTime() %Index(current_step,all_steps): '
+ '%ElapsedTime() '
+ '[%Integer(steps_completed)/%Integer(total_steps)]: '
'%String(scenario_name): '
'%String(step_name)')
@@ -136,23 +154,32 @@ class YarnRunner(cliapp.Application):
all_steps = []
for scenario in scenarios:
all_steps.extend(scenario.steps)
- self.ts['all_steps'] = all_steps
+ self.ts['total_steps'] = len(all_steps)
+ self.ts['steps_completed'] = 0
self.scenarios_run = 0
self.skipped_for_assuming = 0
self.steps_run = 0
self.timings = []
- scenario_runner = yarnlib.ScenarioRunner(shell_prelude, os.getcwd(),
- self.parse_env(),
- pre_step_cb=self.pre_step,
- post_step_cb=self.post_step)
+ if self.settings['no-act']:
+ runner_class = dry_scenario_runner_factory(self.info, self.ts)
+ elif self.settings['max-jobs'] > 1:
+ runner_class = functools.partial(yarnlib.ScenarioMultiRunner,
+ max_jobs=self.settings['max-jobs'])
+ else:
+ runner_class = yarnlib.ScenarioRunner
+ scenario_runner = runner_class(shell_prelude, os.getcwd(),
+ self.parse_env(),
+ pre_step_cb=self.pre_step,
+ post_step_cb=self.post_step,
+ pre_scenario_cb=self.pre_scenario,
+ post_scenario_cb=self.post_scenario,
+ testdir=self.tempdir)
start_time = time.time()
- failed_scenarios = []
- for scenario in self.select_scenarios(scenarios):
- if not self.run_scenario(scenario_runner, scenario):
- failed_scenarios.append(scenario)
+ failed_scenarios = (
+ scenario_runner.run_scenarios(self.select_scenarios(scenarios)))
duration = time.time() - start_time
if not self.settings['snapshot']:
@@ -234,56 +261,40 @@ class YarnRunner(cliapp.Application):
return scenarios
- def run_scenario(self, scenario_runner, scenario):
+ def parse_env(self):
+ for option_arg in self.settings['env']:
+ if '=' not in option_arg:
+ raise cliapp.AppException(
+ '--env argument must contain "=" '
+ 'to separate environment variable name and value')
+ key, value = option_arg.split('=', 1)
+ yield key, value
+
+ def pre_scenario(self, scenario, datadir, homedir):
self.start_scenario_timing(scenario.name)
started = time.time()
self.info('Running scenario %s' % scenario.name)
- self.ts['scenario_name'] = scenario.name
- self.scenarios_run += 1
-
- if self.settings['no-act']:
- self.info('Pretending everything went OK')
- for step in scenario.steps:
- self.ts['current_step'] = step
- self.remember_scenario_timing(time.time() - started)
- return True
-
- os.mkdir(self.scenario_dir(self.tempdir, scenario))
- datadir = self.datadir(self.tempdir, scenario)
- os.mkdir(datadir)
self.info('DATADIR is %s' % datadir)
- homedir = self.homedir(datadir)
- os.mkdir(homedir)
self.info('HOME for tests is %s' % homedir)
+ return (started,)
- ok = scenario_runner.run_scenario(scenario, datadir, homedir)
+ def post_scenario(self, scenario, datadir, homedir, pre_scenario_userdata):
+ stopped = time.time()
+ started, = pre_scenario_userdata
- self.remember_scenario_timing(time.time() - started)
+ self.scenarios_run += 1
+ self.remember_scenario_timing(stopped - started)
if not self.settings['snapshot']:
shutil.rmtree(datadir, ignore_errors=True)
- return ok
-
- def homedir(self, datadir):
- return os.path.join(datadir, 'HOME')
- def parse_env(self):
- for option_arg in self.settings['env']:
- if '=' not in option_arg:
- raise cliapp.AppException(
- '--env argument must contain "=" '
- 'to separate environment variable name and value')
- key, value = option_arg.split('=', 1)
- yield key, value
-
- def pre_step(self, step, **ignored):
+ def pre_step(self, scenario, step, **ignored):
started = time.time()
self.info('Running step "%s %s"' % (step.what, step.text))
- self.ts['current_step'] = step
+ self.ts['scenario_name'] = scenario.name
self.ts['step_name'] = '%s %s' % (step.what, step.text)
- self.steps_run += 1
return (started,)
@@ -292,6 +303,9 @@ class YarnRunner(cliapp.Application):
stopped = time.time()
(started,) = pre_step_userdata
+ self.steps_run += 1
+ self.ts['steps_completed'] += 1
+
logging.debug('Exit code: %d' % exit)
if stdout:
logging.debug('Standard output:\n%s' % self.indent(stdout))
@@ -307,13 +321,15 @@ class YarnRunner(cliapp.Application):
'Skipping "%s" because "%s %s" failed' %
(scenario.name, step.what, step.text))
self.skipped_for_assuming += 1
+ skipped = len(scenario.steps) - scenario.steps.index(step) - 1
+ self.ts['total_steps'] -= skipped
else:
self.error(
'ERROR: In scenario "%s"\nstep "%s %s" failed,\n'
'with exit code %d:\n'
'Standard output from shell command:\n%s'
'Standard error from shell command:\n%s' %
- (scenario.name, step.what, step.text, exit,
+ (str(scenario.name), str(step.what), str(step.text), exit,
self.indent(stdout), self.indent(stderr)))
self.remember_step_timing(
'%s %s' % (step.what, step.text), stopped - started)
@@ -321,16 +337,10 @@ class YarnRunner(cliapp.Application):
self.snapshot_datadir(self.tempdir, step_env['DATADIR'],
scenario, step_number, step)
- def scenario_dir(self, tempdir, scenario):
- return os.path.join(tempdir, self.nice(scenario.name))
-
- def datadir(self, tempdir, scenario):
- sd = self.scenario_dir(tempdir, scenario)
- return os.path.join(sd, 'datadir')
-
def snapshot_dir(self, tempdir, scenario, step, step_number):
- sd = self.scenario_dir(tempdir, scenario)
- base = '%03d-%s-%s' % (step_number, step.what, self.nice(step.text))
+ sd = yarnlib.ScenarioRunner.scenario_dir(tempdir, scenario)
+ base = '%03d-%s-%s' % (step_number, step.what,
+ yarnlib.ScenarioRunner.nice(step.text))
return os.path.join(sd, base)
def snapshot_datadir(self, tempdir, datadir, scenario, step_number, step):
@@ -340,21 +350,6 @@ class YarnRunner(cliapp.Application):
if exit != 0:
logging.warning('Snapshot copy failed:\n%s\n%s' % (out, err))
- def nice(self, name):
- # Quote a scenario or step name so it forms a nice filename.
- nice_chars = "abcdefghijklmnopqrstuvwxyz"
- nice_chars += nice_chars.upper()
- nice_chars += "0123456789-."
-
- nice = []
- for c in name:
- if c in nice_chars:
- nice.append(c)
- elif not nice or nice[-1] != '_':
- nice.append('_')
- nice = ''.join(nice)
- return nice
-
def indent(self, s):
return ''.join(' %s\n' % line for line in s.splitlines())
diff --git a/yarnlib/__init__.py b/yarnlib/__init__.py
index 515082e..c2a5edd 100644
--- a/yarnlib/__init__.py
+++ b/yarnlib/__init__.py
@@ -26,6 +26,7 @@ from scenario_step_connector import (implements_matches_step,
StepNotImplementedError,
StepMultipleImplementationsError)
from scenario_runner import ScenarioRunner
+from scenario_multi_runner import ScenarioMultiRunner
from shell_libraries import load_shell_libraries
diff --git a/yarnlib/scenario_multi_runner.py b/yarnlib/scenario_multi_runner.py
new file mode 100644
index 0000000..402e20c
--- /dev/null
+++ b/yarnlib/scenario_multi_runner.py
@@ -0,0 +1,164 @@
+# Copyright 2014 Richard Maw
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# =*= License: GPL-3+ =*=
+
+
+import asyncore
+import collections
+import errno
+import logging
+import os
+import socket
+import subprocess
+
+import yarnlib
+
+
+class StepOutputHandler(asyncore.file_dispatcher):
+ def __init__(self, fd, handle_closed, handle_output, map=None):
+ asyncore.file_dispatcher.__init__(self, fd=fd, map=map)
+ self.handle_closed = handle_closed
+ self.handle_output = handle_output
+ def handle_read(self):
+ d = self.read(4096)
+ if not d:
+ self.close()
+ self.handle_closed()
+ self.handle_output(d)
+
+
+class StepJob(object):
+ stdout_closed = False
+ stderr_closed = False
+ def __init__(self, step, script, env, scenario_queue, runner,
+ fd_map, scenarios, failed_scenarios):
+ self.step = step
+ self.scenario_queue = scenario_queue
+ self.runner = runner
+ self.fd_map = fd_map
+ self.scenarios = scenarios
+ self.failed_scenarios = failed_scenarios
+
+ self.outbuf = []
+ self.errbuf = []
+
+ self.sp = sp = subprocess.Popen(['sh', '-xeuc', script],
+ stdin=open(os.devnull, 'r'),
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, env=env,
+ cwd=runner.srcdir)
+ StepOutputHandler(fd=sp.stdout, map=fd_map,
+ handle_output=self.outbuf.append,
+ handle_closed=self.handle_stdout_closed)
+ StepOutputHandler(fd=sp.stderr, map=fd_map,
+ handle_output=self.errbuf.append,
+ handle_closed=self.handle_stderr_closed)
+ def handle_stdout_closed(self):
+ self.stdout_closed = True
+ if self.stderr_closed:
+ self.terminate()
+ def handle_stderr_closed(self):
+ self.stderr_closed = True
+ if self.stdout_closed:
+ self.terminate()
+ def terminate(self):
+ exit_code = self.sp.poll()
+ if exit_code is None:
+ logging.warning('Step %s closed output before terminating, '
+ 'waiting for it to exit', self.step.text)
+ exit_code = self.sp.wait()
+ self.handle_terminated(exit_code, ''.join(self.outbuf),
+ ''.join(self.errbuf))
+ def handle_terminated(self, exit_code, stdout, stderr):
+ try:
+ self.scenario_queue.send((exit_code, stdout, stderr))
+ except StopIteration:
+ # end of scenario, queue another
+ if self.scenarios:
+ scenario = self.scenarios.pop(0)
+ s = self.runner.run_scenario(
+ fd_map=self.fd_map, scenario=scenario,
+ scenarios=self.scenarios,
+ failed_scenarios=self.failed_scenarios)
+ s.next()
+ s.send(s)
+
+
+class ScenarioMultiRunner(yarnlib.ScenarioRunner):
+ def __init__(self, *args, **kwargs):
+ self.max_jobs = kwargs.pop('max_jobs')
+ yarnlib.ScenarioRunner.__init__(self, *args, **kwargs)
+
+ def run_scenarios(self, scenarios):
+ jobs_to_run = min(self.max_jobs, len(scenarios))
+ remaining_scenarios = scenarios[jobs_to_run:]
+ fd_map = {}
+ failed_scenarios = []
+ for scenario in scenarios[:jobs_to_run]:
+ s = self.run_scenario(fd_map=fd_map,
+ scenario=scenario,
+ scenarios=remaining_scenarios,
+ failed_scenarios=failed_scenarios)
+ s.next() # need an initial next
+ s.send(s)
+ asyncore.loop(map=fd_map)
+ return failed_scenarios
+
+ def run_scenario(self, fd_map, scenario, scenarios, failed_scenarios):
+ scenario_dir, datadir, homedir, pre_scenario_ud = (
+ self.setup_scenario(scenario))
+ assuming, cleanup, normal = self.partition_steps(scenario)
+ scenario_env = dict(self.env, HOME=homedir, DATADIR=datadir)
+
+ ok = True
+ step_number = 1
+ scenario_queue = (yield)
+
+ # can't delegate common setup here, since it's more code to
+ # delegate a yield than it's worth without python3's yield from,
+ # so rather than using the nice for-loops in ScenarioRunner,
+ # we move the control flow logic to after the step.
+ queue = collections.deque(assuming + normal + cleanup)
+ while queue:
+ step = queue.popleft()
+
+ step_env, pre_step_userdata, shell_script = self.setup_step(
+ step=step, scenario_env=scenario_env,
+ scenario=scenario, step_number=step_number)
+ StepJob(step=step, script=shell_script, env=step_env,
+ runner=self, scenario_queue=scenario_queue,
+ fd_map=fd_map, scenarios=scenarios,
+ failed_scenarios=failed_scenarios)
+ exit, stdout, stderr = (yield)
+ self.post_step_cb(scenario=scenario, step=step,
+ step_number=step_number, step_env=step_env,
+ exit=exit, stdout=stdout, stderr=stderr,
+ pre_step_userdata=pre_step_userdata)
+ step_number += 1
+ if exit != 0:
+ if step in assuming:
+ break
+ elif step in normal:
+ ok = False
+ queue = collections.deque(cleanup)
+ else:
+ ok = False
+ break
+ if not ok:
+ failed_scenarios.append(scenario)
+ self.post_scenario_cb(scenario=scenario, datadir=datadir,
+ homedir=homedir,
+ pre_scenario_userdata=pre_scenario_ud)
diff --git a/yarnlib/scenario_runner.py b/yarnlib/scenario_runner.py
index e0cdf62..cbe2fad 100644
--- a/yarnlib/scenario_runner.py
+++ b/yarnlib/scenario_runner.py
@@ -101,26 +101,54 @@ class ScenarioRunner(object):
def __init__(self, shell_prelude, srcdir, extra_env=(),
pre_step_cb=default_pre_step, post_step_cb=default_post_step,
- cmdrunner=cliapp.runcmd_unchecked):
+ pre_scenario_cb=lambda *x: None,
+ post_scenario_cb=lambda *x: None,
+ cmdrunner=cliapp.runcmd_unchecked, testdir=None):
self.shell_prelude = shell_prelude
self.srcdir = srcdir
self.env = self.clean_env(extra_env, SRCDIR=srcdir)
self.pre_step_cb = pre_step_cb
self.post_step_cb = post_step_cb
+ self.pre_scenario_cb = pre_scenario_cb
+ self.post_scenario_cb = post_scenario_cb
self.cmdrunner = cmdrunner
+ self.testdir = testdir
+
+ def setup_scenario(self, scenario): # pragma: no cover
+ scenario_dir = self.scenario_dir(self.testdir, scenario)
+ os.mkdir(scenario_dir)
+ datadir = self.datadir(scenario_dir)
+ os.mkdir(datadir)
+ homedir = self.homedir(datadir)
+ os.mkdir(homedir)
+
+ ud = self.pre_scenario_cb(scenario, datadir, homedir)
+ return scenario_dir, datadir, homedir, ud
+
+ def run_scenarios(self, scenarios): # pragma: no cover
+ failed = []
+ for scenario in scenarios:
+ scenario_dir, datadir, homedir, ud = self.setup_scenario(scenario)
+ if not self.run_scenario(scenario, datadir, homedir):
+ failed.append(scenario)
+ self.post_scenario_cb(scenario, datadir, homedir, ud)
+ return failed
- def run_scenario(self, scenario, datadir, homedir):
+ @staticmethod
+ def partition_steps(scenario):
assuming = [s for s in scenario.steps if s.what == 'ASSUMING']
cleanup = [s for s in scenario.steps if s.what == 'FINALLY']
- normal = [s for s in scenario.steps if s not in assuming + cleanup]
+ normal = [s for s in scenario.steps
+ if s.what not in ('ASSUMING', 'FINALLY')]
+ return assuming, cleanup, normal
+
+ def run_scenario(self, scenario, datadir, homedir):
+ assuming, cleanup, normal = self.partition_steps(scenario)
+ scenario_env = dict(self.env, HOME=homedir, DATADIR=datadir)
ok = True
step_number = 1
- scenario_env = dict(self.env)
- scenario_env['HOME'] = homedir
- scenario_env['DATADIR'] = datadir
-
for step in assuming:
exit = self.run_step(scenario, step, scenario_env, step_number)
step_number += 1
@@ -143,12 +171,12 @@ class ScenarioRunner(object):
return ok
- def run_step(self, scenario, step, scenario_env, step_number):
+ def setup_step(self, step, scenario_env, scenario, step_number):
m = yarnlib.implements_matches_step(step.implementation, step)
assert m is not None
step_env = dict(scenario_env)
- for i, match in enumerate(m.groups('')):
- step_env['MATCH_%d' % (i+1)] = match
+ step_env.update(('MATCH_%d' % i, match) for (i, match)
+ in enumerate(m.groups(''), 1))
# All parameters passed as keyword-arguments, so that the callback
# may declare parameters in any order, and ignore any parameters
@@ -159,6 +187,12 @@ class ScenarioRunner(object):
shell_script = '%s\n\n%s\n' % (
self.shell_prelude, step.implementation.shell)
+ return step_env, pre_step_userdata, shell_script
+
+ def run_step(self, scenario, step, scenario_env, step_number):
+ step_env, pre_step_userdata, shell_script = self.setup_step(
+ step=step, scenario_env=scenario_env,
+ scenario=scenario, step_number=step_number)
exit, stdout, stderr = self.cmdrunner(
['sh', '-xeuc', shell_script], env=step_env, cwd=self.srcdir)
@@ -200,3 +234,31 @@ class ScenarioRunner(object):
env.update(kwarg_env)
return env
+
+ @classmethod
+ def scenario_dir(cls, testdir, scenario): # pragma: no cover
+ return os.path.join(testdir, cls.nice(scenario.name))
+
+ @staticmethod
+ def datadir(scenario_dir): # pragma: no cover
+ return os.path.join(scenario_dir, 'datadir')
+
+ @staticmethod
+ def homedir(datadir): # pragma: no cover
+ return os.path.join(datadir, 'HOME')
+
+ @staticmethod
+ def nice(name): # pragma: no cover
+ # Quote a scenario or step name so it forms a nice filename.
+ nice_chars = "abcdefghijklmnopqrstuvwxyz"
+ nice_chars += nice_chars.upper()
+ nice_chars += "0123456789-."
+
+ nice = []
+ for c in name:
+ if c in nice_chars:
+ nice.append(c)
+ elif not nice or nice[-1] != '_':
+ nice.append('_')
+ nice = ''.join(nice)
+ return nice