diff options
Diffstat (limited to 'distbuild/worker_build_scheduler.py')
-rw-r--r-- | distbuild/worker_build_scheduler.py | 135 |
1 files changed, 81 insertions, 54 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 25d76a4f..bf0d87b1 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -323,7 +323,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): distbuild.crash_point() who = event.who - last_job = who.job() # the job this worker's just completed + last_job = who.current_job() # the job this worker's just completed if last_job: logging.debug('%s wants new job, just did %s', @@ -372,9 +372,12 @@ class WorkerConnection(distbuild.StateMachine): self._writeable_cache_server = writeable_cache_server self._worker_cache_server_port = worker_cache_server_port self._morph_instance = morph_instance - self._helper_id = None - self._job = None - self._exec_response_msg = None + + self._active_jobs = dict() + self._current_job = None + self._current_job_exec_response = None + self._current_job_cache_request = None + self._debug_json = False addr, port = self._conn.getpeername() @@ -384,8 +387,8 @@ class WorkerConnection(distbuild.StateMachine): def name(self): return self._worker_name - def job(self): - return self._job + def current_job(self): + return self._current_job def setup(self): distbuild.crash_point() @@ -423,31 +426,30 @@ class WorkerConnection(distbuild.StateMachine): def _maybe_cancel(self, event_source, build_cancel): - if build_cancel.id not in self._job.initiators: + if build_cancel.id not in self._current_job.initiators: return # event not relevant logging.debug('WC: BuildController %r requested a cancel', event_source) - if (len(self._job.initiators) == 1): + job = self._current_job + if (len(job.initiators) == 1): logging.debug('WC: Cancelling running job %s ' 'with job id %s running on %s', - self._job.artifact.basename(), - self._job.id, + job.artifact.basename(), job.id, self.name()) - msg = distbuild.message('exec-cancel', id=self._job.id) + msg = distbuild.message('exec-cancel', id=job.id) self._jm.send(msg) self.mainloop.queue_event(self, _BuildCancelled()) else: logging.debug('WC: Not cancelling running job %s with job id %s, ' 'other initiators want it done: %s', - self._job.artifact.basename(), - self._job.id, - [i for i in self._job.initiators - if i != build_cancel.id]) + job.artifact.basename(), + job.id, + [i for i in job.initiators if i != build_cancel.id]) - self._job.initiators.remove(build_cancel.id) + job.initiators.remove(build_cancel.id) def _reconnect(self, event_source, event): distbuild.crash_point() @@ -458,23 +460,30 @@ class WorkerConnection(distbuild.StateMachine): def _start_build(self, event_source, event): distbuild.crash_point() - self._job = event.job - self._helper_id = None - self._exec_response_msg = None + job = event.job + + if job.id in self._active_jobs: + logging.warn('Duplicate job %s for worker %s', job.id, self.name()) + + if self._current_job_exec_response or self._current_job_cache_request: + logging.warn('Caching not finished for %s', self._current_job.id) + + self._active_jobs[job.id] = job + self._current_job = job logging.debug('WC: starting build: %s for %s' % - (self._job.artifact.name, self._job.initiators)) + (job.artifact.name, job.initiators)) argv = [ self._morph_instance, 'worker-build', '--build-log-on-stdout', - self._job.artifact.name, + job.artifact.name, ] msg = distbuild.message('exec-request', - id=self._job.id, + id=job.id, argv=argv, - stdin_contents=distbuild.serialise_artifact(self._job.artifact), + stdin_contents=distbuild.serialise_artifact(job.artifact), ) self._jm.send(msg) @@ -482,10 +491,10 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('WC: sent to worker %s: %r' % (self._worker_name, msg)) - started = WorkerBuildStepStarted(self._job.initiators, - self._job.artifact.source.cache_key, self.name()) + started = WorkerBuildStepStarted(job.initiators, + job.artifact.source.cache_key, self.name()) - self.mainloop.queue_event(WorkerConnection, _JobStarted(self._job)) + self.mainloop.queue_event(WorkerConnection, _JobStarted(job)) self.mainloop.queue_event(WorkerConnection, started) def _handle_json_message(self, event_source, event): @@ -500,37 +509,50 @@ class WorkerConnection(distbuild.StateMachine): 'exec-output': self._handle_exec_output, 'exec-response': self._handle_exec_response, } - + handler = handlers[event.msg['type']] - handler(event.msg) + job = self._active_jobs.get(event.msg['id']) + + if job: + handler(event.msg, job) + else: + logging.warn('Received %s for unknown job %s', + event.msg['type'], event.msg['id']) + + def _handle_exec_output(self, msg, job): + '''Handle output from a job that the worker is or was running.''' - def _handle_exec_output(self, msg): new = dict(msg) - new['ids'] = self._job.initiators + new['ids'] = job.initiators + logging.debug('WC: emitting: %s', repr(new)) self.mainloop.queue_event( WorkerConnection, - WorkerBuildOutput(new, self._job.artifact.source.cache_key)) + WorkerBuildOutput(new, job.artifact.source.cache_key)) - def _handle_exec_response(self, msg): - logging.debug('WC: finished building: %s' % self._job.artifact.name) - logging.debug('initiators that need to know: %s' - % self._job.initiators) + def _handle_exec_response(self, msg, job): + '''Handle completion of a job that the worker is or was running.''' + + logging.debug('WC: finished building: %s' % job.artifact.name) + logging.debug('initiators that need to know: %s' % job.initiators) new = dict(msg) - new['ids'] = self._job.initiators + new['ids'] = job.initiators if new['exit'] != 0: # Build failed. - new_event = WorkerBuildFailed(new, - self._job.artifact.source.cache_key) + new_event = WorkerBuildFailed(new, job.artifact.source.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) - self.mainloop.queue_event(WorkerConnection, _JobFailed(self._job)) + self.mainloop.queue_event(WorkerConnection, _JobFailed(job)) self.mainloop.queue_event(self, _BuildFailed()) else: # Build succeeded. We have more work to do: caching the result. self.mainloop.queue_event(self, _BuildFinished()) - self._exec_response_msg = new + self._current_job_exec_response = new + + # The job is no longer considered active, because the worker is + # finished with it so we won't receive any more messages about it. + del self._active_jobs[job.id] def _request_job(self, event_source, event): distbuild.crash_point() @@ -544,15 +566,16 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('Requesting shared artifact cache to get artifacts') - kind = self._job.artifact.source.morphology['kind'] + job = self._current_job + kind = job.artifact.source.morphology['kind'] if kind == 'chunk': - source_artifacts = self._job.artifact.source.artifacts + source_artifacts = job.artifact.source.artifacts suffixes = ['%s.%s' % (kind, name) for name in source_artifacts] suffixes.append('build-log') else: - filename = '%s.%s' % (kind, self._job.artifact.name) + filename = '%s.%s' % (kind, job.artifact.name) suffixes = [filename] if kind == 'stratum': @@ -568,22 +591,22 @@ class WorkerConnection(distbuild.StateMachine): '/1.0/fetch?host=%s:%d&cacheid=%s&artifacts=%s' % (urllib.quote(worker_host), self._worker_cache_server_port, - urllib.quote(self._job.artifact.source.cache_key), + urllib.quote(job.artifact.source.cache_key), suffixes)) msg = distbuild.message( 'http-request', id=self._request_ids.next(), url=url, method='GET', body=None, headers=None) - self._helper_id = msg['id'] + self._current_job_cache_request = msg['id'] req = distbuild.HelperRequest(msg) self.mainloop.queue_event(distbuild.HelperRouter, req) - progress = WorkerBuildCaching(self._job.initiators, - self._job.artifact.source.cache_key) + progress = WorkerBuildCaching(job.initiators, + job.artifact.source.cache_key) self.mainloop.queue_event(WorkerConnection, progress) def _maybe_handle_helper_result(self, event_source, event): - if event.msg['id'] == self._helper_id: + if event.msg['id'] == self._current_job_cache_request: distbuild.crash_point() logging.debug('caching: event.msg: %s' % repr(event.msg)) @@ -591,8 +614,8 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('Shared artifact cache population done') new_event = WorkerBuildFinished( - self._exec_response_msg, - self._job.artifact.source.cache_key) + self._current_job_exec_response, + self._current_job.artifact.source.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) self.mainloop.queue_event(self, _Cached()) else: @@ -607,13 +630,17 @@ class WorkerConnection(distbuild.StateMachine): # The BuildController will not try to cancel jobs that have # been marked as failed. self.mainloop.queue_event(WorkerConnection, - _JobFailed(self._job)) + _JobFailed(self._current_job)) new_event = WorkerBuildFailed( - self._exec_response_msg, - self._job.artifact.source.cache_key) + self._current_job_exec_response, + self._current_job.artifact.source.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) self.mainloop.queue_event(self, _BuildFailed()) - self.mainloop.queue_event(WorkerConnection, _JobFinished(self._job)) + self.mainloop.queue_event(WorkerConnection, + _JobFinished(self._current_job)) + + self._current_job_exec_response = None + self._current_job_cache_request = None |