From 7a8a2eda8ecb7c36d789af49eab1b7c3c853e461 Mon Sep 17 00:00:00 2001 From: Sam Thursfield Date: Fri, 13 Feb 2015 17:25:31 +0000 Subject: Add script to run a distbuild network on the local machine Change-Id: I8d3f3ec6a1796b06b32e43dc4839360ff1cc2d86 --- scripts/run-distbuild | 523 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 523 insertions(+) create mode 100755 scripts/run-distbuild diff --git a/scripts/run-distbuild b/scripts/run-distbuild new file mode 100755 index 00000000..678fd20d --- /dev/null +++ b/scripts/run-distbuild @@ -0,0 +1,523 @@ +#!/usr/bin/env python +# Copyright (C) 2015 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +'''Run a local instance of a Morph distributed build network. + +To use: + + scripts/run-distbuild + +This will test your system-installed version of Morph. During development you +probably want to use `--source` to run Morph from a source checkout instead: + + scripts/run-distbuild --source=. + +If you want more info on what is going on, try: + + scripts/run-distbuild --log=/dev/stdout + +All distbuild subprocesses will be shut down on TERM signal or if any of them +crash. + +Logs and build artifacts will be stored in a newly created temporary directory. +You can specify a different location with the --datadir argument. The directory +will not be deleted when the process exits. + +You can set the number of workers with `--workers=N` (default is 1). + +** NOTE: there are currently some bugs in Morph which cause various errors +** when more than one worker is running on the system at once. + +''' + + +DISTBUILD_HELPER = 'distbuild-helper' +MORPH = 'morph' +MORPH_CACHE_SERVER = 'morph-cache-server' + + +import os +import pipes +import subprocess +import tempfile +import time +from logging import debug, info + +import cliapp + + +def subdir(workdir, *path_components): + '''Create a subdirectory and return the path to it.''' + path = os.path.join(workdir, *path_components) + os.makedirs(path) + return path + + +def add_to_environment(env=None, **new_values): + '''Extend 'env' with 'new_values'. + + If 'env' is None, the environment of the current process is used. + + If any of 'new_values' are present in 'env' already, an exception is + raised. + + ''' + env = env or os.environ.copy() + for k, v in new_values.iteritems(): + if k in env: + # Sorry ... there's no 'one size fits all' strategy for merging + # environment variables so just add code for whatever special + # case you need to handle, if you hit this. + raise NotImplementedError( + 'Unable to merge %s=%s with existing value of %s', + k, v, env[k]) + else: + env[k] = v + return env + + +class Process(subprocess.Popen): + '''A running subprocess.''' + + def __init__(self, name, argv, **kwargs): + '''Start a new subprocess, using subprocess.Popen. + + The 'name' parameter is only used internally. + + The 'argv' parameter specifies the commandline to be run. + + ''' + self.name = name + self.argv = argv + + info('%s commandline: %s' % (name, ' '.join(argv))) + super(Process, self).__init__(argv, **kwargs) + info('%s process ID: %s' % (name, self.pid)) + + +class CliappProcess(Process): + '''A running subprocess that supports 'cliapp'-style commandline arguments. + + The 'cliapp' library is used by most Baserock components to give a + consistent commandline interface. + + ''' + + def __init__(self, name, argv, settings, **kwargs): + '''Start a new subprocess, using subprocess.Popen. + + The 'settings' dict will be formatted as long-form commandline switches + and added to 'argv'. + + ''' + self.settings = settings + + assert 'config' not in settings, \ + "The configuration file %s may interfere with settings passed " \ + "through the commandline." + + full_argv = argv + self._format_settings(settings) + + super(CliappProcess, self).__init__(name, full_argv, **kwargs) + + def _format_settings(self, arg_dict): + def as_string(key, value): + if value is True: + return '--%s' % key + elif value is False: + return None + else: + return '--%s=%s' % (key, value) + + def not_null(item): + return (item is not None) + + strings = [as_string(k, v) for k, v in arg_dict.iteritems()] + return sorted(filter(not_null, strings)) + + +class MorphProcess(CliappProcess): + '''A running instance of Morph, morph-cache-server or distbuild-helper.''' + + def __init__(self, name, argv, settings, log_path=None, source_dir=None, + **kwargs): + '''Start an instance of Morph, morph-cache-server or distbuild-helper. + + The logs will be sent to a file called '$log_path/morph-$name.log', if + log_path is passed. + + If source_dir is passed, it is prepended to argv[0] and added to + PYTHONPATH. This is to allow running Morph from its source tree easily. + + ''' + if log_path is not None: + settings['log'] = os.path.join(log_path, 'morph-%s.log' % name) + + if source_dir is not None: + argv[0] = os.path.join(source_dir, argv[0]) + kwargs['env'] = add_to_environment( + kwargs.get('env'), PYTHONPATH=source_dir) + + settings['no-default-config'] = True + + super(MorphProcess, self).__init__(name, argv, settings, **kwargs) + + +class MorphListenerProcess(MorphProcess): + '''A running instance of a Morph or morph-cache-server daemon process.''' + + def __init__(self, name, port_names, argv, settings, **kwargs): + '''Start and wait for an instance of Morph or morph-cache-server. + + Using --port-file arguments, the constructor will wait for each port + listed in 'port_names' to become ready before returning. The subprocess + will pick a random available port number for each port. The numbers + be accessible as attributes on this class. + + For example, if you pass port_names=['worker-daemon-port'], the + subprocess will receive two extra commandline arguments: + --worker-daemon-port=0 and --worker-daemon-port-file=xxx + + After construction, self.worker_daemon_port will contain the + number of the port that it is listening on. + + ''' + for port_name in port_names: + self._setup_port_fifo(port_name, settings) + + super(MorphListenerProcess, self).__init__( + name, argv, settings, **kwargs) + + for port_name in port_names: + port_number = self._read_port_fifo(port_name, settings) + info('%s: %s port is %s' % (name, port_name, port_number)) + + port_attr = port_name.replace('-', '_') + setattr(self, port_attr, port_number) + + def _setup_port_fifo(self, port_name, settings): + tempdir = tempfile.mkdtemp() + port_file = os.path.join(tempdir, '%s.port' % (port_name)) + os.mkfifo(port_file) + + # Note that Python passes dicts by reference, so this modifies the + # dict that was passed in. + settings['%s' % port_name] = 0 + settings['%s-file' % port_name] = port_file + + def _read_port_fifo(self, port_name, settings): + port_file = settings['%s-file' % port_name] + + # The open() call will block until the daemon process opens the other + # end of this FIFO. It should do this only once it is listening for + # connections on this port. Thus, we block until the process is ready, + # which is very important for avoiding races. + debug('Read: %s' % port_file) + with open(port_file, 'r') as f: + port_number = int(f.readline()) + + os.unlink(port_file) + os.rmdir(os.path.dirname(port_file)) + + return port_number + + +class MorphCacheServerProcess(MorphListenerProcess): + '''A morph-cache-server process.''' + + def __init__(self, name, cache_path, log_path=None, source_dir=None, + enable_writes=False): + '''Start a morph-cache-server process.''' + + ports = ['port'] + argv = [MORPH_CACHE_SERVER] + + settings = { + 'artifact-dir': subdir(cache_path, 'artifacts'), + 'enable-writes': enable_writes, + 'no-fcgi': True, + 'repo-dir': subdir(cache_path, 'gits'), + } + + super(MorphCacheServerProcess, self).__init__( + name, ports, argv, settings, log_path=log_path, + source_dir=source_dir, stderr=subprocess.PIPE) + + +class MorphWorkerDaemonProcess(MorphListenerProcess): + '''A `morph worker-daemon` process.''' + def __init__(self, name, cache_server, log_path=None, source_dir=None): + '''Start a `morph worker-daemon` instance.''' + + ports = ['worker-daemon-port'] + argv = [MORPH, 'worker-daemon'] + + settings = { + 'artifact-cache-server': 'http://localhost:%s' % cache_server.port, + } + + super(MorphWorkerDaemonProcess, self).__init__( + name, ports, argv, settings, log_path=log_path, + source_dir=source_dir) + + +class ProcessMonitor(object): + '''A tool for managing a bunch of subprocesses.''' + + def __init__(self): + self.process = {} + + def watch(self, process): + '''Start monitoring a running process.''' + + assert process.name not in self.process + + self.process[process.name] = process + + def check_all(self): + '''Check all processes are running.''' + failed = [] + + for name, p in self.process.iteritems(): + if p.poll() != None: + failed.append(p) + + if len(failed) > 0: + def message(p): + return '%s: exited with code %s' % (p.name, p.returncode) + raise Exception('\n'.join(message(p) for p in failed)) + + def terminate_all(self): + '''Send TERM signal to all active subprocesses.''' + for p in self.process.itervalues(): + if p.poll() == None: + p.terminate() + info('Waiting for process %i' % p.pid) + p.wait() + + +class DistbuildTestHarness(cliapp.Application): + '''Harness for running a distbuild network on a single machine.''' + + def __init__(self): + super(DistbuildTestHarness, self).__init__() + + self.process_monitor = ProcessMonitor() + + def add_settings(self): + self.settings.string( + ['datadir'], + 'location to cache gits and artifacts, and write log files') + self.settings.string( + ['port-file'], + 'write port used by initiator to FILE, when ready') + self.settings.string( + ['source'], + 'Path to a checkout of morph.git, to be used instead of the ' + 'installed system version of Morph', + default=None) + # FIXME: I'd rather the default be more than one worker, as it is a + # better test for the controller code, but some bugs in Morph currently + # prevent having more than one worker on the same machine. + self.settings.integer( + ['workers'], + 'number of workers to start', + default=1) + + def process_args(self, args): + '''Run a distbuild network.''' + + if len(args) != 0: + raise cliapp.AppException( + 'This program does not take any arguments.') + + try: + datadir = self.settings['datadir'] or tempfile.mkdtemp() + + worker_cache, shared_cache = self.start_cache_servers(datadir) + + controller = self.start_distbuild_network( + datadir, worker_cache, shared_cache, + source_dir=self.settings['source'], + n_workers=self.settings['workers']) + + if self.settings['port-file']: + with open(self.settings['port-file'], 'w') as f: + f.write('%s' % controller.controller_initiator_port) + + print('Distbuild controller listening on port %i' % + controller.controller_initiator_port) + print('Data in %s' % datadir) + + print('\nTo use: morph distbuild ' + '--controller-initiator-address=localhost ' + '--controller-initiator-port=%i FILE' % + controller.controller_initiator_port) + + # Run until we get a TERM signal. It would be neater to use + # distbuild.SubprocessEventSource rather than time.sleep(1) in + # this main loop. But it would be more complicated to implement. + while True: + time.sleep(1) + self.process_monitor.check_all() + finally: + self.process_monitor.terminate_all() + + def start_cache_servers(self, workdir, source_dir=None): + '''Start necessary artifact cache servers. + + There needs to be a shared artifact cache server. In a normal distbuild + setup this is part of the Trove system. + + There is a separate cache server for all the workers. In a real + distbuild setup, each worker machine runs its own instance of + morph-cache-server. The controller uses the same port number for all + workers so in this test harness all workers will have to share one + cache-server process. + + It's not possible to use a single cache server process at present, + because when the /fetch method of the shared cache server is called, it + will break because it can't fetch stuff from itself. + + ''' + worker_cache = MorphCacheServerProcess( + name='worker-cache-server', + source_dir=source_dir, + cache_path=subdir(workdir, 'worker-cache'), + log_path=workdir, + enable_writes=False) + self.process_monitor.watch(worker_cache) + + shared_cache = MorphCacheServerProcess( + name='shared-cache-server', + source_dir=source_dir, + cache_path=subdir(workdir, 'shared-cache'), + log_path=workdir, + enable_writes=True) + self.process_monitor.watch(shared_cache) + + return worker_cache, shared_cache + + def create_worker_morph_wrapper_script(self, workdir, source_dir=None): + '''Create a wrapper script for Morph that distbuild processes will use. + + The distbuild controller and worker daemons coordinate the work being + done by distbuild, but the actual work of calculating the build graph + and running build commands is done by Morph subprocesses started by + the daemon helper processes. By default they will call the system + installed Morph (/usr/bin/morph) with fixed arguments. + + We can cause them to use a different Morph program by passing + `--morph-instance` to the controller. However, to pass extra arguments + we need to create a wrapper script and pass in the path to that. + + ''' + path = os.path.join(workdir, 'worker-morph') + + if source_dir: + morph = os.path.join(source_dir, MORPH) + else: + morph = MORPH + + cache_dir = os.path.join(workdir, 'worker-cache') + log = os.path.join(workdir, 'morph.log') + + with open(path, 'w') as f: + f.write('#!/bin/sh\n') + + if source_dir: + f.write('export PYTHONPATH="$PYTHONPATH":%s\n' % + pipes.quote(source_dir)) + + f.write('%s --cachedir=%s --log=%s "$@"\n' % ( + pipes.quote(morph), pipes.quote(cache_dir), pipes.quote(log))) + + os.chmod(path, 0755) + return path + + def start_distbuild_network(self, workdir, worker_cache, shared_cache, + source_dir=None, n_workers=4): + '''Start Morph distbuild daemons and helper processes. + + This starts a `morph controller-daemon` process, and one or more `morph + worker-daemon` processes. It also starts the helper process that these + need. It returns the controller process, which is the one you need to + talk to if you want to start a build. + + ''' + worker_morph = self.create_worker_morph_wrapper_script( + workdir, source_dir=source_dir) + + workers = [] + for n in range(0, n_workers): + worker = MorphWorkerDaemonProcess( + name='worker-%i' % n, + source_dir=source_dir, + cache_server=worker_cache, + log_path=workdir) + self.process_monitor.watch(worker) + + workers.append('localhost:%i' % worker.worker_daemon_port) + + worker_helper = MorphProcess( + name='worker-%i-helper' % n, + argv=[DISTBUILD_HELPER], + source_dir=source_dir, + settings={ + 'parent-port': worker.worker_daemon_port, + }, + log_path=workdir + ) + self.process_monitor.watch(worker_helper) + + shared_cache_url = 'http://localhost:%s' % shared_cache.port + + controller = MorphListenerProcess( + name='controller', + # Order is significant -- helper-port must be first! + port_names=['controller-helper-port', 'controller-initiator-port'], + argv=[MORPH, 'controller-daemon'], + source_dir=source_dir, + settings={ + 'artifact-cache-server': shared_cache_url, + 'controller-initiator-address': 'localhost', + 'morph-instance': worker_morph, + 'worker': ','.join(workers), + 'worker-cache-server-port': worker_cache.port, + 'writeable-cache-server': shared_cache_url, + }, + log_path=workdir) + self.process_monitor.watch(controller) + + controller_helper = MorphProcess( + name='controller-helper', + argv=[DISTBUILD_HELPER], + source_dir=source_dir, + settings={ + 'parent-port': controller.controller_helper_port, + }, + log_path=workdir + ) + self.process_monitor.watch(controller_helper) + + # Need to wait for controller-helper to connect to controller. + time.sleep(0.1) + self.process_monitor.check_all() + + return controller + + +DistbuildTestHarness().run() -- cgit v1.2.1