diff options
author | Sam Thursfield <sam.thursfield@codethink.co.uk> | 2015-03-11 10:41:49 +0000 |
---|---|---|
committer | Sam Thursfield <sam.thursfield@codethink.co.uk> | 2015-03-11 10:41:49 +0000 |
commit | 68395a12c79c22e266b4a06095533e38da6e29d0 (patch) | |
tree | f004fb3122bac502b127faf70e868bd56a0cb409 /distbuild | |
parent | 11559bbbd24c218d512d503df27157668b37bdc9 (diff) | |
parent | eba2e42855e9413f035e5093d64543184dce6fae (diff) | |
download | morph-68395a12c79c22e266b4a06095533e38da6e29d0.tar.gz |
Merge branch 'sam/distbuild-build-logs'
Reviewed-By: Adam Coldrick <adam.coldrick@codethink.co.uk>
Reviewed-By: Richard Maw <richard.maw@codethink.co.uk>
Diffstat (limited to 'distbuild')
-rw-r--r-- | distbuild/__init__.py | 7 | ||||
-rw-r--r-- | distbuild/ansible/hosts | 1 | ||||
-rw-r--r-- | distbuild/build_controller.py | 22 | ||||
-rw-r--r-- | distbuild/initiator.py | 61 | ||||
-rw-r--r-- | distbuild/initiator_connection.py | 24 | ||||
-rw-r--r-- | distbuild/mainloop.py | 5 | ||||
-rw-r--r-- | distbuild/protocol.py | 6 | ||||
-rw-r--r-- | distbuild/worker_build_scheduler.py | 135 |
8 files changed, 134 insertions, 127 deletions
diff --git a/distbuild/__init__.py b/distbuild/__init__.py index 52ad2cc2..62a904d4 100644 --- a/distbuild/__init__.py +++ b/distbuild/__init__.py @@ -1,6 +1,6 @@ # distbuild/__init__.py -- library for Morph's distributed build plugin # -# Copyright (C) 2012, 2014 Codethink Limited +# Copyright (C) 2012, 2014-2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -51,9 +51,8 @@ from worker_build_scheduler import (WorkerBuildQueuer, WorkerBuildFailed, WorkerBuildStepStarted) from build_controller import (BuildController, BuildFailed, BuildProgress, - BuildSteps, BuildStepStarted, - BuildStepAlreadyStarted, BuildOutput, - BuildStepFinished, BuildStepFailed, + BuildStepStarted, BuildStepAlreadyStarted, + BuildOutput, BuildStepFinished, BuildStepFailed, BuildFinished, BuildCancel, build_step_name, map_build_graph) from initiator import Initiator diff --git a/distbuild/ansible/hosts b/distbuild/ansible/hosts new file mode 100644 index 00000000..2fbb50c4 --- /dev/null +++ b/distbuild/ansible/hosts @@ -0,0 +1 @@ +localhost diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py index aa11ae8f..fd4b2105 100644 --- a/distbuild/build_controller.py +++ b/distbuild/build_controller.py @@ -71,13 +71,6 @@ class BuildProgress(object): self.message_text = message_text -class BuildSteps(object): - - def __init__(self, request_id, artifact): - self.id = request_id - self.artifact = artifact - - class BuildStepStarted(object): def __init__(self, request_id, step_name, worker_name): @@ -121,7 +114,7 @@ class _Abort(object): def build_step_name(artifact): '''Return user-comprehensible name for a given artifact.''' - return artifact.name + return artifact.source.name def map_build_graph(artifact, callback): @@ -295,9 +288,6 @@ class BuildController(distbuild.StateMachine): progress = BuildProgress( self._request['id'], 'Finished computing build graph') self.mainloop.queue_event(BuildController, progress) - - build_steps = BuildSteps(self._request['id'], artifact) - self.mainloop.queue_event(BuildController, build_steps) self.mainloop.queue_event(self, _GotGraph(artifact)) @@ -348,7 +338,6 @@ class BuildController(distbuild.StateMachine): '(helper id: %s)' % self._helper_id) def _maybe_handle_cache_response(self, event_source, event): - def set_status(artifact): is_in_cache = cache_state[artifact.basename()] artifact.state = BUILT if is_in_cache else UNBUILT @@ -370,15 +359,14 @@ class BuildController(distbuild.StateMachine): map_build_graph(self._artifact, set_status) self.mainloop.queue_event(self, _Annotated()) - count = sum(map_build_graph(self._artifact, - lambda a: 1 if a.state == UNBUILT else 0)) - + unbuilt = len([a for a in self._artifact.walk() if a.state == UNBUILT]) + total = len([a for _ in self._artifact.walk()]) progress = BuildProgress( self._request['id'], - 'Need to build %d artifacts' % count) + 'Need to build %d artifacts, of %d total' % (unbuilt, total)) self.mainloop.queue_event(BuildController, progress) - if count == 0: + if total == 0: logging.info('There seems to be nothing to build') self.mainloop.queue_event(self, _Built()) diff --git a/distbuild/initiator.py b/distbuild/initiator.py index 7f82827c..549df66b 100644 --- a/distbuild/initiator.py +++ b/distbuild/initiator.py @@ -17,6 +17,7 @@ import cliapp +import itertools import logging import os import random @@ -37,6 +38,20 @@ class _Failed(object): self.msg = msg +def create_build_directory(prefix='build'): + '''Create a new directory to store build logs. + + The directory will be named build-0, unless that directory already exists, + in which case it will be named build-1, and so on. + + ''' + for i in itertools.count(): + path = '%s-%02i' % (prefix, i) + if not os.path.exists(path): + os.mkdir(path) + return path + + class Initiator(distbuild.StateMachine): def __init__(self, cm, conn, app, repo_name, ref, morphology, @@ -49,11 +64,14 @@ class Initiator(distbuild.StateMachine): self._ref = ref self._morphology = morphology self._original_ref = original_ref - self._steps = None self._step_outputs = {} - self._step_output_dir = app.settings['initiator-step-output-dir'] self.debug_transitions = False + if app.settings['initiator-step-output-dir'] == '': + self._step_output_dir = create_build_directory() + else: + self._step_output_dir = app.settings['initiator-step-output-dir'] + def setup(self): distbuild.crash_point() @@ -98,7 +116,6 @@ class Initiator(distbuild.StateMachine): 'build-finished': self._handle_build_finished_message, 'build-failed': self._handle_build_failed_message, 'build-progress': self._handle_build_progress_message, - 'build-steps': self._handle_build_steps_message, 'step-started': self._handle_step_started_message, 'step-already-started': self._handle_step_already_started_message, 'step-output': self._handle_step_output_message, @@ -118,12 +135,6 @@ class Initiator(distbuild.StateMachine): def _handle_build_progress_message(self, msg): self._app.status(msg='Progress: %(msgtext)s', msgtext=msg['message']) - def _handle_build_steps_message(self, msg): - self._steps = msg['steps'] - self._app.status( - msg='Build steps in total: %(steps)d', - steps=len(self._steps)) - def _open_output(self, msg): assert msg['step_name'] not in self._step_outputs if self._step_output_dir: @@ -141,16 +152,17 @@ class Initiator(distbuild.StateMachine): def _get_output(self, msg): return self._step_outputs[msg['step_name']] + def _write_status_to_build_log(self, f, status): + f.write(time.strftime('%Y-%m-%d %H:%M:%S ') + status + '\n') + f.flush() + def _handle_step_already_started_message(self, msg): status = '%s is already building on %s' % ( msg['step_name'], msg['worker_name']) self._app.status(msg=status) self._open_output(msg) - - f = self._get_output(msg) - f.write(time.strftime('%Y-%m-%d %H:%M:%S ') + status + '\n') - f.flush() + self._write_status_to_build_log(self._get_output(msg), status) def _handle_step_started_message(self, msg): status = 'Started building %s on %s' % ( @@ -158,10 +170,7 @@ class Initiator(distbuild.StateMachine): self._app.status(msg=status) self._open_output(msg) - - f = self._get_output(msg) - f.write(time.strftime('%Y-%m-%d %H:%M:%S ') + status + '\n') - f.flush() + self._write_status_to_build_log(self._get_output(msg), status) def _handle_step_output_message(self, msg): step_name = msg['step_name'] @@ -180,9 +189,7 @@ class Initiator(distbuild.StateMachine): status = 'Finished building %s' % step_name self._app.status(msg=status) - f = self._get_output(msg) - f.write(time.strftime('%Y-%m-%d %H:%M:%S ') + status + '\n') - + self._write_status_to_build_log(self._get_output(msg), status) self._close_output(msg) else: logging.warning( @@ -194,9 +201,7 @@ class Initiator(distbuild.StateMachine): status = 'Build of %s failed.' % step_name self._app.status(msg=status) - f = self._get_output(msg) - f.write(time.strftime('%Y-%m-%d %H:%M:%S ') + status + '\n') - + self._write_status_to_build_log(self._get_output(msg), status) self._close_output(msg) else: logging.warning( @@ -227,3 +232,13 @@ class Initiator(distbuild.StateMachine): self.mainloop.queue_event(self._cm, distbuild.StopConnecting()) self._jm.close() + def handle_cancel(self): + # Note in each build-step.log file that the initiator cancelled: this + # makes it easier to tell whether a build was aborted due to a bug or + # dropped connection, or if the user cancelled with CTRL+C / SIGINT. + + for f in self._step_outputs.itervalues(): + self._write_status_to_build_log(f, 'Initiator cancelled') + f.close() + + self._step_outputs = {} diff --git a/distbuild/initiator_connection.py b/distbuild/initiator_connection.py index 86df28f1..8b68fda3 100644 --- a/distbuild/initiator_connection.py +++ b/distbuild/initiator_connection.py @@ -1,6 +1,6 @@ # distbuild/initiator_connection.py -- communicate with initiator # -# Copyright (C) 2012, 2014 - 2015 Codethink Limited +# Copyright (C) 2012, 2014-2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -77,8 +77,6 @@ class InitiatorConnection(distbuild.StateMachine): 'idle', self._send_build_failed_message), ('idle', distbuild.BuildController, distbuild.BuildProgress, 'idle', self._send_build_progress_message), - ('idle', distbuild.BuildController, distbuild.BuildSteps, - 'idle', self._send_build_steps_message), ('idle', distbuild.BuildController, distbuild.BuildStepStarted, 'idle', self._send_build_step_started_message), ('idle', distbuild.BuildController, @@ -180,26 +178,6 @@ class InitiatorConnection(distbuild.StateMachine): self.jm.send(msg) self._log_send(msg) - def _send_build_steps_message(self, event_source, event): - - def make_step_dict(artifact): - return { - 'name': distbuild.build_step_name(artifact), - 'build-depends': [ - distbuild.build_step_name(x) - for x in artifact.source.dependencies - ] - } - - if event.id in self.our_ids: - step_names = distbuild.map_build_graph( - event.artifact, make_step_dict) - msg = distbuild.message('build-steps', - id=self._route_map.get_incoming_id(event.id), - steps=step_names) - self.jm.send(msg) - self._log_send(msg) - def _send_build_step_started_message(self, event_source, event): logging.debug('InitiatorConnection: build_step_started: ' 'id=%s step_name=%s worker_name=%s' % diff --git a/distbuild/mainloop.py b/distbuild/mainloop.py index 97e439f3..f15a3ac1 100644 --- a/distbuild/mainloop.py +++ b/distbuild/mainloop.py @@ -57,8 +57,11 @@ class MainLoop(object): logging.debug('MainLoop.remove_state_machine: %s' % machine) self._machines.remove(machine) + def state_machines_of_type(self, machine_type): + return [m for m in self._machines if isinstance(m, machine_type)] + def n_state_machines_of_type(self, machine_type): - return len([m for m in self._machines if isinstance(m, machine_type)]) + return len(self.state_machines_of_type(machine_type)) def add_event_source(self, event_source): logging.debug('MainLoop.add_event_source: %s' % event_source) diff --git a/distbuild/protocol.py b/distbuild/protocol.py index f2c74819..141df742 100644 --- a/distbuild/protocol.py +++ b/distbuild/protocol.py @@ -1,6 +1,6 @@ # distbuild/protocol.py -- abstractions for the JSON messages # -# Copyright (C) 2012, 2014 - 2015 Codethink Limited +# Copyright (C) 2012, 2014-2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -38,10 +38,6 @@ _required_fields = { 'id', 'message', ], - 'build-steps': [ - 'id', - 'steps', - ], 'step-started': [ 'id', 'step_name', diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 4f7ff98f..81c961e1 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -332,7 +332,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): distbuild.crash_point() who = event.who - last_job = who.job() # the job this worker's just completed + last_job = who.current_job() # the job this worker's just completed if last_job: logging.debug('%s wants new job, just did %s', @@ -395,9 +395,12 @@ class WorkerConnection(distbuild.StateMachine): self._writeable_cache_server = writeable_cache_server self._worker_cache_server_port = worker_cache_server_port self._morph_instance = morph_instance - self._helper_id = None - self._job = None - self._exec_response_msg = None + + self._active_jobs = dict() + self._current_job = None + self._current_job_exec_response = None + self._current_job_cache_request = None + self._debug_json = False addr, port = self._conn.getpeername() @@ -407,8 +410,8 @@ class WorkerConnection(distbuild.StateMachine): def name(self): return self._worker_name - def job(self): - return self._job + def current_job(self): + return self._current_job def setup(self): distbuild.crash_point() @@ -448,31 +451,30 @@ class WorkerConnection(distbuild.StateMachine): def _maybe_cancel(self, event_source, build_cancel): - if build_cancel.id not in self._job.initiators: + if build_cancel.id not in self._current_job.initiators: return # event not relevant logging.debug('WC: BuildController %r requested a cancel', event_source) - if (len(self._job.initiators) == 1): + job = self._current_job + if (len(job.initiators) == 1): logging.debug('WC: Cancelling running job %s ' 'with job id %s running on %s', - self._job.artifact.basename(), - self._job.id, + job.artifact.basename(), job.id, self.name()) - msg = distbuild.message('exec-cancel', id=self._job.id) + msg = distbuild.message('exec-cancel', id=job.id) self._jm.send(msg) self.mainloop.queue_event(self, _BuildCancelled()) else: logging.debug('WC: Not cancelling running job %s with job id %s, ' 'other initiators want it done: %s', - self._job.artifact.basename(), - self._job.id, - [i for i in self._job.initiators - if i != build_cancel.id]) + job.artifact.basename(), + job.id, + [i for i in job.initiators if i != build_cancel.id]) - self._job.initiators.remove(build_cancel.id) + job.initiators.remove(build_cancel.id) def _disconnected(self, event_source, event): distbuild.crash_point() @@ -485,23 +487,30 @@ class WorkerConnection(distbuild.StateMachine): def _start_build(self, event_source, event): distbuild.crash_point() - self._job = event.job - self._helper_id = None - self._exec_response_msg = None + job = event.job + + if job.id in self._active_jobs: + logging.warn('Duplicate job %s for worker %s', job.id, self.name()) + + if self._current_job_exec_response or self._current_job_cache_request: + logging.warn('Caching not finished for %s', self._current_job.id) + + self._active_jobs[job.id] = job + self._current_job = job logging.debug('WC: starting build: %s for %s' % - (self._job.artifact.name, self._job.initiators)) + (job.artifact.name, job.initiators)) argv = [ self._morph_instance, 'worker-build', '--build-log-on-stdout', - self._job.artifact.name, + job.artifact.name, ] msg = distbuild.message('exec-request', - id=self._job.id, + id=job.id, argv=argv, - stdin_contents=distbuild.serialise_artifact(self._job.artifact), + stdin_contents=distbuild.serialise_artifact(job.artifact), ) self._jm.send(msg) @@ -509,10 +518,10 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('WC: sent to worker %s: %r' % (self._worker_name, msg)) - started = WorkerBuildStepStarted(self._job.initiators, - self._job.artifact.source.cache_key, self.name()) + started = WorkerBuildStepStarted(job.initiators, + job.artifact.source.cache_key, self.name()) - self.mainloop.queue_event(WorkerConnection, _JobStarted(self._job)) + self.mainloop.queue_event(WorkerConnection, _JobStarted(job)) self.mainloop.queue_event(WorkerConnection, started) def _handle_json_message(self, event_source, event): @@ -527,37 +536,50 @@ class WorkerConnection(distbuild.StateMachine): 'exec-output': self._handle_exec_output, 'exec-response': self._handle_exec_response, } - + handler = handlers[event.msg['type']] - handler(event.msg) + job = self._active_jobs.get(event.msg['id']) + + if job: + handler(event.msg, job) + else: + logging.warn('Received %s for unknown job %s', + event.msg['type'], event.msg['id']) + + def _handle_exec_output(self, msg, job): + '''Handle output from a job that the worker is or was running.''' - def _handle_exec_output(self, msg): new = dict(msg) - new['ids'] = self._job.initiators + new['ids'] = job.initiators + logging.debug('WC: emitting: %s', repr(new)) self.mainloop.queue_event( WorkerConnection, - WorkerBuildOutput(new, self._job.artifact.source.cache_key)) + WorkerBuildOutput(new, job.artifact.source.cache_key)) - def _handle_exec_response(self, msg): - logging.debug('WC: finished building: %s' % self._job.artifact.name) - logging.debug('initiators that need to know: %s' - % self._job.initiators) + def _handle_exec_response(self, msg, job): + '''Handle completion of a job that the worker is or was running.''' + + logging.debug('WC: finished building: %s' % job.artifact.name) + logging.debug('initiators that need to know: %s' % job.initiators) new = dict(msg) - new['ids'] = self._job.initiators + new['ids'] = job.initiators if new['exit'] != 0: # Build failed. - new_event = WorkerBuildFailed(new, - self._job.artifact.source.cache_key) + new_event = WorkerBuildFailed(new, job.artifact.source.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) - self.mainloop.queue_event(WorkerConnection, _JobFailed(self._job)) + self.mainloop.queue_event(WorkerConnection, _JobFailed(job)) self.mainloop.queue_event(self, _BuildFailed()) else: # Build succeeded. We have more work to do: caching the result. self.mainloop.queue_event(self, _BuildFinished()) - self._exec_response_msg = new + self._current_job_exec_response = new + + # The job is no longer considered active, because the worker is + # finished with it so we won't receive any more messages about it. + del self._active_jobs[job.id] def _request_job(self, event_source, event): distbuild.crash_point() @@ -571,15 +593,16 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('Requesting shared artifact cache to get artifacts') - kind = self._job.artifact.source.morphology['kind'] + job = self._current_job + kind = job.artifact.source.morphology['kind'] if kind == 'chunk': - source_artifacts = self._job.artifact.source.artifacts + source_artifacts = job.artifact.source.artifacts suffixes = ['%s.%s' % (kind, name) for name in source_artifacts] suffixes.append('build-log') else: - filename = '%s.%s' % (kind, self._job.artifact.name) + filename = '%s.%s' % (kind, job.artifact.name) suffixes = [filename] if kind == 'stratum': @@ -595,22 +618,22 @@ class WorkerConnection(distbuild.StateMachine): '/1.0/fetch?host=%s:%d&cacheid=%s&artifacts=%s' % (urllib.quote(worker_host), self._worker_cache_server_port, - urllib.quote(self._job.artifact.source.cache_key), + urllib.quote(job.artifact.source.cache_key), suffixes)) msg = distbuild.message( 'http-request', id=self._request_ids.next(), url=url, method='GET', body=None, headers=None) - self._helper_id = msg['id'] + self._current_job_cache_request = msg['id'] req = distbuild.HelperRequest(msg) self.mainloop.queue_event(distbuild.HelperRouter, req) - progress = WorkerBuildCaching(self._job.initiators, - self._job.artifact.source.cache_key) + progress = WorkerBuildCaching(job.initiators, + job.artifact.source.cache_key) self.mainloop.queue_event(WorkerConnection, progress) def _maybe_handle_helper_result(self, event_source, event): - if event.msg['id'] == self._helper_id: + if event.msg['id'] == self._current_job_cache_request: distbuild.crash_point() logging.debug('caching: event.msg: %s' % repr(event.msg)) @@ -618,8 +641,8 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('Shared artifact cache population done') new_event = WorkerBuildFinished( - self._exec_response_msg, - self._job.artifact.source.cache_key) + self._current_job_exec_response, + self._current_job.artifact.source.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) self.mainloop.queue_event(self, _Cached()) else: @@ -634,13 +657,17 @@ class WorkerConnection(distbuild.StateMachine): # The BuildController will not try to cancel jobs that have # been marked as failed. self.mainloop.queue_event(WorkerConnection, - _JobFailed(self._job)) + _JobFailed(self._current_job)) new_event = WorkerBuildFailed( - self._exec_response_msg, - self._job.artifact.source.cache_key) + self._current_job_exec_response, + self._current_job.artifact.source.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) self.mainloop.queue_event(self, _BuildFailed()) - self.mainloop.queue_event(WorkerConnection, _JobFinished(self._job)) + self.mainloop.queue_event(WorkerConnection, + _JobFinished(self._current_job)) + + self._current_job_exec_response = None + self._current_job_cache_request = None |