summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Thursfield <sam.thursfield@codethink.co.uk>2014-04-14 11:20:02 +0300
committerSam Thursfield <sam.thursfield@codethink.co.uk>2014-04-14 11:20:02 +0300
commit0baae51e217f4999c1f068c458527d598284ddec (patch)
tree6b13cf44af03a1870fb730a3fa4825fc552df397
parent63f7346990802f01ef76cba75495f6f7d1409028 (diff)
parentf05022d39b530069a75fbecfd744bca66cc0ce66 (diff)
downloadmorph-0baae51e217f4999c1f068c458527d598284ddec.tar.gz
Merge branch 'sam/distbuild-concurrency-fixes'
Reviewed-By: Lars Wirzenius <lars.wirzenius@codethink.co.uk>
-rw-r--r--distbuild/build_controller.py121
-rw-r--r--distbuild/connection_machine.py1
-rw-r--r--distbuild/helper_router.py1
-rw-r--r--distbuild/initiator.py1
-rw-r--r--distbuild/initiator_connection.py12
-rw-r--r--distbuild/jm.py1
-rw-r--r--distbuild/json_router.py1
-rw-r--r--distbuild/sockbuf.py1
-rw-r--r--distbuild/sockserv.py1
-rw-r--r--distbuild/worker_build_scheduler.py6
-rw-r--r--morphlib/plugins/distbuild_plugin.py1
11 files changed, 87 insertions, 60 deletions
diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py
index ce8fced5..404bcf9f 100644
--- a/distbuild/build_controller.py
+++ b/distbuild/build_controller.py
@@ -150,10 +150,11 @@ class BuildController(distbuild.StateMachine):
_idgen = distbuild.IdentifierGenerator('BuildController')
- def __init__(self, build_request_message, artifact_cache_server,
- morph_instance):
+ def __init__(self, initiator_connection, build_request_message,
+ artifact_cache_server, morph_instance):
distbuild.crash_point()
distbuild.StateMachine.__init__(self, 'init')
+ self._initiator_connection = initiator_connection
self._request = build_request_message
self._artifact_cache_server = artifact_cache_server
self._morph_instance = morph_instance
@@ -168,58 +169,60 @@ class BuildController(distbuild.StateMachine):
distbuild.crash_point()
spec = [
+ # state, source, event_class, new_state, callback
('init', self, _Start, 'graphing', self._start_graphing),
- ('init', distbuild.InitiatorConnection,
- distbuild.InitiatorDisconnect, 'init', self._maybe_abort),
- ('init', self, _Abort, None, None),
-
+ ('init', self._initiator_connection,
+ distbuild.InitiatorDisconnect, None, None),
+
('graphing', distbuild.HelperRouter, distbuild.HelperOutput,
- 'graphing', self._collect_graph),
+ 'graphing', self._maybe_collect_graph),
('graphing', distbuild.HelperRouter, distbuild.HelperResult,
- 'graphing', self._finish_graph),
+ 'graphing', self._maybe_finish_graph),
('graphing', self, _GotGraph,
'annotating', self._start_annotating),
('graphing', self, _GraphFailed, None, None),
- ('graphing', distbuild.InitiatorConnection,
- distbuild.InitiatorDisconnect, None,
- self._maybe_abort),
-
+ ('graphing', self._initiator_connection,
+ distbuild.InitiatorDisconnect, None, None),
+
('annotating', distbuild.HelperRouter, distbuild.HelperResult,
- 'annotating', self._handle_cache_response),
- ('annotating', self, _Annotated, 'building',
+ 'annotating', self._maybe_handle_cache_response),
+ ('annotating', self, _Annotated, 'building',
self._queue_worker_builds),
- ('annotating', distbuild.InitiatorConnection,
- distbuild.InitiatorDisconnect, None,
- self._maybe_abort),
-
- ('building', distbuild.WorkerConnection,
- distbuild.WorkerBuildStepStarted, 'building',
- self._relay_build_step_started),
- ('building', distbuild.WorkerConnection,
- distbuild.WorkerBuildOutput, 'building',
- self._relay_build_output),
- ('building', distbuild.WorkerConnection,
- distbuild.WorkerBuildCaching, 'building',
- self._relay_build_caching),
- ('building', distbuild.WorkerConnection,
- distbuild.WorkerBuildFinished, 'building',
- self._check_result_and_queue_more_builds),
- ('building', distbuild.WorkerConnection,
- distbuild.WorkerBuildFailed, None,
- self._notify_build_failed),
+ ('annotating', self._initiator_connection,
+ distbuild.InitiatorDisconnect, None, None),
+
+ # The exact WorkerConnection that is doing our building changes
+ # from build to build. We must listen to all messages from all
+ # workers, and choose whether to change state inside the callback.
+ # (An alternative would be to manage a set of temporary transitions
+ # specific to WorkerConnection instances that our currently
+ # building for us, but the state machines are not intended to
+ # behave that way).
+ ('building', distbuild.WorkerConnection,
+ distbuild.WorkerBuildStepStarted, 'building',
+ self._maybe_relay_build_step_started),
+ ('building', distbuild.WorkerConnection,
+ distbuild.WorkerBuildOutput, 'building',
+ self._maybe_relay_build_output),
+ ('building', distbuild.WorkerConnection,
+ distbuild.WorkerBuildCaching, 'building',
+ self._maybe_relay_build_caching),
+ ('building', distbuild.WorkerConnection,
+ distbuild.WorkerBuildFinished, 'building',
+ self._maybe_check_result_and_queue_more_builds),
+ ('building', distbuild.WorkerConnection,
+ distbuild.WorkerBuildFailed, 'building',
+ self._maybe_notify_build_failed),
+ ('building', self, _Abort, None, None),
('building', self, _Built, None, self._notify_build_done),
- ('building', distbuild.InitiatorConnection,
- distbuild.InitiatorDisconnect, 'building',
+ ('building', self._initiator_connection,
+ distbuild.InitiatorDisconnect, None,
self._notify_initiator_disconnected),
]
self.add_transitions(spec)
self.mainloop.queue_event(self, _Start())
- def _maybe_abort(self, event_source, event):
- if event.id == self._request['id']:
- self.mainloop.queue_event(self, _Abort())
-
def _start_graphing(self, event_source, event):
distbuild.crash_point()
@@ -245,14 +248,14 @@ class BuildController(distbuild.StateMachine):
progress = BuildProgress(self._request['id'], 'Computing build graph')
self.mainloop.queue_event(BuildController, progress)
- def _collect_graph(self, event_source, event):
+ def _maybe_collect_graph(self, event_source, event):
distbuild.crash_point()
if event.msg['id'] == self._helper_id:
self._artifact_data.add(event.msg['stdout'])
self._artifact_error.add(event.msg['stderr'])
- def _finish_graph(self, event_source, event):
+ def _maybe_finish_graph(self, event_source, event):
distbuild.crash_point()
def notify_failure(msg_text):
@@ -325,7 +328,7 @@ class BuildController(distbuild.StateMachine):
'Queued as %s query whether %s is in cache' %
(msg['id'], filename))
- def _handle_cache_response(self, event_source, event):
+ def _maybe_handle_cache_response(self, event_source, event):
distbuild.crash_point()
logging.debug('Got cache query response: %s' % repr(event.msg))
@@ -418,11 +421,12 @@ class BuildController(distbuild.StateMachine):
def _notify_initiator_disconnected(self, event_source, disconnect):
- if disconnect.id == self._request['id']:
- cancel = BuildCancel(disconnect.id)
- self.mainloop.queue_event(BuildController, cancel)
+ logging.debug("BuildController %r: initiator id %s disconnected", self,
+ disconnect.id)
+ cancel = BuildCancel(disconnect.id)
+ self.mainloop.queue_event(BuildController, cancel)
- def _relay_build_step_started(self, event_source, event):
+ def _maybe_relay_build_step_started(self, event_source, event):
distbuild.crash_point()
if event.initiator_id != self._request['id']:
return # not for us
@@ -440,7 +444,7 @@ class BuildController(distbuild.StateMachine):
self.mainloop.queue_event(BuildController, started)
logging.debug('BC: emitted %s' % repr(started))
- def _relay_build_output(self, event_source, event):
+ def _maybe_relay_build_output(self, event_source, event):
distbuild.crash_point()
if event.msg['id'] != self._request['id']:
return # not for us
@@ -458,7 +462,7 @@ class BuildController(distbuild.StateMachine):
self.mainloop.queue_event(BuildController, output)
logging.debug('BC: queued %s' % repr(output))
- def _relay_build_caching(self, event_source, event):
+ def _maybe_relay_build_caching(self, event_source, event):
distbuild.crash_point()
if event.initiator_id != self._request['id']:
return # not for us
@@ -481,7 +485,7 @@ class BuildController(distbuild.StateMachine):
else:
return None
- def _check_result_and_queue_more_builds(self, event_source, event):
+ def _maybe_check_result_and_queue_more_builds(self, event_source, event):
distbuild.crash_point()
if event.msg['id'] != self._request['id']:
return # not for us
@@ -513,17 +517,24 @@ class BuildController(distbuild.StateMachine):
self._queue_worker_builds(None, event)
- def _notify_build_failed(self, event_source, event):
+ def _maybe_notify_build_failed(self, event_source, event):
distbuild.crash_point()
+
if event.msg['id'] != self._request['id']:
- return # not for us
+ return
artifact = self._find_artifact(event.artifact_cache_key)
+
if artifact is None:
- # This is not the event you are looking for.
- return
+ logging.error(
+ 'BuildController %r: artifact %s is not in our build graph!',
+ self, artifact)
+ # We abort the build in this case on the grounds that something is
+ # very wrong internally, and it's best for the initiator to receive
+ # an error than to be left hanging.
+ self.mainloop.queue_event(self, _Abort())
- logging.error(
+ logging.info(
'Build step failed for %s: %s', artifact.name, repr(event.msg))
step_failed = BuildStepFailed(
@@ -548,6 +559,8 @@ class BuildController(distbuild.StateMachine):
cancel = BuildCancel(self._request['id'])
self.mainloop.queue_event(BuildController, cancel)
+ self.mainloop.queue_event(self, _Abort())
+
def _notify_build_done(self, event_source, event):
distbuild.crash_point()
diff --git a/distbuild/connection_machine.py b/distbuild/connection_machine.py
index 2f768f0b..648ce35a 100644
--- a/distbuild/connection_machine.py
+++ b/distbuild/connection_machine.py
@@ -81,6 +81,7 @@ class ConnectionMachine(distbuild.StateMachine):
self.mainloop.add_event_source(self._timer)
spec = [
+ # state, source, event_class, new_state, callback
('connecting', self._sock_proxy, distbuild.SocketWriteable,
'connected', self._connect),
('connecting', self, StopConnecting, None, self._stop),
diff --git a/distbuild/helper_router.py b/distbuild/helper_router.py
index 752a5fdb..1f0ce45b 100644
--- a/distbuild/helper_router.py
+++ b/distbuild/helper_router.py
@@ -82,6 +82,7 @@ class HelperRouter(distbuild.StateMachine):
self.mainloop.add_state_machine(jm)
spec = [
+ # state, source, event_class, new_state, callback
('idle', HelperRouter, HelperRequest, 'idle',
self._handle_request),
('idle', jm, distbuild.JsonNewMessage, 'idle', self._helper_msg),
diff --git a/distbuild/initiator.py b/distbuild/initiator.py
index 069578d2..6e4ca65a 100644
--- a/distbuild/initiator.py
+++ b/distbuild/initiator.py
@@ -58,6 +58,7 @@ class Initiator(distbuild.StateMachine):
logging.debug('initiator: _jm=%s' % repr(self._jm))
spec = [
+ # state, source, event_class, new_state, callback
('waiting', self._jm, distbuild.JsonEof, None, self._terminate),
('waiting', self._jm, distbuild.JsonNewMessage, 'waiting',
self._handle_json_message),
diff --git a/distbuild/initiator_connection.py b/distbuild/initiator_connection.py
index d48d4698..e20e6d98 100644
--- a/distbuild/initiator_connection.py
+++ b/distbuild/initiator_connection.py
@@ -35,9 +35,12 @@ class _Close(object):
class InitiatorConnection(distbuild.StateMachine):
- '''Communicate with the initiator.
-
- This state machine communicates with the initiator, relaying and
+ '''Communicate with a single initiator.
+
+ When a developer runs 'morph distbuild' and connects to the controller,
+ the ListenServer object on the controller creates an InitiatorConnection.
+
+ This state machine communicates with the build initiator, relaying and
translating messages from the initiator to the rest of the controller's
state machines, and vice versa.
@@ -64,6 +67,7 @@ class InitiatorConnection(distbuild.StateMachine):
self.our_ids = set()
spec = [
+ # state, source, event_class, new_state, callback
('idle', self.jm, distbuild.JsonNewMessage, 'idle',
self._handle_msg),
('idle', self.jm, distbuild.JsonEof, 'closing', self._disconnect),
@@ -99,7 +103,7 @@ class InitiatorConnection(distbuild.StateMachine):
self._route_map.add(event.msg['id'], new_id)
event.msg['id'] = new_id
build_controller = distbuild.BuildController(
- event.msg, self.artifact_cache_server, self.morph_instance)
+ self, event.msg, self.artifact_cache_server, self.morph_instance)
self.mainloop.add_state_machine(build_controller)
def _disconnect(self, event_source, event):
diff --git a/distbuild/jm.py b/distbuild/jm.py
index bb86adc4..a4e366a7 100644
--- a/distbuild/jm.py
+++ b/distbuild/jm.py
@@ -56,6 +56,7 @@ class JsonMachine(StateMachine):
self.receive_buf = StringBuffer()
spec = [
+ # state, source, event_class, new_state, callback
('rw', sockbuf, SocketBufferNewData, 'rw', self._parse),
('rw', sockbuf, SocketBufferEof, 'w', self._send_eof),
('rw', self, _Close2, None, self._really_close),
diff --git a/distbuild/json_router.py b/distbuild/json_router.py
index bf272174..93533b2e 100644
--- a/distbuild/json_router.py
+++ b/distbuild/json_router.py
@@ -52,6 +52,7 @@ class JsonRouter(distbuild.StateMachine):
self.mainloop.add_state_machine(jm)
spec = [
+ # state, source, event_class, new_state, callback
('idle', jm, distbuild.JsonNewMessage, 'idle', self.bloop),
('idle', jm, distbuild.JsonEof, None, self.close),
]
diff --git a/distbuild/sockbuf.py b/distbuild/sockbuf.py
index 6803bfb5..346706db 100644
--- a/distbuild/sockbuf.py
+++ b/distbuild/sockbuf.py
@@ -89,6 +89,7 @@ class SocketBuffer(StateMachine):
self._wbuf = StringBuffer()
spec = [
+ # state, source, event_class, new_state, callback
('reading', src, SocketReadable, 'reading', self._fill),
('reading', self, _WriteBufferNotEmpty, 'rw',
self._start_writing),
diff --git a/distbuild/sockserv.py b/distbuild/sockserv.py
index dc313d06..124d29b9 100644
--- a/distbuild/sockserv.py
+++ b/distbuild/sockserv.py
@@ -26,6 +26,7 @@ class ListenServer(StateMachine):
self.mainloop.add_event_source(src)
spec = [
+ # state, source, event_class, new_state, callback
('listening', src, NewConnection, 'listening', self.new_conn),
('listening', src, SocketError, None, self.report_error),
]
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py
index 5ec7c9aa..f17a4cf4 100644
--- a/distbuild/worker_build_scheduler.py
+++ b/distbuild/worker_build_scheduler.py
@@ -127,6 +127,7 @@ class WorkerBuildQueuer(distbuild.StateMachine):
self._available_workers = []
spec = [
+ # state, source, event_class, new_state, callback
('idle', WorkerBuildQueuer, WorkerBuildRequest, 'idle',
self._handle_request),
('idle', WorkerBuildQueuer, WorkerCancelPending, 'idle',
@@ -210,6 +211,7 @@ class WorkerConnection(distbuild.StateMachine):
self.mainloop.add_state_machine(self._jm)
spec = [
+ # state, source, event_class, new_state, callback
('idle', self._jm, distbuild.JsonEof, None, self._reconnect),
('idle', self, _HaveAJob, 'building', self._start_build),
@@ -224,7 +226,7 @@ class WorkerConnection(distbuild.StateMachine):
self._request_caching),
('caching', distbuild.HelperRouter, distbuild.HelperResult,
- 'caching', self._handle_helper_result),
+ 'caching', self._maybe_handle_helper_result),
('caching', self, _Cached, 'idle', self._request_job),
('caching', self, _JobFailed, 'idle', self._request_job),
]
@@ -372,7 +374,7 @@ class WorkerConnection(distbuild.StateMachine):
self._initiator_id = None
self._finished_msg = event.msg
- def _handle_helper_result(self, event_source, event):
+ def _maybe_handle_helper_result(self, event_source, event):
if event.msg['id'] == self._helper_id:
distbuild.crash_point()
diff --git a/morphlib/plugins/distbuild_plugin.py b/morphlib/plugins/distbuild_plugin.py
index 375e70f1..c60dee6e 100644
--- a/morphlib/plugins/distbuild_plugin.py
+++ b/morphlib/plugins/distbuild_plugin.py
@@ -216,6 +216,7 @@ class ControllerDaemon(cliapp.Plugin):
morph_instance = self.app.settings['morph-instance']
listener_specs = [
+ # address, port, class to initiate on connection, class init args
('controller-helper-address', 'controller-helper-port',
distbuild.HelperRouter, []),
('controller-initiator-address', 'controller-initiator-port',