summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Thursfield <sam.thursfield@codethink.co.uk>2015-02-16 11:59:01 +0000
committerSam Thursfield <sam.thursfield@codethink.co.uk>2015-02-16 11:59:01 +0000
commitb831ab16fb49ad56aa1091f1c820325a33832d53 (patch)
tree12b53f6543e103b70e470fb740f1c22e92b0b044
parent0cb996b43cba9376e4defba1a4065ad9b080db61 (diff)
parent626d62feccefda2a57abd5f71da9c4fb164c604c (diff)
downloadmorph-b831ab16fb49ad56aa1091f1c820325a33832d53.tar.gz
Merge branch 'sam/distbuild-test-harness' into sam/distbuild-build-logs
Conflicts: scripts/distbuild
-rwxr-xr-xscripts/distbuild294
-rw-r--r--yarns/morph.shell-lib2
2 files changed, 164 insertions, 132 deletions
diff --git a/scripts/distbuild b/scripts/distbuild
index 285b1524..97798831 100755
--- a/scripts/distbuild
+++ b/scripts/distbuild
@@ -14,7 +14,29 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-# Run a local instance of a Morph distributed build network.
+'''Run a local instance of a Morph distributed build network.
+
+To use:
+
+ scripts/distbuild run
+
+If you want more info on what is going on, try:
+
+ scripts/distbuild run --log=/dev/stdout
+
+All distbuild subprocesses will be shut down on TERM signal or if any of them
+crash.
+
+Logs and build artifacts will be stored in a newly created temporary directory.
+You can specify a different location with the --datadir argument. The directory
+will not be deleted when the process exits.
+
+'''
+
+
+DISTBUILD_HELPER = 'distbuild-helper'
+MORPH = 'morph'
+MORPH_CACHE_SERVER = 'morph-cache-server'
import os
@@ -22,42 +44,39 @@ import select
import subprocess
import tempfile
import time
+from logging import debug, info
import cliapp
-
-# Some hackz if you are running Morph from the source tree.
-os.environ['PYTHONPATH'] = '/src/morph'
-DISTBUILD_HELPER = '/src/morph/distbuild-helper'
-MORPH = '/src/morph/morph'
-MORPH_CACHE_SERVER = '/src/morph/morph-cache-server'
-
-
-
-def subdir(workdir, *args):
- path = os.path.join(workdir, *args)
+def subdir(workdir, *path_components):
+ '''Create a subdirectory and return the path to it.'''
+ path = os.path.join(workdir, *path_components)
os.makedirs(path)
return path
class Process(subprocess.Popen):
- '''Helper for subprocess.Popen with neater commandline argument passing.
+ '''A running subprocess.'''
- This allows you to pass a dict of settings, which will be converted to
- commandline arguments internally.
+ def __init__(self, name, argv, settings, **kwargs):
+ '''Start a new subprocess, using subprocess.Popen.
- '''
+ The 'name' parameter is only used internally.
- def __init__(self, name, argv, settings):
+ The 'argv' parameter specifies the commandline to be run. The
+ 'settings' dict will be formatted as long-form commandline switches and
+ added to 'argv'.
+
+ '''
self.name = name
self.argv = argv
self.settings = settings
full_argv = argv + self._format_settings(settings)
- print('%s commandline: %s' % (name, ' '.join(full_argv)))
- subprocess.Popen.__init__(self, full_argv)
- print('%s process ID: %s' % (name, self.pid))
+ info('%s commandline: %s' % (name, ' '.join(full_argv)))
+ super(Process, self).__init__(full_argv, **kwargs)
+ info('%s process ID: %s' % (name, self.pid))
def _format_settings(self, arg_dict):
def as_string(key, value):
@@ -71,33 +90,50 @@ class Process(subprocess.Popen):
class MorphProcess(Process):
- '''Process helper with special defaults for Morph and related programs.
+ '''A running instance of Morph, morph-cache-server or distbuild-helper.'''
- Morph and its helper tools all use 'cliapp', which provides a standard
- interface for configuration and logging.
+ def __init__(self, name, argv, settings, log_path=None, **kwargs):
+ '''Start an instance of Morph, morph-cache-server or distbuild-helper.
- '''
+ The logs will be sent to a file called '$log_path/morph-$name.log', if
+ log_path is passed.
- def __init__(self, name, argv, settings, log_path=None):
+ '''
if log_path:
settings['log'] = os.path.join(log_path, 'morph-%s.log' % name)
settings['no-default-config'] = True
- Process.__init__(self, name, argv, settings)
+ super(MorphProcess, self).__init__(name, argv, settings, **kwargs)
class MorphListenerProcess(MorphProcess):
- '''Process helper for listening processes.'''
+ '''A running instance of a Morph or morph-cache-server daemon process.'''
+
+ def __init__(self, name, port_names, argv, settings, **kwargs):
+ '''Start and wait for an instance of Morph or morph-cache-server.
- def __init__(self, name, port_names, argv, settings, log_path=None):
+ Using --port-file arguments, the constructor will wait for each port
+ listed in 'port_names' to become ready before returning. The subprocess
+ will pick a random available port number for each port. The numbers
+ be accessible as attributes on this class.
+
+ For example, if you pass port_names=['worker-daemon-port'], the
+ subprocess will receive two extra commandline arguments:
+ --worker-daemon-port=0 and --worker-daemon-port-file=xxx
+
+ Once the process starts, self.worker_daemon_port will contain the
+ number of the port that it is listening on.
+
+ '''
for port_name in port_names:
self._setup_port_fifo(port_name, settings)
- MorphProcess.__init__(self, name, argv, settings, log_path=log_path)
+ super(MorphListenerProcess, self).__init__(
+ name, argv, settings, **kwargs)
for port_name in port_names:
port_number = self._read_port_fifo(port_name, settings)
- print('%s: %s port is %s' % (name, port_name, port_number))
+ info('%s: %s port is %s' % (name, port_name, port_number))
port_attr = port_name.replace('-', '_')
setattr(self, port_attr, port_number)
@@ -115,12 +151,12 @@ class MorphListenerProcess(MorphProcess):
def _read_port_fifo(self, port_name, settings):
port_file = settings['%s-file' % port_name]
- print 'Read: %s' % port_file
+ debug('Read: %s' % port_file)
with open(port_file, 'r') as f:
# The readline() call will block until a line of data is written.
# The process we are starting should only write out the port number
# once the process is listening on that port. Thus, we block until
- # the process is ready, which is very important.
+ # the process is ready, which is very important for avoiding races.
port_number = int(f.readline())
os.unlink(port_file)
@@ -130,23 +166,31 @@ class MorphListenerProcess(MorphProcess):
class MorphCacheServerProcess(MorphListenerProcess):
+ '''A morph-cache-server process.'''
+
def __init__(self, name, cache_path, log_path=None, enable_writes=False):
+ '''Start a morph-cache-server process.'''
+
ports = ['port']
argv = [MORPH_CACHE_SERVER]
settings = {
'artifact-dir': subdir(cache_path, 'artifacts'),
- 'repo-dir': subdir(cache_path, 'gits'),
'enable-writes': enable_writes,
'no-fcgi': True,
+ 'repo-dir': subdir(cache_path, 'gits'),
}
- MorphListenerProcess.__init__(
- self, name, ports, argv, settings, log_path=log_path)
+ super(MorphCacheServerProcess, self).__init__(
+ name, ports, argv, settings, log_path=log_path,
+ stderr=subprocess.PIPE)
class MorphWorkerDaemonProcess(MorphListenerProcess):
+ '''A `morph worker-daemon` process.'''
def __init__(self, name, cache_server, log_path=None):
+ '''Start a `morph worker-daemon` instance.'''
+
ports = ['worker-daemon-port']
argv = [MORPH, 'worker-daemon']
@@ -154,17 +198,44 @@ class MorphWorkerDaemonProcess(MorphListenerProcess):
'artifact-cache-server': 'http://localhost:%s' % cache_server.port,
}
- MorphListenerProcess.__init__(
- self, name, ports, argv, settings, log_path=log_path)
+ super(MorphWorkerDaemonProcess, self).__init__(
+ name, ports, argv, settings, log_path=log_path)
+
+
+class ProcessMonitor(object):
+ '''A tool for managing a bunch of subprocesses.'''
+
+ def __init__(self):
+ self.process = {}
+
+ def watch(self, process):
+ '''Start monitoring a running process.'''
+ self.process[process.name] = process
+
+ def check_all(self):
+ '''Check all processes are running.'''
+ for name, p in self.process.iteritems():
+ if p.poll() != None:
+ raise Exception(
+ '%s: exited with code %s' % (name, p.returncode))
+
+ def terminate_all(self):
+ '''Send TERM signal to all active subprocesses.'''
+ for p in self.process.itervalues():
+ if p.poll() == None:
+ p.terminate()
+ info('Waiting for process %i' % p.pid)
+ p.wait()
class DistbuildTestHarness(cliapp.Application):
'''Harness for running a distbuild network on a single machine.'''
def __init__(self):
- self.process = dict()
super(DistbuildTestHarness, self).__init__()
+ self.process_monitor = ProcessMonitor()
+
def add_settings(self):
self.settings.string(
['datadir'],
@@ -176,6 +247,10 @@ class DistbuildTestHarness(cliapp.Application):
self.settings.string(
['port-file'],
'write port used by initiator to FILE, when ready')
+ self.settings.integer(
+ ['workers'],
+ 'number of workers to start',
+ default=4)
def cmd_run(self, args):
'''Run a distbuild network.'''
@@ -188,116 +263,85 @@ class DistbuildTestHarness(cliapp.Application):
controller = self.start_distbuild_network(
datadir, worker_cache, shared_cache,
morph_instance=self.settings['morph-instance'],
- n_workers=1)
+ n_workers=self.settings['workers'])
if self.settings['port-file']:
with open(self.settings['port-file'], 'w') as f:
f.write('%s' % controller.controller_initiator_port)
- print('Distbuild initiator listening on port %i' %
- controller.controller_initiator_port)
+ print('Distbuild controller listening on port %i' %
+ controller.controller_initiator_port)
print('Data in %s' % datadir)
+ print('\nTo use: morph distbuild '
+ '--controller-initiator-address=localhost '
+ '--controller-initiator-port=%i FILE' %
+ controller.controller_initiator_port)
+
# Run until we get a TERM signal.
while True:
select.select([], [], [], 1)
- self.check_processes_are_running()
+ self.process_monitor.check_all()
finally:
- self.terminate_processes()
-
- def cmd_build(self):
- '''Start a distbuild network and run a single build on it.'''
- workdir = tempfile.mkdtemp()
- print('Working directory: %s' % workdir)
-
- to_build = [
- 'baserock:baserock/definitions',
- 'c7292b7c81cdd7e5b9e85722406371748453c44f',
- 'systems/base-system-x86_64-generic.morph',
- ]
-
-
- try:
- worker_cache, shared_cache = self.start_cache_servers(workdir)
- controller = self.start_distbuild_network(
- workdir, worker_cache, shared_cache, n_workers=7)
-
- initiator1 = self.start_build(controller, to_build,
- log_path=subdir(workdir, '1'))
+ self.process_monitor.terminate_all()
- initiator2 = self.start_build(controller, to_build,
- log_path=subdir(workdir, '2'))
-
- while initiator1.poll() == None or initiator2.poll() == None:
- time.sleep(0.1) # FIXME: use select!
- self.check_processes_are_running()
-
- if initiator1.returncode != 0 or initiator2.returncode != 0:
- #raise Exception('Initiator fail')
- print('Initiator fail')
- else:
- print('Success!')
-
- finally:
- self.terminate_processes()
- print('Test state kept in %s' % workdir)
+ def start_cache_servers(self, workdir):
+ '''Start necessary artifact cache servers.
- def watch_process(self, process):
- self.process[process.name] = process
+ There needs to be a shared artifact cache server. In a normal distbuild
+ setup this is part of the Trove system.
- def check_processes_are_running(self):
- '''Check all processes are running.'''
- for name, p in self.process.iteritems():
- if p.poll() != None:
- raise Exception(
- '%s: exited with code %s' % (name, p.returncode))
+ There is a separate cache server for all the workers. In a real
+ distbuild setup, each worker machine runs its own instance of
+ morph-cache-server. The controller uses the same port number for all
+ workers so in this test harness all workers will have to share one
+ cache-server process.
- def terminate_processes(self):
- '''Send TERM signal to all active subprocesses.'''
- for p in self.process.itervalues():
- if p.poll() == None:
- p.terminate()
- print('Waiting for process %i' % p.pid)
- p.wait()
+ It's not possible to use a single cache server process at present,
+ because when the /fetch method of the shared cache server is called, it
+ will break because it can't fetch stuff from itself.
- def start_cache_servers(self, workdir):
- # We have to make a bit of a kludge here. In a real distbuild setup,
- # each worker machine runs its own instance of morph-cache-server on
- # port 8080. The controller uses the same port number for all workers
- # so in this test harness all workers will have to share one
- # cache-server process.
- #
- # That's fine, but we still need a separate process for the *shared*
- # artifact cache: artifacts are copied from workers to the shared cache
- # using the /fetch method, which just wouldn't work if it had to fetch
- # from itself.
+ '''
worker_cache = MorphCacheServerProcess(
name='worker-cache-server',
cache_path=subdir(workdir, 'worker-cache'),
log_path=workdir,
enable_writes=False)
- self.watch_process(worker_cache)
+ self.process_monitor.watch(worker_cache)
shared_cache = MorphCacheServerProcess(
name='shared-cache-server',
cache_path=subdir(workdir, 'shared-cache'),
log_path=workdir,
enable_writes=True)
- self.watch_process(shared_cache)
+ self.process_monitor.watch(shared_cache)
return worker_cache, shared_cache
def start_distbuild_network(self, workdir, worker_cache, shared_cache,
morph_instance=None, n_workers=4):
- if morph_instance is None:
- # This is a hack so we can pass arguments to the Morph used to run
- # worker-build and serialise-artifact.
+ '''Start Morph distbuild daemons and helper processes.
+
+ This starts a `morph controller-daemon` process, and one or more `morph
+ worker-daemon` processes. It also starts the helper process that these
+ need. It returns the controller process, which is the one you need to
+ talk to if you want to start a build.
+
+ '''
+
+ if not morph_instance:
+ # Create a wrapper script for the Morph that will be used to run
+ # `serialise-artifact` and `worker-build` commands, so we can pass
+ # it the commandline arguments it neds.
worker_morph = os.path.join(workdir, 'worker-morph')
+
with open(worker_morph, 'w') as f:
cache_dir = os.path.join(workdir, 'worker-cache')
log = os.path.join(workdir, 'morph.log')
f.write('#!/bin/sh\n')
- f.write('%s --cachedir=%s --log=%s $@\n' % (MORPH, cache_dir, log))
+ f.write('%s --cachedir=%s --log=%s $@\n' % (
+ MORPH, cache_dir, log))
+
os.chmod(worker_morph, 0755)
morph_instance = worker_morph
@@ -307,7 +351,7 @@ class DistbuildTestHarness(cliapp.Application):
name='worker-%i' % n,
cache_server=worker_cache,
log_path=workdir)
- self.watch_process(worker)
+ self.process_monitor.watch(worker)
workers.append('localhost:%i' % worker.worker_daemon_port)
@@ -319,7 +363,9 @@ class DistbuildTestHarness(cliapp.Application):
},
log_path=workdir
)
- self.watch_process(worker_helper)
+ self.process_monitor.watch(worker_helper)
+
+ shared_cache_url = 'http://localhost:%s' % shared_cache.port
controller = MorphListenerProcess(
name='controller',
@@ -331,10 +377,10 @@ class DistbuildTestHarness(cliapp.Application):
'morph-instance': morph_instance,
'worker': ','.join(workers),
'worker-cache-server-port': worker_cache.port,
- 'writeable-cache-server': 'http://localhost:%s' % shared_cache.port,
+ 'writeable-cache-server': shared_cache_url,
},
log_path=workdir)
- self.watch_process(controller)
+ self.process_monitor.watch(controller)
controller_helper = MorphProcess(
name='controller-helper',
@@ -344,27 +390,13 @@ class DistbuildTestHarness(cliapp.Application):
},
log_path=workdir
)
-
- self.watch_process(controller_helper)
+ self.process_monitor.watch(controller_helper)
# Need to wait for controller-helper to connect to controller.
time.sleep(0.1)
-
- self.check_processes_are_running()
+ self.process_monitor.check_all()
return controller
- def start_build(self, controller, to_build, log_path):
- port = controller.controller_initiator_port
- return MorphProcess(
- name='initiator',
- argv=[MORPH, 'distbuild-morphology'] + to_build,
- settings={
- 'controller-initiator-address': 'localhost',
- 'controller-initiator-port': port,
- 'initiator-step-output-dir': log_path,
- },
- log_path=log_path)
-
DistbuildTestHarness().run()
diff --git a/yarns/morph.shell-lib b/yarns/morph.shell-lib
index 206f0e38..c5de80df 100644
--- a/yarns/morph.shell-lib
+++ b/yarns/morph.shell-lib
@@ -3,7 +3,7 @@
# The shell functions in this library are meant to make writing IMPLEMENTS
# sections for yarn scenario tests easier.
-# Copyright (C) 2013-2014 Codethink Limited
+# Copyright (C) 2013-2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by