summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Thursfield <sam.thursfield@codethink.co.uk>2015-02-18 10:50:49 +0000
committerSam Thursfield <sam.thursfield@codethink.co.uk>2015-02-18 16:32:50 +0000
commit441241801c152984fa985a61348aff2c988acbfa (patch)
tree16cbd009cb530733977c8224e0e1c036d57e0b85
parentf8dad6717ceeafbbbd5e98c9b60626d646e27b90 (diff)
downloadmorph-441241801c152984fa985a61348aff2c988acbfa.tar.gz
distbuild: Fix build logs being sent to the wrong log files
For a while we have seen an issue where output from build A would end up in the log file of some other random chunk. The problem turns out to be that the WorkerConnection class in the controller-daemon assumes cancellation is instantaneous. If a build was cancelled, the WorkerConnection would send a cancel message for the job it was running, and then start a new job. However, the worker-daemon process would have a backlog of exec-output messages and a delayed exec-response message from the old job. The controller would receive these and would assume that they were for the new job, without checking the job ID in the messages. Thus they would be sent to the wrong log file. To fix this, the WorkerConnection class now tracks jobs by job ID, and the code should be generally more robust when unexpected messages are received.
-rw-r--r--distbuild/worker_build_scheduler.py135
1 files changed, 81 insertions, 54 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py
index 25d76a4f..bf0d87b1 100644
--- a/distbuild/worker_build_scheduler.py
+++ b/distbuild/worker_build_scheduler.py
@@ -323,7 +323,7 @@ class WorkerBuildQueuer(distbuild.StateMachine):
distbuild.crash_point()
who = event.who
- last_job = who.job() # the job this worker's just completed
+ last_job = who.current_job() # the job this worker's just completed
if last_job:
logging.debug('%s wants new job, just did %s',
@@ -372,9 +372,12 @@ class WorkerConnection(distbuild.StateMachine):
self._writeable_cache_server = writeable_cache_server
self._worker_cache_server_port = worker_cache_server_port
self._morph_instance = morph_instance
- self._helper_id = None
- self._job = None
- self._exec_response_msg = None
+
+ self._active_jobs = dict()
+ self._current_job = None
+ self._current_job_exec_response = None
+ self._current_job_cache_request = None
+
self._debug_json = False
addr, port = self._conn.getpeername()
@@ -384,8 +387,8 @@ class WorkerConnection(distbuild.StateMachine):
def name(self):
return self._worker_name
- def job(self):
- return self._job
+ def current_job(self):
+ return self._current_job
def setup(self):
distbuild.crash_point()
@@ -423,31 +426,30 @@ class WorkerConnection(distbuild.StateMachine):
def _maybe_cancel(self, event_source, build_cancel):
- if build_cancel.id not in self._job.initiators:
+ if build_cancel.id not in self._current_job.initiators:
return # event not relevant
logging.debug('WC: BuildController %r requested a cancel',
event_source)
- if (len(self._job.initiators) == 1):
+ job = self._current_job
+ if (len(job.initiators) == 1):
logging.debug('WC: Cancelling running job %s '
'with job id %s running on %s',
- self._job.artifact.basename(),
- self._job.id,
+ job.artifact.basename(), job.id,
self.name())
- msg = distbuild.message('exec-cancel', id=self._job.id)
+ msg = distbuild.message('exec-cancel', id=job.id)
self._jm.send(msg)
self.mainloop.queue_event(self, _BuildCancelled())
else:
logging.debug('WC: Not cancelling running job %s with job id %s, '
'other initiators want it done: %s',
- self._job.artifact.basename(),
- self._job.id,
- [i for i in self._job.initiators
- if i != build_cancel.id])
+ job.artifact.basename(),
+ job.id,
+ [i for i in job.initiators if i != build_cancel.id])
- self._job.initiators.remove(build_cancel.id)
+ job.initiators.remove(build_cancel.id)
def _reconnect(self, event_source, event):
distbuild.crash_point()
@@ -458,23 +460,30 @@ class WorkerConnection(distbuild.StateMachine):
def _start_build(self, event_source, event):
distbuild.crash_point()
- self._job = event.job
- self._helper_id = None
- self._exec_response_msg = None
+ job = event.job
+
+ if job.id in self._active_jobs:
+ logging.warn('Duplicate job %s for worker %s', job.id, self.name())
+
+ if self._current_job_exec_response or self._current_job_cache_request:
+ logging.warn('Caching not finished for %s', self._current_job.id)
+
+ self._active_jobs[job.id] = job
+ self._current_job = job
logging.debug('WC: starting build: %s for %s' %
- (self._job.artifact.name, self._job.initiators))
+ (job.artifact.name, job.initiators))
argv = [
self._morph_instance,
'worker-build',
'--build-log-on-stdout',
- self._job.artifact.name,
+ job.artifact.name,
]
msg = distbuild.message('exec-request',
- id=self._job.id,
+ id=job.id,
argv=argv,
- stdin_contents=distbuild.serialise_artifact(self._job.artifact),
+ stdin_contents=distbuild.serialise_artifact(job.artifact),
)
self._jm.send(msg)
@@ -482,10 +491,10 @@ class WorkerConnection(distbuild.StateMachine):
logging.debug('WC: sent to worker %s: %r'
% (self._worker_name, msg))
- started = WorkerBuildStepStarted(self._job.initiators,
- self._job.artifact.source.cache_key, self.name())
+ started = WorkerBuildStepStarted(job.initiators,
+ job.artifact.source.cache_key, self.name())
- self.mainloop.queue_event(WorkerConnection, _JobStarted(self._job))
+ self.mainloop.queue_event(WorkerConnection, _JobStarted(job))
self.mainloop.queue_event(WorkerConnection, started)
def _handle_json_message(self, event_source, event):
@@ -500,37 +509,50 @@ class WorkerConnection(distbuild.StateMachine):
'exec-output': self._handle_exec_output,
'exec-response': self._handle_exec_response,
}
-
+
handler = handlers[event.msg['type']]
- handler(event.msg)
+ job = self._active_jobs.get(event.msg['id'])
+
+ if job:
+ handler(event.msg, job)
+ else:
+ logging.warn('Received %s for unknown job %s',
+ event.msg['type'], event.msg['id'])
+
+ def _handle_exec_output(self, msg, job):
+ '''Handle output from a job that the worker is or was running.'''
- def _handle_exec_output(self, msg):
new = dict(msg)
- new['ids'] = self._job.initiators
+ new['ids'] = job.initiators
+
logging.debug('WC: emitting: %s', repr(new))
self.mainloop.queue_event(
WorkerConnection,
- WorkerBuildOutput(new, self._job.artifact.source.cache_key))
+ WorkerBuildOutput(new, job.artifact.source.cache_key))
- def _handle_exec_response(self, msg):
- logging.debug('WC: finished building: %s' % self._job.artifact.name)
- logging.debug('initiators that need to know: %s'
- % self._job.initiators)
+ def _handle_exec_response(self, msg, job):
+ '''Handle completion of a job that the worker is or was running.'''
+
+ logging.debug('WC: finished building: %s' % job.artifact.name)
+ logging.debug('initiators that need to know: %s' % job.initiators)
new = dict(msg)
- new['ids'] = self._job.initiators
+ new['ids'] = job.initiators
if new['exit'] != 0:
# Build failed.
- new_event = WorkerBuildFailed(new,
- self._job.artifact.source.cache_key)
+ new_event = WorkerBuildFailed(new, job.artifact.source.cache_key)
self.mainloop.queue_event(WorkerConnection, new_event)
- self.mainloop.queue_event(WorkerConnection, _JobFailed(self._job))
+ self.mainloop.queue_event(WorkerConnection, _JobFailed(job))
self.mainloop.queue_event(self, _BuildFailed())
else:
# Build succeeded. We have more work to do: caching the result.
self.mainloop.queue_event(self, _BuildFinished())
- self._exec_response_msg = new
+ self._current_job_exec_response = new
+
+ # The job is no longer considered active, because the worker is
+ # finished with it so we won't receive any more messages about it.
+ del self._active_jobs[job.id]
def _request_job(self, event_source, event):
distbuild.crash_point()
@@ -544,15 +566,16 @@ class WorkerConnection(distbuild.StateMachine):
logging.debug('Requesting shared artifact cache to get artifacts')
- kind = self._job.artifact.source.morphology['kind']
+ job = self._current_job
+ kind = job.artifact.source.morphology['kind']
if kind == 'chunk':
- source_artifacts = self._job.artifact.source.artifacts
+ source_artifacts = job.artifact.source.artifacts
suffixes = ['%s.%s' % (kind, name) for name in source_artifacts]
suffixes.append('build-log')
else:
- filename = '%s.%s' % (kind, self._job.artifact.name)
+ filename = '%s.%s' % (kind, job.artifact.name)
suffixes = [filename]
if kind == 'stratum':
@@ -568,22 +591,22 @@ class WorkerConnection(distbuild.StateMachine):
'/1.0/fetch?host=%s:%d&cacheid=%s&artifacts=%s' %
(urllib.quote(worker_host),
self._worker_cache_server_port,
- urllib.quote(self._job.artifact.source.cache_key),
+ urllib.quote(job.artifact.source.cache_key),
suffixes))
msg = distbuild.message(
'http-request', id=self._request_ids.next(), url=url,
method='GET', body=None, headers=None)
- self._helper_id = msg['id']
+ self._current_job_cache_request = msg['id']
req = distbuild.HelperRequest(msg)
self.mainloop.queue_event(distbuild.HelperRouter, req)
- progress = WorkerBuildCaching(self._job.initiators,
- self._job.artifact.source.cache_key)
+ progress = WorkerBuildCaching(job.initiators,
+ job.artifact.source.cache_key)
self.mainloop.queue_event(WorkerConnection, progress)
def _maybe_handle_helper_result(self, event_source, event):
- if event.msg['id'] == self._helper_id:
+ if event.msg['id'] == self._current_job_cache_request:
distbuild.crash_point()
logging.debug('caching: event.msg: %s' % repr(event.msg))
@@ -591,8 +614,8 @@ class WorkerConnection(distbuild.StateMachine):
logging.debug('Shared artifact cache population done')
new_event = WorkerBuildFinished(
- self._exec_response_msg,
- self._job.artifact.source.cache_key)
+ self._current_job_exec_response,
+ self._current_job.artifact.source.cache_key)
self.mainloop.queue_event(WorkerConnection, new_event)
self.mainloop.queue_event(self, _Cached())
else:
@@ -607,13 +630,17 @@ class WorkerConnection(distbuild.StateMachine):
# The BuildController will not try to cancel jobs that have
# been marked as failed.
self.mainloop.queue_event(WorkerConnection,
- _JobFailed(self._job))
+ _JobFailed(self._current_job))
new_event = WorkerBuildFailed(
- self._exec_response_msg,
- self._job.artifact.source.cache_key)
+ self._current_job_exec_response,
+ self._current_job.artifact.source.cache_key)
self.mainloop.queue_event(WorkerConnection, new_event)
self.mainloop.queue_event(self, _BuildFailed())
- self.mainloop.queue_event(WorkerConnection, _JobFinished(self._job))
+ self.mainloop.queue_event(WorkerConnection,
+ _JobFinished(self._current_job))
+
+ self._current_job_exec_response = None
+ self._current_job_cache_request = None