From ee252ff578a83a801730e6f72dbcdc274f2e26d1 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Mon, 5 May 2014 16:12:17 +0100 Subject: Add cancelling to WorkerBuildScheduler --- distbuild/worker_build_scheduler.py | 106 +++++++++++++++++++++++++++++++----- 1 file changed, 93 insertions(+), 13 deletions(-) (limited to 'distbuild') diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 85856a37..79180843 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -121,6 +121,17 @@ class Jobs(object): def remove(self, job): del self._jobs[job.artifact.basename()] + def get_jobs(self): + return self._jobs + + def remove_jobs(self, jobs): + for job in jobs: + if job.artifact.basename() in self._jobs: + self.remove(job) + else: + logging.warning("Tried to remove a job that doesn't exist " + "(%s)", job.artifact.basename()) + def exists(self, artifact_basename): return artifact_basename in self._jobs @@ -134,14 +145,22 @@ class Jobs(object): def __repr__(self): return str([job.artifact.basename() for (_, job) in self._jobs.iteritems()]) - + + class _BuildFinished(object): pass + class _BuildFailed(object): pass + + +class _BuildCancelled(object): + + pass + class _Cached(object): @@ -176,6 +195,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): self._handle_request), ('idle', WorkerBuildQueuer, WorkerCancelPending, 'idle', self._handle_cancel), + ('idle', WorkerConnection, _NeedJob, 'idle', self._handle_worker), ] self.add_transitions(spec) @@ -221,11 +241,42 @@ class WorkerBuildQueuer(distbuild.StateMachine): event.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, progress) - def _handle_cancel(self, event_source, worker_cancel_pending): - # TODO: this probably needs to check whether any initiators - # care about this thing + def _handle_cancel(self, event_source, event): + + def cancel_this(job): + if event.initiator_id not in job.initiators: + return False # not for us + + name = job.artifact.basename() + job_id = job.id - pass + logging.debug('Checking whether to remove job %s with job id %s', + name, job_id) + + if len(job.initiators) == 1: + if job.is_building: + logging.debug('NOT removing running job %s with job id %s ' + '(WorkerConnection will cancel job)', + name, job_id) + else: + logging.debug('Removing job %s with job id %s', + name, job_id) + return True + else: + # Don't cancel, but still remove this initiator from + # the list of initiators + logging.debug('NOT removing job %s with job id %s ' + 'other initiators want it: %s', name, job_id, + [i for i in job.initiators + if i != event.initiator_id]) + + job.initiators.remove(event.initiator_id) + + return False + + self._jobs.remove_jobs( + [job for (_, job) in self._jobs.get_jobs().iteritems() + if cancel_this(job)]) def _handle_worker(self, event_source, event): distbuild.crash_point() @@ -234,17 +285,20 @@ class WorkerBuildQueuer(distbuild.StateMachine): last_job = who.job() # the job this worker's just completed if last_job: - logging.debug('%s wants new job, just did %s' % - (who.name(), last_job.artifact.basename())) + logging.debug('%s wants new job, just did %s', + who.name(), last_job.artifact.basename()) + logging.debug('Removing job %s with job id %s', + last_job.artifact.basename(), last_job.id) self._jobs.remove(last_job) else: - logging.debug('%s wants its first job' % who.name()) + logging.debug('%s wants its first job', who.name()) - logging.debug('WBQ: Adding worker to queue: %s' % event.who) + logging.debug('WBQ: Adding worker to queue: %s', event.who.name()) self._available_workers.append(event) - logging.debug('Current jobs: %s' % self._jobs) - logging.debug('Workers available: %d' % len(self._available_workers)) + + logging.debug('Current jobs: %s', self._jobs) + logging.debug('Workers available: %d', len(self._available_workers)) job = self._jobs.get_next_job() @@ -308,10 +362,12 @@ class WorkerConnection(distbuild.StateMachine): ('building', distbuild.BuildController, distbuild.BuildCancel, 'building', self._maybe_cancel), + ('building', self._jm, distbuild.JsonEof, None, self._reconnect), ('building', self._jm, distbuild.JsonNewMessage, 'building', self._handle_json_message), ('building', self, _BuildFailed, 'idle', self._request_job), + ('building', self, _BuildCancelled, 'idle', self._request_job), ('building', self, _BuildFinished, 'caching', self._request_caching), @@ -325,10 +381,32 @@ class WorkerConnection(distbuild.StateMachine): self._request_job(None, None) def _maybe_cancel(self, event_source, build_cancel): - logging.debug('WC: BuildController %r requested a cancel' % + + if build_cancel.id not in self._job.initiators: + return # event not relevant + + logging.debug('WC: BuildController %r requested a cancel', event_source) - # TODO: implement cancel + if (len(self._job.initiators) == 1): + logging.debug('WC: Cancelling running job %s ' + 'with job id %s running on %s', + self._job.artifact.basename(), + self._job.id, + self.name()) + + msg = distbuild.message('exec-cancel', id=self._job.id) + self._jm.send(msg) + self.mainloop.queue_event(self, _BuildCancelled()) + else: + logging.debug('WC: Not cancelling running job %s with job id %s, ' + 'other initiators want it done: %s', + self._job.artifact.basename(), + self._job.id, + [i for i in self._job.initiators + if i != build_cancel.id]) + + self._job.initiators.remove(build_cancel.id) def _reconnect(self, event_source, event): distbuild.crash_point() @@ -411,6 +489,8 @@ class WorkerConnection(distbuild.StateMachine): self.mainloop.queue_event(self, _BuildFinished()) self._exec_response_msg = new + self._job.is_building = False + def _request_job(self, event_source, event): distbuild.crash_point() self.mainloop.queue_event(WorkerConnection, _NeedJob(self)) -- cgit v1.2.1