summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Thursfield <sam.thursfield@codethink.co.uk>2015-03-11 10:41:49 +0000
committerSam Thursfield <sam.thursfield@codethink.co.uk>2015-03-11 10:41:49 +0000
commit68395a12c79c22e266b4a06095533e38da6e29d0 (patch)
treef004fb3122bac502b127faf70e868bd56a0cb409
parent11559bbbd24c218d512d503df27157668b37bdc9 (diff)
parenteba2e42855e9413f035e5093d64543184dce6fae (diff)
downloadmorph-68395a12c79c22e266b4a06095533e38da6e29d0.tar.gz
Merge branch 'sam/distbuild-build-logs'
Reviewed-By: Adam Coldrick <adam.coldrick@codethink.co.uk> Reviewed-By: Richard Maw <richard.maw@codethink.co.uk>
-rw-r--r--distbuild/__init__.py7
-rw-r--r--distbuild/ansible/hosts1
-rw-r--r--distbuild/build_controller.py22
-rw-r--r--distbuild/initiator.py61
-rw-r--r--distbuild/initiator_connection.py24
-rw-r--r--distbuild/mainloop.py5
-rw-r--r--distbuild/protocol.py6
-rw-r--r--distbuild/worker_build_scheduler.py135
-rw-r--r--morphlib/buildcommand.py10
-rw-r--r--morphlib/plugins/distbuild_plugin.py1
10 files changed, 143 insertions, 129 deletions
diff --git a/distbuild/__init__.py b/distbuild/__init__.py
index 52ad2cc2..62a904d4 100644
--- a/distbuild/__init__.py
+++ b/distbuild/__init__.py
@@ -1,6 +1,6 @@
# distbuild/__init__.py -- library for Morph's distributed build plugin
#
-# Copyright (C) 2012, 2014 Codethink Limited
+# Copyright (C) 2012, 2014-2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -51,9 +51,8 @@ from worker_build_scheduler import (WorkerBuildQueuer,
WorkerBuildFailed,
WorkerBuildStepStarted)
from build_controller import (BuildController, BuildFailed, BuildProgress,
- BuildSteps, BuildStepStarted,
- BuildStepAlreadyStarted, BuildOutput,
- BuildStepFinished, BuildStepFailed,
+ BuildStepStarted, BuildStepAlreadyStarted,
+ BuildOutput, BuildStepFinished, BuildStepFailed,
BuildFinished, BuildCancel,
build_step_name, map_build_graph)
from initiator import Initiator
diff --git a/distbuild/ansible/hosts b/distbuild/ansible/hosts
new file mode 100644
index 00000000..2fbb50c4
--- /dev/null
+++ b/distbuild/ansible/hosts
@@ -0,0 +1 @@
+localhost
diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py
index aa11ae8f..fd4b2105 100644
--- a/distbuild/build_controller.py
+++ b/distbuild/build_controller.py
@@ -71,13 +71,6 @@ class BuildProgress(object):
self.message_text = message_text
-class BuildSteps(object):
-
- def __init__(self, request_id, artifact):
- self.id = request_id
- self.artifact = artifact
-
-
class BuildStepStarted(object):
def __init__(self, request_id, step_name, worker_name):
@@ -121,7 +114,7 @@ class _Abort(object):
def build_step_name(artifact):
'''Return user-comprehensible name for a given artifact.'''
- return artifact.name
+ return artifact.source.name
def map_build_graph(artifact, callback):
@@ -295,9 +288,6 @@ class BuildController(distbuild.StateMachine):
progress = BuildProgress(
self._request['id'], 'Finished computing build graph')
self.mainloop.queue_event(BuildController, progress)
-
- build_steps = BuildSteps(self._request['id'], artifact)
- self.mainloop.queue_event(BuildController, build_steps)
self.mainloop.queue_event(self, _GotGraph(artifact))
@@ -348,7 +338,6 @@ class BuildController(distbuild.StateMachine):
'(helper id: %s)' % self._helper_id)
def _maybe_handle_cache_response(self, event_source, event):
-
def set_status(artifact):
is_in_cache = cache_state[artifact.basename()]
artifact.state = BUILT if is_in_cache else UNBUILT
@@ -370,15 +359,14 @@ class BuildController(distbuild.StateMachine):
map_build_graph(self._artifact, set_status)
self.mainloop.queue_event(self, _Annotated())
- count = sum(map_build_graph(self._artifact,
- lambda a: 1 if a.state == UNBUILT else 0))
-
+ unbuilt = len([a for a in self._artifact.walk() if a.state == UNBUILT])
+ total = len([a for _ in self._artifact.walk()])
progress = BuildProgress(
self._request['id'],
- 'Need to build %d artifacts' % count)
+ 'Need to build %d artifacts, of %d total' % (unbuilt, total))
self.mainloop.queue_event(BuildController, progress)
- if count == 0:
+ if total == 0:
logging.info('There seems to be nothing to build')
self.mainloop.queue_event(self, _Built())
diff --git a/distbuild/initiator.py b/distbuild/initiator.py
index 7f82827c..549df66b 100644
--- a/distbuild/initiator.py
+++ b/distbuild/initiator.py
@@ -17,6 +17,7 @@
import cliapp
+import itertools
import logging
import os
import random
@@ -37,6 +38,20 @@ class _Failed(object):
self.msg = msg
+def create_build_directory(prefix='build'):
+ '''Create a new directory to store build logs.
+
+ The directory will be named build-0, unless that directory already exists,
+ in which case it will be named build-1, and so on.
+
+ '''
+ for i in itertools.count():
+ path = '%s-%02i' % (prefix, i)
+ if not os.path.exists(path):
+ os.mkdir(path)
+ return path
+
+
class Initiator(distbuild.StateMachine):
def __init__(self, cm, conn, app, repo_name, ref, morphology,
@@ -49,11 +64,14 @@ class Initiator(distbuild.StateMachine):
self._ref = ref
self._morphology = morphology
self._original_ref = original_ref
- self._steps = None
self._step_outputs = {}
- self._step_output_dir = app.settings['initiator-step-output-dir']
self.debug_transitions = False
+ if app.settings['initiator-step-output-dir'] == '':
+ self._step_output_dir = create_build_directory()
+ else:
+ self._step_output_dir = app.settings['initiator-step-output-dir']
+
def setup(self):
distbuild.crash_point()
@@ -98,7 +116,6 @@ class Initiator(distbuild.StateMachine):
'build-finished': self._handle_build_finished_message,
'build-failed': self._handle_build_failed_message,
'build-progress': self._handle_build_progress_message,
- 'build-steps': self._handle_build_steps_message,
'step-started': self._handle_step_started_message,
'step-already-started': self._handle_step_already_started_message,
'step-output': self._handle_step_output_message,
@@ -118,12 +135,6 @@ class Initiator(distbuild.StateMachine):
def _handle_build_progress_message(self, msg):
self._app.status(msg='Progress: %(msgtext)s', msgtext=msg['message'])
- def _handle_build_steps_message(self, msg):
- self._steps = msg['steps']
- self._app.status(
- msg='Build steps in total: %(steps)d',
- steps=len(self._steps))
-
def _open_output(self, msg):
assert msg['step_name'] not in self._step_outputs
if self._step_output_dir:
@@ -141,16 +152,17 @@ class Initiator(distbuild.StateMachine):
def _get_output(self, msg):
return self._step_outputs[msg['step_name']]
+ def _write_status_to_build_log(self, f, status):
+ f.write(time.strftime('%Y-%m-%d %H:%M:%S ') + status + '\n')
+ f.flush()
+
def _handle_step_already_started_message(self, msg):
status = '%s is already building on %s' % (
msg['step_name'], msg['worker_name'])
self._app.status(msg=status)
self._open_output(msg)
-
- f = self._get_output(msg)
- f.write(time.strftime('%Y-%m-%d %H:%M:%S ') + status + '\n')
- f.flush()
+ self._write_status_to_build_log(self._get_output(msg), status)
def _handle_step_started_message(self, msg):
status = 'Started building %s on %s' % (
@@ -158,10 +170,7 @@ class Initiator(distbuild.StateMachine):
self._app.status(msg=status)
self._open_output(msg)
-
- f = self._get_output(msg)
- f.write(time.strftime('%Y-%m-%d %H:%M:%S ') + status + '\n')
- f.flush()
+ self._write_status_to_build_log(self._get_output(msg), status)
def _handle_step_output_message(self, msg):
step_name = msg['step_name']
@@ -180,9 +189,7 @@ class Initiator(distbuild.StateMachine):
status = 'Finished building %s' % step_name
self._app.status(msg=status)
- f = self._get_output(msg)
- f.write(time.strftime('%Y-%m-%d %H:%M:%S ') + status + '\n')
-
+ self._write_status_to_build_log(self._get_output(msg), status)
self._close_output(msg)
else:
logging.warning(
@@ -194,9 +201,7 @@ class Initiator(distbuild.StateMachine):
status = 'Build of %s failed.' % step_name
self._app.status(msg=status)
- f = self._get_output(msg)
- f.write(time.strftime('%Y-%m-%d %H:%M:%S ') + status + '\n')
-
+ self._write_status_to_build_log(self._get_output(msg), status)
self._close_output(msg)
else:
logging.warning(
@@ -227,3 +232,13 @@ class Initiator(distbuild.StateMachine):
self.mainloop.queue_event(self._cm, distbuild.StopConnecting())
self._jm.close()
+ def handle_cancel(self):
+ # Note in each build-step.log file that the initiator cancelled: this
+ # makes it easier to tell whether a build was aborted due to a bug or
+ # dropped connection, or if the user cancelled with CTRL+C / SIGINT.
+
+ for f in self._step_outputs.itervalues():
+ self._write_status_to_build_log(f, 'Initiator cancelled')
+ f.close()
+
+ self._step_outputs = {}
diff --git a/distbuild/initiator_connection.py b/distbuild/initiator_connection.py
index 86df28f1..8b68fda3 100644
--- a/distbuild/initiator_connection.py
+++ b/distbuild/initiator_connection.py
@@ -1,6 +1,6 @@
# distbuild/initiator_connection.py -- communicate with initiator
#
-# Copyright (C) 2012, 2014 - 2015 Codethink Limited
+# Copyright (C) 2012, 2014-2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -77,8 +77,6 @@ class InitiatorConnection(distbuild.StateMachine):
'idle', self._send_build_failed_message),
('idle', distbuild.BuildController, distbuild.BuildProgress,
'idle', self._send_build_progress_message),
- ('idle', distbuild.BuildController, distbuild.BuildSteps,
- 'idle', self._send_build_steps_message),
('idle', distbuild.BuildController, distbuild.BuildStepStarted,
'idle', self._send_build_step_started_message),
('idle', distbuild.BuildController,
@@ -180,26 +178,6 @@ class InitiatorConnection(distbuild.StateMachine):
self.jm.send(msg)
self._log_send(msg)
- def _send_build_steps_message(self, event_source, event):
-
- def make_step_dict(artifact):
- return {
- 'name': distbuild.build_step_name(artifact),
- 'build-depends': [
- distbuild.build_step_name(x)
- for x in artifact.source.dependencies
- ]
- }
-
- if event.id in self.our_ids:
- step_names = distbuild.map_build_graph(
- event.artifact, make_step_dict)
- msg = distbuild.message('build-steps',
- id=self._route_map.get_incoming_id(event.id),
- steps=step_names)
- self.jm.send(msg)
- self._log_send(msg)
-
def _send_build_step_started_message(self, event_source, event):
logging.debug('InitiatorConnection: build_step_started: '
'id=%s step_name=%s worker_name=%s' %
diff --git a/distbuild/mainloop.py b/distbuild/mainloop.py
index 97e439f3..f15a3ac1 100644
--- a/distbuild/mainloop.py
+++ b/distbuild/mainloop.py
@@ -57,8 +57,11 @@ class MainLoop(object):
logging.debug('MainLoop.remove_state_machine: %s' % machine)
self._machines.remove(machine)
+ def state_machines_of_type(self, machine_type):
+ return [m for m in self._machines if isinstance(m, machine_type)]
+
def n_state_machines_of_type(self, machine_type):
- return len([m for m in self._machines if isinstance(m, machine_type)])
+ return len(self.state_machines_of_type(machine_type))
def add_event_source(self, event_source):
logging.debug('MainLoop.add_event_source: %s' % event_source)
diff --git a/distbuild/protocol.py b/distbuild/protocol.py
index f2c74819..141df742 100644
--- a/distbuild/protocol.py
+++ b/distbuild/protocol.py
@@ -1,6 +1,6 @@
# distbuild/protocol.py -- abstractions for the JSON messages
#
-# Copyright (C) 2012, 2014 - 2015 Codethink Limited
+# Copyright (C) 2012, 2014-2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -38,10 +38,6 @@ _required_fields = {
'id',
'message',
],
- 'build-steps': [
- 'id',
- 'steps',
- ],
'step-started': [
'id',
'step_name',
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py
index 4f7ff98f..81c961e1 100644
--- a/distbuild/worker_build_scheduler.py
+++ b/distbuild/worker_build_scheduler.py
@@ -332,7 +332,7 @@ class WorkerBuildQueuer(distbuild.StateMachine):
distbuild.crash_point()
who = event.who
- last_job = who.job() # the job this worker's just completed
+ last_job = who.current_job() # the job this worker's just completed
if last_job:
logging.debug('%s wants new job, just did %s',
@@ -395,9 +395,12 @@ class WorkerConnection(distbuild.StateMachine):
self._writeable_cache_server = writeable_cache_server
self._worker_cache_server_port = worker_cache_server_port
self._morph_instance = morph_instance
- self._helper_id = None
- self._job = None
- self._exec_response_msg = None
+
+ self._active_jobs = dict()
+ self._current_job = None
+ self._current_job_exec_response = None
+ self._current_job_cache_request = None
+
self._debug_json = False
addr, port = self._conn.getpeername()
@@ -407,8 +410,8 @@ class WorkerConnection(distbuild.StateMachine):
def name(self):
return self._worker_name
- def job(self):
- return self._job
+ def current_job(self):
+ return self._current_job
def setup(self):
distbuild.crash_point()
@@ -448,31 +451,30 @@ class WorkerConnection(distbuild.StateMachine):
def _maybe_cancel(self, event_source, build_cancel):
- if build_cancel.id not in self._job.initiators:
+ if build_cancel.id not in self._current_job.initiators:
return # event not relevant
logging.debug('WC: BuildController %r requested a cancel',
event_source)
- if (len(self._job.initiators) == 1):
+ job = self._current_job
+ if (len(job.initiators) == 1):
logging.debug('WC: Cancelling running job %s '
'with job id %s running on %s',
- self._job.artifact.basename(),
- self._job.id,
+ job.artifact.basename(), job.id,
self.name())
- msg = distbuild.message('exec-cancel', id=self._job.id)
+ msg = distbuild.message('exec-cancel', id=job.id)
self._jm.send(msg)
self.mainloop.queue_event(self, _BuildCancelled())
else:
logging.debug('WC: Not cancelling running job %s with job id %s, '
'other initiators want it done: %s',
- self._job.artifact.basename(),
- self._job.id,
- [i for i in self._job.initiators
- if i != build_cancel.id])
+ job.artifact.basename(),
+ job.id,
+ [i for i in job.initiators if i != build_cancel.id])
- self._job.initiators.remove(build_cancel.id)
+ job.initiators.remove(build_cancel.id)
def _disconnected(self, event_source, event):
distbuild.crash_point()
@@ -485,23 +487,30 @@ class WorkerConnection(distbuild.StateMachine):
def _start_build(self, event_source, event):
distbuild.crash_point()
- self._job = event.job
- self._helper_id = None
- self._exec_response_msg = None
+ job = event.job
+
+ if job.id in self._active_jobs:
+ logging.warn('Duplicate job %s for worker %s', job.id, self.name())
+
+ if self._current_job_exec_response or self._current_job_cache_request:
+ logging.warn('Caching not finished for %s', self._current_job.id)
+
+ self._active_jobs[job.id] = job
+ self._current_job = job
logging.debug('WC: starting build: %s for %s' %
- (self._job.artifact.name, self._job.initiators))
+ (job.artifact.name, job.initiators))
argv = [
self._morph_instance,
'worker-build',
'--build-log-on-stdout',
- self._job.artifact.name,
+ job.artifact.name,
]
msg = distbuild.message('exec-request',
- id=self._job.id,
+ id=job.id,
argv=argv,
- stdin_contents=distbuild.serialise_artifact(self._job.artifact),
+ stdin_contents=distbuild.serialise_artifact(job.artifact),
)
self._jm.send(msg)
@@ -509,10 +518,10 @@ class WorkerConnection(distbuild.StateMachine):
logging.debug('WC: sent to worker %s: %r'
% (self._worker_name, msg))
- started = WorkerBuildStepStarted(self._job.initiators,
- self._job.artifact.source.cache_key, self.name())
+ started = WorkerBuildStepStarted(job.initiators,
+ job.artifact.source.cache_key, self.name())
- self.mainloop.queue_event(WorkerConnection, _JobStarted(self._job))
+ self.mainloop.queue_event(WorkerConnection, _JobStarted(job))
self.mainloop.queue_event(WorkerConnection, started)
def _handle_json_message(self, event_source, event):
@@ -527,37 +536,50 @@ class WorkerConnection(distbuild.StateMachine):
'exec-output': self._handle_exec_output,
'exec-response': self._handle_exec_response,
}
-
+
handler = handlers[event.msg['type']]
- handler(event.msg)
+ job = self._active_jobs.get(event.msg['id'])
+
+ if job:
+ handler(event.msg, job)
+ else:
+ logging.warn('Received %s for unknown job %s',
+ event.msg['type'], event.msg['id'])
+
+ def _handle_exec_output(self, msg, job):
+ '''Handle output from a job that the worker is or was running.'''
- def _handle_exec_output(self, msg):
new = dict(msg)
- new['ids'] = self._job.initiators
+ new['ids'] = job.initiators
+
logging.debug('WC: emitting: %s', repr(new))
self.mainloop.queue_event(
WorkerConnection,
- WorkerBuildOutput(new, self._job.artifact.source.cache_key))
+ WorkerBuildOutput(new, job.artifact.source.cache_key))
- def _handle_exec_response(self, msg):
- logging.debug('WC: finished building: %s' % self._job.artifact.name)
- logging.debug('initiators that need to know: %s'
- % self._job.initiators)
+ def _handle_exec_response(self, msg, job):
+ '''Handle completion of a job that the worker is or was running.'''
+
+ logging.debug('WC: finished building: %s' % job.artifact.name)
+ logging.debug('initiators that need to know: %s' % job.initiators)
new = dict(msg)
- new['ids'] = self._job.initiators
+ new['ids'] = job.initiators
if new['exit'] != 0:
# Build failed.
- new_event = WorkerBuildFailed(new,
- self._job.artifact.source.cache_key)
+ new_event = WorkerBuildFailed(new, job.artifact.source.cache_key)
self.mainloop.queue_event(WorkerConnection, new_event)
- self.mainloop.queue_event(WorkerConnection, _JobFailed(self._job))
+ self.mainloop.queue_event(WorkerConnection, _JobFailed(job))
self.mainloop.queue_event(self, _BuildFailed())
else:
# Build succeeded. We have more work to do: caching the result.
self.mainloop.queue_event(self, _BuildFinished())
- self._exec_response_msg = new
+ self._current_job_exec_response = new
+
+ # The job is no longer considered active, because the worker is
+ # finished with it so we won't receive any more messages about it.
+ del self._active_jobs[job.id]
def _request_job(self, event_source, event):
distbuild.crash_point()
@@ -571,15 +593,16 @@ class WorkerConnection(distbuild.StateMachine):
logging.debug('Requesting shared artifact cache to get artifacts')
- kind = self._job.artifact.source.morphology['kind']
+ job = self._current_job
+ kind = job.artifact.source.morphology['kind']
if kind == 'chunk':
- source_artifacts = self._job.artifact.source.artifacts
+ source_artifacts = job.artifact.source.artifacts
suffixes = ['%s.%s' % (kind, name) for name in source_artifacts]
suffixes.append('build-log')
else:
- filename = '%s.%s' % (kind, self._job.artifact.name)
+ filename = '%s.%s' % (kind, job.artifact.name)
suffixes = [filename]
if kind == 'stratum':
@@ -595,22 +618,22 @@ class WorkerConnection(distbuild.StateMachine):
'/1.0/fetch?host=%s:%d&cacheid=%s&artifacts=%s' %
(urllib.quote(worker_host),
self._worker_cache_server_port,
- urllib.quote(self._job.artifact.source.cache_key),
+ urllib.quote(job.artifact.source.cache_key),
suffixes))
msg = distbuild.message(
'http-request', id=self._request_ids.next(), url=url,
method='GET', body=None, headers=None)
- self._helper_id = msg['id']
+ self._current_job_cache_request = msg['id']
req = distbuild.HelperRequest(msg)
self.mainloop.queue_event(distbuild.HelperRouter, req)
- progress = WorkerBuildCaching(self._job.initiators,
- self._job.artifact.source.cache_key)
+ progress = WorkerBuildCaching(job.initiators,
+ job.artifact.source.cache_key)
self.mainloop.queue_event(WorkerConnection, progress)
def _maybe_handle_helper_result(self, event_source, event):
- if event.msg['id'] == self._helper_id:
+ if event.msg['id'] == self._current_job_cache_request:
distbuild.crash_point()
logging.debug('caching: event.msg: %s' % repr(event.msg))
@@ -618,8 +641,8 @@ class WorkerConnection(distbuild.StateMachine):
logging.debug('Shared artifact cache population done')
new_event = WorkerBuildFinished(
- self._exec_response_msg,
- self._job.artifact.source.cache_key)
+ self._current_job_exec_response,
+ self._current_job.artifact.source.cache_key)
self.mainloop.queue_event(WorkerConnection, new_event)
self.mainloop.queue_event(self, _Cached())
else:
@@ -634,13 +657,17 @@ class WorkerConnection(distbuild.StateMachine):
# The BuildController will not try to cancel jobs that have
# been marked as failed.
self.mainloop.queue_event(WorkerConnection,
- _JobFailed(self._job))
+ _JobFailed(self._current_job))
new_event = WorkerBuildFailed(
- self._exec_response_msg,
- self._job.artifact.source.cache_key)
+ self._current_job_exec_response,
+ self._current_job.artifact.source.cache_key)
self.mainloop.queue_event(WorkerConnection, new_event)
self.mainloop.queue_event(self, _BuildFailed())
- self.mainloop.queue_event(WorkerConnection, _JobFinished(self._job))
+ self.mainloop.queue_event(WorkerConnection,
+ _JobFinished(self._current_job))
+
+ self._current_job_exec_response = None
+ self._current_job_cache_request = None
diff --git a/morphlib/buildcommand.py b/morphlib/buildcommand.py
index 8572450d..6e7c9bbd 100644
--- a/morphlib/buildcommand.py
+++ b/morphlib/buildcommand.py
@@ -544,4 +544,12 @@ class InitiatorBuildCommand(BuildCommand):
self.MAX_RETRIES)
loop.add_state_machine(cm)
- loop.run()
+ try:
+ loop.run()
+ except KeyboardInterrupt:
+ # This will run if the user presses Ctrl+C or sends SIGINT during
+ # the build. It won't trigger on SIGTERM, SIGKILL or unhandled
+ # Python exceptions.
+ logging.info('Received KeyboardInterrupt, aborting.')
+ for initiator in loop.state_machines_of_type(distbuild.Initiator):
+ initiator.handle_cancel()
diff --git a/morphlib/plugins/distbuild_plugin.py b/morphlib/plugins/distbuild_plugin.py
index a7d69472..24d5584c 100644
--- a/morphlib/plugins/distbuild_plugin.py
+++ b/morphlib/plugins/distbuild_plugin.py
@@ -180,7 +180,6 @@ class ControllerDaemon(cliapp.Plugin):
self.app.settings.string(
['initiator-step-output-dir'],
'write build output to files in DIR',
- default='.',
group=group_distbuild)
self.app.settings.string(