#!/usr/bin/env python # Copyright (C) 2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. '''Run a local instance of a Morph distributed build network. To use: scripts/run-distbuild This will test your system-installed version of Morph. During development you probably want to use `--source` to run Morph from a source checkout instead: scripts/run-distbuild --source=. If you want more info on what is going on, try: scripts/run-distbuild --log=/dev/stdout All distbuild subprocesses will be shut down on TERM signal or if any of them crash. Logs and build artifacts will be stored in a newly created temporary directory. You can specify a different location with the --datadir argument. The directory will not be deleted when the process exits. You can set the number of workers with `--workers=N` (default is 1). ** NOTE: there are currently some bugs in Morph which cause various errors ** when more than one worker is running on the system at once. ''' DISTBUILD_HELPER = 'distbuild-helper' MORPH = 'morph' MORPH_CACHE_SERVER = 'morph-cache-server' import os import pipes import subprocess import tempfile import time from logging import debug, info import cliapp def subdir(workdir, *path_components): '''Create a subdirectory and return the path to it.''' path = os.path.join(workdir, *path_components) os.makedirs(path) return path def add_to_environment(env=None, **new_values): '''Extend 'env' with 'new_values'. If 'env' is None, the environment of the current process is used. If any of 'new_values' are present in 'env' already, an exception is raised. ''' env = env or os.environ.copy() for k, v in new_values.iteritems(): if k in env: # Sorry ... there's no 'one size fits all' strategy for merging # environment variables so just add code for whatever special # case you need to handle, if you hit this. raise NotImplementedError( 'Unable to merge %s=%s with existing value of %s', k, v, env[k]) else: env[k] = v return env class Process(subprocess.Popen): '''A running subprocess.''' def __init__(self, name, argv, **kwargs): '''Start a new subprocess, using subprocess.Popen. The 'name' parameter is only used internally. The 'argv' parameter specifies the commandline to be run. ''' self.name = name self.argv = argv info('%s commandline: %s' % (name, ' '.join(argv))) super(Process, self).__init__(argv, **kwargs) info('%s process ID: %s' % (name, self.pid)) class CliappProcess(Process): '''A running subprocess that supports 'cliapp'-style commandline arguments. The 'cliapp' library is used by most Baserock components to give a consistent commandline interface. ''' def __init__(self, name, argv, settings, **kwargs): '''Start a new subprocess, using subprocess.Popen. The 'settings' dict will be formatted as long-form commandline switches and added to 'argv'. ''' self.settings = settings assert 'config' not in settings, \ "The configuration file %s may interfere with settings passed " \ "through the commandline." full_argv = argv + self._format_settings(settings) super(CliappProcess, self).__init__(name, full_argv, **kwargs) def _format_settings(self, arg_dict): def as_string(key, value): if value is True: return '--%s' % key elif value is False: return None else: return '--%s=%s' % (key, value) def not_null(item): return (item is not None) strings = [as_string(k, v) for k, v in arg_dict.iteritems()] return sorted(filter(not_null, strings)) class MorphProcess(CliappProcess): '''A running instance of Morph, morph-cache-server or distbuild-helper.''' def __init__(self, name, argv, settings, log_path=None, source_dir=None, **kwargs): '''Start an instance of Morph, morph-cache-server or distbuild-helper. The logs will be sent to a file called '$log_path/morph-$name.log', if log_path is passed. If source_dir is passed, it is prepended to argv[0] and added to PYTHONPATH. This is to allow running Morph from its source tree easily. ''' if log_path is not None: settings['log'] = os.path.join(log_path, 'morph-%s.log' % name) if source_dir is not None: argv[0] = os.path.join(source_dir, argv[0]) kwargs['env'] = add_to_environment( kwargs.get('env'), PYTHONPATH=source_dir) settings['no-default-config'] = True super(MorphProcess, self).__init__(name, argv, settings, **kwargs) class MorphListenerProcess(MorphProcess): '''A running instance of a Morph or morph-cache-server daemon process.''' def __init__(self, name, port_names, argv, settings, **kwargs): '''Start and wait for an instance of Morph or morph-cache-server. Using --port-file arguments, the constructor will wait for each port listed in 'port_names' to become ready before returning. The subprocess will pick a random available port number for each port. The numbers be accessible as attributes on this class. For example, if you pass port_names=['worker-daemon-port'], the subprocess will receive two extra commandline arguments: --worker-daemon-port=0 and --worker-daemon-port-file=xxx After construction, self.worker_daemon_port will contain the number of the port that it is listening on. ''' for port_name in port_names: self._setup_port_fifo(port_name, settings) super(MorphListenerProcess, self).__init__( name, argv, settings, **kwargs) for port_name in port_names: port_number = self._read_port_fifo(port_name, settings) info('%s: %s port is %s' % (name, port_name, port_number)) port_attr = port_name.replace('-', '_') setattr(self, port_attr, port_number) def _setup_port_fifo(self, port_name, settings): tempdir = tempfile.mkdtemp() port_file = os.path.join(tempdir, '%s.port' % (port_name)) os.mkfifo(port_file) # Note that Python passes dicts by reference, so this modifies the # dict that was passed in. settings['%s' % port_name] = 0 settings['%s-file' % port_name] = port_file def _read_port_fifo(self, port_name, settings): port_file = settings['%s-file' % port_name] # The open() call will block until the daemon process opens the other # end of this FIFO. It should do this only once it is listening for # connections on this port. Thus, we block until the process is ready, # which is very important for avoiding races. debug('Read: %s' % port_file) with open(port_file, 'r') as f: port_number = int(f.readline()) os.unlink(port_file) os.rmdir(os.path.dirname(port_file)) return port_number class MorphCacheServerProcess(MorphListenerProcess): '''A morph-cache-server process.''' def __init__(self, name, cache_path, log_path=None, source_dir=None, enable_writes=False): '''Start a morph-cache-server process.''' ports = ['port'] argv = [MORPH_CACHE_SERVER] settings = { 'artifact-dir': subdir(cache_path, 'artifacts'), 'enable-writes': enable_writes, 'no-fcgi': True, 'repo-dir': subdir(cache_path, 'gits'), } super(MorphCacheServerProcess, self).__init__( name, ports, argv, settings, log_path=log_path, source_dir=source_dir, stderr=subprocess.PIPE) class MorphWorkerDaemonProcess(MorphListenerProcess): '''A `morph worker-daemon` process.''' def __init__(self, name, cache_server, log_path=None, source_dir=None): '''Start a `morph worker-daemon` instance.''' ports = ['worker-daemon-port'] argv = [MORPH, 'worker-daemon'] settings = { 'artifact-cache-server': 'http://localhost:%s' % cache_server.port, } super(MorphWorkerDaemonProcess, self).__init__( name, ports, argv, settings, log_path=log_path, source_dir=source_dir) class ProcessMonitor(object): '''A tool for managing a bunch of subprocesses.''' def __init__(self): self.process = {} def watch(self, process): '''Start monitoring a running process.''' assert process.name not in self.process self.process[process.name] = process def check_all(self): '''Check all processes are running.''' failed = [] for name, p in self.process.iteritems(): if p.poll() != None: failed.append(p) if len(failed) > 0: def message(p): return '%s: exited with code %s' % (p.name, p.returncode) raise Exception('\n'.join(message(p) for p in failed)) def terminate_all(self): '''Send TERM signal to all active subprocesses.''' for p in self.process.itervalues(): if p.poll() == None: p.terminate() info('Waiting for process %i' % p.pid) p.wait() class DistbuildTestHarness(cliapp.Application): '''Harness for running a distbuild network on a single machine.''' def __init__(self): super(DistbuildTestHarness, self).__init__() self.process_monitor = ProcessMonitor() def add_settings(self): self.settings.string( ['datadir'], 'location to cache gits and artifacts, and write log files') self.settings.string( ['port-file'], 'write port used by initiator to FILE, when ready') self.settings.string( ['source'], 'Path to a checkout of morph.git, to be used instead of the ' 'installed system version of Morph', default=None) # FIXME: I'd rather the default be more than one worker, as it is a # better test for the controller code, but some bugs in Morph currently # prevent having more than one worker on the same machine. self.settings.integer( ['workers'], 'number of workers to start', default=1) def process_args(self, args): '''Run a distbuild network.''' if len(args) != 0: raise cliapp.AppException( 'This program does not take any arguments.') try: datadir = self.settings['datadir'] or tempfile.mkdtemp() worker_cache, shared_cache = self.start_cache_servers(datadir) controller = self.start_distbuild_network( datadir, worker_cache, shared_cache, source_dir=self.settings['source'], n_workers=self.settings['workers']) if self.settings['port-file']: with open(self.settings['port-file'], 'w') as f: f.write('%s' % controller.controller_initiator_port) print('Distbuild controller listening on port %i' % controller.controller_initiator_port) print('Data in %s' % datadir) print('\nTo use: morph distbuild ' '--controller-initiator-address=localhost ' '--controller-initiator-port=%i FILE' % controller.controller_initiator_port) # Run until we get a TERM signal. It would be neater to use # distbuild.SubprocessEventSource rather than time.sleep(1) in # this main loop. But it would be more complicated to implement. while True: time.sleep(1) self.process_monitor.check_all() finally: self.process_monitor.terminate_all() def start_cache_servers(self, workdir, source_dir=None): '''Start necessary artifact cache servers. There needs to be a shared artifact cache server. In a normal distbuild setup this is part of the Trove system. There is a separate cache server for all the workers. In a real distbuild setup, each worker machine runs its own instance of morph-cache-server. The controller uses the same port number for all workers so in this test harness all workers will have to share one cache-server process. It's not possible to use a single cache server process at present, because when the /fetch method of the shared cache server is called, it will break because it can't fetch stuff from itself. ''' worker_cache = MorphCacheServerProcess( name='worker-cache-server', source_dir=source_dir, cache_path=subdir(workdir, 'worker-cache'), log_path=workdir, enable_writes=False) self.process_monitor.watch(worker_cache) shared_cache = MorphCacheServerProcess( name='shared-cache-server', source_dir=source_dir, cache_path=subdir(workdir, 'shared-cache'), log_path=workdir, enable_writes=True) self.process_monitor.watch(shared_cache) return worker_cache, shared_cache def create_worker_morph_wrapper_script(self, workdir, source_dir=None): '''Create a wrapper script for Morph that distbuild processes will use. The distbuild controller and worker daemons coordinate the work being done by distbuild, but the actual work of calculating the build graph and running build commands is done by Morph subprocesses started by the daemon helper processes. By default they will call the system installed Morph (/usr/bin/morph) with fixed arguments. We can cause them to use a different Morph program by passing `--morph-instance` to the controller. However, to pass extra arguments we need to create a wrapper script and pass in the path to that. ''' path = os.path.join(workdir, 'worker-morph') if source_dir: morph = os.path.join(source_dir, MORPH) else: morph = MORPH cache_dir = os.path.join(workdir, 'worker-cache') log = os.path.join(workdir, 'morph.log') with open(path, 'w') as f: f.write('#!/bin/sh\n') if source_dir: f.write('export PYTHONPATH="$PYTHONPATH":%s\n' % pipes.quote(source_dir)) f.write('%s --cachedir=%s --log=%s "$@"\n' % ( pipes.quote(morph), pipes.quote(cache_dir), pipes.quote(log))) os.chmod(path, 0755) return path def start_distbuild_network(self, workdir, worker_cache, shared_cache, source_dir=None, n_workers=4): '''Start Morph distbuild daemons and helper processes. This starts a `morph controller-daemon` process, and one or more `morph worker-daemon` processes. It also starts the helper process that these need. It returns the controller process, which is the one you need to talk to if you want to start a build. ''' worker_morph = self.create_worker_morph_wrapper_script( workdir, source_dir=source_dir) workers = [] for n in range(0, n_workers): worker = MorphWorkerDaemonProcess( name='worker-%i' % n, source_dir=source_dir, cache_server=worker_cache, log_path=workdir) self.process_monitor.watch(worker) workers.append('localhost:%i' % worker.worker_daemon_port) worker_helper = MorphProcess( name='worker-%i-helper' % n, argv=[DISTBUILD_HELPER], source_dir=source_dir, settings={ 'parent-port': worker.worker_daemon_port, }, log_path=workdir ) self.process_monitor.watch(worker_helper) shared_cache_url = 'http://localhost:%s' % shared_cache.port controller = MorphListenerProcess( name='controller', # Order is significant -- helper-port must be first! port_names=['controller-helper-port', 'controller-initiator-port'], argv=[MORPH, 'controller-daemon'], source_dir=source_dir, settings={ 'artifact-cache-server': shared_cache_url, 'controller-initiator-address': 'localhost', 'morph-instance': worker_morph, 'worker': ','.join(workers), 'worker-cache-server-port': worker_cache.port, 'writeable-cache-server': shared_cache_url, }, log_path=workdir) self.process_monitor.watch(controller) controller_helper = MorphProcess( name='controller-helper', argv=[DISTBUILD_HELPER], source_dir=source_dir, settings={ 'parent-port': controller.controller_helper_port, }, log_path=workdir ) self.process_monitor.watch(controller_helper) # Need to wait for controller-helper to connect to controller. time.sleep(0.1) self.process_monitor.check_all() return controller DistbuildTestHarness().run()