summaryrefslogtreecommitdiff
path: root/yarnlib
diff options
context:
space:
mode:
Diffstat (limited to 'yarnlib')
-rw-r--r--yarnlib/__init__.py1
-rw-r--r--yarnlib/scenario_multi_runner.py164
-rw-r--r--yarnlib/scenario_runner.py82
3 files changed, 237 insertions, 10 deletions
diff --git a/yarnlib/__init__.py b/yarnlib/__init__.py
index 515082e..c2a5edd 100644
--- a/yarnlib/__init__.py
+++ b/yarnlib/__init__.py
@@ -26,6 +26,7 @@ from scenario_step_connector import (implements_matches_step,
StepNotImplementedError,
StepMultipleImplementationsError)
from scenario_runner import ScenarioRunner
+from scenario_multi_runner import ScenarioMultiRunner
from shell_libraries import load_shell_libraries
diff --git a/yarnlib/scenario_multi_runner.py b/yarnlib/scenario_multi_runner.py
new file mode 100644
index 0000000..402e20c
--- /dev/null
+++ b/yarnlib/scenario_multi_runner.py
@@ -0,0 +1,164 @@
+# Copyright 2014 Richard Maw
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# =*= License: GPL-3+ =*=
+
+
+import asyncore
+import collections
+import errno
+import logging
+import os
+import socket
+import subprocess
+
+import yarnlib
+
+
+class StepOutputHandler(asyncore.file_dispatcher):
+ def __init__(self, fd, handle_closed, handle_output, map=None):
+ asyncore.file_dispatcher.__init__(self, fd=fd, map=map)
+ self.handle_closed = handle_closed
+ self.handle_output = handle_output
+ def handle_read(self):
+ d = self.read(4096)
+ if not d:
+ self.close()
+ self.handle_closed()
+ self.handle_output(d)
+
+
+class StepJob(object):
+ stdout_closed = False
+ stderr_closed = False
+ def __init__(self, step, script, env, scenario_queue, runner,
+ fd_map, scenarios, failed_scenarios):
+ self.step = step
+ self.scenario_queue = scenario_queue
+ self.runner = runner
+ self.fd_map = fd_map
+ self.scenarios = scenarios
+ self.failed_scenarios = failed_scenarios
+
+ self.outbuf = []
+ self.errbuf = []
+
+ self.sp = sp = subprocess.Popen(['sh', '-xeuc', script],
+ stdin=open(os.devnull, 'r'),
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, env=env,
+ cwd=runner.srcdir)
+ StepOutputHandler(fd=sp.stdout, map=fd_map,
+ handle_output=self.outbuf.append,
+ handle_closed=self.handle_stdout_closed)
+ StepOutputHandler(fd=sp.stderr, map=fd_map,
+ handle_output=self.errbuf.append,
+ handle_closed=self.handle_stderr_closed)
+ def handle_stdout_closed(self):
+ self.stdout_closed = True
+ if self.stderr_closed:
+ self.terminate()
+ def handle_stderr_closed(self):
+ self.stderr_closed = True
+ if self.stdout_closed:
+ self.terminate()
+ def terminate(self):
+ exit_code = self.sp.poll()
+ if exit_code is None:
+ logging.warning('Step %s closed output before terminating, '
+ 'waiting for it to exit', self.step.text)
+ exit_code = self.sp.wait()
+ self.handle_terminated(exit_code, ''.join(self.outbuf),
+ ''.join(self.errbuf))
+ def handle_terminated(self, exit_code, stdout, stderr):
+ try:
+ self.scenario_queue.send((exit_code, stdout, stderr))
+ except StopIteration:
+ # end of scenario, queue another
+ if self.scenarios:
+ scenario = self.scenarios.pop(0)
+ s = self.runner.run_scenario(
+ fd_map=self.fd_map, scenario=scenario,
+ scenarios=self.scenarios,
+ failed_scenarios=self.failed_scenarios)
+ s.next()
+ s.send(s)
+
+
+class ScenarioMultiRunner(yarnlib.ScenarioRunner):
+ def __init__(self, *args, **kwargs):
+ self.max_jobs = kwargs.pop('max_jobs')
+ yarnlib.ScenarioRunner.__init__(self, *args, **kwargs)
+
+ def run_scenarios(self, scenarios):
+ jobs_to_run = min(self.max_jobs, len(scenarios))
+ remaining_scenarios = scenarios[jobs_to_run:]
+ fd_map = {}
+ failed_scenarios = []
+ for scenario in scenarios[:jobs_to_run]:
+ s = self.run_scenario(fd_map=fd_map,
+ scenario=scenario,
+ scenarios=remaining_scenarios,
+ failed_scenarios=failed_scenarios)
+ s.next() # need an initial next
+ s.send(s)
+ asyncore.loop(map=fd_map)
+ return failed_scenarios
+
+ def run_scenario(self, fd_map, scenario, scenarios, failed_scenarios):
+ scenario_dir, datadir, homedir, pre_scenario_ud = (
+ self.setup_scenario(scenario))
+ assuming, cleanup, normal = self.partition_steps(scenario)
+ scenario_env = dict(self.env, HOME=homedir, DATADIR=datadir)
+
+ ok = True
+ step_number = 1
+ scenario_queue = (yield)
+
+ # can't delegate common setup here, since it's more code to
+ # delegate a yield than it's worth without python3's yield from,
+ # so rather than using the nice for-loops in ScenarioRunner,
+ # we move the control flow logic to after the step.
+ queue = collections.deque(assuming + normal + cleanup)
+ while queue:
+ step = queue.popleft()
+
+ step_env, pre_step_userdata, shell_script = self.setup_step(
+ step=step, scenario_env=scenario_env,
+ scenario=scenario, step_number=step_number)
+ StepJob(step=step, script=shell_script, env=step_env,
+ runner=self, scenario_queue=scenario_queue,
+ fd_map=fd_map, scenarios=scenarios,
+ failed_scenarios=failed_scenarios)
+ exit, stdout, stderr = (yield)
+ self.post_step_cb(scenario=scenario, step=step,
+ step_number=step_number, step_env=step_env,
+ exit=exit, stdout=stdout, stderr=stderr,
+ pre_step_userdata=pre_step_userdata)
+ step_number += 1
+ if exit != 0:
+ if step in assuming:
+ break
+ elif step in normal:
+ ok = False
+ queue = collections.deque(cleanup)
+ else:
+ ok = False
+ break
+ if not ok:
+ failed_scenarios.append(scenario)
+ self.post_scenario_cb(scenario=scenario, datadir=datadir,
+ homedir=homedir,
+ pre_scenario_userdata=pre_scenario_ud)
diff --git a/yarnlib/scenario_runner.py b/yarnlib/scenario_runner.py
index e0cdf62..cbe2fad 100644
--- a/yarnlib/scenario_runner.py
+++ b/yarnlib/scenario_runner.py
@@ -101,26 +101,54 @@ class ScenarioRunner(object):
def __init__(self, shell_prelude, srcdir, extra_env=(),
pre_step_cb=default_pre_step, post_step_cb=default_post_step,
- cmdrunner=cliapp.runcmd_unchecked):
+ pre_scenario_cb=lambda *x: None,
+ post_scenario_cb=lambda *x: None,
+ cmdrunner=cliapp.runcmd_unchecked, testdir=None):
self.shell_prelude = shell_prelude
self.srcdir = srcdir
self.env = self.clean_env(extra_env, SRCDIR=srcdir)
self.pre_step_cb = pre_step_cb
self.post_step_cb = post_step_cb
+ self.pre_scenario_cb = pre_scenario_cb
+ self.post_scenario_cb = post_scenario_cb
self.cmdrunner = cmdrunner
+ self.testdir = testdir
+
+ def setup_scenario(self, scenario): # pragma: no cover
+ scenario_dir = self.scenario_dir(self.testdir, scenario)
+ os.mkdir(scenario_dir)
+ datadir = self.datadir(scenario_dir)
+ os.mkdir(datadir)
+ homedir = self.homedir(datadir)
+ os.mkdir(homedir)
+
+ ud = self.pre_scenario_cb(scenario, datadir, homedir)
+ return scenario_dir, datadir, homedir, ud
+
+ def run_scenarios(self, scenarios): # pragma: no cover
+ failed = []
+ for scenario in scenarios:
+ scenario_dir, datadir, homedir, ud = self.setup_scenario(scenario)
+ if not self.run_scenario(scenario, datadir, homedir):
+ failed.append(scenario)
+ self.post_scenario_cb(scenario, datadir, homedir, ud)
+ return failed
- def run_scenario(self, scenario, datadir, homedir):
+ @staticmethod
+ def partition_steps(scenario):
assuming = [s for s in scenario.steps if s.what == 'ASSUMING']
cleanup = [s for s in scenario.steps if s.what == 'FINALLY']
- normal = [s for s in scenario.steps if s not in assuming + cleanup]
+ normal = [s for s in scenario.steps
+ if s.what not in ('ASSUMING', 'FINALLY')]
+ return assuming, cleanup, normal
+
+ def run_scenario(self, scenario, datadir, homedir):
+ assuming, cleanup, normal = self.partition_steps(scenario)
+ scenario_env = dict(self.env, HOME=homedir, DATADIR=datadir)
ok = True
step_number = 1
- scenario_env = dict(self.env)
- scenario_env['HOME'] = homedir
- scenario_env['DATADIR'] = datadir
-
for step in assuming:
exit = self.run_step(scenario, step, scenario_env, step_number)
step_number += 1
@@ -143,12 +171,12 @@ class ScenarioRunner(object):
return ok
- def run_step(self, scenario, step, scenario_env, step_number):
+ def setup_step(self, step, scenario_env, scenario, step_number):
m = yarnlib.implements_matches_step(step.implementation, step)
assert m is not None
step_env = dict(scenario_env)
- for i, match in enumerate(m.groups('')):
- step_env['MATCH_%d' % (i+1)] = match
+ step_env.update(('MATCH_%d' % i, match) for (i, match)
+ in enumerate(m.groups(''), 1))
# All parameters passed as keyword-arguments, so that the callback
# may declare parameters in any order, and ignore any parameters
@@ -159,6 +187,12 @@ class ScenarioRunner(object):
shell_script = '%s\n\n%s\n' % (
self.shell_prelude, step.implementation.shell)
+ return step_env, pre_step_userdata, shell_script
+
+ def run_step(self, scenario, step, scenario_env, step_number):
+ step_env, pre_step_userdata, shell_script = self.setup_step(
+ step=step, scenario_env=scenario_env,
+ scenario=scenario, step_number=step_number)
exit, stdout, stderr = self.cmdrunner(
['sh', '-xeuc', shell_script], env=step_env, cwd=self.srcdir)
@@ -200,3 +234,31 @@ class ScenarioRunner(object):
env.update(kwarg_env)
return env
+
+ @classmethod
+ def scenario_dir(cls, testdir, scenario): # pragma: no cover
+ return os.path.join(testdir, cls.nice(scenario.name))
+
+ @staticmethod
+ def datadir(scenario_dir): # pragma: no cover
+ return os.path.join(scenario_dir, 'datadir')
+
+ @staticmethod
+ def homedir(datadir): # pragma: no cover
+ return os.path.join(datadir, 'HOME')
+
+ @staticmethod
+ def nice(name): # pragma: no cover
+ # Quote a scenario or step name so it forms a nice filename.
+ nice_chars = "abcdefghijklmnopqrstuvwxyz"
+ nice_chars += nice_chars.upper()
+ nice_chars += "0123456789-."
+
+ nice = []
+ for c in name:
+ if c in nice_chars:
+ nice.append(c)
+ elif not nice or nice[-1] != '_':
+ nice.append('_')
+ nice = ''.join(nice)
+ return nice