diff options
author | Sam Thursfield <sam.thursfield@codethink.co.uk> | 2015-02-16 11:59:01 +0000 |
---|---|---|
committer | Sam Thursfield <sam.thursfield@codethink.co.uk> | 2015-02-16 11:59:01 +0000 |
commit | b831ab16fb49ad56aa1091f1c820325a33832d53 (patch) | |
tree | 12b53f6543e103b70e470fb740f1c22e92b0b044 | |
parent | 0cb996b43cba9376e4defba1a4065ad9b080db61 (diff) | |
parent | 626d62feccefda2a57abd5f71da9c4fb164c604c (diff) | |
download | morph-b831ab16fb49ad56aa1091f1c820325a33832d53.tar.gz |
Merge branch 'sam/distbuild-test-harness' into sam/distbuild-build-logs
Conflicts:
scripts/distbuild
-rwxr-xr-x | scripts/distbuild | 294 | ||||
-rw-r--r-- | yarns/morph.shell-lib | 2 |
2 files changed, 164 insertions, 132 deletions
diff --git a/scripts/distbuild b/scripts/distbuild index 285b1524..97798831 100755 --- a/scripts/distbuild +++ b/scripts/distbuild @@ -14,7 +14,29 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -# Run a local instance of a Morph distributed build network. +'''Run a local instance of a Morph distributed build network. + +To use: + + scripts/distbuild run + +If you want more info on what is going on, try: + + scripts/distbuild run --log=/dev/stdout + +All distbuild subprocesses will be shut down on TERM signal or if any of them +crash. + +Logs and build artifacts will be stored in a newly created temporary directory. +You can specify a different location with the --datadir argument. The directory +will not be deleted when the process exits. + +''' + + +DISTBUILD_HELPER = 'distbuild-helper' +MORPH = 'morph' +MORPH_CACHE_SERVER = 'morph-cache-server' import os @@ -22,42 +44,39 @@ import select import subprocess import tempfile import time +from logging import debug, info import cliapp - -# Some hackz if you are running Morph from the source tree. -os.environ['PYTHONPATH'] = '/src/morph' -DISTBUILD_HELPER = '/src/morph/distbuild-helper' -MORPH = '/src/morph/morph' -MORPH_CACHE_SERVER = '/src/morph/morph-cache-server' - - - -def subdir(workdir, *args): - path = os.path.join(workdir, *args) +def subdir(workdir, *path_components): + '''Create a subdirectory and return the path to it.''' + path = os.path.join(workdir, *path_components) os.makedirs(path) return path class Process(subprocess.Popen): - '''Helper for subprocess.Popen with neater commandline argument passing. + '''A running subprocess.''' - This allows you to pass a dict of settings, which will be converted to - commandline arguments internally. + def __init__(self, name, argv, settings, **kwargs): + '''Start a new subprocess, using subprocess.Popen. - ''' + The 'name' parameter is only used internally. - def __init__(self, name, argv, settings): + The 'argv' parameter specifies the commandline to be run. The + 'settings' dict will be formatted as long-form commandline switches and + added to 'argv'. + + ''' self.name = name self.argv = argv self.settings = settings full_argv = argv + self._format_settings(settings) - print('%s commandline: %s' % (name, ' '.join(full_argv))) - subprocess.Popen.__init__(self, full_argv) - print('%s process ID: %s' % (name, self.pid)) + info('%s commandline: %s' % (name, ' '.join(full_argv))) + super(Process, self).__init__(full_argv, **kwargs) + info('%s process ID: %s' % (name, self.pid)) def _format_settings(self, arg_dict): def as_string(key, value): @@ -71,33 +90,50 @@ class Process(subprocess.Popen): class MorphProcess(Process): - '''Process helper with special defaults for Morph and related programs. + '''A running instance of Morph, morph-cache-server or distbuild-helper.''' - Morph and its helper tools all use 'cliapp', which provides a standard - interface for configuration and logging. + def __init__(self, name, argv, settings, log_path=None, **kwargs): + '''Start an instance of Morph, morph-cache-server or distbuild-helper. - ''' + The logs will be sent to a file called '$log_path/morph-$name.log', if + log_path is passed. - def __init__(self, name, argv, settings, log_path=None): + ''' if log_path: settings['log'] = os.path.join(log_path, 'morph-%s.log' % name) settings['no-default-config'] = True - Process.__init__(self, name, argv, settings) + super(MorphProcess, self).__init__(name, argv, settings, **kwargs) class MorphListenerProcess(MorphProcess): - '''Process helper for listening processes.''' + '''A running instance of a Morph or morph-cache-server daemon process.''' + + def __init__(self, name, port_names, argv, settings, **kwargs): + '''Start and wait for an instance of Morph or morph-cache-server. - def __init__(self, name, port_names, argv, settings, log_path=None): + Using --port-file arguments, the constructor will wait for each port + listed in 'port_names' to become ready before returning. The subprocess + will pick a random available port number for each port. The numbers + be accessible as attributes on this class. + + For example, if you pass port_names=['worker-daemon-port'], the + subprocess will receive two extra commandline arguments: + --worker-daemon-port=0 and --worker-daemon-port-file=xxx + + Once the process starts, self.worker_daemon_port will contain the + number of the port that it is listening on. + + ''' for port_name in port_names: self._setup_port_fifo(port_name, settings) - MorphProcess.__init__(self, name, argv, settings, log_path=log_path) + super(MorphListenerProcess, self).__init__( + name, argv, settings, **kwargs) for port_name in port_names: port_number = self._read_port_fifo(port_name, settings) - print('%s: %s port is %s' % (name, port_name, port_number)) + info('%s: %s port is %s' % (name, port_name, port_number)) port_attr = port_name.replace('-', '_') setattr(self, port_attr, port_number) @@ -115,12 +151,12 @@ class MorphListenerProcess(MorphProcess): def _read_port_fifo(self, port_name, settings): port_file = settings['%s-file' % port_name] - print 'Read: %s' % port_file + debug('Read: %s' % port_file) with open(port_file, 'r') as f: # The readline() call will block until a line of data is written. # The process we are starting should only write out the port number # once the process is listening on that port. Thus, we block until - # the process is ready, which is very important. + # the process is ready, which is very important for avoiding races. port_number = int(f.readline()) os.unlink(port_file) @@ -130,23 +166,31 @@ class MorphListenerProcess(MorphProcess): class MorphCacheServerProcess(MorphListenerProcess): + '''A morph-cache-server process.''' + def __init__(self, name, cache_path, log_path=None, enable_writes=False): + '''Start a morph-cache-server process.''' + ports = ['port'] argv = [MORPH_CACHE_SERVER] settings = { 'artifact-dir': subdir(cache_path, 'artifacts'), - 'repo-dir': subdir(cache_path, 'gits'), 'enable-writes': enable_writes, 'no-fcgi': True, + 'repo-dir': subdir(cache_path, 'gits'), } - MorphListenerProcess.__init__( - self, name, ports, argv, settings, log_path=log_path) + super(MorphCacheServerProcess, self).__init__( + name, ports, argv, settings, log_path=log_path, + stderr=subprocess.PIPE) class MorphWorkerDaemonProcess(MorphListenerProcess): + '''A `morph worker-daemon` process.''' def __init__(self, name, cache_server, log_path=None): + '''Start a `morph worker-daemon` instance.''' + ports = ['worker-daemon-port'] argv = [MORPH, 'worker-daemon'] @@ -154,17 +198,44 @@ class MorphWorkerDaemonProcess(MorphListenerProcess): 'artifact-cache-server': 'http://localhost:%s' % cache_server.port, } - MorphListenerProcess.__init__( - self, name, ports, argv, settings, log_path=log_path) + super(MorphWorkerDaemonProcess, self).__init__( + name, ports, argv, settings, log_path=log_path) + + +class ProcessMonitor(object): + '''A tool for managing a bunch of subprocesses.''' + + def __init__(self): + self.process = {} + + def watch(self, process): + '''Start monitoring a running process.''' + self.process[process.name] = process + + def check_all(self): + '''Check all processes are running.''' + for name, p in self.process.iteritems(): + if p.poll() != None: + raise Exception( + '%s: exited with code %s' % (name, p.returncode)) + + def terminate_all(self): + '''Send TERM signal to all active subprocesses.''' + for p in self.process.itervalues(): + if p.poll() == None: + p.terminate() + info('Waiting for process %i' % p.pid) + p.wait() class DistbuildTestHarness(cliapp.Application): '''Harness for running a distbuild network on a single machine.''' def __init__(self): - self.process = dict() super(DistbuildTestHarness, self).__init__() + self.process_monitor = ProcessMonitor() + def add_settings(self): self.settings.string( ['datadir'], @@ -176,6 +247,10 @@ class DistbuildTestHarness(cliapp.Application): self.settings.string( ['port-file'], 'write port used by initiator to FILE, when ready') + self.settings.integer( + ['workers'], + 'number of workers to start', + default=4) def cmd_run(self, args): '''Run a distbuild network.''' @@ -188,116 +263,85 @@ class DistbuildTestHarness(cliapp.Application): controller = self.start_distbuild_network( datadir, worker_cache, shared_cache, morph_instance=self.settings['morph-instance'], - n_workers=1) + n_workers=self.settings['workers']) if self.settings['port-file']: with open(self.settings['port-file'], 'w') as f: f.write('%s' % controller.controller_initiator_port) - print('Distbuild initiator listening on port %i' % - controller.controller_initiator_port) + print('Distbuild controller listening on port %i' % + controller.controller_initiator_port) print('Data in %s' % datadir) + print('\nTo use: morph distbuild ' + '--controller-initiator-address=localhost ' + '--controller-initiator-port=%i FILE' % + controller.controller_initiator_port) + # Run until we get a TERM signal. while True: select.select([], [], [], 1) - self.check_processes_are_running() + self.process_monitor.check_all() finally: - self.terminate_processes() - - def cmd_build(self): - '''Start a distbuild network and run a single build on it.''' - workdir = tempfile.mkdtemp() - print('Working directory: %s' % workdir) - - to_build = [ - 'baserock:baserock/definitions', - 'c7292b7c81cdd7e5b9e85722406371748453c44f', - 'systems/base-system-x86_64-generic.morph', - ] - - - try: - worker_cache, shared_cache = self.start_cache_servers(workdir) - controller = self.start_distbuild_network( - workdir, worker_cache, shared_cache, n_workers=7) - - initiator1 = self.start_build(controller, to_build, - log_path=subdir(workdir, '1')) + self.process_monitor.terminate_all() - initiator2 = self.start_build(controller, to_build, - log_path=subdir(workdir, '2')) - - while initiator1.poll() == None or initiator2.poll() == None: - time.sleep(0.1) # FIXME: use select! - self.check_processes_are_running() - - if initiator1.returncode != 0 or initiator2.returncode != 0: - #raise Exception('Initiator fail') - print('Initiator fail') - else: - print('Success!') - - finally: - self.terminate_processes() - print('Test state kept in %s' % workdir) + def start_cache_servers(self, workdir): + '''Start necessary artifact cache servers. - def watch_process(self, process): - self.process[process.name] = process + There needs to be a shared artifact cache server. In a normal distbuild + setup this is part of the Trove system. - def check_processes_are_running(self): - '''Check all processes are running.''' - for name, p in self.process.iteritems(): - if p.poll() != None: - raise Exception( - '%s: exited with code %s' % (name, p.returncode)) + There is a separate cache server for all the workers. In a real + distbuild setup, each worker machine runs its own instance of + morph-cache-server. The controller uses the same port number for all + workers so in this test harness all workers will have to share one + cache-server process. - def terminate_processes(self): - '''Send TERM signal to all active subprocesses.''' - for p in self.process.itervalues(): - if p.poll() == None: - p.terminate() - print('Waiting for process %i' % p.pid) - p.wait() + It's not possible to use a single cache server process at present, + because when the /fetch method of the shared cache server is called, it + will break because it can't fetch stuff from itself. - def start_cache_servers(self, workdir): - # We have to make a bit of a kludge here. In a real distbuild setup, - # each worker machine runs its own instance of morph-cache-server on - # port 8080. The controller uses the same port number for all workers - # so in this test harness all workers will have to share one - # cache-server process. - # - # That's fine, but we still need a separate process for the *shared* - # artifact cache: artifacts are copied from workers to the shared cache - # using the /fetch method, which just wouldn't work if it had to fetch - # from itself. + ''' worker_cache = MorphCacheServerProcess( name='worker-cache-server', cache_path=subdir(workdir, 'worker-cache'), log_path=workdir, enable_writes=False) - self.watch_process(worker_cache) + self.process_monitor.watch(worker_cache) shared_cache = MorphCacheServerProcess( name='shared-cache-server', cache_path=subdir(workdir, 'shared-cache'), log_path=workdir, enable_writes=True) - self.watch_process(shared_cache) + self.process_monitor.watch(shared_cache) return worker_cache, shared_cache def start_distbuild_network(self, workdir, worker_cache, shared_cache, morph_instance=None, n_workers=4): - if morph_instance is None: - # This is a hack so we can pass arguments to the Morph used to run - # worker-build and serialise-artifact. + '''Start Morph distbuild daemons and helper processes. + + This starts a `morph controller-daemon` process, and one or more `morph + worker-daemon` processes. It also starts the helper process that these + need. It returns the controller process, which is the one you need to + talk to if you want to start a build. + + ''' + + if not morph_instance: + # Create a wrapper script for the Morph that will be used to run + # `serialise-artifact` and `worker-build` commands, so we can pass + # it the commandline arguments it neds. worker_morph = os.path.join(workdir, 'worker-morph') + with open(worker_morph, 'w') as f: cache_dir = os.path.join(workdir, 'worker-cache') log = os.path.join(workdir, 'morph.log') f.write('#!/bin/sh\n') - f.write('%s --cachedir=%s --log=%s $@\n' % (MORPH, cache_dir, log)) + f.write('%s --cachedir=%s --log=%s $@\n' % ( + MORPH, cache_dir, log)) + os.chmod(worker_morph, 0755) morph_instance = worker_morph @@ -307,7 +351,7 @@ class DistbuildTestHarness(cliapp.Application): name='worker-%i' % n, cache_server=worker_cache, log_path=workdir) - self.watch_process(worker) + self.process_monitor.watch(worker) workers.append('localhost:%i' % worker.worker_daemon_port) @@ -319,7 +363,9 @@ class DistbuildTestHarness(cliapp.Application): }, log_path=workdir ) - self.watch_process(worker_helper) + self.process_monitor.watch(worker_helper) + + shared_cache_url = 'http://localhost:%s' % shared_cache.port controller = MorphListenerProcess( name='controller', @@ -331,10 +377,10 @@ class DistbuildTestHarness(cliapp.Application): 'morph-instance': morph_instance, 'worker': ','.join(workers), 'worker-cache-server-port': worker_cache.port, - 'writeable-cache-server': 'http://localhost:%s' % shared_cache.port, + 'writeable-cache-server': shared_cache_url, }, log_path=workdir) - self.watch_process(controller) + self.process_monitor.watch(controller) controller_helper = MorphProcess( name='controller-helper', @@ -344,27 +390,13 @@ class DistbuildTestHarness(cliapp.Application): }, log_path=workdir ) - - self.watch_process(controller_helper) + self.process_monitor.watch(controller_helper) # Need to wait for controller-helper to connect to controller. time.sleep(0.1) - - self.check_processes_are_running() + self.process_monitor.check_all() return controller - def start_build(self, controller, to_build, log_path): - port = controller.controller_initiator_port - return MorphProcess( - name='initiator', - argv=[MORPH, 'distbuild-morphology'] + to_build, - settings={ - 'controller-initiator-address': 'localhost', - 'controller-initiator-port': port, - 'initiator-step-output-dir': log_path, - }, - log_path=log_path) - DistbuildTestHarness().run() diff --git a/yarns/morph.shell-lib b/yarns/morph.shell-lib index 206f0e38..c5de80df 100644 --- a/yarns/morph.shell-lib +++ b/yarns/morph.shell-lib @@ -3,7 +3,7 @@ # The shell functions in this library are meant to make writing IMPLEMENTS # sections for yarn scenario tests easier. -# Copyright (C) 2013-2014 Codethink Limited +# Copyright (C) 2013-2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by |