diff options
author | Sam Thursfield <sam.thursfield@codethink.co.uk> | 2015-02-17 16:26:56 +0000 |
---|---|---|
committer | Sam Thursfield <sam.thursfield@codethink.co.uk> | 2015-02-17 17:07:51 +0000 |
commit | 3337211deae718413884e2eb9ae84d9eced95fa4 (patch) | |
tree | c104c53554961e3a6529d8415401836f655b92b8 | |
parent | d8fba3d2ca7714493d2bd71019026eef313a538c (diff) | |
download | morph-3337211deae718413884e2eb9ae84d9eced95fa4.tar.gz |
distbuild: Make JsonRouter code less generic
The JsonRouter object is actually only used in the `morph worker-daemon`
process. I find it hard to understand what's going on in this class
because the names are generic (e.g. 'client' could refer to anything,
but in fact the controller-daemon is the only 'client' there will ever
be).
This commit adds docstrings and cleans up some function and variable
names to hopefully make the code easier to reason about.
-rw-r--r-- | distbuild/json_router.py | 118 |
1 files changed, 81 insertions, 37 deletions
diff --git a/distbuild/json_router.py b/distbuild/json_router.py index 5bbb8fb9..150f9614 100644 --- a/distbuild/json_router.py +++ b/distbuild/json_router.py @@ -24,15 +24,21 @@ import distbuild class JsonRouter(distbuild.StateMachine): '''Route JSON messages between clients and helpers. - - This state machine receives JSON messages from clients and helpers, - and routes messages between them. - - Each incoming request is labeled with a unique identifier, then - sent to the next free helper. The helper's response will retain - the unique id, so that the response can be routed to the right - client. - + + This state machine receives JSON messages from the remote `morph + controller-daemon` process, which request either running a program + or making an HTTP request. These requests are forwarded to a local + `distbuild-helper` process, which actually does the work. + + When the work is complete the helper will send an 'exec-response' or + 'http-response' message. In the case of 'exec-request', it will send + 'exec-output' progress messages while the process runs, forwarding anything + that the subprocess writes to stderr or stdout. + + Each incoming request is labeled with a unique identifier, then sent to the + next free helper. The helper's response will retain the unique id, so that + the response can be routed to the right client. + ''' pending_requests = [] @@ -53,18 +59,38 @@ class JsonRouter(distbuild.StateMachine): spec = [ # state, source, event_class, new_state, callback - ('idle', jm, distbuild.JsonNewMessage, 'idle', self.bloop), + ('idle', jm, distbuild.JsonNewMessage, 'idle', self.route_message), ('idle', jm, distbuild.JsonEof, None, self.close), ] self.add_transitions(spec) - def _lookup_request(self, request_id): + def _lookup_request_originator(self, request_id): + '''Look up the JsonMachine which we received the request from. + + It will be connected to a controller-daemon process, by a socket. + + ''' if request_id in self.running_requests: - return self.running_requests[request_id] + client, request_msg, helper = self.running_requests[request_id] + assert request_msg['id'] != request_id + return client else: return None - - def bloop(self, event_source, event): + + def _lookup_request_helper(self, request_id): + '''Look up the JsonMachine we forwarded the request to. + + It will be connected to a distbuild-helper process, by a socket. + + ''' + if request_id in self.running_requests: + client, request_msg, helper = self.running_requests[request_id] + assert request_msg['id'] != request_id + return helper + else: + return None + + def route_message(self, event_source, event): logging.debug('JsonRouter: got msg: %s', repr(event.msg)) handlers = { 'http-request': self.do_request, @@ -78,55 +104,72 @@ class JsonRouter(distbuild.StateMachine): handler = handlers.get(event.msg['type']) handler(event_source, event) - def do_request(self, client, event): - self._enqueue_request(client, event.msg) + def do_request(self, controller_jm, event): + '''Handle a request from the controller to do something. + + The request is queued until a helper is available. + + ''' + self._enqueue_request(controller_jm, event.msg) if self.pending_helpers: self._send_request() - def do_cancel(self, client, event): + def do_cancel(self, controller_jm, event): + '''Cancel a running or queued request. + + Only exec-request can be cancelled. + + ''' + # There should only ever be one outgoing ID for the request (i.e. only + # one helper process should be doing it), but to be safe we cancel all + # of them if there are multiple. for id in self.route_map.get_outgoing_ids(event.msg['id']): logging.debug('JsonRouter: looking up request for id %s', id) - t = self._lookup_request(id) - if t: - helper = t[2] + helper_jm = self._lookup_request_helper(id) + + if helper_jm: new = dict(event.msg) new['id'] = id - helper.send(new) + helper_jm.send(new) logging.debug('JsonRouter: sent to helper: %s', repr(new)) else: logging.warn('do_cancel: Request %s is not running.' % id) - def do_response(self, helper, event): - t = self._lookup_request(event.msg['id']) - if t: - client, msg, helper = t + def do_response(self, helper_jm, event): + '''A request has completed.''' + controller_jm = self._lookup_request_originator(event.msg['id']) + if controller_jm: new = dict(event.msg) - new['id'] = self.route_map.get_incoming_id(msg['id']) - client.send(new) + new['id'] = self.route_map.get_incoming_id(event.msg['id']) + controller_jm.send(new) logging.debug('JsonRouter: sent to client: %s', repr(new)) - self.route_map.remove(msg['id']) + self.route_map.remove(event.msg['id']) else: logging.warn('do_response: Request %s is not running.' % event.msg['id']) - def do_helper_ready(self, helper, event): - self.pending_helpers.append(helper) + def do_helper_ready(self, helper_jm, event): + '''Add helper to the pool of available helpers.''' + self.pending_helpers.append(helper_jm) if self.pending_requests: self._send_request() - def do_exec_output(self, helper, event): - t = self._lookup_request(event.msg['id']) - if t: - client, msg, helper = t + def do_exec_output(self, helper_jm, event): + '''Forward stderr/stdout contents of a running exec-request.''' + + controller_jm = self._lookup_request_originator(event.msg['id']) + if controller_jm: new = dict(event.msg) - new['id'] = self.route_map.get_incoming_id(msg['id']) - client.send(new) - logging.debug('JsonRouter: sent to client: %s', repr(new)) + new['id'] = self.route_map.get_incoming_id(event.msg['id']) + controller_jm.send(new) + logging.debug('JsonRouter: sent to controller: %s', repr(new)) else: logging.warn('do_exec_output: Request %s is not running.' % event.msg['id']) def close(self, event_source, event): + '''Handle controller or helper disconnecting.''' + logging.debug('closing: %s', repr(event_source)) event_source.close() @@ -145,6 +188,7 @@ class JsonRouter(distbuild.StateMachine): elif event_source == helper: del self.running_requests[request_id] self._enqueue_request(client, msg) + self.route_map.remove(request_id) # Remove from pending requests, if the client quit. i = 0 |