summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Thursfield <sam.thursfield@codethink.co.uk>2015-02-13 17:25:31 +0000
committerBaserock Gerrit <gerrit@baserock.org>2015-06-17 10:36:09 +0000
commit7a8a2eda8ecb7c36d789af49eab1b7c3c853e461 (patch)
tree97c6d1bfe9f9721ee1e056718a5a8f66e5db9a63
parentc933c6a5791b814996f64d09ba6589818ffab6e3 (diff)
downloadmorph-7a8a2eda8ecb7c36d789af49eab1b7c3c853e461.tar.gz
Add script to run a distbuild network on the local machine
Change-Id: I8d3f3ec6a1796b06b32e43dc4839360ff1cc2d86
-rwxr-xr-xscripts/run-distbuild523
1 files changed, 523 insertions, 0 deletions
diff --git a/scripts/run-distbuild b/scripts/run-distbuild
new file mode 100755
index 00000000..678fd20d
--- /dev/null
+++ b/scripts/run-distbuild
@@ -0,0 +1,523 @@
+#!/usr/bin/env python
+# Copyright (C) 2015 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+'''Run a local instance of a Morph distributed build network.
+
+To use:
+
+ scripts/run-distbuild
+
+This will test your system-installed version of Morph. During development you
+probably want to use `--source` to run Morph from a source checkout instead:
+
+ scripts/run-distbuild --source=.
+
+If you want more info on what is going on, try:
+
+ scripts/run-distbuild --log=/dev/stdout
+
+All distbuild subprocesses will be shut down on TERM signal or if any of them
+crash.
+
+Logs and build artifacts will be stored in a newly created temporary directory.
+You can specify a different location with the --datadir argument. The directory
+will not be deleted when the process exits.
+
+You can set the number of workers with `--workers=N` (default is 1).
+
+** NOTE: there are currently some bugs in Morph which cause various errors
+** when more than one worker is running on the system at once.
+
+'''
+
+
+DISTBUILD_HELPER = 'distbuild-helper'
+MORPH = 'morph'
+MORPH_CACHE_SERVER = 'morph-cache-server'
+
+
+import os
+import pipes
+import subprocess
+import tempfile
+import time
+from logging import debug, info
+
+import cliapp
+
+
+def subdir(workdir, *path_components):
+ '''Create a subdirectory and return the path to it.'''
+ path = os.path.join(workdir, *path_components)
+ os.makedirs(path)
+ return path
+
+
+def add_to_environment(env=None, **new_values):
+ '''Extend 'env' with 'new_values'.
+
+ If 'env' is None, the environment of the current process is used.
+
+ If any of 'new_values' are present in 'env' already, an exception is
+ raised.
+
+ '''
+ env = env or os.environ.copy()
+ for k, v in new_values.iteritems():
+ if k in env:
+ # Sorry ... there's no 'one size fits all' strategy for merging
+ # environment variables so just add code for whatever special
+ # case you need to handle, if you hit this.
+ raise NotImplementedError(
+ 'Unable to merge %s=%s with existing value of %s',
+ k, v, env[k])
+ else:
+ env[k] = v
+ return env
+
+
+class Process(subprocess.Popen):
+ '''A running subprocess.'''
+
+ def __init__(self, name, argv, **kwargs):
+ '''Start a new subprocess, using subprocess.Popen.
+
+ The 'name' parameter is only used internally.
+
+ The 'argv' parameter specifies the commandline to be run.
+
+ '''
+ self.name = name
+ self.argv = argv
+
+ info('%s commandline: %s' % (name, ' '.join(argv)))
+ super(Process, self).__init__(argv, **kwargs)
+ info('%s process ID: %s' % (name, self.pid))
+
+
+class CliappProcess(Process):
+ '''A running subprocess that supports 'cliapp'-style commandline arguments.
+
+ The 'cliapp' library is used by most Baserock components to give a
+ consistent commandline interface.
+
+ '''
+
+ def __init__(self, name, argv, settings, **kwargs):
+ '''Start a new subprocess, using subprocess.Popen.
+
+ The 'settings' dict will be formatted as long-form commandline switches
+ and added to 'argv'.
+
+ '''
+ self.settings = settings
+
+ assert 'config' not in settings, \
+ "The configuration file %s may interfere with settings passed " \
+ "through the commandline."
+
+ full_argv = argv + self._format_settings(settings)
+
+ super(CliappProcess, self).__init__(name, full_argv, **kwargs)
+
+ def _format_settings(self, arg_dict):
+ def as_string(key, value):
+ if value is True:
+ return '--%s' % key
+ elif value is False:
+ return None
+ else:
+ return '--%s=%s' % (key, value)
+
+ def not_null(item):
+ return (item is not None)
+
+ strings = [as_string(k, v) for k, v in arg_dict.iteritems()]
+ return sorted(filter(not_null, strings))
+
+
+class MorphProcess(CliappProcess):
+ '''A running instance of Morph, morph-cache-server or distbuild-helper.'''
+
+ def __init__(self, name, argv, settings, log_path=None, source_dir=None,
+ **kwargs):
+ '''Start an instance of Morph, morph-cache-server or distbuild-helper.
+
+ The logs will be sent to a file called '$log_path/morph-$name.log', if
+ log_path is passed.
+
+ If source_dir is passed, it is prepended to argv[0] and added to
+ PYTHONPATH. This is to allow running Morph from its source tree easily.
+
+ '''
+ if log_path is not None:
+ settings['log'] = os.path.join(log_path, 'morph-%s.log' % name)
+
+ if source_dir is not None:
+ argv[0] = os.path.join(source_dir, argv[0])
+ kwargs['env'] = add_to_environment(
+ kwargs.get('env'), PYTHONPATH=source_dir)
+
+ settings['no-default-config'] = True
+
+ super(MorphProcess, self).__init__(name, argv, settings, **kwargs)
+
+
+class MorphListenerProcess(MorphProcess):
+ '''A running instance of a Morph or morph-cache-server daemon process.'''
+
+ def __init__(self, name, port_names, argv, settings, **kwargs):
+ '''Start and wait for an instance of Morph or morph-cache-server.
+
+ Using --port-file arguments, the constructor will wait for each port
+ listed in 'port_names' to become ready before returning. The subprocess
+ will pick a random available port number for each port. The numbers
+ be accessible as attributes on this class.
+
+ For example, if you pass port_names=['worker-daemon-port'], the
+ subprocess will receive two extra commandline arguments:
+ --worker-daemon-port=0 and --worker-daemon-port-file=xxx
+
+ After construction, self.worker_daemon_port will contain the
+ number of the port that it is listening on.
+
+ '''
+ for port_name in port_names:
+ self._setup_port_fifo(port_name, settings)
+
+ super(MorphListenerProcess, self).__init__(
+ name, argv, settings, **kwargs)
+
+ for port_name in port_names:
+ port_number = self._read_port_fifo(port_name, settings)
+ info('%s: %s port is %s' % (name, port_name, port_number))
+
+ port_attr = port_name.replace('-', '_')
+ setattr(self, port_attr, port_number)
+
+ def _setup_port_fifo(self, port_name, settings):
+ tempdir = tempfile.mkdtemp()
+ port_file = os.path.join(tempdir, '%s.port' % (port_name))
+ os.mkfifo(port_file)
+
+ # Note that Python passes dicts by reference, so this modifies the
+ # dict that was passed in.
+ settings['%s' % port_name] = 0
+ settings['%s-file' % port_name] = port_file
+
+ def _read_port_fifo(self, port_name, settings):
+ port_file = settings['%s-file' % port_name]
+
+ # The open() call will block until the daemon process opens the other
+ # end of this FIFO. It should do this only once it is listening for
+ # connections on this port. Thus, we block until the process is ready,
+ # which is very important for avoiding races.
+ debug('Read: %s' % port_file)
+ with open(port_file, 'r') as f:
+ port_number = int(f.readline())
+
+ os.unlink(port_file)
+ os.rmdir(os.path.dirname(port_file))
+
+ return port_number
+
+
+class MorphCacheServerProcess(MorphListenerProcess):
+ '''A morph-cache-server process.'''
+
+ def __init__(self, name, cache_path, log_path=None, source_dir=None,
+ enable_writes=False):
+ '''Start a morph-cache-server process.'''
+
+ ports = ['port']
+ argv = [MORPH_CACHE_SERVER]
+
+ settings = {
+ 'artifact-dir': subdir(cache_path, 'artifacts'),
+ 'enable-writes': enable_writes,
+ 'no-fcgi': True,
+ 'repo-dir': subdir(cache_path, 'gits'),
+ }
+
+ super(MorphCacheServerProcess, self).__init__(
+ name, ports, argv, settings, log_path=log_path,
+ source_dir=source_dir, stderr=subprocess.PIPE)
+
+
+class MorphWorkerDaemonProcess(MorphListenerProcess):
+ '''A `morph worker-daemon` process.'''
+ def __init__(self, name, cache_server, log_path=None, source_dir=None):
+ '''Start a `morph worker-daemon` instance.'''
+
+ ports = ['worker-daemon-port']
+ argv = [MORPH, 'worker-daemon']
+
+ settings = {
+ 'artifact-cache-server': 'http://localhost:%s' % cache_server.port,
+ }
+
+ super(MorphWorkerDaemonProcess, self).__init__(
+ name, ports, argv, settings, log_path=log_path,
+ source_dir=source_dir)
+
+
+class ProcessMonitor(object):
+ '''A tool for managing a bunch of subprocesses.'''
+
+ def __init__(self):
+ self.process = {}
+
+ def watch(self, process):
+ '''Start monitoring a running process.'''
+
+ assert process.name not in self.process
+
+ self.process[process.name] = process
+
+ def check_all(self):
+ '''Check all processes are running.'''
+ failed = []
+
+ for name, p in self.process.iteritems():
+ if p.poll() != None:
+ failed.append(p)
+
+ if len(failed) > 0:
+ def message(p):
+ return '%s: exited with code %s' % (p.name, p.returncode)
+ raise Exception('\n'.join(message(p) for p in failed))
+
+ def terminate_all(self):
+ '''Send TERM signal to all active subprocesses.'''
+ for p in self.process.itervalues():
+ if p.poll() == None:
+ p.terminate()
+ info('Waiting for process %i' % p.pid)
+ p.wait()
+
+
+class DistbuildTestHarness(cliapp.Application):
+ '''Harness for running a distbuild network on a single machine.'''
+
+ def __init__(self):
+ super(DistbuildTestHarness, self).__init__()
+
+ self.process_monitor = ProcessMonitor()
+
+ def add_settings(self):
+ self.settings.string(
+ ['datadir'],
+ 'location to cache gits and artifacts, and write log files')
+ self.settings.string(
+ ['port-file'],
+ 'write port used by initiator to FILE, when ready')
+ self.settings.string(
+ ['source'],
+ 'Path to a checkout of morph.git, to be used instead of the '
+ 'installed system version of Morph',
+ default=None)
+ # FIXME: I'd rather the default be more than one worker, as it is a
+ # better test for the controller code, but some bugs in Morph currently
+ # prevent having more than one worker on the same machine.
+ self.settings.integer(
+ ['workers'],
+ 'number of workers to start',
+ default=1)
+
+ def process_args(self, args):
+ '''Run a distbuild network.'''
+
+ if len(args) != 0:
+ raise cliapp.AppException(
+ 'This program does not take any arguments.')
+
+ try:
+ datadir = self.settings['datadir'] or tempfile.mkdtemp()
+
+ worker_cache, shared_cache = self.start_cache_servers(datadir)
+
+ controller = self.start_distbuild_network(
+ datadir, worker_cache, shared_cache,
+ source_dir=self.settings['source'],
+ n_workers=self.settings['workers'])
+
+ if self.settings['port-file']:
+ with open(self.settings['port-file'], 'w') as f:
+ f.write('%s' % controller.controller_initiator_port)
+
+ print('Distbuild controller listening on port %i' %
+ controller.controller_initiator_port)
+ print('Data in %s' % datadir)
+
+ print('\nTo use: morph distbuild '
+ '--controller-initiator-address=localhost '
+ '--controller-initiator-port=%i FILE' %
+ controller.controller_initiator_port)
+
+ # Run until we get a TERM signal. It would be neater to use
+ # distbuild.SubprocessEventSource rather than time.sleep(1) in
+ # this main loop. But it would be more complicated to implement.
+ while True:
+ time.sleep(1)
+ self.process_monitor.check_all()
+ finally:
+ self.process_monitor.terminate_all()
+
+ def start_cache_servers(self, workdir, source_dir=None):
+ '''Start necessary artifact cache servers.
+
+ There needs to be a shared artifact cache server. In a normal distbuild
+ setup this is part of the Trove system.
+
+ There is a separate cache server for all the workers. In a real
+ distbuild setup, each worker machine runs its own instance of
+ morph-cache-server. The controller uses the same port number for all
+ workers so in this test harness all workers will have to share one
+ cache-server process.
+
+ It's not possible to use a single cache server process at present,
+ because when the /fetch method of the shared cache server is called, it
+ will break because it can't fetch stuff from itself.
+
+ '''
+ worker_cache = MorphCacheServerProcess(
+ name='worker-cache-server',
+ source_dir=source_dir,
+ cache_path=subdir(workdir, 'worker-cache'),
+ log_path=workdir,
+ enable_writes=False)
+ self.process_monitor.watch(worker_cache)
+
+ shared_cache = MorphCacheServerProcess(
+ name='shared-cache-server',
+ source_dir=source_dir,
+ cache_path=subdir(workdir, 'shared-cache'),
+ log_path=workdir,
+ enable_writes=True)
+ self.process_monitor.watch(shared_cache)
+
+ return worker_cache, shared_cache
+
+ def create_worker_morph_wrapper_script(self, workdir, source_dir=None):
+ '''Create a wrapper script for Morph that distbuild processes will use.
+
+ The distbuild controller and worker daemons coordinate the work being
+ done by distbuild, but the actual work of calculating the build graph
+ and running build commands is done by Morph subprocesses started by
+ the daemon helper processes. By default they will call the system
+ installed Morph (/usr/bin/morph) with fixed arguments.
+
+ We can cause them to use a different Morph program by passing
+ `--morph-instance` to the controller. However, to pass extra arguments
+ we need to create a wrapper script and pass in the path to that.
+
+ '''
+ path = os.path.join(workdir, 'worker-morph')
+
+ if source_dir:
+ morph = os.path.join(source_dir, MORPH)
+ else:
+ morph = MORPH
+
+ cache_dir = os.path.join(workdir, 'worker-cache')
+ log = os.path.join(workdir, 'morph.log')
+
+ with open(path, 'w') as f:
+ f.write('#!/bin/sh\n')
+
+ if source_dir:
+ f.write('export PYTHONPATH="$PYTHONPATH":%s\n' %
+ pipes.quote(source_dir))
+
+ f.write('%s --cachedir=%s --log=%s "$@"\n' % (
+ pipes.quote(morph), pipes.quote(cache_dir), pipes.quote(log)))
+
+ os.chmod(path, 0755)
+ return path
+
+ def start_distbuild_network(self, workdir, worker_cache, shared_cache,
+ source_dir=None, n_workers=4):
+ '''Start Morph distbuild daemons and helper processes.
+
+ This starts a `morph controller-daemon` process, and one or more `morph
+ worker-daemon` processes. It also starts the helper process that these
+ need. It returns the controller process, which is the one you need to
+ talk to if you want to start a build.
+
+ '''
+ worker_morph = self.create_worker_morph_wrapper_script(
+ workdir, source_dir=source_dir)
+
+ workers = []
+ for n in range(0, n_workers):
+ worker = MorphWorkerDaemonProcess(
+ name='worker-%i' % n,
+ source_dir=source_dir,
+ cache_server=worker_cache,
+ log_path=workdir)
+ self.process_monitor.watch(worker)
+
+ workers.append('localhost:%i' % worker.worker_daemon_port)
+
+ worker_helper = MorphProcess(
+ name='worker-%i-helper' % n,
+ argv=[DISTBUILD_HELPER],
+ source_dir=source_dir,
+ settings={
+ 'parent-port': worker.worker_daemon_port,
+ },
+ log_path=workdir
+ )
+ self.process_monitor.watch(worker_helper)
+
+ shared_cache_url = 'http://localhost:%s' % shared_cache.port
+
+ controller = MorphListenerProcess(
+ name='controller',
+ # Order is significant -- helper-port must be first!
+ port_names=['controller-helper-port', 'controller-initiator-port'],
+ argv=[MORPH, 'controller-daemon'],
+ source_dir=source_dir,
+ settings={
+ 'artifact-cache-server': shared_cache_url,
+ 'controller-initiator-address': 'localhost',
+ 'morph-instance': worker_morph,
+ 'worker': ','.join(workers),
+ 'worker-cache-server-port': worker_cache.port,
+ 'writeable-cache-server': shared_cache_url,
+ },
+ log_path=workdir)
+ self.process_monitor.watch(controller)
+
+ controller_helper = MorphProcess(
+ name='controller-helper',
+ argv=[DISTBUILD_HELPER],
+ source_dir=source_dir,
+ settings={
+ 'parent-port': controller.controller_helper_port,
+ },
+ log_path=workdir
+ )
+ self.process_monitor.watch(controller_helper)
+
+ # Need to wait for controller-helper to connect to controller.
+ time.sleep(0.1)
+ self.process_monitor.check_all()
+
+ return controller
+
+
+DistbuildTestHarness().run()