summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Thursfield <sam.thursfield@codethink.co.uk>2015-02-13 17:25:31 +0000
committerAdam Coldrick <adam.coldrick@codethink.co.uk>2015-03-10 15:13:34 +0000
commit61596fe3e0e305bf95c3c5c99213cceca2c7a6e3 (patch)
tree0a216ce9e1fdb9f13a6321cec64cb7a393879cb1
parent8fcf5f3c68e24b96347aee0819ab18cf97b48721 (diff)
downloadmorph-61596fe3e0e305bf95c3c5c99213cceca2c7a6e3.tar.gz
Add script to run a distbuild network on the local machine
This extends the work Richard Maw did on getting distbuild to run locally. It's now possible to interact with the local distbuild network, where before it could only be used as part of Yarn tests. It's also easy to have more than a single worker now.
-rwxr-xr-xscripts/distbuild402
1 files changed, 402 insertions, 0 deletions
diff --git a/scripts/distbuild b/scripts/distbuild
new file mode 100755
index 00000000..97798831
--- /dev/null
+++ b/scripts/distbuild
@@ -0,0 +1,402 @@
+#!/usr/bin/env python
+# Copyright (C) 2015 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+'''Run a local instance of a Morph distributed build network.
+
+To use:
+
+ scripts/distbuild run
+
+If you want more info on what is going on, try:
+
+ scripts/distbuild run --log=/dev/stdout
+
+All distbuild subprocesses will be shut down on TERM signal or if any of them
+crash.
+
+Logs and build artifacts will be stored in a newly created temporary directory.
+You can specify a different location with the --datadir argument. The directory
+will not be deleted when the process exits.
+
+'''
+
+
+DISTBUILD_HELPER = 'distbuild-helper'
+MORPH = 'morph'
+MORPH_CACHE_SERVER = 'morph-cache-server'
+
+
+import os
+import select
+import subprocess
+import tempfile
+import time
+from logging import debug, info
+
+import cliapp
+
+
+def subdir(workdir, *path_components):
+ '''Create a subdirectory and return the path to it.'''
+ path = os.path.join(workdir, *path_components)
+ os.makedirs(path)
+ return path
+
+
+class Process(subprocess.Popen):
+ '''A running subprocess.'''
+
+ def __init__(self, name, argv, settings, **kwargs):
+ '''Start a new subprocess, using subprocess.Popen.
+
+ The 'name' parameter is only used internally.
+
+ The 'argv' parameter specifies the commandline to be run. The
+ 'settings' dict will be formatted as long-form commandline switches and
+ added to 'argv'.
+
+ '''
+ self.name = name
+ self.argv = argv
+ self.settings = settings
+
+ full_argv = argv + self._format_settings(settings)
+ info('%s commandline: %s' % (name, ' '.join(full_argv)))
+ super(Process, self).__init__(full_argv, **kwargs)
+ info('%s process ID: %s' % (name, self.pid))
+
+ def _format_settings(self, arg_dict):
+ def as_string(key, value):
+ if value is True:
+ return '--%s' % key
+ elif value is False:
+ return ''
+ else:
+ return '--%s=%s' % (key, value)
+ return [as_string(k, v) for k, v in arg_dict.iteritems()]
+
+
+class MorphProcess(Process):
+ '''A running instance of Morph, morph-cache-server or distbuild-helper.'''
+
+ def __init__(self, name, argv, settings, log_path=None, **kwargs):
+ '''Start an instance of Morph, morph-cache-server or distbuild-helper.
+
+ The logs will be sent to a file called '$log_path/morph-$name.log', if
+ log_path is passed.
+
+ '''
+ if log_path:
+ settings['log'] = os.path.join(log_path, 'morph-%s.log' % name)
+ settings['no-default-config'] = True
+
+ super(MorphProcess, self).__init__(name, argv, settings, **kwargs)
+
+
+class MorphListenerProcess(MorphProcess):
+ '''A running instance of a Morph or morph-cache-server daemon process.'''
+
+ def __init__(self, name, port_names, argv, settings, **kwargs):
+ '''Start and wait for an instance of Morph or morph-cache-server.
+
+ Using --port-file arguments, the constructor will wait for each port
+ listed in 'port_names' to become ready before returning. The subprocess
+ will pick a random available port number for each port. The numbers
+ be accessible as attributes on this class.
+
+ For example, if you pass port_names=['worker-daemon-port'], the
+ subprocess will receive two extra commandline arguments:
+ --worker-daemon-port=0 and --worker-daemon-port-file=xxx
+
+ Once the process starts, self.worker_daemon_port will contain the
+ number of the port that it is listening on.
+
+ '''
+ for port_name in port_names:
+ self._setup_port_fifo(port_name, settings)
+
+ super(MorphListenerProcess, self).__init__(
+ name, argv, settings, **kwargs)
+
+ for port_name in port_names:
+ port_number = self._read_port_fifo(port_name, settings)
+ info('%s: %s port is %s' % (name, port_name, port_number))
+
+ port_attr = port_name.replace('-', '_')
+ setattr(self, port_attr, port_number)
+
+ def _setup_port_fifo(self, port_name, settings):
+ tempdir = tempfile.mkdtemp()
+ port_file = os.path.join(tempdir, '%s.port' % (port_name))
+ os.mkfifo(port_file)
+
+ # Note that Python passes dicts by reference, so this modifies the
+ # dict that was passed in.
+ settings['%s' % port_name] = 0
+ settings['%s-file' % port_name] = port_file
+
+ def _read_port_fifo(self, port_name, settings):
+ port_file = settings['%s-file' % port_name]
+
+ debug('Read: %s' % port_file)
+ with open(port_file, 'r') as f:
+ # The readline() call will block until a line of data is written.
+ # The process we are starting should only write out the port number
+ # once the process is listening on that port. Thus, we block until
+ # the process is ready, which is very important for avoiding races.
+ port_number = int(f.readline())
+
+ os.unlink(port_file)
+ os.rmdir(os.path.dirname(port_file))
+
+ return port_number
+
+
+class MorphCacheServerProcess(MorphListenerProcess):
+ '''A morph-cache-server process.'''
+
+ def __init__(self, name, cache_path, log_path=None, enable_writes=False):
+ '''Start a morph-cache-server process.'''
+
+ ports = ['port']
+ argv = [MORPH_CACHE_SERVER]
+
+ settings = {
+ 'artifact-dir': subdir(cache_path, 'artifacts'),
+ 'enable-writes': enable_writes,
+ 'no-fcgi': True,
+ 'repo-dir': subdir(cache_path, 'gits'),
+ }
+
+ super(MorphCacheServerProcess, self).__init__(
+ name, ports, argv, settings, log_path=log_path,
+ stderr=subprocess.PIPE)
+
+
+class MorphWorkerDaemonProcess(MorphListenerProcess):
+ '''A `morph worker-daemon` process.'''
+ def __init__(self, name, cache_server, log_path=None):
+ '''Start a `morph worker-daemon` instance.'''
+
+ ports = ['worker-daemon-port']
+ argv = [MORPH, 'worker-daemon']
+
+ settings = {
+ 'artifact-cache-server': 'http://localhost:%s' % cache_server.port,
+ }
+
+ super(MorphWorkerDaemonProcess, self).__init__(
+ name, ports, argv, settings, log_path=log_path)
+
+
+class ProcessMonitor(object):
+ '''A tool for managing a bunch of subprocesses.'''
+
+ def __init__(self):
+ self.process = {}
+
+ def watch(self, process):
+ '''Start monitoring a running process.'''
+ self.process[process.name] = process
+
+ def check_all(self):
+ '''Check all processes are running.'''
+ for name, p in self.process.iteritems():
+ if p.poll() != None:
+ raise Exception(
+ '%s: exited with code %s' % (name, p.returncode))
+
+ def terminate_all(self):
+ '''Send TERM signal to all active subprocesses.'''
+ for p in self.process.itervalues():
+ if p.poll() == None:
+ p.terminate()
+ info('Waiting for process %i' % p.pid)
+ p.wait()
+
+
+class DistbuildTestHarness(cliapp.Application):
+ '''Harness for running a distbuild network on a single machine.'''
+
+ def __init__(self):
+ super(DistbuildTestHarness, self).__init__()
+
+ self.process_monitor = ProcessMonitor()
+
+ def add_settings(self):
+ self.settings.string(
+ ['datadir'],
+ 'location to cache gits and artifacts, and write log files')
+ self.settings.string(
+ ['morph-instance'],
+ 'Path to Morph program that will run worker-build and '
+ 'serialise-artifact commands')
+ self.settings.string(
+ ['port-file'],
+ 'write port used by initiator to FILE, when ready')
+ self.settings.integer(
+ ['workers'],
+ 'number of workers to start',
+ default=4)
+
+ def cmd_run(self, args):
+ '''Run a distbuild network.'''
+
+ try:
+ datadir = self.settings['datadir'] or tempfile.mkdtemp()
+
+ worker_cache, shared_cache = self.start_cache_servers(datadir)
+
+ controller = self.start_distbuild_network(
+ datadir, worker_cache, shared_cache,
+ morph_instance=self.settings['morph-instance'],
+ n_workers=self.settings['workers'])
+
+ if self.settings['port-file']:
+ with open(self.settings['port-file'], 'w') as f:
+ f.write('%s' % controller.controller_initiator_port)
+
+ print('Distbuild controller listening on port %i' %
+ controller.controller_initiator_port)
+ print('Data in %s' % datadir)
+
+ print('\nTo use: morph distbuild '
+ '--controller-initiator-address=localhost '
+ '--controller-initiator-port=%i FILE' %
+ controller.controller_initiator_port)
+
+ # Run until we get a TERM signal.
+ while True:
+ select.select([], [], [], 1)
+ self.process_monitor.check_all()
+ finally:
+ self.process_monitor.terminate_all()
+
+ def start_cache_servers(self, workdir):
+ '''Start necessary artifact cache servers.
+
+ There needs to be a shared artifact cache server. In a normal distbuild
+ setup this is part of the Trove system.
+
+ There is a separate cache server for all the workers. In a real
+ distbuild setup, each worker machine runs its own instance of
+ morph-cache-server. The controller uses the same port number for all
+ workers so in this test harness all workers will have to share one
+ cache-server process.
+
+ It's not possible to use a single cache server process at present,
+ because when the /fetch method of the shared cache server is called, it
+ will break because it can't fetch stuff from itself.
+
+ '''
+ worker_cache = MorphCacheServerProcess(
+ name='worker-cache-server',
+ cache_path=subdir(workdir, 'worker-cache'),
+ log_path=workdir,
+ enable_writes=False)
+ self.process_monitor.watch(worker_cache)
+
+ shared_cache = MorphCacheServerProcess(
+ name='shared-cache-server',
+ cache_path=subdir(workdir, 'shared-cache'),
+ log_path=workdir,
+ enable_writes=True)
+ self.process_monitor.watch(shared_cache)
+
+ return worker_cache, shared_cache
+
+ def start_distbuild_network(self, workdir, worker_cache, shared_cache,
+ morph_instance=None, n_workers=4):
+ '''Start Morph distbuild daemons and helper processes.
+
+ This starts a `morph controller-daemon` process, and one or more `morph
+ worker-daemon` processes. It also starts the helper process that these
+ need. It returns the controller process, which is the one you need to
+ talk to if you want to start a build.
+
+ '''
+
+ if not morph_instance:
+ # Create a wrapper script for the Morph that will be used to run
+ # `serialise-artifact` and `worker-build` commands, so we can pass
+ # it the commandline arguments it neds.
+ worker_morph = os.path.join(workdir, 'worker-morph')
+
+ with open(worker_morph, 'w') as f:
+ cache_dir = os.path.join(workdir, 'worker-cache')
+ log = os.path.join(workdir, 'morph.log')
+ f.write('#!/bin/sh\n')
+ f.write('%s --cachedir=%s --log=%s $@\n' % (
+ MORPH, cache_dir, log))
+
+ os.chmod(worker_morph, 0755)
+ morph_instance = worker_morph
+
+ workers = []
+ for n in range(0, n_workers):
+ worker = MorphWorkerDaemonProcess(
+ name='worker-%i' % n,
+ cache_server=worker_cache,
+ log_path=workdir)
+ self.process_monitor.watch(worker)
+
+ workers.append('localhost:%i' % worker.worker_daemon_port)
+
+ worker_helper = MorphProcess(
+ name='worker-%i-helper' % n,
+ argv=[DISTBUILD_HELPER],
+ settings={
+ 'parent-port': worker.worker_daemon_port,
+ },
+ log_path=workdir
+ )
+ self.process_monitor.watch(worker_helper)
+
+ shared_cache_url = 'http://localhost:%s' % shared_cache.port
+
+ controller = MorphListenerProcess(
+ name='controller',
+ # Order is significant -- helper-port must be first!
+ port_names=['controller-helper-port', 'controller-initiator-port'],
+ argv=[MORPH, 'controller-daemon'],
+ settings={
+ 'controller-initiator-address': 'localhost',
+ 'morph-instance': morph_instance,
+ 'worker': ','.join(workers),
+ 'worker-cache-server-port': worker_cache.port,
+ 'writeable-cache-server': shared_cache_url,
+ },
+ log_path=workdir)
+ self.process_monitor.watch(controller)
+
+ controller_helper = MorphProcess(
+ name='controller-helper',
+ argv=[DISTBUILD_HELPER],
+ settings={
+ 'parent-port': controller.controller_helper_port,
+ },
+ log_path=workdir
+ )
+ self.process_monitor.watch(controller_helper)
+
+ # Need to wait for controller-helper to connect to controller.
+ time.sleep(0.1)
+ self.process_monitor.check_all()
+
+ return controller
+
+
+DistbuildTestHarness().run()