summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Thursfield <sam.thursfield@codethink.co.uk>2015-04-09 15:06:46 +0000
committerSam Thursfield <sam.thursfield@codethink.co.uk>2015-05-05 12:58:29 +0000
commit93598c2488d27c583285251ab12428a37dbeaa16 (patch)
treed39916479083829adb3930d5a7a6111d40a7431b
parentfa44693f1c5fe9dcbfaa097e4dbe2b129e3f94fe (diff)
downloadmorph-sam/distbuild-controller-job-tracking.tar.gz
distbuild: Allow WorkerConnection to track multiple in-flight jobssam/distbuild-controller-job-tracking
Although in theory a worker should only ever have one job at once, in practice this assumption doesn't hold, and can cause serious confusion. The worker (implemented in the JsonRouter class) will actually queue up exec-request messages and run the oldest one first. I saw a case where, due to a build not being correctly cancelled, the WorkerConnection.current_job attribute got out of sync with what the worker was actually building. This lead to an error when trying to fetch the built artifacts, as the controller tried to fetch artifacts for something that wasn't actually built yet, and everything got stuck. To prevent this from happening, we either need to remove the exec-request queue in the worker-daemon process, or make the WorkerConnection class cope with multiple jobs at once. The latter seems like the more robust approach, so I have done that. Another bug this fixes is the issue where, if the 'Computing build graph' (serialise-artifact) step of a build completes on the controller while one of its WorkerConnection objects is waiting for artifacts to be fetched by the shared cache from the worker, the build hangs. This would happen because the WorkerConnection assumed that any HelperResponse message it saw was the result of its request, so would send a _JobFinished before caching had actually finished if there was an unrelated HelperResponse received in the meantime. It now checks the request ID of the HelperResponse before calling the code that is now in the new _handle_helper_result_for_job() function. Change-Id: Ia961f333f9dae77405b58c82c99a56e4c43e1628
-rw-r--r--distbuild/worker_build_scheduler.py224
1 files changed, 115 insertions, 109 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py
index bc2df4b1..71e1c3ef 100644
--- a/distbuild/worker_build_scheduler.py
+++ b/distbuild/worker_build_scheduler.py
@@ -134,11 +134,11 @@ class JobQueue(object):
self._jobs = {}
def get(self, artifact_basename):
- return (self._jobs[artifact_basename]
- if artifact_basename in self._jobs else None)
+ return self._jobs.get(artifact_basename, None)
def add(self, job):
artifact_basename = job.artifact.basename()
+
if artifact_basename in self._jobs:
logging.error(
"Duplicate job for %s added to %s job queue, ignoring.",
@@ -153,23 +153,25 @@ class JobQueue(object):
logging.warning("Tried to remove a job that doesn't exist "
"(%s)", job.artifact.basename())
- def get_jobs(self):
- return self._jobs
+ def __contains__(self, artifact_basename):
+ return artifact_basename in self._jobs
+
+ def __iter__(self):
+ return self._jobs.itervalues()
def remove_jobs(self, jobs):
for job in jobs:
self.remove(job)
- def exists(self, artifact_basename):
- return artifact_basename in self._jobs
-
def get_next_job(self):
# for now just return the first thing we find that's not being built
- waiting = [job for (_, job) in
- self._jobs.iteritems() if job.who == None]
+ waiting = [job for job in self if job.who == None]
return waiting.pop() if len(waiting) > 0 else None
+ def running_jobs(self):
+ return [job for job in self if job.running()]
+
def __repr__(self):
items = []
for job in self._jobs.itervalues():
@@ -180,7 +182,8 @@ class JobQueue(object):
class _BuildFinished(object):
- pass
+ def __init__(self, job):
+ self.job = job
class _BuildFailed(object):
@@ -260,10 +263,14 @@ class WorkerBuildQueuer(distbuild.StateMachine):
event.job.set_state('running')
def _set_job_finished(self, event_source, event):
- event.job.set_state('complete')
+ job = event.job
+ job.set_state('complete')
+ self._jobs.remove(job)
def _set_job_failed(self, event_source, event):
- event.job.set_state('failed')
+ job = event.job
+ job.set_state('failed')
+ self._jobs.remove(job)
def _handle_request(self, event_source, event):
distbuild.crash_point()
@@ -276,7 +283,7 @@ class WorkerBuildQueuer(distbuild.StateMachine):
# If so, add our initiator id to the existing job
# If not, create a job
- if self._jobs.exists(event.artifact.basename()):
+ if event.artifact.basename() in self._jobs:
job = self._jobs.get(event.artifact.basename())
job.initiators.append(event.initiator_id)
@@ -338,24 +345,11 @@ class WorkerBuildQueuer(distbuild.StateMachine):
return False
self._jobs.remove_jobs(
- [job for (_, job) in self._jobs.get_jobs().iteritems()
- if cancel_this(job)])
+ [job for job in self._jobs if cancel_this(job)])
def _handle_worker(self, event_source, event):
distbuild.crash_point()
- who = event.who
- last_job = who.current_job() # the job this worker's just completed
-
- if last_job:
- logging.debug('%s wants new job, just did %s',
- who.name(), last_job.artifact.basename())
-
- logging.debug('Removing job %s', last_job.artifact.basename())
- self._jobs.remove(last_job)
- else:
- logging.debug('%s wants its first job', who.name())
-
logging.debug('WBQ: Adding worker to queue: %s', event.who.name())
self._available_workers.append(event)
@@ -408,21 +402,15 @@ class WorkerConnection(distbuild.StateMachine):
self._worker_cache_server_port = worker_cache_server_port
self._morph_instance = morph_instance
- self._active_jobs = dict()
- self._current_job = None
- self._current_job_exec_response = None
- self._current_job_cache_request = None
-
addr, port = self._conn.getpeername()
name = socket.getfqdn(addr)
self._worker_name = '%s:%s' % (name, port)
+ self._jobs = JobQueue(owner=self.name())
+
def name(self):
return self._worker_name
- def current_job(self):
- return self._current_job
-
def __str__(self):
return self.name()
@@ -463,27 +451,38 @@ class WorkerConnection(distbuild.StateMachine):
self._request_job(None, None)
def _maybe_cancel(self, event_source, build_cancel):
-
- if build_cancel.id not in self._current_job.initiators:
- return # event not relevant
-
logging.debug('WC: BuildController %r requested a cancel',
event_source)
- job = self._current_job
- if (len(job.initiators) == 1):
- logging.debug('WC: Cancelling running job %s running on %s',
- job.artifact.basename(), self.name())
+ initiator_id = build_cancel.id
+ for job in self._jobs.running_jobs():
+ self._remove_initiator_from_job(job, initiator_id)
- msg = distbuild.message('exec-cancel', id=job.artifact.basename())
- self._jm.send(msg)
- self.mainloop.queue_event(self, _BuildCancelled())
- else:
- logging.debug('WC: Not cancelling running job %s, other initiators '
- 'want it done: %s', job.artifact.basename(),
- [i for i in job.initiators if i != build_cancel.id])
+ def _remove_initiator_from_job(self, job, initiator_id):
+ '''Remove the given initiator from 'job', and cancel it if needed.
+
+ If the given initiator is not interested in 'job', nothing happens.
- job.initiators.remove(build_cancel.id)
+ '''
+
+ if initiator_id in job.initiators:
+ if len(job.initiators) == 1:
+ self._cancel_job(job)
+ else:
+ logging.debug(
+ 'WC: Not cancelling running job %s, other initiators want '
+ 'it done: %s', job.artifact.basename(),
+ [i for i in job.initiators if i != initiator_id])
+ job.initiators.remove(initiator_id)
+
+ def _cancel_job(self, job):
+ logging.debug(
+ 'WC: Cancelling job %s, currently building on %s',
+ job.artifact.basename(), self.name())
+
+ msg = distbuild.message('exec-cancel', id=job.artifact.basename())
+ self._jm.send(msg)
+ self.mainloop.queue_event(self, _BuildCancelled())
def _disconnected(self, event_source, event):
distbuild.crash_point()
@@ -493,20 +492,27 @@ class WorkerConnection(distbuild.StateMachine):
self.mainloop.queue_event(self._cm, distbuild.Reconnect())
- def _start_build(self, event_source, event):
- distbuild.crash_point()
-
- job = event.job
+ def _sanity_check_new_job(self, job):
+ # There's nothing that we can really do if the controller goes nuts
+ # (there's no 'reject job' message), but we can at least log warnings.
- if job.artifact.basename() in self._active_jobs:
+ if job.artifact.basename() in self._jobs:
logging.warn('Worker %s already has job %s', self.name(),
job.artifact.basename())
- if self._current_job_exec_response or self._current_job_cache_request:
- logging.warn('Caching not finished for %s', self._current_job.id)
+ running_jobs = self._jobs.running_jobs()
+ if len(running_jobs) != 0:
+ logging.warn('This worker already has running jobs: %s',
+ running_jobs)
+
+ def _start_build(self, event_source, event):
+ distbuild.crash_point()
+
+ job = event.job
+ self._sanity_check_new_job(job)
- self._active_jobs[job.artifact.basename()] = job
- self._current_job = job
+ self._jobs.add(job)
+ job.set_state('running')
logging.debug('WC: starting build: %s for %s' %
(job.artifact.name, job.initiators))
@@ -527,10 +533,12 @@ class WorkerConnection(distbuild.StateMachine):
)
self._jm.send(msg)
+ # The WorkerBuildQueuer object will set the job state to 'running' when
+ # it receives the _JobStarted message.
+ self.mainloop.queue_event(WorkerConnection, _JobStarted(job))
+
started = WorkerBuildStepStarted(job.initiators,
job.artifact.cache_key, self.name())
-
- self.mainloop.queue_event(WorkerConnection, _JobStarted(job))
self.mainloop.queue_event(WorkerConnection, started)
def _handle_json_message(self, event_source, event):
@@ -547,7 +555,7 @@ class WorkerConnection(distbuild.StateMachine):
}
handler = handlers[event.msg['type']]
- job = self._active_jobs.get(event.msg['id'])
+ job = self._jobs.get(event.msg['id'])
if job:
handler(event.msg, job)
@@ -583,12 +591,8 @@ class WorkerConnection(distbuild.StateMachine):
self.mainloop.queue_event(self, _BuildFailed())
else:
# Build succeeded. We have more work to do: caching the result.
- self.mainloop.queue_event(self, _BuildFinished())
- self._current_job_exec_response = new
-
- # The job is no longer considered active, because the worker is
- # finished with it so we won't receive any more messages about it.
- del self._active_jobs[job.artifact.basename()]
+ self.mainloop.queue_event(self, _BuildFinished(job))
+ job._exec_response = new
def _request_job(self, event_source, event):
distbuild.crash_point()
@@ -602,7 +606,7 @@ class WorkerConnection(distbuild.StateMachine):
logging.debug('Requesting shared artifact cache to get artifacts')
- job = self._current_job
+ job = event.job
kind = job.artifact.kind
if kind == 'chunk':
@@ -633,7 +637,7 @@ class WorkerConnection(distbuild.StateMachine):
msg = distbuild.message(
'http-request', id=self._request_ids.next(), url=url,
method='GET', body=None, headers=None)
- self._current_job_cache_request = msg['id']
+ job._cache_request_id = msg['id']
req = distbuild.HelperRequest(msg)
self.mainloop.queue_event(distbuild.HelperRouter, req)
@@ -642,41 +646,43 @@ class WorkerConnection(distbuild.StateMachine):
self.mainloop.queue_event(WorkerConnection, progress)
def _maybe_handle_helper_result(self, event_source, event):
- if event.msg['id'] == self._current_job_cache_request:
- distbuild.crash_point()
-
- logging.debug('caching: event.msg: %s' % repr(event.msg))
- if event.msg['status'] == httplib.OK:
- logging.debug('Shared artifact cache population done')
-
- new_event = WorkerBuildFinished(
- self._current_job_exec_response,
- self._current_job.artifact.cache_key)
- self.mainloop.queue_event(WorkerConnection, new_event)
- self.mainloop.queue_event(self, _Cached())
- else:
- logging.error(
- 'Failed to populate artifact cache: %s %s' %
- (event.msg['status'], event.msg['body']))
-
- # We will attempt to remove this job twice
- # unless we mark it as failed before the BuildController
- # processes the WorkerBuildFailed event.
- #
- # The BuildController will not try to cancel jobs that have
- # been marked as failed.
- self.mainloop.queue_event(WorkerConnection,
- _JobFailed(self._current_job))
-
- new_event = WorkerBuildFailed(
- self._current_job_exec_response,
- self._current_job.artifact.cache_key)
- self.mainloop.queue_event(WorkerConnection, new_event)
-
- self.mainloop.queue_event(self, _BuildFailed())
-
- self.mainloop.queue_event(WorkerConnection,
- _JobFinished(self._current_job))
-
- self._current_job_exec_response = None
- self._current_job_cache_request = None
+ # This function is called for every HelperResult message sent by the
+ # controller's distbuild-helper process (for every completed or failed
+ # http-request).
+ for job in self._jobs:
+ if event.msg['id'] == getattr(job, '_cache_request_id', None):
+ self._handle_helper_result_for_job(job, event)
+
+ def _handle_helper_result_for_job(self, job, event):
+ distbuild.crash_point()
+
+ logging.debug('caching: event.msg: %s' % repr(event.msg))
+ if event.msg['status'] == httplib.OK:
+ logging.debug('Shared artifact cache population done')
+
+ finished_event = WorkerBuildFinished(
+ job._exec_response, job.artifact.cache_key)
+ self.mainloop.queue_event(WorkerConnection, finished_event)
+
+ self.mainloop.queue_event(self, _Cached())
+ else:
+ logging.error(
+ 'Failed to populate artifact cache: %s %s' %
+ (event.msg['status'], event.msg['body']))
+
+ # We will attempt to remove this job twice
+ # unless we mark it as failed before the BuildController
+ # processes the WorkerBuildFailed event.
+ #
+ # The BuildController will not try to cancel jobs that have
+ # been marked as failed.
+ self.mainloop.queue_event(WorkerConnection, _JobFailed(job))
+
+ failed_event = WorkerBuildFailed(
+ job._exec_response, job.artifact.cache_key)
+ self.mainloop.queue_event(WorkerConnection, failed_event)
+
+ self.mainloop.queue_event(self, _BuildFailed())
+
+ # Caching is the last step of a job, so we're now done with it.
+ self.mainloop.queue_event(WorkerConnection, _JobFinished(job))