#!/usr/bin/env python # Copyright (C) 2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. '''Run a local instance of a Morph distributed build network. To use: scripts/distbuild run If you want more info on what is going on, try: scripts/distbuild run --log=/dev/stdout All distbuild subprocesses will be shut down on TERM signal or if any of them crash. Logs and build artifacts will be stored in a newly created temporary directory. You can specify a different location with the --datadir argument. The directory will not be deleted when the process exits. ''' # Some hackz if you are running Morph from the source tree. import os os.environ['PYTHONPATH'] = '/src/morph' DISTBUILD_HELPER = '/src/morph/distbuild-helper' MORPH = '/src/morph/morph' MORPH_CACHE_SERVER = '/src/morph/morph-cache-server' import os import select import subprocess import tempfile import time from logging import debug, info import cliapp def subdir(workdir, *path_components): '''Create a subdirectory and return the path to it.''' path = os.path.join(workdir, *path_components) os.makedirs(path) return path class Process(subprocess.Popen): '''A running subprocess.''' def __init__(self, name, argv, settings, manual_start=False, **kwargs): '''Start a new subprocess, using subprocess.Popen. The 'name' parameter is only used internally. The 'argv' parameter specifies the commandline to be run. The 'settings' dict will be formatted as long-form commandline switches and added to 'argv'. ''' self.name = name self.argv = argv self.settings = settings self.manual_start = manual_start full_argv = argv + self._format_settings(settings) if manual_start: # This is useful if you want to interactively debug one of the # processes. print('Start process manually: %s' % ( ' '.join(argv + self._format_settings(settings)))) self.poll = lambda: None self.returncode = None else: info('%s commandline: %s' % (name, ' '.join(full_argv))) super(Process, self).__init__(full_argv, **kwargs) info('%s process ID: %s' % (name, self.pid)) def _format_settings(self, arg_dict): def as_string(key, value): if value is True: return '--%s' % key elif value is False: return '' else: return '--%s=%s' % (key, value) return [as_string(k, v) for k, v in arg_dict.iteritems()] class MorphProcess(Process): '''A running instance of Morph, morph-cache-server or distbuild-helper.''' def __init__(self, name, argv, settings, log_path=None, **kwargs): '''Start an instance of Morph, morph-cache-server or distbuild-helper. The logs will be sent to a file called '$log_path/morph-$name.log', if log_path is passed. ''' if log_path: settings['log'] = os.path.join(log_path, 'morph-%s.log' % name) settings['no-default-config'] = True super(MorphProcess, self).__init__(name, argv, settings, **kwargs) class MorphListenerProcess(MorphProcess): '''A running instance of a Morph or morph-cache-server daemon process.''' def __init__(self, name, port_names, argv, settings, **kwargs): '''Start and wait for an instance of Morph or morph-cache-server. Using --port-file arguments, the constructor will wait for each port listed in 'port_names' to become ready before returning. The subprocess will pick a random available port number for each port. The numbers be accessible as attributes on this class. For example, if you pass port_names=['worker-daemon-port'], the subprocess will receive two extra commandline arguments: --worker-daemon-port=0 and --worker-daemon-port-file=xxx Once the process starts, self.worker_daemon_port will contain the number of the port that it is listening on. ''' for port_name in port_names: self._setup_port_fifo(port_name, settings) super(MorphListenerProcess, self).__init__( name, argv, settings, **kwargs) for port_name in port_names: port_number = self._read_port_fifo(port_name, settings) info('%s: %s port is %s' % (name, port_name, port_number)) port_attr = port_name.replace('-', '_') setattr(self, port_attr, port_number) def _setup_port_fifo(self, port_name, settings): tempdir = tempfile.mkdtemp() port_file = os.path.join(tempdir, '%s.port' % (port_name)) os.mkfifo(port_file) # Note that Python passes dicts by reference, so this modifies the # dict that was passed in. settings['%s' % port_name] = 0 settings['%s-file' % port_name] = port_file def _read_port_fifo(self, port_name, settings): port_file = settings['%s-file' % port_name] debug('Read: %s' % port_file) with open(port_file, 'r') as f: # The readline() call will block until a line of data is written. # The process we are starting should only write out the port number # once the process is listening on that port. Thus, we block until # the process is ready, which is very important for avoiding races. port_number = int(f.readline()) os.unlink(port_file) os.rmdir(os.path.dirname(port_file)) return port_number class MorphCacheServerProcess(MorphListenerProcess): '''A morph-cache-server process.''' def __init__(self, name, cache_path, log_path=None, enable_writes=False): '''Start a morph-cache-server process.''' ports = ['port'] argv = [MORPH_CACHE_SERVER] settings = { 'artifact-dir': subdir(cache_path, 'artifacts'), 'enable-writes': enable_writes, 'no-fcgi': True, 'repo-dir': subdir(cache_path, 'gits'), } stderr_file = os.path.join(log_path, 'morph-%s.stderr' % name) self.stderr = open(stderr_file, 'w') super(MorphCacheServerProcess, self).__init__( name, ports, argv, settings, log_path=log_path, stderr=self.stderr) class MorphWorkerDaemonProcess(MorphListenerProcess): '''A `morph worker-daemon` process.''' def __init__(self, name, cache_server, log_path=None): '''Start a `morph worker-daemon` instance.''' ports = ['worker-daemon-port'] argv = [MORPH, 'worker-daemon'] settings = { 'artifact-cache-server': 'http://localhost:%s' % cache_server.port, } super(MorphWorkerDaemonProcess, self).__init__( name, ports, argv, settings, log_path=log_path) class ProcessMonitor(object): '''A tool for managing a bunch of subprocesses.''' def __init__(self): self.process = {} def watch(self, process): '''Start monitoring a running process.''' if process.name in self.process: raise KeyError('Already watching a process named %s.' % process.name) self.process[process.name] = process def check_all(self): '''Check all processes are running.''' for name, p in self.process.iteritems(): if p.poll() != None: raise Exception( '%s: exited with code %s' % (name, p.returncode)) def terminate_all(self): '''Send TERM signal to all active subprocesses.''' for p in self.process.itervalues(): if p.manual_start: continue if p.poll() == None: p.terminate() info('%s: Waiting for process %i' % (p.name, p.pid)) p.wait() else: info('%s: Process %i already exited with code %i' % ( p.name, p.pid, p.returncode)) class DistbuildTestHarness(cliapp.Application): '''Harness for running a distbuild network on a single machine.''' def __init__(self): super(DistbuildTestHarness, self).__init__() self.process_monitor = ProcessMonitor() def add_settings(self): self.settings.string( ['datadir'], 'location to cache gits and artifacts, and write log files') self.settings.string( ['morph-instance'], 'Path to Morph program that will run worker-build and ' 'serialise-artifact commands') self.settings.string( ['port-file'], 'write port used by initiator to FILE, when ready') self.settings.integer( ['workers'], 'number of workers to start', default=4) def cmd_run(self, args): '''Run a distbuild network.''' try: datadir = self.settings['datadir'] or tempfile.mkdtemp() worker_cache, shared_cache = self.start_cache_servers(datadir) controller = self.start_distbuild_network( datadir, worker_cache, shared_cache, morph_instance=self.settings['morph-instance'], n_workers=self.settings['workers']) if self.settings['port-file']: with open(self.settings['port-file'], 'w') as f: f.write('%s' % controller.controller_initiator_port) print('Distbuild controller listening on port %i' % controller.controller_initiator_port) print('Data in %s' % datadir) print('\nTo use: morph distbuild ' '--controller-initiator-address=localhost ' '--controller-initiator-port=%i FILE' % controller.controller_initiator_port) # Run until we get a TERM signal. while True: select.select([], [], [], 1) self.process_monitor.check_all() finally: self.process_monitor.terminate_all() def cmd_build(self, args): '''Start a distbuild network and run a single build on it.''' to_build = [ ['baserock:baserock/definitions', 'c7292b7c81cdd7e5b9e85722406371748453c44f', 'systems/base-system-x86_64-generic.morph'], ['baserock:baserock/definitions', 'c7292b7c81cdd7e5b9e85722406371748453c44f', 'systems/weston-system-x86_64-generic.morph'], #['baserock:baserock/definitions', # '918e33e27a3c8921c6fcad517e5bbe394cf16168', # 'systems/base-system-x86_64-generic.morph'], #['baserock:baserock/definitions', # '918e33e27a3c8921c6fcad517e5bbe394cf16168', # 'systems/weston-system-x86_64-generic.morph'] ] try: datadir = self.settings['datadir'] or tempfile.mkdtemp() worker_cache, shared_cache = self.start_cache_servers(datadir) controller = self.start_distbuild_network( datadir, worker_cache, shared_cache, morph_instance=self.settings['morph-instance'], n_workers=self.settings['workers']) print('Data in %s' % datadir) initiators = [] initiator_monitor = ProcessMonitor() initiator_log_path = subdir(datadir, 'builds') try: main_initiator = self.start_build( 'build-main', controller, to_build[1], log_path=subdir(datadir, 'main')) initiators.append(main_initiator) initiator_monitor.watch(main_initiator) # Start a bunch of builds, cancelling them after 90 seconds. for i in range(0,12): initiator = self.start_build( 'build-%i' % i, controller, to_build[0], log_path=subdir(datadir, str(i))) time.sleep(90) print('Cancel initiator %s' % initiator) initiator.terminate() initiator.wait() while not all(initiator.poll() for initiator in initiators): select.select([], [], [], 0.1) self.process_monitor.check_all() if any(initiator.returncode != 0 for initiator in initiators): print('Initiator fail') else: print('Success!') finally: initiator_monitor.terminate_all() finally: self.process_monitor.terminate_all() print('Test state kept in %s' % datadir) def start_cache_servers(self, workdir): # We have to make a bit of a kludge here. In a real distbuild setup, # each worker machine runs its own instance of morph-cache-server on # port 8080. The controller uses the same port number for all workers # so in this test harness all workers will have to share one # cache-server process. # # That's fine, but we still need a separate process for the *shared* # artifact cache: artifacts are copied from workers to the shared cache # using the /fetch method, which just wouldn't work if it had to fetch # from itself. worker_cache = MorphCacheServerProcess( name='worker-cache-server', cache_path=subdir(workdir, 'worker-cache'), log_path=workdir, enable_writes=False) self.process_monitor.watch(worker_cache) shared_cache = MorphCacheServerProcess( name='shared-cache-server', cache_path=subdir(workdir, 'shared-cache'), log_path=workdir, enable_writes=True) self.process_monitor.watch(shared_cache) return worker_cache, shared_cache def start_distbuild_network(self, workdir, worker_cache, shared_cache, morph_instance=None, n_workers=4): '''Start Morph distbuild daemons and helper processes. This starts a `morph controller-daemon` process, and one or more `morph worker-daemon` processes. It also starts the helper process that these need. It returns the controller process, which is the one you need to talk to if you want to start a build. ''' if not morph_instance: # Create a wrapper script for the Morph that will be used to run # `serialise-artifact` and `worker-build` commands, so we can pass # it the commandline arguments it neds. worker_morph = os.path.join(workdir, 'worker-morph') with open(worker_morph, 'w') as f: cache_dir = os.path.join(workdir, 'worker-cache') log = os.path.join(workdir, 'morph.log') f.write('#!/bin/sh\n') f.write('%s --cachedir=%s --log=%s $@\n' % ( MORPH, cache_dir, log)) os.chmod(worker_morph, 0755) morph_instance = worker_morph workers = [] for n in range(0, n_workers): worker = MorphWorkerDaemonProcess( name='worker-%i' % n, cache_server=worker_cache, log_path=workdir) self.process_monitor.watch(worker) workers.append('localhost:%i' % worker.worker_daemon_port) worker_helper = MorphProcess( name='worker-%i-helper' % n, argv=[DISTBUILD_HELPER], settings={ 'parent-port': worker.worker_daemon_port, }, log_path=workdir ) self.process_monitor.watch(worker_helper) shared_cache_url = 'http://localhost:%s' % shared_cache.port controller = MorphListenerProcess( name='controller', # Order is significant -- helper-port must be first! port_names=['controller-helper-port', 'controller-initiator-port'], argv=[MORPH, 'controller-daemon'], settings={ 'controller-initiator-address': 'localhost', 'morph-instance': morph_instance, 'worker': ','.join(workers), 'worker-cache-server-port': worker_cache.port, 'artifact-cache-server': shared_cache_url, 'writeable-cache-server': shared_cache_url, }, log_path=workdir) self.process_monitor.watch(controller) controller_helper = MorphProcess( name='controller-helper', argv=[DISTBUILD_HELPER], settings={ 'parent-port': controller.controller_helper_port, }, log_path=workdir ) self.process_monitor.watch(controller_helper) # Need to wait for controller-helper to connect to controller. time.sleep(0.1) self.process_monitor.check_all() return controller def start_build(self, name, controller, to_build, log_path): port = controller.controller_initiator_port return MorphProcess( name=name, argv=[MORPH, 'distbuild-morphology'] + to_build, settings={ 'controller-initiator-address': 'localhost', 'controller-initiator-port': port, 'initiator-step-output-dir': log_path, }, log_path=log_path) DistbuildTestHarness().run()