summaryrefslogtreecommitdiff
path: root/distbuild
diff options
context:
space:
mode:
Diffstat (limited to 'distbuild')
-rw-r--r--distbuild/build_controller.py88
-rw-r--r--distbuild/initiator.py55
-rw-r--r--distbuild/initiator_connection.py36
-rw-r--r--distbuild/jm.py11
-rw-r--r--distbuild/mainloop.py7
-rw-r--r--distbuild/protocol.py10
6 files changed, 121 insertions, 86 deletions
diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py
index 387b410f..aa11ae8f 100644
--- a/distbuild/build_controller.py
+++ b/distbuild/build_controller.py
@@ -1,6 +1,6 @@
# distbuild/build_controller.py -- control the steps for one build
#
-# Copyright (C) 2012, 2014 Codethink Limited
+# Copyright (C) 2012, 2014-2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -37,11 +37,6 @@ class _Start(object): pass
class _Annotated(object): pass
class _Built(object): pass
-class _AnnotationFailed(object):
-
- def __init__(self, http_status_code, error_msg):
- self.http_status_code = http_status_code
- self.error_msg = error_msg
class _GotGraph(object):
@@ -49,11 +44,6 @@ class _GotGraph(object):
self.artifact = artifact
-class _GraphFailed(object):
-
- pass
-
-
class BuildCancel(object):
def __init__(self, id):
@@ -192,14 +182,13 @@ class BuildController(distbuild.StateMachine):
'graphing', self._maybe_finish_graph),
('graphing', self, _GotGraph,
'annotating', self._start_annotating),
- ('graphing', self, _GraphFailed, None, None),
+ ('graphing', self, BuildFailed, None, None),
('graphing', self._initiator_connection,
distbuild.InitiatorDisconnect, None, None),
('annotating', distbuild.HelperRouter, distbuild.HelperResult,
'annotating', self._maybe_handle_cache_response),
- ('annotating', self, _AnnotationFailed, None,
- self._notify_annotation_failed),
+ ('annotating', self, BuildFailed, None, None),
('annotating', self, _Annotated, 'building',
self._queue_worker_builds),
('annotating', self._initiator_connection,
@@ -244,6 +233,29 @@ class BuildController(distbuild.StateMachine):
self.mainloop.queue_event(self, _Start())
+ def fail(self, reason):
+ logging.error(reason)
+ message = BuildFailed(self._request['id'], reason)
+
+ # The message is sent twice so that it can be matched both by state
+ # transitions listening for this specific controller instance, and by
+ # state transitions listening for messages from the BuildController
+ # class that then filter the message based on the request ID field.
+ self.mainloop.queue_event(self, message)
+ self.mainloop.queue_event(BuildController, message)
+
+ def _request_command_execution(self, argv, request_id):
+ '''Tell the controller's distbuild-helper to run a command.'''
+ if self.mainloop.n_state_machines_of_type(distbuild.HelperRouter) == 0:
+ self.fail('No distbuild-helper process running on controller!')
+
+ msg = distbuild.message('exec-request',
+ id=request_id,
+ argv=argv,
+ stdin_contents='')
+ req = distbuild.HelperRequest(msg)
+ self.mainloop.queue_event(distbuild.HelperRouter, req)
+
def _start_graphing(self, event_source, event):
distbuild.crash_point()
@@ -260,14 +272,10 @@ class BuildController(distbuild.StateMachine):
]
if 'original_ref' in self._request:
argv.append(self._request['original_ref'])
- msg = distbuild.message('exec-request',
- id=self._idgen.next(),
- argv=argv,
- stdin_contents='')
- self._helper_id = msg['id']
- req = distbuild.HelperRequest(msg)
- self.mainloop.queue_event(distbuild.HelperRouter, req)
-
+
+ self._helper_id = self._idgen.next()
+ self._request_command_execution(argv, self._helper_id)
+
progress = BuildProgress(self._request['id'], 'Computing build graph')
self.mainloop.queue_event(BuildController, progress)
@@ -281,16 +289,6 @@ class BuildController(distbuild.StateMachine):
def _maybe_finish_graph(self, event_source, event):
distbuild.crash_point()
- def notify_failure(msg_text):
- logging.error('Graph creation failed: %s' % msg_text)
-
- failed = BuildFailed(
- self._request['id'],
- 'Failed to compute build graph: %s' % msg_text)
- self.mainloop.queue_event(BuildController, failed)
-
- self.mainloop.queue_event(self, _GraphFailed())
-
def notify_success(artifact):
logging.debug('Graph is finished')
@@ -308,8 +306,7 @@ class BuildController(distbuild.StateMachine):
error_text = self._artifact_error.peek()
if event.msg['exit'] != 0 or error_text:
- notify_failure('Problem with serialise-artifact: %s'
- % error_text)
+ self.fail(error_text)
if event.msg['exit'] != 0:
return
@@ -319,7 +316,7 @@ class BuildController(distbuild.StateMachine):
artifact = distbuild.deserialise_artifact(text)
except ValueError, e:
logging.error(traceback.format_exc())
- notify_failure(str(e))
+ self.fail('Failed to compute build graph: %s' % e)
return
notify_success(artifact)
@@ -362,13 +359,11 @@ class BuildController(distbuild.StateMachine):
logging.debug('Got cache response: %s' % repr(event.msg))
http_status_code = event.msg['status']
- error_msg = event.msg['body']
if http_status_code != httplib.OK:
- logging.debug('Cache request failed with status: %s'
- % event.msg['status'])
- self.mainloop.queue_event(self,
- _AnnotationFailed(http_status_code, error_msg))
+ self.fail('Failed to annotate build graph: HTTP request to %s got '
+ '%d: %s' % (self._artifact_cache_server,
+ http_status_code, event.msg['body']))
return
cache_state = json.loads(event.msg['body'])
@@ -581,14 +576,6 @@ class BuildController(distbuild.StateMachine):
self._queue_worker_builds(None, event)
- def _notify_annotation_failed(self, event_source, event):
- errmsg = ('Failed to annotate build graph: http request got %d: %s'
- % (event.http_status_code, event.error_msg))
-
- logging.error(errmsg)
- failed = BuildFailed(self._request['id'], errmsg)
- self.mainloop.queue_event(BuildController, failed)
-
def _maybe_notify_build_failed(self, event_source, event):
distbuild.crash_point()
@@ -613,10 +600,7 @@ class BuildController(distbuild.StateMachine):
self._request['id'], build_step_name(artifact))
self.mainloop.queue_event(BuildController, step_failed)
- build_failed = BuildFailed(
- self._request['id'],
- 'Building failed for %s' % artifact.name)
- self.mainloop.queue_event(BuildController, build_failed)
+ self.fail('Building failed for %s' % artifact.name)
# Cancel any jobs waiting to be executed, since there is no point
# running them if this build has failed, it would just waste
diff --git a/distbuild/initiator.py b/distbuild/initiator.py
index aaae7d62..7f82827c 100644
--- a/distbuild/initiator.py
+++ b/distbuild/initiator.py
@@ -1,6 +1,6 @@
# distbuild/initiator.py -- state machine for the initiator
#
-# Copyright (C) 2012, 2014 Codethink Limited
+# Copyright (C) 2012, 2014-2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -20,7 +20,7 @@ import cliapp
import logging
import os
import random
-import sys
+import time
import distbuild
@@ -83,11 +83,12 @@ class Initiator(distbuild.StateMachine):
repo=self._repo_name,
ref=self._ref,
morphology=self._morphology,
- original_ref=self._original_ref
+ original_ref=self._original_ref,
+ protocol_version=distbuild.protocol.VERSION
)
self._jm.send(msg)
logging.debug('Initiator: sent to controller: %s', repr(msg))
-
+
def _handle_json_message(self, event_source, event):
distbuild.crash_point()
@@ -137,23 +138,35 @@ class Initiator(distbuild.StateMachine):
self._step_outputs[msg['step_name']].close()
del self._step_outputs[msg['step_name']]
+ def _get_output(self, msg):
+ return self._step_outputs[msg['step_name']]
+
def _handle_step_already_started_message(self, msg):
- self._app.status(
- msg='%s is already building on %s' % (msg['step_name'],
- msg['worker_name']))
+ status = '%s is already building on %s' % (
+ msg['step_name'], msg['worker_name'])
+ self._app.status(msg=status)
+
self._open_output(msg)
+ f = self._get_output(msg)
+ f.write(time.strftime('%Y-%m-%d %H:%M:%S ') + status + '\n')
+ f.flush()
+
def _handle_step_started_message(self, msg):
- self._app.status(
- msg='Started building %(step_name)s on %(worker_name)s',
- step_name=msg['step_name'],
- worker_name=msg['worker_name'])
+ status = 'Started building %s on %s' % (
+ msg['step_name'], msg['worker_name'])
+ self._app.status(msg=status)
+
self._open_output(msg)
+ f = self._get_output(msg)
+ f.write(time.strftime('%Y-%m-%d %H:%M:%S ') + status + '\n')
+ f.flush()
+
def _handle_step_output_message(self, msg):
step_name = msg['step_name']
if step_name in self._step_outputs:
- f = self._step_outputs[step_name]
+ f = self._get_output(msg)
f.write(msg['stdout'])
f.write(msg['stderr'])
f.flush()
@@ -164,9 +177,12 @@ class Initiator(distbuild.StateMachine):
def _handle_step_finished_message(self, msg):
step_name = msg['step_name']
if step_name in self._step_outputs:
- self._app.status(
- msg='Finished building %(step_name)s',
- step_name=step_name)
+ status = 'Finished building %s' % step_name
+ self._app.status(msg=status)
+
+ f = self._get_output(msg)
+ f.write(time.strftime('%Y-%m-%d %H:%M:%S ') + status + '\n')
+
self._close_output(msg)
else:
logging.warning(
@@ -175,9 +191,12 @@ class Initiator(distbuild.StateMachine):
def _handle_step_failed_message(self, msg):
step_name = msg['step_name']
if step_name in self._step_outputs:
- self._app.status(
- msg='Build failed: %(step_name)s',
- step_name=step_name)
+ status = 'Build of %s failed.' % step_name
+ self._app.status(msg=status)
+
+ f = self._get_output(msg)
+ f.write(time.strftime('%Y-%m-%d %H:%M:%S ') + status + '\n')
+
self._close_output(msg)
else:
logging.warning(
diff --git a/distbuild/initiator_connection.py b/distbuild/initiator_connection.py
index db982230..86df28f1 100644
--- a/distbuild/initiator_connection.py
+++ b/distbuild/initiator_connection.py
@@ -1,6 +1,6 @@
# distbuild/initiator_connection.py -- communicate with initiator
#
-# Copyright (C) 2012, 2014 Codethink Limited
+# Copyright (C) 2012, 2014 - 2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -100,15 +100,31 @@ class InitiatorConnection(distbuild.StateMachine):
logging.debug('InitiatorConnection: from %s: %r', self.initiator_name,
event.msg)
- if event.msg['type'] == 'build-request':
- new_id = self._idgen.next()
- self.our_ids.add(new_id)
- self._route_map.add(event.msg['id'], new_id)
- event.msg['id'] = new_id
- build_controller = distbuild.BuildController(
- self, event.msg, self.artifact_cache_server,
- self.morph_instance)
- self.mainloop.add_state_machine(build_controller)
+ try:
+ if event.msg['type'] == 'build-request':
+ if (event.msg.get('protocol_version') !=
+ distbuild.protocol.VERSION):
+ msg = distbuild.message('build-failed',
+ id=event.msg['id'],
+ reason=('Protocol version mismatch between server & '
+ 'initiator: distbuild network uses distbuild '
+ 'protocol version %i, but client uses version'
+ ' %i.', distbuild.protocol.VERSION,
+ event.msg.get('protocol_version')))
+ self.jm.send(msg)
+ self._log_send(msg)
+ return
+ new_id = self._idgen.next()
+ self.our_ids.add(new_id)
+ self._route_map.add(event.msg['id'], new_id)
+ event.msg['id'] = new_id
+ build_controller = distbuild.BuildController(
+ self, event.msg, self.artifact_cache_server,
+ self.morph_instance)
+ self.mainloop.add_state_machine(build_controller)
+ except (KeyError, ValueError) as ex:
+ logging.error('Invalid message from initiator: %s: exception %s',
+ event.msg, ex)
def _disconnect(self, event_source, event):
for id in self.our_ids:
diff --git a/distbuild/jm.py b/distbuild/jm.py
index 615100e4..85510924 100644
--- a/distbuild/jm.py
+++ b/distbuild/jm.py
@@ -1,6 +1,6 @@
# mainloop/jm.py -- state machine for JSON communication between nodes
#
-# Copyright (C) 2012, 2014 Codethink Limited
+# Copyright (C) 2012, 2014 - 2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -109,8 +109,13 @@ class JsonMachine(StateMachine):
line = line.rstrip()
if self.debug_json:
logging.debug('JsonMachine: line: %s' % repr(line))
- msg = yaml.load(json.loads(line))
- self.mainloop.queue_event(self, JsonNewMessage(msg))
+ msg = None
+ try:
+ msg = yaml.safe_load(json.loads(line))
+ except Exception:
+ logging.error('Invalid input: %s' % line)
+ if msg:
+ self.mainloop.queue_event(self, JsonNewMessage(msg))
def _send_eof(self, event_source, event):
self.mainloop.queue_event(self, JsonEof())
diff --git a/distbuild/mainloop.py b/distbuild/mainloop.py
index f0e5eebc..97e439f3 100644
--- a/distbuild/mainloop.py
+++ b/distbuild/mainloop.py
@@ -1,6 +1,6 @@
# mainloop/mainloop.py -- select-based main loop
#
-# Copyright (C) 2012, 2014 Codethink Limited
+# Copyright (C) 2012, 2014-2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -56,7 +56,10 @@ class MainLoop(object):
def remove_state_machine(self, machine):
logging.debug('MainLoop.remove_state_machine: %s' % machine)
self._machines.remove(machine)
-
+
+ def n_state_machines_of_type(self, machine_type):
+ return len([m for m in self._machines if isinstance(m, machine_type)])
+
def add_event_source(self, event_source):
logging.debug('MainLoop.add_event_source: %s' % event_source)
self._sources.append(event_source)
diff --git a/distbuild/protocol.py b/distbuild/protocol.py
index ffce1fe7..f2c74819 100644
--- a/distbuild/protocol.py
+++ b/distbuild/protocol.py
@@ -1,6 +1,6 @@
# distbuild/protocol.py -- abstractions for the JSON messages
#
-# Copyright (C) 2012, 2014 Codethink Limited
+# Copyright (C) 2012, 2014 - 2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -19,12 +19,20 @@
'''Construct protocol message objects (dicts).'''
+# Version refers to an integer that should be incremented by one each time a
+# time a change is introduced that would break server/initiator compatibility
+
+
+VERSION = 1
+
+
_required_fields = {
'build-request': [
'id',
'repo',
'ref',
'morphology',
+ 'protocol_version',
],
'build-progress': [
'id',