diff options
author | Sam Thursfield <sam.thursfield@codethink.co.uk> | 2014-04-14 11:20:02 +0300 |
---|---|---|
committer | Sam Thursfield <sam.thursfield@codethink.co.uk> | 2014-04-14 11:20:02 +0300 |
commit | 0baae51e217f4999c1f068c458527d598284ddec (patch) | |
tree | 6b13cf44af03a1870fb730a3fa4825fc552df397 /distbuild | |
parent | 63f7346990802f01ef76cba75495f6f7d1409028 (diff) | |
parent | f05022d39b530069a75fbecfd744bca66cc0ce66 (diff) | |
download | morph-0baae51e217f4999c1f068c458527d598284ddec.tar.gz |
Merge branch 'sam/distbuild-concurrency-fixes'
Reviewed-By: Lars Wirzenius <lars.wirzenius@codethink.co.uk>
Diffstat (limited to 'distbuild')
-rw-r--r-- | distbuild/build_controller.py | 121 | ||||
-rw-r--r-- | distbuild/connection_machine.py | 1 | ||||
-rw-r--r-- | distbuild/helper_router.py | 1 | ||||
-rw-r--r-- | distbuild/initiator.py | 1 | ||||
-rw-r--r-- | distbuild/initiator_connection.py | 12 | ||||
-rw-r--r-- | distbuild/jm.py | 1 | ||||
-rw-r--r-- | distbuild/json_router.py | 1 | ||||
-rw-r--r-- | distbuild/sockbuf.py | 1 | ||||
-rw-r--r-- | distbuild/sockserv.py | 1 | ||||
-rw-r--r-- | distbuild/worker_build_scheduler.py | 6 |
10 files changed, 86 insertions, 60 deletions
diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py index ce8fced5..404bcf9f 100644 --- a/distbuild/build_controller.py +++ b/distbuild/build_controller.py @@ -150,10 +150,11 @@ class BuildController(distbuild.StateMachine): _idgen = distbuild.IdentifierGenerator('BuildController') - def __init__(self, build_request_message, artifact_cache_server, - morph_instance): + def __init__(self, initiator_connection, build_request_message, + artifact_cache_server, morph_instance): distbuild.crash_point() distbuild.StateMachine.__init__(self, 'init') + self._initiator_connection = initiator_connection self._request = build_request_message self._artifact_cache_server = artifact_cache_server self._morph_instance = morph_instance @@ -168,58 +169,60 @@ class BuildController(distbuild.StateMachine): distbuild.crash_point() spec = [ + # state, source, event_class, new_state, callback ('init', self, _Start, 'graphing', self._start_graphing), - ('init', distbuild.InitiatorConnection, - distbuild.InitiatorDisconnect, 'init', self._maybe_abort), - ('init', self, _Abort, None, None), - + ('init', self._initiator_connection, + distbuild.InitiatorDisconnect, None, None), + ('graphing', distbuild.HelperRouter, distbuild.HelperOutput, - 'graphing', self._collect_graph), + 'graphing', self._maybe_collect_graph), ('graphing', distbuild.HelperRouter, distbuild.HelperResult, - 'graphing', self._finish_graph), + 'graphing', self._maybe_finish_graph), ('graphing', self, _GotGraph, 'annotating', self._start_annotating), ('graphing', self, _GraphFailed, None, None), - ('graphing', distbuild.InitiatorConnection, - distbuild.InitiatorDisconnect, None, - self._maybe_abort), - + ('graphing', self._initiator_connection, + distbuild.InitiatorDisconnect, None, None), + ('annotating', distbuild.HelperRouter, distbuild.HelperResult, - 'annotating', self._handle_cache_response), - ('annotating', self, _Annotated, 'building', + 'annotating', self._maybe_handle_cache_response), + ('annotating', self, _Annotated, 'building', self._queue_worker_builds), - ('annotating', distbuild.InitiatorConnection, - distbuild.InitiatorDisconnect, None, - self._maybe_abort), - - ('building', distbuild.WorkerConnection, - distbuild.WorkerBuildStepStarted, 'building', - self._relay_build_step_started), - ('building', distbuild.WorkerConnection, - distbuild.WorkerBuildOutput, 'building', - self._relay_build_output), - ('building', distbuild.WorkerConnection, - distbuild.WorkerBuildCaching, 'building', - self._relay_build_caching), - ('building', distbuild.WorkerConnection, - distbuild.WorkerBuildFinished, 'building', - self._check_result_and_queue_more_builds), - ('building', distbuild.WorkerConnection, - distbuild.WorkerBuildFailed, None, - self._notify_build_failed), + ('annotating', self._initiator_connection, + distbuild.InitiatorDisconnect, None, None), + + # The exact WorkerConnection that is doing our building changes + # from build to build. We must listen to all messages from all + # workers, and choose whether to change state inside the callback. + # (An alternative would be to manage a set of temporary transitions + # specific to WorkerConnection instances that our currently + # building for us, but the state machines are not intended to + # behave that way). + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildStepStarted, 'building', + self._maybe_relay_build_step_started), + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildOutput, 'building', + self._maybe_relay_build_output), + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildCaching, 'building', + self._maybe_relay_build_caching), + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildFinished, 'building', + self._maybe_check_result_and_queue_more_builds), + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildFailed, 'building', + self._maybe_notify_build_failed), + ('building', self, _Abort, None, None), ('building', self, _Built, None, self._notify_build_done), - ('building', distbuild.InitiatorConnection, - distbuild.InitiatorDisconnect, 'building', + ('building', self._initiator_connection, + distbuild.InitiatorDisconnect, None, self._notify_initiator_disconnected), ] self.add_transitions(spec) self.mainloop.queue_event(self, _Start()) - def _maybe_abort(self, event_source, event): - if event.id == self._request['id']: - self.mainloop.queue_event(self, _Abort()) - def _start_graphing(self, event_source, event): distbuild.crash_point() @@ -245,14 +248,14 @@ class BuildController(distbuild.StateMachine): progress = BuildProgress(self._request['id'], 'Computing build graph') self.mainloop.queue_event(BuildController, progress) - def _collect_graph(self, event_source, event): + def _maybe_collect_graph(self, event_source, event): distbuild.crash_point() if event.msg['id'] == self._helper_id: self._artifact_data.add(event.msg['stdout']) self._artifact_error.add(event.msg['stderr']) - def _finish_graph(self, event_source, event): + def _maybe_finish_graph(self, event_source, event): distbuild.crash_point() def notify_failure(msg_text): @@ -325,7 +328,7 @@ class BuildController(distbuild.StateMachine): 'Queued as %s query whether %s is in cache' % (msg['id'], filename)) - def _handle_cache_response(self, event_source, event): + def _maybe_handle_cache_response(self, event_source, event): distbuild.crash_point() logging.debug('Got cache query response: %s' % repr(event.msg)) @@ -418,11 +421,12 @@ class BuildController(distbuild.StateMachine): def _notify_initiator_disconnected(self, event_source, disconnect): - if disconnect.id == self._request['id']: - cancel = BuildCancel(disconnect.id) - self.mainloop.queue_event(BuildController, cancel) + logging.debug("BuildController %r: initiator id %s disconnected", self, + disconnect.id) + cancel = BuildCancel(disconnect.id) + self.mainloop.queue_event(BuildController, cancel) - def _relay_build_step_started(self, event_source, event): + def _maybe_relay_build_step_started(self, event_source, event): distbuild.crash_point() if event.initiator_id != self._request['id']: return # not for us @@ -440,7 +444,7 @@ class BuildController(distbuild.StateMachine): self.mainloop.queue_event(BuildController, started) logging.debug('BC: emitted %s' % repr(started)) - def _relay_build_output(self, event_source, event): + def _maybe_relay_build_output(self, event_source, event): distbuild.crash_point() if event.msg['id'] != self._request['id']: return # not for us @@ -458,7 +462,7 @@ class BuildController(distbuild.StateMachine): self.mainloop.queue_event(BuildController, output) logging.debug('BC: queued %s' % repr(output)) - def _relay_build_caching(self, event_source, event): + def _maybe_relay_build_caching(self, event_source, event): distbuild.crash_point() if event.initiator_id != self._request['id']: return # not for us @@ -481,7 +485,7 @@ class BuildController(distbuild.StateMachine): else: return None - def _check_result_and_queue_more_builds(self, event_source, event): + def _maybe_check_result_and_queue_more_builds(self, event_source, event): distbuild.crash_point() if event.msg['id'] != self._request['id']: return # not for us @@ -513,17 +517,24 @@ class BuildController(distbuild.StateMachine): self._queue_worker_builds(None, event) - def _notify_build_failed(self, event_source, event): + def _maybe_notify_build_failed(self, event_source, event): distbuild.crash_point() + if event.msg['id'] != self._request['id']: - return # not for us + return artifact = self._find_artifact(event.artifact_cache_key) + if artifact is None: - # This is not the event you are looking for. - return + logging.error( + 'BuildController %r: artifact %s is not in our build graph!', + self, artifact) + # We abort the build in this case on the grounds that something is + # very wrong internally, and it's best for the initiator to receive + # an error than to be left hanging. + self.mainloop.queue_event(self, _Abort()) - logging.error( + logging.info( 'Build step failed for %s: %s', artifact.name, repr(event.msg)) step_failed = BuildStepFailed( @@ -548,6 +559,8 @@ class BuildController(distbuild.StateMachine): cancel = BuildCancel(self._request['id']) self.mainloop.queue_event(BuildController, cancel) + self.mainloop.queue_event(self, _Abort()) + def _notify_build_done(self, event_source, event): distbuild.crash_point() diff --git a/distbuild/connection_machine.py b/distbuild/connection_machine.py index 2f768f0b..648ce35a 100644 --- a/distbuild/connection_machine.py +++ b/distbuild/connection_machine.py @@ -81,6 +81,7 @@ class ConnectionMachine(distbuild.StateMachine): self.mainloop.add_event_source(self._timer) spec = [ + # state, source, event_class, new_state, callback ('connecting', self._sock_proxy, distbuild.SocketWriteable, 'connected', self._connect), ('connecting', self, StopConnecting, None, self._stop), diff --git a/distbuild/helper_router.py b/distbuild/helper_router.py index 752a5fdb..1f0ce45b 100644 --- a/distbuild/helper_router.py +++ b/distbuild/helper_router.py @@ -82,6 +82,7 @@ class HelperRouter(distbuild.StateMachine): self.mainloop.add_state_machine(jm) spec = [ + # state, source, event_class, new_state, callback ('idle', HelperRouter, HelperRequest, 'idle', self._handle_request), ('idle', jm, distbuild.JsonNewMessage, 'idle', self._helper_msg), diff --git a/distbuild/initiator.py b/distbuild/initiator.py index 069578d2..6e4ca65a 100644 --- a/distbuild/initiator.py +++ b/distbuild/initiator.py @@ -58,6 +58,7 @@ class Initiator(distbuild.StateMachine): logging.debug('initiator: _jm=%s' % repr(self._jm)) spec = [ + # state, source, event_class, new_state, callback ('waiting', self._jm, distbuild.JsonEof, None, self._terminate), ('waiting', self._jm, distbuild.JsonNewMessage, 'waiting', self._handle_json_message), diff --git a/distbuild/initiator_connection.py b/distbuild/initiator_connection.py index d48d4698..e20e6d98 100644 --- a/distbuild/initiator_connection.py +++ b/distbuild/initiator_connection.py @@ -35,9 +35,12 @@ class _Close(object): class InitiatorConnection(distbuild.StateMachine): - '''Communicate with the initiator. - - This state machine communicates with the initiator, relaying and + '''Communicate with a single initiator. + + When a developer runs 'morph distbuild' and connects to the controller, + the ListenServer object on the controller creates an InitiatorConnection. + + This state machine communicates with the build initiator, relaying and translating messages from the initiator to the rest of the controller's state machines, and vice versa. @@ -64,6 +67,7 @@ class InitiatorConnection(distbuild.StateMachine): self.our_ids = set() spec = [ + # state, source, event_class, new_state, callback ('idle', self.jm, distbuild.JsonNewMessage, 'idle', self._handle_msg), ('idle', self.jm, distbuild.JsonEof, 'closing', self._disconnect), @@ -99,7 +103,7 @@ class InitiatorConnection(distbuild.StateMachine): self._route_map.add(event.msg['id'], new_id) event.msg['id'] = new_id build_controller = distbuild.BuildController( - event.msg, self.artifact_cache_server, self.morph_instance) + self, event.msg, self.artifact_cache_server, self.morph_instance) self.mainloop.add_state_machine(build_controller) def _disconnect(self, event_source, event): diff --git a/distbuild/jm.py b/distbuild/jm.py index bb86adc4..a4e366a7 100644 --- a/distbuild/jm.py +++ b/distbuild/jm.py @@ -56,6 +56,7 @@ class JsonMachine(StateMachine): self.receive_buf = StringBuffer() spec = [ + # state, source, event_class, new_state, callback ('rw', sockbuf, SocketBufferNewData, 'rw', self._parse), ('rw', sockbuf, SocketBufferEof, 'w', self._send_eof), ('rw', self, _Close2, None, self._really_close), diff --git a/distbuild/json_router.py b/distbuild/json_router.py index bf272174..93533b2e 100644 --- a/distbuild/json_router.py +++ b/distbuild/json_router.py @@ -52,6 +52,7 @@ class JsonRouter(distbuild.StateMachine): self.mainloop.add_state_machine(jm) spec = [ + # state, source, event_class, new_state, callback ('idle', jm, distbuild.JsonNewMessage, 'idle', self.bloop), ('idle', jm, distbuild.JsonEof, None, self.close), ] diff --git a/distbuild/sockbuf.py b/distbuild/sockbuf.py index 6803bfb5..346706db 100644 --- a/distbuild/sockbuf.py +++ b/distbuild/sockbuf.py @@ -89,6 +89,7 @@ class SocketBuffer(StateMachine): self._wbuf = StringBuffer() spec = [ + # state, source, event_class, new_state, callback ('reading', src, SocketReadable, 'reading', self._fill), ('reading', self, _WriteBufferNotEmpty, 'rw', self._start_writing), diff --git a/distbuild/sockserv.py b/distbuild/sockserv.py index dc313d06..124d29b9 100644 --- a/distbuild/sockserv.py +++ b/distbuild/sockserv.py @@ -26,6 +26,7 @@ class ListenServer(StateMachine): self.mainloop.add_event_source(src) spec = [ + # state, source, event_class, new_state, callback ('listening', src, NewConnection, 'listening', self.new_conn), ('listening', src, SocketError, None, self.report_error), ] diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 5ec7c9aa..f17a4cf4 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -127,6 +127,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): self._available_workers = [] spec = [ + # state, source, event_class, new_state, callback ('idle', WorkerBuildQueuer, WorkerBuildRequest, 'idle', self._handle_request), ('idle', WorkerBuildQueuer, WorkerCancelPending, 'idle', @@ -210,6 +211,7 @@ class WorkerConnection(distbuild.StateMachine): self.mainloop.add_state_machine(self._jm) spec = [ + # state, source, event_class, new_state, callback ('idle', self._jm, distbuild.JsonEof, None, self._reconnect), ('idle', self, _HaveAJob, 'building', self._start_build), @@ -224,7 +226,7 @@ class WorkerConnection(distbuild.StateMachine): self._request_caching), ('caching', distbuild.HelperRouter, distbuild.HelperResult, - 'caching', self._handle_helper_result), + 'caching', self._maybe_handle_helper_result), ('caching', self, _Cached, 'idle', self._request_job), ('caching', self, _JobFailed, 'idle', self._request_job), ] @@ -372,7 +374,7 @@ class WorkerConnection(distbuild.StateMachine): self._initiator_id = None self._finished_msg = event.msg - def _handle_helper_result(self, event_source, event): + def _maybe_handle_helper_result(self, event_source, event): if event.msg['id'] == self._helper_id: distbuild.crash_point() |