summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Thursfield <sam.thursfield@codethink.co.uk>2015-02-17 16:26:56 +0000
committerSam Thursfield <sam.thursfield@codethink.co.uk>2015-02-17 17:07:51 +0000
commit3337211deae718413884e2eb9ae84d9eced95fa4 (patch)
treec104c53554961e3a6529d8415401836f655b92b8
parentd8fba3d2ca7714493d2bd71019026eef313a538c (diff)
downloadmorph-3337211deae718413884e2eb9ae84d9eced95fa4.tar.gz
distbuild: Make JsonRouter code less generic
The JsonRouter object is actually only used in the `morph worker-daemon` process. I find it hard to understand what's going on in this class because the names are generic (e.g. 'client' could refer to anything, but in fact the controller-daemon is the only 'client' there will ever be). This commit adds docstrings and cleans up some function and variable names to hopefully make the code easier to reason about.
-rw-r--r--distbuild/json_router.py118
1 files changed, 81 insertions, 37 deletions
diff --git a/distbuild/json_router.py b/distbuild/json_router.py
index 5bbb8fb9..150f9614 100644
--- a/distbuild/json_router.py
+++ b/distbuild/json_router.py
@@ -24,15 +24,21 @@ import distbuild
class JsonRouter(distbuild.StateMachine):
'''Route JSON messages between clients and helpers.
-
- This state machine receives JSON messages from clients and helpers,
- and routes messages between them.
-
- Each incoming request is labeled with a unique identifier, then
- sent to the next free helper. The helper's response will retain
- the unique id, so that the response can be routed to the right
- client.
-
+
+ This state machine receives JSON messages from the remote `morph
+ controller-daemon` process, which request either running a program
+ or making an HTTP request. These requests are forwarded to a local
+ `distbuild-helper` process, which actually does the work.
+
+ When the work is complete the helper will send an 'exec-response' or
+ 'http-response' message. In the case of 'exec-request', it will send
+ 'exec-output' progress messages while the process runs, forwarding anything
+ that the subprocess writes to stderr or stdout.
+
+ Each incoming request is labeled with a unique identifier, then sent to the
+ next free helper. The helper's response will retain the unique id, so that
+ the response can be routed to the right client.
+
'''
pending_requests = []
@@ -53,18 +59,38 @@ class JsonRouter(distbuild.StateMachine):
spec = [
# state, source, event_class, new_state, callback
- ('idle', jm, distbuild.JsonNewMessage, 'idle', self.bloop),
+ ('idle', jm, distbuild.JsonNewMessage, 'idle', self.route_message),
('idle', jm, distbuild.JsonEof, None, self.close),
]
self.add_transitions(spec)
- def _lookup_request(self, request_id):
+ def _lookup_request_originator(self, request_id):
+ '''Look up the JsonMachine which we received the request from.
+
+ It will be connected to a controller-daemon process, by a socket.
+
+ '''
if request_id in self.running_requests:
- return self.running_requests[request_id]
+ client, request_msg, helper = self.running_requests[request_id]
+ assert request_msg['id'] != request_id
+ return client
else:
return None
-
- def bloop(self, event_source, event):
+
+ def _lookup_request_helper(self, request_id):
+ '''Look up the JsonMachine we forwarded the request to.
+
+ It will be connected to a distbuild-helper process, by a socket.
+
+ '''
+ if request_id in self.running_requests:
+ client, request_msg, helper = self.running_requests[request_id]
+ assert request_msg['id'] != request_id
+ return helper
+ else:
+ return None
+
+ def route_message(self, event_source, event):
logging.debug('JsonRouter: got msg: %s', repr(event.msg))
handlers = {
'http-request': self.do_request,
@@ -78,55 +104,72 @@ class JsonRouter(distbuild.StateMachine):
handler = handlers.get(event.msg['type'])
handler(event_source, event)
- def do_request(self, client, event):
- self._enqueue_request(client, event.msg)
+ def do_request(self, controller_jm, event):
+ '''Handle a request from the controller to do something.
+
+ The request is queued until a helper is available.
+
+ '''
+ self._enqueue_request(controller_jm, event.msg)
if self.pending_helpers:
self._send_request()
- def do_cancel(self, client, event):
+ def do_cancel(self, controller_jm, event):
+ '''Cancel a running or queued request.
+
+ Only exec-request can be cancelled.
+
+ '''
+ # There should only ever be one outgoing ID for the request (i.e. only
+ # one helper process should be doing it), but to be safe we cancel all
+ # of them if there are multiple.
for id in self.route_map.get_outgoing_ids(event.msg['id']):
logging.debug('JsonRouter: looking up request for id %s', id)
- t = self._lookup_request(id)
- if t:
- helper = t[2]
+ helper_jm = self._lookup_request_helper(id)
+
+ if helper_jm:
new = dict(event.msg)
new['id'] = id
- helper.send(new)
+ helper_jm.send(new)
logging.debug('JsonRouter: sent to helper: %s', repr(new))
else:
logging.warn('do_cancel: Request %s is not running.' % id)
- def do_response(self, helper, event):
- t = self._lookup_request(event.msg['id'])
- if t:
- client, msg, helper = t
+ def do_response(self, helper_jm, event):
+ '''A request has completed.'''
+ controller_jm = self._lookup_request_originator(event.msg['id'])
+ if controller_jm:
new = dict(event.msg)
- new['id'] = self.route_map.get_incoming_id(msg['id'])
- client.send(new)
+ new['id'] = self.route_map.get_incoming_id(event.msg['id'])
+ controller_jm.send(new)
logging.debug('JsonRouter: sent to client: %s', repr(new))
- self.route_map.remove(msg['id'])
+ self.route_map.remove(event.msg['id'])
else:
logging.warn('do_response: Request %s is not running.' %
event.msg['id'])
- def do_helper_ready(self, helper, event):
- self.pending_helpers.append(helper)
+ def do_helper_ready(self, helper_jm, event):
+ '''Add helper to the pool of available helpers.'''
+ self.pending_helpers.append(helper_jm)
if self.pending_requests:
self._send_request()
- def do_exec_output(self, helper, event):
- t = self._lookup_request(event.msg['id'])
- if t:
- client, msg, helper = t
+ def do_exec_output(self, helper_jm, event):
+ '''Forward stderr/stdout contents of a running exec-request.'''
+
+ controller_jm = self._lookup_request_originator(event.msg['id'])
+ if controller_jm:
new = dict(event.msg)
- new['id'] = self.route_map.get_incoming_id(msg['id'])
- client.send(new)
- logging.debug('JsonRouter: sent to client: %s', repr(new))
+ new['id'] = self.route_map.get_incoming_id(event.msg['id'])
+ controller_jm.send(new)
+ logging.debug('JsonRouter: sent to controller: %s', repr(new))
else:
logging.warn('do_exec_output: Request %s is not running.' %
event.msg['id'])
def close(self, event_source, event):
+ '''Handle controller or helper disconnecting.'''
+
logging.debug('closing: %s', repr(event_source))
event_source.close()
@@ -145,6 +188,7 @@ class JsonRouter(distbuild.StateMachine):
elif event_source == helper:
del self.running_requests[request_id]
self._enqueue_request(client, msg)
+ self.route_map.remove(request_id)
# Remove from pending requests, if the client quit.
i = 0