summaryrefslogtreecommitdiff
path: root/distbuild
diff options
context:
space:
mode:
Diffstat (limited to 'distbuild')
-rw-r--r--distbuild/ansible/hosts1
-rw-r--r--distbuild/initiator.py5
-rw-r--r--distbuild/initiator_connection.py34
-rw-r--r--distbuild/jm.py11
-rw-r--r--distbuild/protocol.py8
-rw-r--r--distbuild/worker_build_scheduler.py41
6 files changed, 79 insertions, 21 deletions
diff --git a/distbuild/ansible/hosts b/distbuild/ansible/hosts
new file mode 100644
index 00000000..2fbb50c4
--- /dev/null
+++ b/distbuild/ansible/hosts
@@ -0,0 +1 @@
+localhost
diff --git a/distbuild/initiator.py b/distbuild/initiator.py
index 8f9e0c38..549df66b 100644
--- a/distbuild/initiator.py
+++ b/distbuild/initiator.py
@@ -101,11 +101,12 @@ class Initiator(distbuild.StateMachine):
repo=self._repo_name,
ref=self._ref,
morphology=self._morphology,
- original_ref=self._original_ref
+ original_ref=self._original_ref,
+ protocol_version=distbuild.protocol.VERSION
)
self._jm.send(msg)
logging.debug('Initiator: sent to controller: %s', repr(msg))
-
+
def _handle_json_message(self, event_source, event):
distbuild.crash_point()
diff --git a/distbuild/initiator_connection.py b/distbuild/initiator_connection.py
index 4cd13db3..8b68fda3 100644
--- a/distbuild/initiator_connection.py
+++ b/distbuild/initiator_connection.py
@@ -98,15 +98,31 @@ class InitiatorConnection(distbuild.StateMachine):
logging.debug('InitiatorConnection: from %s: %r', self.initiator_name,
event.msg)
- if event.msg['type'] == 'build-request':
- new_id = self._idgen.next()
- self.our_ids.add(new_id)
- self._route_map.add(event.msg['id'], new_id)
- event.msg['id'] = new_id
- build_controller = distbuild.BuildController(
- self, event.msg, self.artifact_cache_server,
- self.morph_instance)
- self.mainloop.add_state_machine(build_controller)
+ try:
+ if event.msg['type'] == 'build-request':
+ if (event.msg.get('protocol_version') !=
+ distbuild.protocol.VERSION):
+ msg = distbuild.message('build-failed',
+ id=event.msg['id'],
+ reason=('Protocol version mismatch between server & '
+ 'initiator: distbuild network uses distbuild '
+ 'protocol version %i, but client uses version'
+ ' %i.', distbuild.protocol.VERSION,
+ event.msg.get('protocol_version')))
+ self.jm.send(msg)
+ self._log_send(msg)
+ return
+ new_id = self._idgen.next()
+ self.our_ids.add(new_id)
+ self._route_map.add(event.msg['id'], new_id)
+ event.msg['id'] = new_id
+ build_controller = distbuild.BuildController(
+ self, event.msg, self.artifact_cache_server,
+ self.morph_instance)
+ self.mainloop.add_state_machine(build_controller)
+ except (KeyError, ValueError) as ex:
+ logging.error('Invalid message from initiator: %s: exception %s',
+ event.msg, ex)
def _disconnect(self, event_source, event):
for id in self.our_ids:
diff --git a/distbuild/jm.py b/distbuild/jm.py
index 615100e4..85510924 100644
--- a/distbuild/jm.py
+++ b/distbuild/jm.py
@@ -1,6 +1,6 @@
# mainloop/jm.py -- state machine for JSON communication between nodes
#
-# Copyright (C) 2012, 2014 Codethink Limited
+# Copyright (C) 2012, 2014 - 2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -109,8 +109,13 @@ class JsonMachine(StateMachine):
line = line.rstrip()
if self.debug_json:
logging.debug('JsonMachine: line: %s' % repr(line))
- msg = yaml.load(json.loads(line))
- self.mainloop.queue_event(self, JsonNewMessage(msg))
+ msg = None
+ try:
+ msg = yaml.safe_load(json.loads(line))
+ except Exception:
+ logging.error('Invalid input: %s' % line)
+ if msg:
+ self.mainloop.queue_event(self, JsonNewMessage(msg))
def _send_eof(self, event_source, event):
self.mainloop.queue_event(self, JsonEof())
diff --git a/distbuild/protocol.py b/distbuild/protocol.py
index dee45d17..141df742 100644
--- a/distbuild/protocol.py
+++ b/distbuild/protocol.py
@@ -19,12 +19,20 @@
'''Construct protocol message objects (dicts).'''
+# Version refers to an integer that should be incremented by one each time a
+# time a change is introduced that would break server/initiator compatibility
+
+
+VERSION = 1
+
+
_required_fields = {
'build-request': [
'id',
'repo',
'ref',
'morphology',
+ 'protocol_version',
],
'build-progress': [
'id',
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py
index bf0d87b1..81c961e1 100644
--- a/distbuild/worker_build_scheduler.py
+++ b/distbuild/worker_build_scheduler.py
@@ -93,6 +93,12 @@ class _HaveAJob(object):
def __init__(self, job):
self.job = job
+class _Disconnected(object):
+
+ def __init__(self, who):
+ self.who = who
+
+
class Job(object):
def __init__(self, job_id, artifact, initiator_id):
@@ -220,7 +226,10 @@ class WorkerBuildQueuer(distbuild.StateMachine):
('idle', WorkerConnection, _JobFinished, 'idle',
self._set_job_finished),
('idle', WorkerConnection, _JobFailed, 'idle',
- self._set_job_failed)
+ self._set_job_failed),
+
+ ('idle', WorkerConnection, _Disconnected, 'idle',
+ self._handle_worker_disconnected),
]
self.add_transitions(spec)
@@ -355,8 +364,22 @@ class WorkerBuildQueuer(distbuild.StateMachine):
(job.artifact.name, worker.who.name()))
self.mainloop.queue_event(worker.who, _HaveAJob(job))
-
-
+
+ def _handle_worker_disconnected(self, event):
+ self._remove_worker(self, event.who)
+
+ def _remove_worker(self, worker):
+ logging.debug('WBQ: Removing worker %s from queue', worker.name())
+
+ # There should only be one InitiatorConnection instance per worker in
+ # the _available_workers list. But anything can happen in space! So we
+ # take care to remove all GiveJob messages in the list that came from
+ # the disconnected worker, not the first.
+ self._available_workers = filter(
+ lambda worker_msg: worker_msg.who != worker,
+ self._available_workers)
+
+
class WorkerConnection(distbuild.StateMachine):
'''Communicate with a single worker.'''
@@ -400,14 +423,15 @@ class WorkerConnection(distbuild.StateMachine):
spec = [
# state, source, event_class, new_state, callback
- ('idle', self._jm, distbuild.JsonEof, None, self._reconnect),
+ ('idle', self._jm, distbuild.JsonEof, None, self._disconnected),
('idle', self, _HaveAJob, 'building', self._start_build),
('building', distbuild.BuildController,
distbuild.BuildCancel, 'building',
self._maybe_cancel),
- ('building', self._jm, distbuild.JsonEof, None, self._reconnect),
+ ('building', self._jm, distbuild.JsonEof, None,
+ self._disconnected),
('building', self._jm, distbuild.JsonNewMessage, 'building',
self._handle_json_message),
('building', self, _BuildFailed, 'idle', self._request_job),
@@ -415,6 +439,7 @@ class WorkerConnection(distbuild.StateMachine):
('building', self, _BuildFinished, 'caching',
self._request_caching),
+ ('caching', self._jm, distbuild.JsonEof, None, self._disconnected),
('caching', distbuild.HelperRouter, distbuild.HelperResult,
'caching', self._maybe_handle_helper_result),
('caching', self, _Cached, 'idle', self._request_job),
@@ -451,10 +476,12 @@ class WorkerConnection(distbuild.StateMachine):
job.initiators.remove(build_cancel.id)
- def _reconnect(self, event_source, event):
+ def _disconnected(self, event_source, event):
distbuild.crash_point()
- logging.debug('WC: Triggering reconnect')
+ logging.debug('WC: Disconnected from worker %s' % self.name())
+ self.mainloop.queue_event(InitiatorConnection, _Disconnected(self))
+
self.mainloop.queue_event(self._cm, distbuild.Reconnect())
def _start_build(self, event_source, event):