diff options
Diffstat (limited to 'distbuild')
-rw-r--r-- | distbuild/ansible/hosts | 1 | ||||
-rw-r--r-- | distbuild/initiator.py | 5 | ||||
-rw-r--r-- | distbuild/initiator_connection.py | 34 | ||||
-rw-r--r-- | distbuild/jm.py | 11 | ||||
-rw-r--r-- | distbuild/protocol.py | 8 | ||||
-rw-r--r-- | distbuild/worker_build_scheduler.py | 41 |
6 files changed, 79 insertions, 21 deletions
diff --git a/distbuild/ansible/hosts b/distbuild/ansible/hosts new file mode 100644 index 00000000..2fbb50c4 --- /dev/null +++ b/distbuild/ansible/hosts @@ -0,0 +1 @@ +localhost diff --git a/distbuild/initiator.py b/distbuild/initiator.py index 8f9e0c38..549df66b 100644 --- a/distbuild/initiator.py +++ b/distbuild/initiator.py @@ -101,11 +101,12 @@ class Initiator(distbuild.StateMachine): repo=self._repo_name, ref=self._ref, morphology=self._morphology, - original_ref=self._original_ref + original_ref=self._original_ref, + protocol_version=distbuild.protocol.VERSION ) self._jm.send(msg) logging.debug('Initiator: sent to controller: %s', repr(msg)) - + def _handle_json_message(self, event_source, event): distbuild.crash_point() diff --git a/distbuild/initiator_connection.py b/distbuild/initiator_connection.py index 4cd13db3..8b68fda3 100644 --- a/distbuild/initiator_connection.py +++ b/distbuild/initiator_connection.py @@ -98,15 +98,31 @@ class InitiatorConnection(distbuild.StateMachine): logging.debug('InitiatorConnection: from %s: %r', self.initiator_name, event.msg) - if event.msg['type'] == 'build-request': - new_id = self._idgen.next() - self.our_ids.add(new_id) - self._route_map.add(event.msg['id'], new_id) - event.msg['id'] = new_id - build_controller = distbuild.BuildController( - self, event.msg, self.artifact_cache_server, - self.morph_instance) - self.mainloop.add_state_machine(build_controller) + try: + if event.msg['type'] == 'build-request': + if (event.msg.get('protocol_version') != + distbuild.protocol.VERSION): + msg = distbuild.message('build-failed', + id=event.msg['id'], + reason=('Protocol version mismatch between server & ' + 'initiator: distbuild network uses distbuild ' + 'protocol version %i, but client uses version' + ' %i.', distbuild.protocol.VERSION, + event.msg.get('protocol_version'))) + self.jm.send(msg) + self._log_send(msg) + return + new_id = self._idgen.next() + self.our_ids.add(new_id) + self._route_map.add(event.msg['id'], new_id) + event.msg['id'] = new_id + build_controller = distbuild.BuildController( + self, event.msg, self.artifact_cache_server, + self.morph_instance) + self.mainloop.add_state_machine(build_controller) + except (KeyError, ValueError) as ex: + logging.error('Invalid message from initiator: %s: exception %s', + event.msg, ex) def _disconnect(self, event_source, event): for id in self.our_ids: diff --git a/distbuild/jm.py b/distbuild/jm.py index 615100e4..85510924 100644 --- a/distbuild/jm.py +++ b/distbuild/jm.py @@ -1,6 +1,6 @@ # mainloop/jm.py -- state machine for JSON communication between nodes # -# Copyright (C) 2012, 2014 Codethink Limited +# Copyright (C) 2012, 2014 - 2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -109,8 +109,13 @@ class JsonMachine(StateMachine): line = line.rstrip() if self.debug_json: logging.debug('JsonMachine: line: %s' % repr(line)) - msg = yaml.load(json.loads(line)) - self.mainloop.queue_event(self, JsonNewMessage(msg)) + msg = None + try: + msg = yaml.safe_load(json.loads(line)) + except Exception: + logging.error('Invalid input: %s' % line) + if msg: + self.mainloop.queue_event(self, JsonNewMessage(msg)) def _send_eof(self, event_source, event): self.mainloop.queue_event(self, JsonEof()) diff --git a/distbuild/protocol.py b/distbuild/protocol.py index dee45d17..141df742 100644 --- a/distbuild/protocol.py +++ b/distbuild/protocol.py @@ -19,12 +19,20 @@ '''Construct protocol message objects (dicts).''' +# Version refers to an integer that should be incremented by one each time a +# time a change is introduced that would break server/initiator compatibility + + +VERSION = 1 + + _required_fields = { 'build-request': [ 'id', 'repo', 'ref', 'morphology', + 'protocol_version', ], 'build-progress': [ 'id', diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index bf0d87b1..81c961e1 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -93,6 +93,12 @@ class _HaveAJob(object): def __init__(self, job): self.job = job +class _Disconnected(object): + + def __init__(self, who): + self.who = who + + class Job(object): def __init__(self, job_id, artifact, initiator_id): @@ -220,7 +226,10 @@ class WorkerBuildQueuer(distbuild.StateMachine): ('idle', WorkerConnection, _JobFinished, 'idle', self._set_job_finished), ('idle', WorkerConnection, _JobFailed, 'idle', - self._set_job_failed) + self._set_job_failed), + + ('idle', WorkerConnection, _Disconnected, 'idle', + self._handle_worker_disconnected), ] self.add_transitions(spec) @@ -355,8 +364,22 @@ class WorkerBuildQueuer(distbuild.StateMachine): (job.artifact.name, worker.who.name())) self.mainloop.queue_event(worker.who, _HaveAJob(job)) - - + + def _handle_worker_disconnected(self, event): + self._remove_worker(self, event.who) + + def _remove_worker(self, worker): + logging.debug('WBQ: Removing worker %s from queue', worker.name()) + + # There should only be one InitiatorConnection instance per worker in + # the _available_workers list. But anything can happen in space! So we + # take care to remove all GiveJob messages in the list that came from + # the disconnected worker, not the first. + self._available_workers = filter( + lambda worker_msg: worker_msg.who != worker, + self._available_workers) + + class WorkerConnection(distbuild.StateMachine): '''Communicate with a single worker.''' @@ -400,14 +423,15 @@ class WorkerConnection(distbuild.StateMachine): spec = [ # state, source, event_class, new_state, callback - ('idle', self._jm, distbuild.JsonEof, None, self._reconnect), + ('idle', self._jm, distbuild.JsonEof, None, self._disconnected), ('idle', self, _HaveAJob, 'building', self._start_build), ('building', distbuild.BuildController, distbuild.BuildCancel, 'building', self._maybe_cancel), - ('building', self._jm, distbuild.JsonEof, None, self._reconnect), + ('building', self._jm, distbuild.JsonEof, None, + self._disconnected), ('building', self._jm, distbuild.JsonNewMessage, 'building', self._handle_json_message), ('building', self, _BuildFailed, 'idle', self._request_job), @@ -415,6 +439,7 @@ class WorkerConnection(distbuild.StateMachine): ('building', self, _BuildFinished, 'caching', self._request_caching), + ('caching', self._jm, distbuild.JsonEof, None, self._disconnected), ('caching', distbuild.HelperRouter, distbuild.HelperResult, 'caching', self._maybe_handle_helper_result), ('caching', self, _Cached, 'idle', self._request_job), @@ -451,10 +476,12 @@ class WorkerConnection(distbuild.StateMachine): job.initiators.remove(build_cancel.id) - def _reconnect(self, event_source, event): + def _disconnected(self, event_source, event): distbuild.crash_point() - logging.debug('WC: Triggering reconnect') + logging.debug('WC: Disconnected from worker %s' % self.name()) + self.mainloop.queue_event(InitiatorConnection, _Disconnected(self)) + self.mainloop.queue_event(self._cm, distbuild.Reconnect()) def _start_build(self, event_source, event): |