diff options
Diffstat (limited to 'distbuild')
-rw-r--r-- | distbuild/build_controller.py | 88 | ||||
-rw-r--r-- | distbuild/initiator.py | 55 | ||||
-rw-r--r-- | distbuild/initiator_connection.py | 36 | ||||
-rw-r--r-- | distbuild/jm.py | 11 | ||||
-rw-r--r-- | distbuild/mainloop.py | 7 | ||||
-rw-r--r-- | distbuild/protocol.py | 10 |
6 files changed, 121 insertions, 86 deletions
diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py index 387b410f..aa11ae8f 100644 --- a/distbuild/build_controller.py +++ b/distbuild/build_controller.py @@ -1,6 +1,6 @@ # distbuild/build_controller.py -- control the steps for one build # -# Copyright (C) 2012, 2014 Codethink Limited +# Copyright (C) 2012, 2014-2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -37,11 +37,6 @@ class _Start(object): pass class _Annotated(object): pass class _Built(object): pass -class _AnnotationFailed(object): - - def __init__(self, http_status_code, error_msg): - self.http_status_code = http_status_code - self.error_msg = error_msg class _GotGraph(object): @@ -49,11 +44,6 @@ class _GotGraph(object): self.artifact = artifact -class _GraphFailed(object): - - pass - - class BuildCancel(object): def __init__(self, id): @@ -192,14 +182,13 @@ class BuildController(distbuild.StateMachine): 'graphing', self._maybe_finish_graph), ('graphing', self, _GotGraph, 'annotating', self._start_annotating), - ('graphing', self, _GraphFailed, None, None), + ('graphing', self, BuildFailed, None, None), ('graphing', self._initiator_connection, distbuild.InitiatorDisconnect, None, None), ('annotating', distbuild.HelperRouter, distbuild.HelperResult, 'annotating', self._maybe_handle_cache_response), - ('annotating', self, _AnnotationFailed, None, - self._notify_annotation_failed), + ('annotating', self, BuildFailed, None, None), ('annotating', self, _Annotated, 'building', self._queue_worker_builds), ('annotating', self._initiator_connection, @@ -244,6 +233,29 @@ class BuildController(distbuild.StateMachine): self.mainloop.queue_event(self, _Start()) + def fail(self, reason): + logging.error(reason) + message = BuildFailed(self._request['id'], reason) + + # The message is sent twice so that it can be matched both by state + # transitions listening for this specific controller instance, and by + # state transitions listening for messages from the BuildController + # class that then filter the message based on the request ID field. + self.mainloop.queue_event(self, message) + self.mainloop.queue_event(BuildController, message) + + def _request_command_execution(self, argv, request_id): + '''Tell the controller's distbuild-helper to run a command.''' + if self.mainloop.n_state_machines_of_type(distbuild.HelperRouter) == 0: + self.fail('No distbuild-helper process running on controller!') + + msg = distbuild.message('exec-request', + id=request_id, + argv=argv, + stdin_contents='') + req = distbuild.HelperRequest(msg) + self.mainloop.queue_event(distbuild.HelperRouter, req) + def _start_graphing(self, event_source, event): distbuild.crash_point() @@ -260,14 +272,10 @@ class BuildController(distbuild.StateMachine): ] if 'original_ref' in self._request: argv.append(self._request['original_ref']) - msg = distbuild.message('exec-request', - id=self._idgen.next(), - argv=argv, - stdin_contents='') - self._helper_id = msg['id'] - req = distbuild.HelperRequest(msg) - self.mainloop.queue_event(distbuild.HelperRouter, req) - + + self._helper_id = self._idgen.next() + self._request_command_execution(argv, self._helper_id) + progress = BuildProgress(self._request['id'], 'Computing build graph') self.mainloop.queue_event(BuildController, progress) @@ -281,16 +289,6 @@ class BuildController(distbuild.StateMachine): def _maybe_finish_graph(self, event_source, event): distbuild.crash_point() - def notify_failure(msg_text): - logging.error('Graph creation failed: %s' % msg_text) - - failed = BuildFailed( - self._request['id'], - 'Failed to compute build graph: %s' % msg_text) - self.mainloop.queue_event(BuildController, failed) - - self.mainloop.queue_event(self, _GraphFailed()) - def notify_success(artifact): logging.debug('Graph is finished') @@ -308,8 +306,7 @@ class BuildController(distbuild.StateMachine): error_text = self._artifact_error.peek() if event.msg['exit'] != 0 or error_text: - notify_failure('Problem with serialise-artifact: %s' - % error_text) + self.fail(error_text) if event.msg['exit'] != 0: return @@ -319,7 +316,7 @@ class BuildController(distbuild.StateMachine): artifact = distbuild.deserialise_artifact(text) except ValueError, e: logging.error(traceback.format_exc()) - notify_failure(str(e)) + self.fail('Failed to compute build graph: %s' % e) return notify_success(artifact) @@ -362,13 +359,11 @@ class BuildController(distbuild.StateMachine): logging.debug('Got cache response: %s' % repr(event.msg)) http_status_code = event.msg['status'] - error_msg = event.msg['body'] if http_status_code != httplib.OK: - logging.debug('Cache request failed with status: %s' - % event.msg['status']) - self.mainloop.queue_event(self, - _AnnotationFailed(http_status_code, error_msg)) + self.fail('Failed to annotate build graph: HTTP request to %s got ' + '%d: %s' % (self._artifact_cache_server, + http_status_code, event.msg['body'])) return cache_state = json.loads(event.msg['body']) @@ -581,14 +576,6 @@ class BuildController(distbuild.StateMachine): self._queue_worker_builds(None, event) - def _notify_annotation_failed(self, event_source, event): - errmsg = ('Failed to annotate build graph: http request got %d: %s' - % (event.http_status_code, event.error_msg)) - - logging.error(errmsg) - failed = BuildFailed(self._request['id'], errmsg) - self.mainloop.queue_event(BuildController, failed) - def _maybe_notify_build_failed(self, event_source, event): distbuild.crash_point() @@ -613,10 +600,7 @@ class BuildController(distbuild.StateMachine): self._request['id'], build_step_name(artifact)) self.mainloop.queue_event(BuildController, step_failed) - build_failed = BuildFailed( - self._request['id'], - 'Building failed for %s' % artifact.name) - self.mainloop.queue_event(BuildController, build_failed) + self.fail('Building failed for %s' % artifact.name) # Cancel any jobs waiting to be executed, since there is no point # running them if this build has failed, it would just waste diff --git a/distbuild/initiator.py b/distbuild/initiator.py index aaae7d62..7f82827c 100644 --- a/distbuild/initiator.py +++ b/distbuild/initiator.py @@ -1,6 +1,6 @@ # distbuild/initiator.py -- state machine for the initiator # -# Copyright (C) 2012, 2014 Codethink Limited +# Copyright (C) 2012, 2014-2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -20,7 +20,7 @@ import cliapp import logging import os import random -import sys +import time import distbuild @@ -83,11 +83,12 @@ class Initiator(distbuild.StateMachine): repo=self._repo_name, ref=self._ref, morphology=self._morphology, - original_ref=self._original_ref + original_ref=self._original_ref, + protocol_version=distbuild.protocol.VERSION ) self._jm.send(msg) logging.debug('Initiator: sent to controller: %s', repr(msg)) - + def _handle_json_message(self, event_source, event): distbuild.crash_point() @@ -137,23 +138,35 @@ class Initiator(distbuild.StateMachine): self._step_outputs[msg['step_name']].close() del self._step_outputs[msg['step_name']] + def _get_output(self, msg): + return self._step_outputs[msg['step_name']] + def _handle_step_already_started_message(self, msg): - self._app.status( - msg='%s is already building on %s' % (msg['step_name'], - msg['worker_name'])) + status = '%s is already building on %s' % ( + msg['step_name'], msg['worker_name']) + self._app.status(msg=status) + self._open_output(msg) + f = self._get_output(msg) + f.write(time.strftime('%Y-%m-%d %H:%M:%S ') + status + '\n') + f.flush() + def _handle_step_started_message(self, msg): - self._app.status( - msg='Started building %(step_name)s on %(worker_name)s', - step_name=msg['step_name'], - worker_name=msg['worker_name']) + status = 'Started building %s on %s' % ( + msg['step_name'], msg['worker_name']) + self._app.status(msg=status) + self._open_output(msg) + f = self._get_output(msg) + f.write(time.strftime('%Y-%m-%d %H:%M:%S ') + status + '\n') + f.flush() + def _handle_step_output_message(self, msg): step_name = msg['step_name'] if step_name in self._step_outputs: - f = self._step_outputs[step_name] + f = self._get_output(msg) f.write(msg['stdout']) f.write(msg['stderr']) f.flush() @@ -164,9 +177,12 @@ class Initiator(distbuild.StateMachine): def _handle_step_finished_message(self, msg): step_name = msg['step_name'] if step_name in self._step_outputs: - self._app.status( - msg='Finished building %(step_name)s', - step_name=step_name) + status = 'Finished building %s' % step_name + self._app.status(msg=status) + + f = self._get_output(msg) + f.write(time.strftime('%Y-%m-%d %H:%M:%S ') + status + '\n') + self._close_output(msg) else: logging.warning( @@ -175,9 +191,12 @@ class Initiator(distbuild.StateMachine): def _handle_step_failed_message(self, msg): step_name = msg['step_name'] if step_name in self._step_outputs: - self._app.status( - msg='Build failed: %(step_name)s', - step_name=step_name) + status = 'Build of %s failed.' % step_name + self._app.status(msg=status) + + f = self._get_output(msg) + f.write(time.strftime('%Y-%m-%d %H:%M:%S ') + status + '\n') + self._close_output(msg) else: logging.warning( diff --git a/distbuild/initiator_connection.py b/distbuild/initiator_connection.py index db982230..86df28f1 100644 --- a/distbuild/initiator_connection.py +++ b/distbuild/initiator_connection.py @@ -1,6 +1,6 @@ # distbuild/initiator_connection.py -- communicate with initiator # -# Copyright (C) 2012, 2014 Codethink Limited +# Copyright (C) 2012, 2014 - 2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -100,15 +100,31 @@ class InitiatorConnection(distbuild.StateMachine): logging.debug('InitiatorConnection: from %s: %r', self.initiator_name, event.msg) - if event.msg['type'] == 'build-request': - new_id = self._idgen.next() - self.our_ids.add(new_id) - self._route_map.add(event.msg['id'], new_id) - event.msg['id'] = new_id - build_controller = distbuild.BuildController( - self, event.msg, self.artifact_cache_server, - self.morph_instance) - self.mainloop.add_state_machine(build_controller) + try: + if event.msg['type'] == 'build-request': + if (event.msg.get('protocol_version') != + distbuild.protocol.VERSION): + msg = distbuild.message('build-failed', + id=event.msg['id'], + reason=('Protocol version mismatch between server & ' + 'initiator: distbuild network uses distbuild ' + 'protocol version %i, but client uses version' + ' %i.', distbuild.protocol.VERSION, + event.msg.get('protocol_version'))) + self.jm.send(msg) + self._log_send(msg) + return + new_id = self._idgen.next() + self.our_ids.add(new_id) + self._route_map.add(event.msg['id'], new_id) + event.msg['id'] = new_id + build_controller = distbuild.BuildController( + self, event.msg, self.artifact_cache_server, + self.morph_instance) + self.mainloop.add_state_machine(build_controller) + except (KeyError, ValueError) as ex: + logging.error('Invalid message from initiator: %s: exception %s', + event.msg, ex) def _disconnect(self, event_source, event): for id in self.our_ids: diff --git a/distbuild/jm.py b/distbuild/jm.py index 615100e4..85510924 100644 --- a/distbuild/jm.py +++ b/distbuild/jm.py @@ -1,6 +1,6 @@ # mainloop/jm.py -- state machine for JSON communication between nodes # -# Copyright (C) 2012, 2014 Codethink Limited +# Copyright (C) 2012, 2014 - 2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -109,8 +109,13 @@ class JsonMachine(StateMachine): line = line.rstrip() if self.debug_json: logging.debug('JsonMachine: line: %s' % repr(line)) - msg = yaml.load(json.loads(line)) - self.mainloop.queue_event(self, JsonNewMessage(msg)) + msg = None + try: + msg = yaml.safe_load(json.loads(line)) + except Exception: + logging.error('Invalid input: %s' % line) + if msg: + self.mainloop.queue_event(self, JsonNewMessage(msg)) def _send_eof(self, event_source, event): self.mainloop.queue_event(self, JsonEof()) diff --git a/distbuild/mainloop.py b/distbuild/mainloop.py index f0e5eebc..97e439f3 100644 --- a/distbuild/mainloop.py +++ b/distbuild/mainloop.py @@ -1,6 +1,6 @@ # mainloop/mainloop.py -- select-based main loop # -# Copyright (C) 2012, 2014 Codethink Limited +# Copyright (C) 2012, 2014-2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -56,7 +56,10 @@ class MainLoop(object): def remove_state_machine(self, machine): logging.debug('MainLoop.remove_state_machine: %s' % machine) self._machines.remove(machine) - + + def n_state_machines_of_type(self, machine_type): + return len([m for m in self._machines if isinstance(m, machine_type)]) + def add_event_source(self, event_source): logging.debug('MainLoop.add_event_source: %s' % event_source) self._sources.append(event_source) diff --git a/distbuild/protocol.py b/distbuild/protocol.py index ffce1fe7..f2c74819 100644 --- a/distbuild/protocol.py +++ b/distbuild/protocol.py @@ -1,6 +1,6 @@ # distbuild/protocol.py -- abstractions for the JSON messages # -# Copyright (C) 2012, 2014 Codethink Limited +# Copyright (C) 2012, 2014 - 2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -19,12 +19,20 @@ '''Construct protocol message objects (dicts).''' +# Version refers to an integer that should be incremented by one each time a +# time a change is introduced that would break server/initiator compatibility + + +VERSION = 1 + + _required_fields = { 'build-request': [ 'id', 'repo', 'ref', 'morphology', + 'protocol_version', ], 'build-progress': [ 'id', |