diff options
-rw-r--r-- | distbuild/worker_build_scheduler.py | 223 |
1 files changed, 115 insertions, 108 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index bc2df4b1..ea3e9ab5 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -134,11 +134,11 @@ class JobQueue(object): self._jobs = {} def get(self, artifact_basename): - return (self._jobs[artifact_basename] - if artifact_basename in self._jobs else None) + return self._jobs.get(artifact_basename, None) def add(self, job): artifact_basename = job.artifact.basename() + if artifact_basename in self._jobs: logging.error( "Duplicate job for %s added to %s job queue, ignoring.", @@ -153,23 +153,25 @@ class JobQueue(object): logging.warning("Tried to remove a job that doesn't exist " "(%s)", job.artifact.basename()) - def get_jobs(self): - return self._jobs + def __contains__(self, artifact_basename): + return artifact_basename in self._jobs + + def __iter__(self): + return self._jobs.itervalues() def remove_jobs(self, jobs): for job in jobs: self.remove(job) - def exists(self, artifact_basename): - return artifact_basename in self._jobs - def get_next_job(self): # for now just return the first thing we find that's not being built - waiting = [job for (_, job) in - self._jobs.iteritems() if job.who == None] + waiting = [job for job in self if job.who == None] return waiting.pop() if len(waiting) > 0 else None + def running_jobs(self): + return [job for job in self if job.running()] + def __repr__(self): items = [] for job in self._jobs.itervalues(): @@ -180,7 +182,8 @@ class JobQueue(object): class _BuildFinished(object): - pass + def __init__(self, job): + self.job = job class _BuildFailed(object): @@ -260,10 +263,14 @@ class WorkerBuildQueuer(distbuild.StateMachine): event.job.set_state('running') def _set_job_finished(self, event_source, event): - event.job.set_state('complete') + job = event.job + job.set_state('complete') + self._jobs.remove(job) def _set_job_failed(self, event_source, event): - event.job.set_state('failed') + job = event.job + job.set_state('failed') + self._jobs.remove(job) def _handle_request(self, event_source, event): distbuild.crash_point() @@ -276,7 +283,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): # If so, add our initiator id to the existing job # If not, create a job - if self._jobs.exists(event.artifact.basename()): + if event.artifact.basename() in self._jobs: job = self._jobs.get(event.artifact.basename()) job.initiators.append(event.initiator_id) @@ -338,24 +345,11 @@ class WorkerBuildQueuer(distbuild.StateMachine): return False self._jobs.remove_jobs( - [job for (_, job) in self._jobs.get_jobs().iteritems() - if cancel_this(job)]) + [job for job in self._jobs if cancel_this(job)]) def _handle_worker(self, event_source, event): distbuild.crash_point() - who = event.who - last_job = who.current_job() # the job this worker's just completed - - if last_job: - logging.debug('%s wants new job, just did %s', - who.name(), last_job.artifact.basename()) - - logging.debug('Removing job %s', last_job.artifact.basename()) - self._jobs.remove(last_job) - else: - logging.debug('%s wants its first job', who.name()) - logging.debug('WBQ: Adding worker to queue: %s', event.who.name()) self._available_workers.append(event) @@ -408,21 +402,15 @@ class WorkerConnection(distbuild.StateMachine): self._worker_cache_server_port = worker_cache_server_port self._morph_instance = morph_instance - self._active_jobs = dict() - self._current_job = None - self._current_job_exec_response = None - self._current_job_cache_request = None - addr, port = self._conn.getpeername() name = socket.getfqdn(addr) self._worker_name = '%s:%s' % (name, port) + self._jobs = JobQueue(owner=self.name()) + def name(self): return self._worker_name - def current_job(self): - return self._current_job - def __str__(self): return self.name() @@ -463,27 +451,38 @@ class WorkerConnection(distbuild.StateMachine): self._request_job(None, None) def _maybe_cancel(self, event_source, build_cancel): - - if build_cancel.id not in self._current_job.initiators: - return # event not relevant - logging.debug('WC: BuildController %r requested a cancel', event_source) - job = self._current_job - if (len(job.initiators) == 1): - logging.debug('WC: Cancelling running job %s running on %s', - job.artifact.basename(), self.name()) + initiator_id = build_cancel.id + for job in self._jobs.running_jobs(): + self._remove_initiator_from_job(job, initiator_id) - msg = distbuild.message('exec-cancel', id=job.artifact.basename()) - self._jm.send(msg) - self.mainloop.queue_event(self, _BuildCancelled()) - else: - logging.debug('WC: Not cancelling running job %s, other initiators ' - 'want it done: %s', job.artifact.basename(), - [i for i in job.initiators if i != build_cancel.id]) + def _remove_initiator_from_job(self, job, initiator_id): + '''Remove the given initiator from 'job', and cancel it if needed. + + If the given initiator is not interested in 'job', nothing happens. + + ''' + + if initiator_id in job.initiators: + if len(job.initiators) == 1: + self._cancel_job(job) + else: + logging.debug( + 'WC: Not cancelling running job %s, other initiators want ' + 'it done: %s', job.artifact.basename(), + [i for i in job.initiators if i != initiator_id]) + job.initiators.remove(initiator_id) + + def _cancel_job(self, job): + logging.debug( + 'WC: Cancelling job %s, currently building on %s', + job.artifact.basename(), self.name()) - job.initiators.remove(build_cancel.id) + msg = distbuild.message('exec-cancel', id=job.artifact.basename()) + self._jm.send(msg) + self.mainloop.queue_event(self, _BuildCancelled()) def _disconnected(self, event_source, event): distbuild.crash_point() @@ -493,20 +492,27 @@ class WorkerConnection(distbuild.StateMachine): self.mainloop.queue_event(self._cm, distbuild.Reconnect()) - def _start_build(self, event_source, event): - distbuild.crash_point() - - job = event.job + def _sanity_check_new_job(self, job): + # There's nothing that we can really do if the controller goes nuts + # (there's no 'reject job' message), but we can at least log warnings. - if job.artifact.basename() in self._active_jobs: + if job.artifact.basename() in self._jobs: logging.warn('Worker %s already has job %s', self.name(), job.artifact.basename()) - if self._current_job_exec_response or self._current_job_cache_request: - logging.warn('Caching not finished for %s', self._current_job.id) + running_jobs = self._jobs.running_jobs() + if len(running_jobs) != 0: + logging.warn('This worker already has running jobs: %s', + running_jobs) - self._active_jobs[job.artifact.basename()] = job - self._current_job = job + def _start_build(self, event_source, event): + distbuild.crash_point() + + job = event.job + self._sanity_check_new_job(job) + + self._jobs.add(job) + job.set_state('running') logging.debug('WC: starting build: %s for %s' % (job.artifact.name, job.initiators)) @@ -527,10 +533,12 @@ class WorkerConnection(distbuild.StateMachine): ) self._jm.send(msg) + # The WorkerBuildQueuer object will set the job state to 'running' when + # it receives the _JobStarted message. + self.mainloop.queue_event(WorkerConnection, _JobStarted(job)) + started = WorkerBuildStepStarted(job.initiators, job.artifact.cache_key, self.name()) - - self.mainloop.queue_event(WorkerConnection, _JobStarted(job)) self.mainloop.queue_event(WorkerConnection, started) def _handle_json_message(self, event_source, event): @@ -547,7 +555,7 @@ class WorkerConnection(distbuild.StateMachine): } handler = handlers[event.msg['type']] - job = self._active_jobs.get(event.msg['id']) + job = self._jobs.get(event.msg['id']) if job: handler(event.msg, job) @@ -583,12 +591,8 @@ class WorkerConnection(distbuild.StateMachine): self.mainloop.queue_event(self, _BuildFailed()) else: # Build succeeded. We have more work to do: caching the result. - self.mainloop.queue_event(self, _BuildFinished()) - self._current_job_exec_response = new - - # The job is no longer considered active, because the worker is - # finished with it so we won't receive any more messages about it. - del self._active_jobs[job.artifact.basename()] + self.mainloop.queue_event(self, _BuildFinished(job)) + job._exec_response = new def _request_job(self, event_source, event): distbuild.crash_point() @@ -603,6 +607,7 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('Requesting shared artifact cache to get artifacts') job = self._current_job + job = event.job kind = job.artifact.kind if kind == 'chunk': @@ -633,7 +638,7 @@ class WorkerConnection(distbuild.StateMachine): msg = distbuild.message( 'http-request', id=self._request_ids.next(), url=url, method='GET', body=None, headers=None) - self._current_job_cache_request = msg['id'] + job._cache_request_id = msg['id'] req = distbuild.HelperRequest(msg) self.mainloop.queue_event(distbuild.HelperRouter, req) @@ -642,41 +647,43 @@ class WorkerConnection(distbuild.StateMachine): self.mainloop.queue_event(WorkerConnection, progress) def _maybe_handle_helper_result(self, event_source, event): - if event.msg['id'] == self._current_job_cache_request: - distbuild.crash_point() - - logging.debug('caching: event.msg: %s' % repr(event.msg)) - if event.msg['status'] == httplib.OK: - logging.debug('Shared artifact cache population done') - - new_event = WorkerBuildFinished( - self._current_job_exec_response, - self._current_job.artifact.cache_key) - self.mainloop.queue_event(WorkerConnection, new_event) - self.mainloop.queue_event(self, _Cached()) - else: - logging.error( - 'Failed to populate artifact cache: %s %s' % - (event.msg['status'], event.msg['body'])) - - # We will attempt to remove this job twice - # unless we mark it as failed before the BuildController - # processes the WorkerBuildFailed event. - # - # The BuildController will not try to cancel jobs that have - # been marked as failed. - self.mainloop.queue_event(WorkerConnection, - _JobFailed(self._current_job)) - - new_event = WorkerBuildFailed( - self._current_job_exec_response, - self._current_job.artifact.cache_key) - self.mainloop.queue_event(WorkerConnection, new_event) - - self.mainloop.queue_event(self, _BuildFailed()) - - self.mainloop.queue_event(WorkerConnection, - _JobFinished(self._current_job)) - - self._current_job_exec_response = None - self._current_job_cache_request = None + # This function is called for every HelperResult message sent by the + # controller's distbuild-helper process (for every completed or failed + # http-request). + for job in self._jobs: + if event.msg['id'] == getattr(job, '_cache_request_id', None): + self._handle_helper_result_for_job(job, event) + + def _handle_helper_result_for_job(self, job, event): + distbuild.crash_point() + + logging.debug('caching: event.msg: %s' % repr(event.msg)) + if event.msg['status'] == httplib.OK: + logging.debug('Shared artifact cache population done') + + finished_event = WorkerBuildFinished( + job._exec_response, job.artifact.cache_key) + self.mainloop.queue_event(WorkerConnection, finished_event) + + self.mainloop.queue_event(self, _Cached()) + else: + logging.error( + 'Failed to populate artifact cache: %s %s' % + (event.msg['status'], event.msg['body'])) + + # We will attempt to remove this job twice + # unless we mark it as failed before the BuildController + # processes the WorkerBuildFailed event. + # + # The BuildController will not try to cancel jobs that have + # been marked as failed. + self.mainloop.queue_event(WorkerConnection, _JobFailed(job)) + + failed_event = WorkerBuildFailed( + job._exec_response, job.artifact.cache_key) + self.mainloop.queue_event(WorkerConnection, failed_event) + + self.mainloop.queue_event(self, _BuildFailed()) + + # Caching is the last step of a job, so we're now done with it. + self.mainloop.queue_event(WorkerConnection, _JobFinished(job)) |