summaryrefslogtreecommitdiff
path: root/yarnlib/scenario_multi_runner.py
diff options
context:
space:
mode:
Diffstat (limited to 'yarnlib/scenario_multi_runner.py')
-rw-r--r--yarnlib/scenario_multi_runner.py164
1 files changed, 164 insertions, 0 deletions
diff --git a/yarnlib/scenario_multi_runner.py b/yarnlib/scenario_multi_runner.py
new file mode 100644
index 0000000..402e20c
--- /dev/null
+++ b/yarnlib/scenario_multi_runner.py
@@ -0,0 +1,164 @@
+# Copyright 2014 Richard Maw
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# =*= License: GPL-3+ =*=
+
+
+import asyncore
+import collections
+import errno
+import logging
+import os
+import socket
+import subprocess
+
+import yarnlib
+
+
+class StepOutputHandler(asyncore.file_dispatcher):
+ def __init__(self, fd, handle_closed, handle_output, map=None):
+ asyncore.file_dispatcher.__init__(self, fd=fd, map=map)
+ self.handle_closed = handle_closed
+ self.handle_output = handle_output
+ def handle_read(self):
+ d = self.read(4096)
+ if not d:
+ self.close()
+ self.handle_closed()
+ self.handle_output(d)
+
+
+class StepJob(object):
+ stdout_closed = False
+ stderr_closed = False
+ def __init__(self, step, script, env, scenario_queue, runner,
+ fd_map, scenarios, failed_scenarios):
+ self.step = step
+ self.scenario_queue = scenario_queue
+ self.runner = runner
+ self.fd_map = fd_map
+ self.scenarios = scenarios
+ self.failed_scenarios = failed_scenarios
+
+ self.outbuf = []
+ self.errbuf = []
+
+ self.sp = sp = subprocess.Popen(['sh', '-xeuc', script],
+ stdin=open(os.devnull, 'r'),
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, env=env,
+ cwd=runner.srcdir)
+ StepOutputHandler(fd=sp.stdout, map=fd_map,
+ handle_output=self.outbuf.append,
+ handle_closed=self.handle_stdout_closed)
+ StepOutputHandler(fd=sp.stderr, map=fd_map,
+ handle_output=self.errbuf.append,
+ handle_closed=self.handle_stderr_closed)
+ def handle_stdout_closed(self):
+ self.stdout_closed = True
+ if self.stderr_closed:
+ self.terminate()
+ def handle_stderr_closed(self):
+ self.stderr_closed = True
+ if self.stdout_closed:
+ self.terminate()
+ def terminate(self):
+ exit_code = self.sp.poll()
+ if exit_code is None:
+ logging.warning('Step %s closed output before terminating, '
+ 'waiting for it to exit', self.step.text)
+ exit_code = self.sp.wait()
+ self.handle_terminated(exit_code, ''.join(self.outbuf),
+ ''.join(self.errbuf))
+ def handle_terminated(self, exit_code, stdout, stderr):
+ try:
+ self.scenario_queue.send((exit_code, stdout, stderr))
+ except StopIteration:
+ # end of scenario, queue another
+ if self.scenarios:
+ scenario = self.scenarios.pop(0)
+ s = self.runner.run_scenario(
+ fd_map=self.fd_map, scenario=scenario,
+ scenarios=self.scenarios,
+ failed_scenarios=self.failed_scenarios)
+ s.next()
+ s.send(s)
+
+
+class ScenarioMultiRunner(yarnlib.ScenarioRunner):
+ def __init__(self, *args, **kwargs):
+ self.max_jobs = kwargs.pop('max_jobs')
+ yarnlib.ScenarioRunner.__init__(self, *args, **kwargs)
+
+ def run_scenarios(self, scenarios):
+ jobs_to_run = min(self.max_jobs, len(scenarios))
+ remaining_scenarios = scenarios[jobs_to_run:]
+ fd_map = {}
+ failed_scenarios = []
+ for scenario in scenarios[:jobs_to_run]:
+ s = self.run_scenario(fd_map=fd_map,
+ scenario=scenario,
+ scenarios=remaining_scenarios,
+ failed_scenarios=failed_scenarios)
+ s.next() # need an initial next
+ s.send(s)
+ asyncore.loop(map=fd_map)
+ return failed_scenarios
+
+ def run_scenario(self, fd_map, scenario, scenarios, failed_scenarios):
+ scenario_dir, datadir, homedir, pre_scenario_ud = (
+ self.setup_scenario(scenario))
+ assuming, cleanup, normal = self.partition_steps(scenario)
+ scenario_env = dict(self.env, HOME=homedir, DATADIR=datadir)
+
+ ok = True
+ step_number = 1
+ scenario_queue = (yield)
+
+ # can't delegate common setup here, since it's more code to
+ # delegate a yield than it's worth without python3's yield from,
+ # so rather than using the nice for-loops in ScenarioRunner,
+ # we move the control flow logic to after the step.
+ queue = collections.deque(assuming + normal + cleanup)
+ while queue:
+ step = queue.popleft()
+
+ step_env, pre_step_userdata, shell_script = self.setup_step(
+ step=step, scenario_env=scenario_env,
+ scenario=scenario, step_number=step_number)
+ StepJob(step=step, script=shell_script, env=step_env,
+ runner=self, scenario_queue=scenario_queue,
+ fd_map=fd_map, scenarios=scenarios,
+ failed_scenarios=failed_scenarios)
+ exit, stdout, stderr = (yield)
+ self.post_step_cb(scenario=scenario, step=step,
+ step_number=step_number, step_env=step_env,
+ exit=exit, stdout=stdout, stderr=stderr,
+ pre_step_userdata=pre_step_userdata)
+ step_number += 1
+ if exit != 0:
+ if step in assuming:
+ break
+ elif step in normal:
+ ok = False
+ queue = collections.deque(cleanup)
+ else:
+ ok = False
+ break
+ if not ok:
+ failed_scenarios.append(scenario)
+ self.post_scenario_cb(scenario=scenario, datadir=datadir,
+ homedir=homedir,
+ pre_scenario_userdata=pre_scenario_ud)