summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Thursfield <sam.thursfield@codethink.co.uk>2015-02-03 15:27:19 +0000
committerSam Thursfield <sam.thursfield@codethink.co.uk>2015-02-03 17:42:44 +0000
commit5868a7eb27b232ed1261c4638978309e9cfcd871 (patch)
treef066ad950b4cf9a04a33edb66185fc42d4b91e6e
parentedf5232690a648b715a2649f37393c0565bc127f (diff)
downloadmorph-5868a7eb27b232ed1261c4638978309e9cfcd871.tar.gz
distbuild: Be more robust when a worker disconnects
The logic to handle a worker disconnecting was broken. The WorkerConnection object would remove itself from the main loop as soon as the worker disconnected. But it would not get removed from the list of available workers that the WorkerBuildQueue maintains. So the controller would continue sending messages to this dead connection, and the builds it sent would hang forever for a response.
-rw-r--r--distbuild/worker_build_scheduler.py43
1 files changed, 35 insertions, 8 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py
index be732153..4f7ff98f 100644
--- a/distbuild/worker_build_scheduler.py
+++ b/distbuild/worker_build_scheduler.py
@@ -1,6 +1,6 @@
# distbuild/worker_build_scheduler.py -- schedule worker-builds on workers
#
-# Copyright (C) 2012, 2014 Codethink Limited
+# Copyright (C) 2012, 2014-2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -93,6 +93,12 @@ class _HaveAJob(object):
def __init__(self, job):
self.job = job
+class _Disconnected(object):
+
+ def __init__(self, who):
+ self.who = who
+
+
class Job(object):
def __init__(self, job_id, artifact, initiator_id):
@@ -220,7 +226,10 @@ class WorkerBuildQueuer(distbuild.StateMachine):
('idle', WorkerConnection, _JobFinished, 'idle',
self._set_job_finished),
('idle', WorkerConnection, _JobFailed, 'idle',
- self._set_job_failed)
+ self._set_job_failed),
+
+ ('idle', WorkerConnection, _Disconnected, 'idle',
+ self._handle_worker_disconnected),
]
self.add_transitions(spec)
@@ -355,8 +364,22 @@ class WorkerBuildQueuer(distbuild.StateMachine):
(job.artifact.name, worker.who.name()))
self.mainloop.queue_event(worker.who, _HaveAJob(job))
-
-
+
+ def _handle_worker_disconnected(self, event):
+ self._remove_worker(self, event.who)
+
+ def _remove_worker(self, worker):
+ logging.debug('WBQ: Removing worker %s from queue', worker.name())
+
+ # There should only be one InitiatorConnection instance per worker in
+ # the _available_workers list. But anything can happen in space! So we
+ # take care to remove all GiveJob messages in the list that came from
+ # the disconnected worker, not the first.
+ self._available_workers = filter(
+ lambda worker_msg: worker_msg.who != worker,
+ self._available_workers)
+
+
class WorkerConnection(distbuild.StateMachine):
'''Communicate with a single worker.'''
@@ -397,14 +420,15 @@ class WorkerConnection(distbuild.StateMachine):
spec = [
# state, source, event_class, new_state, callback
- ('idle', self._jm, distbuild.JsonEof, None, self._reconnect),
+ ('idle', self._jm, distbuild.JsonEof, None, self._disconnected),
('idle', self, _HaveAJob, 'building', self._start_build),
('building', distbuild.BuildController,
distbuild.BuildCancel, 'building',
self._maybe_cancel),
- ('building', self._jm, distbuild.JsonEof, None, self._reconnect),
+ ('building', self._jm, distbuild.JsonEof, None,
+ self._disconnected),
('building', self._jm, distbuild.JsonNewMessage, 'building',
self._handle_json_message),
('building', self, _BuildFailed, 'idle', self._request_job),
@@ -412,6 +436,7 @@ class WorkerConnection(distbuild.StateMachine):
('building', self, _BuildFinished, 'caching',
self._request_caching),
+ ('caching', self._jm, distbuild.JsonEof, None, self._disconnected),
('caching', distbuild.HelperRouter, distbuild.HelperResult,
'caching', self._maybe_handle_helper_result),
('caching', self, _Cached, 'idle', self._request_job),
@@ -449,10 +474,12 @@ class WorkerConnection(distbuild.StateMachine):
self._job.initiators.remove(build_cancel.id)
- def _reconnect(self, event_source, event):
+ def _disconnected(self, event_source, event):
distbuild.crash_point()
- logging.debug('WC: Triggering reconnect')
+ logging.debug('WC: Disconnected from worker %s' % self.name())
+ self.mainloop.queue_event(InitiatorConnection, _Disconnected(self))
+
self.mainloop.queue_event(self._cm, distbuild.Reconnect())
def _start_build(self, event_source, event):