diff options
Diffstat (limited to 'distbuild')
-rw-r--r-- | distbuild/__init__.py | 2 | ||||
-rw-r--r-- | distbuild/build_controller.py | 125 | ||||
-rw-r--r-- | distbuild/connection_machine.py | 3 | ||||
-rw-r--r-- | distbuild/distbuild_socket.py | 63 | ||||
-rw-r--r-- | distbuild/helper_router.py | 1 | ||||
-rw-r--r-- | distbuild/initiator.py | 3 | ||||
-rw-r--r-- | distbuild/initiator_connection.py | 62 | ||||
-rw-r--r-- | distbuild/jm.py | 7 | ||||
-rw-r--r-- | distbuild/json_router.py | 1 | ||||
-rw-r--r-- | distbuild/serialise.py | 7 | ||||
-rw-r--r-- | distbuild/sockbuf.py | 5 | ||||
-rw-r--r-- | distbuild/socketsrc.py | 13 | ||||
-rw-r--r-- | distbuild/sockserv.py | 3 | ||||
-rw-r--r-- | distbuild/worker_build_scheduler.py | 22 |
14 files changed, 212 insertions, 105 deletions
diff --git a/distbuild/__init__.py b/distbuild/__init__.py index 3e60d5ee..57aaccaf 100644 --- a/distbuild/__init__.py +++ b/distbuild/__init__.py @@ -58,4 +58,6 @@ from protocol import message from crashpoint import (crash_point, add_crash_condition, add_crash_conditions, clear_crash_conditions) +from distbuild_socket import create_socket + __all__ = locals() diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py index 8b9f34e7..ed6c424e 100644 --- a/distbuild/build_controller.py +++ b/distbuild/build_controller.py @@ -155,74 +155,81 @@ class BuildController(distbuild.StateMachine): _idgen = distbuild.IdentifierGenerator('BuildController') - def __init__(self, build_request_message, artifact_cache_server, - morph_instance): + def __init__(self, initiator_connection, build_request_message, + artifact_cache_server, morph_instance): distbuild.crash_point() distbuild.StateMachine.__init__(self, 'init') + self._initiator_connection = initiator_connection self._request = build_request_message self._artifact_cache_server = artifact_cache_server self._morph_instance = morph_instance self._helper_id = None - self.debug_transitions = True + self.debug_transitions = False + + def __repr__(self): + return '<BuildController at 0x%x, request-id %s>' % (id(self), + self._request['id']) def setup(self): distbuild.crash_point() spec = [ + # state, source, event_class, new_state, callback ('init', self, _Start, 'graphing', self._start_graphing), - ('init', distbuild.InitiatorConnection, - distbuild.InitiatorDisconnect, 'init', self._maybe_abort), - ('init', self, _Abort, None, None), - + ('init', self._initiator_connection, + distbuild.InitiatorDisconnect, None, None), + ('graphing', distbuild.HelperRouter, distbuild.HelperOutput, - 'graphing', self._collect_graph), + 'graphing', self._maybe_collect_graph), ('graphing', distbuild.HelperRouter, distbuild.HelperResult, - 'graphing', self._finish_graph), + 'graphing', self._maybe_finish_graph), ('graphing', self, _GotGraph, 'annotating', self._start_annotating), ('graphing', self, _GraphFailed, None, None), - ('graphing', distbuild.InitiatorConnection, - distbuild.InitiatorDisconnect, None, - self._maybe_abort), - + ('graphing', self._initiator_connection, + distbuild.InitiatorDisconnect, None, None), + ('annotating', distbuild.HelperRouter, distbuild.HelperResult, - 'annotating', self._handle_cache_response), + 'annotating', self._maybe_handle_cache_response), ('annotating', self, _AnnotationFailed, None, self._notify_annotation_failed), ('annotating', self, _Annotated, 'building', self._queue_worker_builds), - ('annotating', distbuild.InitiatorConnection, - distbuild.InitiatorDisconnect, None, - self._maybe_abort), - - ('building', distbuild.WorkerConnection, - distbuild.WorkerBuildStepStarted, 'building', - self._relay_build_step_started), - ('building', distbuild.WorkerConnection, - distbuild.WorkerBuildOutput, 'building', - self._relay_build_output), - ('building', distbuild.WorkerConnection, - distbuild.WorkerBuildCaching, 'building', - self._relay_build_caching), - ('building', distbuild.WorkerConnection, - distbuild.WorkerBuildFinished, 'building', - self._check_result_and_queue_more_builds), - ('building', distbuild.WorkerConnection, - distbuild.WorkerBuildFailed, None, - self._notify_build_failed), + ('annotating', self._initiator_connection, + distbuild.InitiatorDisconnect, None, None), + + # The exact WorkerConnection that is doing our building changes + # from build to build. We must listen to all messages from all + # workers, and choose whether to change state inside the callback. + # (An alternative would be to manage a set of temporary transitions + # specific to WorkerConnection instances that our currently + # building for us, but the state machines are not intended to + # behave that way). + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildStepStarted, 'building', + self._maybe_relay_build_step_started), + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildOutput, 'building', + self._maybe_relay_build_output), + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildCaching, 'building', + self._maybe_relay_build_caching), + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildFinished, 'building', + self._maybe_check_result_and_queue_more_builds), + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildFailed, 'building', + self._maybe_notify_build_failed), + ('building', self, _Abort, None, None), ('building', self, _Built, None, self._notify_build_done), - ('building', distbuild.InitiatorConnection, - distbuild.InitiatorDisconnect, 'building', + ('building', self._initiator_connection, + distbuild.InitiatorDisconnect, None, self._notify_initiator_disconnected), ] self.add_transitions(spec) self.mainloop.queue_event(self, _Start()) - def _maybe_abort(self, event_source, event): - if event.id == self._request['id']: - self.mainloop.queue_event(self, _Abort()) - def _start_graphing(self, event_source, event): distbuild.crash_point() @@ -248,14 +255,14 @@ class BuildController(distbuild.StateMachine): progress = BuildProgress(self._request['id'], 'Computing build graph') self.mainloop.queue_event(BuildController, progress) - def _collect_graph(self, event_source, event): + def _maybe_collect_graph(self, event_source, event): distbuild.crash_point() if event.msg['id'] == self._helper_id: self._artifact_data.add(event.msg['stdout']) self._artifact_error.add(event.msg['stderr']) - def _finish_graph(self, event_source, event): + def _maybe_finish_graph(self, event_source, event): distbuild.crash_point() def notify_failure(msg_text): @@ -333,7 +340,7 @@ class BuildController(distbuild.StateMachine): logging.debug('Made cache request for state of artifacts ' '(helper id: %s)' % self._helper_id) - def _handle_cache_response(self, event_source, event): + def _maybe_handle_cache_response(self, event_source, event): logging.debug('Got cache response: %s' % repr(event.msg)) def set_status(artifact): @@ -423,11 +430,12 @@ class BuildController(distbuild.StateMachine): def _notify_initiator_disconnected(self, event_source, disconnect): - if disconnect.id == self._request['id']: - cancel = BuildCancel(disconnect.id) - self.mainloop.queue_event(BuildController, cancel) + logging.debug("BuildController %r: initiator id %s disconnected", self, + disconnect.id) + cancel = BuildCancel(disconnect.id) + self.mainloop.queue_event(BuildController, cancel) - def _relay_build_step_started(self, event_source, event): + def _maybe_relay_build_step_started(self, event_source, event): distbuild.crash_point() if event.initiator_id != self._request['id']: return # not for us @@ -445,7 +453,7 @@ class BuildController(distbuild.StateMachine): self.mainloop.queue_event(BuildController, started) logging.debug('BC: emitted %s' % repr(started)) - def _relay_build_output(self, event_source, event): + def _maybe_relay_build_output(self, event_source, event): distbuild.crash_point() if event.msg['id'] != self._request['id']: return # not for us @@ -463,7 +471,7 @@ class BuildController(distbuild.StateMachine): self.mainloop.queue_event(BuildController, output) logging.debug('BC: queued %s' % repr(output)) - def _relay_build_caching(self, event_source, event): + def _maybe_relay_build_caching(self, event_source, event): distbuild.crash_point() if event.initiator_id != self._request['id']: return # not for us @@ -486,7 +494,7 @@ class BuildController(distbuild.StateMachine): else: return None - def _check_result_and_queue_more_builds(self, event_source, event): + def _maybe_check_result_and_queue_more_builds(self, event_source, event): distbuild.crash_point() if event.msg['id'] != self._request['id']: return # not for us @@ -526,17 +534,24 @@ class BuildController(distbuild.StateMachine): failed = BuildFailed(self._request['id'], errmsg) self.mainloop.queue_event(BuildController, failed) - def _notify_build_failed(self, event_source, event): + def _maybe_notify_build_failed(self, event_source, event): distbuild.crash_point() + if event.msg['id'] != self._request['id']: - return # not for us + return artifact = self._find_artifact(event.artifact_cache_key) + if artifact is None: - # This is not the event you are looking for. - return + logging.error( + 'BuildController %r: artifact %s is not in our build graph!', + self, artifact) + # We abort the build in this case on the grounds that something is + # very wrong internally, and it's best for the initiator to receive + # an error than to be left hanging. + self.mainloop.queue_event(self, _Abort()) - logging.error( + logging.info( 'Build step failed for %s: %s', artifact.name, repr(event.msg)) step_failed = BuildStepFailed( @@ -561,6 +576,8 @@ class BuildController(distbuild.StateMachine): cancel = BuildCancel(self._request['id']) self.mainloop.queue_event(BuildController, cancel) + self.mainloop.queue_event(self, _Abort()) + def _notify_build_done(self, event_source, event): distbuild.crash_point() diff --git a/distbuild/connection_machine.py b/distbuild/connection_machine.py index 3d4e8d04..648ce35a 100644 --- a/distbuild/connection_machine.py +++ b/distbuild/connection_machine.py @@ -81,6 +81,7 @@ class ConnectionMachine(distbuild.StateMachine): self.mainloop.add_event_source(self._timer) spec = [ + # state, source, event_class, new_state, callback ('connecting', self._sock_proxy, distbuild.SocketWriteable, 'connected', self._connect), ('connecting', self, StopConnecting, None, self._stop), @@ -97,7 +98,7 @@ class ConnectionMachine(distbuild.StateMachine): logging.debug( 'ConnectionMachine: connecting to %s:%s' % (self._addr, self._port)) - self._socket = socket.socket() + self._socket = distbuild.create_socket() distbuild.set_nonblocking(self._socket) try: self._socket.connect((self._addr, self._port)) diff --git a/distbuild/distbuild_socket.py b/distbuild/distbuild_socket.py new file mode 100644 index 00000000..0408a2c1 --- /dev/null +++ b/distbuild/distbuild_socket.py @@ -0,0 +1,63 @@ +# distbuild/distbuild_socket.py -- wrapper around Python 'socket' module. +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import socket + + +class DistbuildSocket(object): + '''Wraps socket.SocketType with a few helper functions.''' + + def __init__(self, real_socket): + self.real_socket = real_socket + + def __getattr__(self, name): + return getattr(self.real_socket, name) + + def __repr__(self): + return '<DistbuildSocket at 0x%x: %s>' % (id(self), str(self)) + + def __str__(self): + localname = self.localname() or '(closed)' + remotename = self.remotename() + if remotename is None: + return '%s' % self.localname() + else: + return '%s -> %s' % (self.localname(), remotename) + + def accept(self, *args): + result = self.real_socket.accept(*args) + return DistbuildSocket(result[0]), result[1:] + + def localname(self): + '''Get local end of socket connection as a string.''' + try: + return '%s:%s' % self.getsockname() + except socket.error: + # If the socket is in destruction we may get EBADF here. + return None + + def remotename(self): + '''Get remote end of socket connection as a string.''' + try: + return '%s:%s' % self.getpeername() + except socket.error: + return None + + +def create_socket(*args): + return DistbuildSocket(socket.socket(*args)) diff --git a/distbuild/helper_router.py b/distbuild/helper_router.py index 752a5fdb..1f0ce45b 100644 --- a/distbuild/helper_router.py +++ b/distbuild/helper_router.py @@ -82,6 +82,7 @@ class HelperRouter(distbuild.StateMachine): self.mainloop.add_state_machine(jm) spec = [ + # state, source, event_class, new_state, callback ('idle', HelperRouter, HelperRequest, 'idle', self._handle_request), ('idle', jm, distbuild.JsonNewMessage, 'idle', self._helper_msg), diff --git a/distbuild/initiator.py b/distbuild/initiator.py index e4d4975f..6e4ca65a 100644 --- a/distbuild/initiator.py +++ b/distbuild/initiator.py @@ -48,7 +48,7 @@ class Initiator(distbuild.StateMachine): self._morphology = morphology self._steps = None self._step_outputs = {} - self.debug_transitions = True + self.debug_transitions = False def setup(self): distbuild.crash_point() @@ -58,6 +58,7 @@ class Initiator(distbuild.StateMachine): logging.debug('initiator: _jm=%s' % repr(self._jm)) spec = [ + # state, source, event_class, new_state, callback ('waiting', self._jm, distbuild.JsonEof, None, self._terminate), ('waiting', self._jm, distbuild.JsonNewMessage, 'waiting', self._handle_json_message), diff --git a/distbuild/initiator_connection.py b/distbuild/initiator_connection.py index 48d083e4..34f2bdaa 100644 --- a/distbuild/initiator_connection.py +++ b/distbuild/initiator_connection.py @@ -35,9 +35,12 @@ class _Close(object): class InitiatorConnection(distbuild.StateMachine): - '''Communicate with the initiator. - - This state machine communicates with the initiator, relaying and + '''Communicate with a single initiator. + + When a developer runs 'morph distbuild' and connects to the controller, + the ListenServer object on the controller creates an InitiatorConnection. + + This state machine communicates with the build initiator, relaying and translating messages from the initiator to the rest of the controller's state machines, and vice versa. @@ -51,6 +54,11 @@ class InitiatorConnection(distbuild.StateMachine): self.conn = conn self.artifact_cache_server = artifact_cache_server self.morph_instance = morph_instance + self.initiator_name = conn.remotename() + + def __repr__(self): + return '<InitiatorConnection at 0x%x: remote %s>' % (id(self), + self.initiator_name) def setup(self): self.jm = distbuild.JsonMachine(self.conn) @@ -59,6 +67,7 @@ class InitiatorConnection(distbuild.StateMachine): self.our_ids = set() spec = [ + # state, source, event_class, new_state, callback ('idle', self.jm, distbuild.JsonNewMessage, 'idle', self._handle_msg), ('idle', self.jm, distbuild.JsonEof, 'closing', self._disconnect), @@ -84,31 +93,32 @@ class InitiatorConnection(distbuild.StateMachine): def _handle_msg(self, event_source, event): '''Handle message from initiator.''' - - logging.debug( - 'InitiatorConnection: from initiator: %s', repr(event.msg)) - + + logging.debug('InitiatorConnection: from %s: %r', self.initiator_name, + event.msg) + if event.msg['type'] == 'build-request': new_id = self._idgen.next() self.our_ids.add(new_id) self._route_map.add(event.msg['id'], new_id) event.msg['id'] = new_id build_controller = distbuild.BuildController( - event.msg, self.artifact_cache_server, self.morph_instance) + self, event.msg, self.artifact_cache_server, + self.morph_instance) self.mainloop.add_state_machine(build_controller) def _disconnect(self, event_source, event): for id in self.our_ids: - logging.debug('InitiatorConnection: InitiatorDisconnect(%s)' - % str(id)) + logging.debug('InitiatorConnection: %s: InitiatorDisconnect(%s)', + self.initiator_name, str(id)) self.mainloop.queue_event(InitiatorConnection, InitiatorDisconnect(id)) # TODO should this clear our_ids? self.mainloop.queue_event(self, _Close(event_source)) def _close(self, event_source, event): - logging.debug('InitiatorConnection: closing: %s', - repr(event.event_source)) + logging.debug('InitiatorConnection: %s: closing: %s', + self.initiator_name, repr(event.event_source)) event.event_source.close() @@ -120,6 +130,10 @@ class InitiatorConnection(distbuild.StateMachine): 'InitiatorConnection: received result: %s', repr(event.msg)) self.jm.send(event.msg) + def _log_send(self, msg): + logging.debug( + 'InitiatorConnection: sent to %s: %r', self.initiator_name, msg) + def _send_build_finished_message(self, event_source, event): if event.id in self.our_ids: msg = distbuild.message('build-finished', @@ -128,8 +142,7 @@ class InitiatorConnection(distbuild.StateMachine): self._route_map.remove(event.id) self.our_ids.remove(event.id) self.jm.send(msg) - logging.debug( - 'InitiatorConnection: sent to initiator: %s', repr(msg)) + self._log_send(msg) def _send_build_failed_message(self, event_source, event): if event.id in self.our_ids: @@ -139,8 +152,7 @@ class InitiatorConnection(distbuild.StateMachine): self._route_map.remove(event.id) self.our_ids.remove(event.id) self.jm.send(msg) - logging.debug( - 'InitiatorConnection: sent to initiator: %s', repr(msg)) + self._log_send(msg) def _send_build_progress_message(self, event_source, event): if event.id in self.our_ids: @@ -148,8 +160,7 @@ class InitiatorConnection(distbuild.StateMachine): id=self._route_map.get_incoming_id(event.id), message=event.message_text) self.jm.send(msg) - logging.debug( - 'InitiatorConnection: sent to initiator: %s', repr(msg)) + self._log_send(msg) def _send_build_steps_message(self, event_source, event): @@ -169,8 +180,7 @@ class InitiatorConnection(distbuild.StateMachine): id=self._route_map.get_incoming_id(event.id), steps=step_names) self.jm.send(msg) - logging.debug( - 'InitiatorConnection: sent to initiator: %s', repr(msg)) + self._log_send(msg) def _send_build_step_started_message(self, event_source, event): if event.id in self.our_ids: @@ -179,8 +189,7 @@ class InitiatorConnection(distbuild.StateMachine): step_name=event.step_name, worker_name=event.worker_name) self.jm.send(msg) - logging.debug( - 'InitiatorConnection: sent to initiator: %s', repr(msg)) + self._log_send(msg) def _send_build_output_message(self, event_source, event): logging.debug('InitiatorConnection: build_output: ' @@ -193,8 +202,7 @@ class InitiatorConnection(distbuild.StateMachine): stdout=event.stdout, stderr=event.stderr) self.jm.send(msg) - logging.debug( - 'InitiatorConnection: sent to initiator: %s', repr(msg)) + self._log_send(msg) def _send_build_step_finished_message(self, event_source, event): if event.id in self.our_ids: @@ -202,8 +210,7 @@ class InitiatorConnection(distbuild.StateMachine): id=self._route_map.get_incoming_id(event.id), step_name=event.step_name) self.jm.send(msg) - logging.debug( - 'InitiatorConnection: sent to initiator: %s', repr(msg)) + self._log_send(msg) def _send_build_step_failed_message(self, event_source, event): if event.id in self.our_ids: @@ -211,6 +218,5 @@ class InitiatorConnection(distbuild.StateMachine): id=self._route_map.get_incoming_id(event.id), step_name=event.step_name) self.jm.send(msg) - logging.debug( - 'InitiatorConnection: sent to initiator: %s', repr(msg)) + self._log_send(msg) diff --git a/distbuild/jm.py b/distbuild/jm.py index ae222c00..69fa5bd1 100644 --- a/distbuild/jm.py +++ b/distbuild/jm.py @@ -43,7 +43,11 @@ class JsonMachine(StateMachine): StateMachine.__init__(self, 'rw') self.conn = conn self.debug_json = False - + + def __repr__(self): + return '<JsonMachine at 0x%x: socket %s, max_buffer %s>' % \ + (id(self), self.conn, self.max_buffer) + def setup(self): sockbuf = self.sockbuf = SocketBuffer(self.conn, self.max_buffer) self.mainloop.add_state_machine(sockbuf) @@ -52,6 +56,7 @@ class JsonMachine(StateMachine): self.receive_buf = StringBuffer() spec = [ + # state, source, event_class, new_state, callback ('rw', sockbuf, SocketBufferNewData, 'rw', self._parse), ('rw', sockbuf, SocketBufferEof, 'w', self._send_eof), ('rw', self, _Close2, None, self._really_close), diff --git a/distbuild/json_router.py b/distbuild/json_router.py index bf272174..93533b2e 100644 --- a/distbuild/json_router.py +++ b/distbuild/json_router.py @@ -52,6 +52,7 @@ class JsonRouter(distbuild.StateMachine): self.mainloop.add_state_machine(jm) spec = [ + # state, source, event_class, new_state, callback ('idle', jm, distbuild.JsonNewMessage, 'idle', self.bloop), ('idle', jm, distbuild.JsonEof, None, self.close), ] diff --git a/distbuild/serialise.py b/distbuild/serialise.py index cd871042..44d96eee 100644 --- a/distbuild/serialise.py +++ b/distbuild/serialise.py @@ -75,9 +75,6 @@ def serialise_artifact(artifact): else: arch = artifact.arch - logging.debug('encode_single_artifact dependencies: %s' - % str([('id: %s' % str(id(d)), d.name) for d in a.dependencies])) - return { 'source_id': source_id, 'name': a.name, @@ -104,13 +101,10 @@ def serialise_artifact(artifact): encoded_sources = {} for a in traverse(artifact): - logging.debug('traversing artifacts at %s' % a.name) - if id(a.source) not in encoded_sources: if a.source.morphology['kind'] == 'chunk': for (_, sa) in a.source.artifacts.iteritems(): if id(sa) not in artifacts: - logging.debug('encoding source artifact %s' % sa.name) artifacts[id(sa)] = sa encoded_artifacts[id(sa)] = encode_single_artifact(sa, artifacts, id(a.source)) @@ -130,7 +124,6 @@ def serialise_artifact(artifact): if id(a) not in artifacts: artifacts[id(a)] = a - logging.debug('encoding artifact %s' % a.name) encoded_artifacts[id(a)] = encode_single_artifact(a, artifacts, id(a.source)) diff --git a/distbuild/sockbuf.py b/distbuild/sockbuf.py index a7fe339a..346706db 100644 --- a/distbuild/sockbuf.py +++ b/distbuild/sockbuf.py @@ -77,6 +77,10 @@ class SocketBuffer(StateMachine): self._sock = sock self._max_buffer = max_buffer + def __repr__(self): + return '<SocketBuffer at 0x%x: socket %s max_buffer %i>' % ( + id(self), self._sock, self._max_buffer) + def setup(self): src = self._src = SocketEventSource(self._sock) src.stop_writing() # We'll start writing when we need to. @@ -85,6 +89,7 @@ class SocketBuffer(StateMachine): self._wbuf = StringBuffer() spec = [ + # state, source, event_class, new_state, callback ('reading', src, SocketReadable, 'reading', self._fill), ('reading', self, _WriteBufferNotEmpty, 'rw', self._start_writing), diff --git a/distbuild/socketsrc.py b/distbuild/socketsrc.py index 78486f0e..14adc74d 100644 --- a/distbuild/socketsrc.py +++ b/distbuild/socketsrc.py @@ -9,6 +9,8 @@ import logging import os import socket +import distbuild + from eventsrc import EventSource @@ -48,13 +50,13 @@ class ListeningSocketEventSource(EventSource): '''An event source for a socket that listens for connections.''' def __init__(self, addr, port): - self.sock = socket.socket() + self.sock = distbuild.create_socket() self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind((addr, port)) self.sock.listen(5) self._accepting = True - logging.info('Listening at %s' % repr(self.sock.getsockname())) - + logging.info('Listening at %s' % self.sock.remotename()) + def get_select_params(self): r = [self.sock.fileno()] if self._accepting else [] return r, [], [], None @@ -117,7 +119,10 @@ class SocketEventSource(EventSource): self._writing = True set_nonblocking(sock) - + + def __repr__(self): + return '<SocketEventSource at %x: socket %s>' % (id(self), self.sock) + def get_select_params(self): r = [self.sock.fileno()] if self._reading else [] w = [self.sock.fileno()] if self._writing else [] diff --git a/distbuild/sockserv.py b/distbuild/sockserv.py index 6d8f216e..124d29b9 100644 --- a/distbuild/sockserv.py +++ b/distbuild/sockserv.py @@ -26,6 +26,7 @@ class ListenServer(StateMachine): self.mainloop.add_event_source(src) spec = [ + # state, source, event_class, new_state, callback ('listening', src, NewConnection, 'listening', self.new_conn), ('listening', src, SocketError, None, self.report_error), ] @@ -34,7 +35,7 @@ class ListenServer(StateMachine): def new_conn(self, event_source, event): logging.debug( 'ListenServer: Creating new %s using %s and %s' % - (repr(self._machine), + (self._machine, repr(event.connection), repr(self._extra_args))) m = self._machine(event.connection, *self._extra_args) diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 7953447f..fc5849b3 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -127,6 +127,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): self._available_workers = [] spec = [ + # state, source, event_class, new_state, callback ('idle', WorkerBuildQueuer, WorkerBuildRequest, 'idle', self._handle_request), ('idle', WorkerBuildQueuer, WorkerCancelPending, 'idle', @@ -193,11 +194,13 @@ class WorkerConnection(distbuild.StateMachine): self._worker_cache_server_port = worker_cache_server_port self._morph_instance = morph_instance self._helper_id = None - - def name(self): + addr, port = self._conn.getpeername() name = socket.getfqdn(addr) - return '%s:%s' % (name, port) + self._worker_name = '%s:%s' % (name, port) + + def name(self): + return self._worker_name def setup(self): distbuild.crash_point() @@ -208,6 +211,7 @@ class WorkerConnection(distbuild.StateMachine): self.mainloop.add_state_machine(self._jm) spec = [ + # state, source, event_class, new_state, callback ('idle', self._jm, distbuild.JsonEof, None, self._reconnect), ('idle', self, _HaveAJob, 'building', self._start_build), @@ -222,7 +226,7 @@ class WorkerConnection(distbuild.StateMachine): self._request_caching), ('caching', distbuild.HelperRouter, distbuild.HelperResult, - 'caching', self._handle_helper_result), + 'caching', self._maybe_handle_helper_result), ('caching', self, _Cached, 'idle', self._request_job), ('caching', self, _JobFailed, 'idle', self._request_job), ] @@ -231,7 +235,8 @@ class WorkerConnection(distbuild.StateMachine): self._request_job(None, None) def _maybe_cancel(self, event_source, build_cancel): - logging.debug('WC: BuildController requested a cancel') + logging.debug('WC: BuildController %r requested a cancel' % + event_source) if build_cancel.id == self._initiator_id: distbuild.crash_point() @@ -265,7 +270,7 @@ class WorkerConnection(distbuild.StateMachine): stdin_contents=distbuild.serialise_artifact(self._artifact), ) self._jm.send(msg) - logging.debug('WC: sent to worker: %s' % repr(msg)) + logging.debug('WC: sent to worker %s: %r' % (self._worker_name, msg)) self._route_map.add(self._initiator_id, msg['id']) self._initiator_request_map[self._initiator_id].add(msg['id']) logging.debug( @@ -281,7 +286,8 @@ class WorkerConnection(distbuild.StateMachine): distbuild.crash_point() - logging.debug('WC: from worker: %s' % repr(event.msg)) + logging.debug( + 'WC: from worker %s: %r' % (self._worker_name, event.msg)) handlers = { 'exec-output': self._handle_exec_output, @@ -371,7 +377,7 @@ class WorkerConnection(distbuild.StateMachine): self._initiator_id = None self._finished_msg = event.msg - def _handle_helper_result(self, event_source, event): + def _maybe_handle_helper_result(self, event_source, event): if event.msg['id'] == self._helper_id: distbuild.crash_point() |