summaryrefslogtreecommitdiff
path: root/distbuild
diff options
context:
space:
mode:
authorRichard Ipsum <richard.ipsum@codethink.co.uk>2014-05-05 15:12:17 (GMT)
committerRichard Ipsum <richard.ipsum@codethink.co.uk>2014-05-06 17:05:33 (GMT)
commitee252ff578a83a801730e6f72dbcdc274f2e26d1 (patch)
treee947e6291a5de32a7c7affa816c1f0959307a2a2 /distbuild
parente5ce0b989eb9ffdf704225051056bee20937d1ab (diff)
downloadmorph-ee252ff578a83a801730e6f72dbcdc274f2e26d1.tar.gz
Add cancelling to WorkerBuildScheduler
Diffstat (limited to 'distbuild')
-rw-r--r--distbuild/worker_build_scheduler.py106
1 files changed, 93 insertions, 13 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py
index 85856a3..7918084 100644
--- a/distbuild/worker_build_scheduler.py
+++ b/distbuild/worker_build_scheduler.py
@@ -121,6 +121,17 @@ class Jobs(object):
def remove(self, job):
del self._jobs[job.artifact.basename()]
+ def get_jobs(self):
+ return self._jobs
+
+ def remove_jobs(self, jobs):
+ for job in jobs:
+ if job.artifact.basename() in self._jobs:
+ self.remove(job)
+ else:
+ logging.warning("Tried to remove a job that doesn't exist "
+ "(%s)", job.artifact.basename())
+
def exists(self, artifact_basename):
return artifact_basename in self._jobs
@@ -134,14 +145,22 @@ class Jobs(object):
def __repr__(self):
return str([job.artifact.basename()
for (_, job) in self._jobs.iteritems()])
-
+
+
class _BuildFinished(object):
pass
+
class _BuildFailed(object):
pass
+
+
+class _BuildCancelled(object):
+
+ pass
+
class _Cached(object):
@@ -176,6 +195,7 @@ class WorkerBuildQueuer(distbuild.StateMachine):
self._handle_request),
('idle', WorkerBuildQueuer, WorkerCancelPending, 'idle',
self._handle_cancel),
+
('idle', WorkerConnection, _NeedJob, 'idle', self._handle_worker),
]
self.add_transitions(spec)
@@ -221,11 +241,42 @@ class WorkerBuildQueuer(distbuild.StateMachine):
event.artifact.cache_key)
self.mainloop.queue_event(WorkerConnection, progress)
- def _handle_cancel(self, event_source, worker_cancel_pending):
- # TODO: this probably needs to check whether any initiators
- # care about this thing
+ def _handle_cancel(self, event_source, event):
+
+ def cancel_this(job):
+ if event.initiator_id not in job.initiators:
+ return False # not for us
+
+ name = job.artifact.basename()
+ job_id = job.id
- pass
+ logging.debug('Checking whether to remove job %s with job id %s',
+ name, job_id)
+
+ if len(job.initiators) == 1:
+ if job.is_building:
+ logging.debug('NOT removing running job %s with job id %s '
+ '(WorkerConnection will cancel job)',
+ name, job_id)
+ else:
+ logging.debug('Removing job %s with job id %s',
+ name, job_id)
+ return True
+ else:
+ # Don't cancel, but still remove this initiator from
+ # the list of initiators
+ logging.debug('NOT removing job %s with job id %s '
+ 'other initiators want it: %s', name, job_id,
+ [i for i in job.initiators
+ if i != event.initiator_id])
+
+ job.initiators.remove(event.initiator_id)
+
+ return False
+
+ self._jobs.remove_jobs(
+ [job for (_, job) in self._jobs.get_jobs().iteritems()
+ if cancel_this(job)])
def _handle_worker(self, event_source, event):
distbuild.crash_point()
@@ -234,17 +285,20 @@ class WorkerBuildQueuer(distbuild.StateMachine):
last_job = who.job() # the job this worker's just completed
if last_job:
- logging.debug('%s wants new job, just did %s' %
- (who.name(), last_job.artifact.basename()))
+ logging.debug('%s wants new job, just did %s',
+ who.name(), last_job.artifact.basename())
+ logging.debug('Removing job %s with job id %s',
+ last_job.artifact.basename(), last_job.id)
self._jobs.remove(last_job)
else:
- logging.debug('%s wants its first job' % who.name())
+ logging.debug('%s wants its first job', who.name())
- logging.debug('WBQ: Adding worker to queue: %s' % event.who)
+ logging.debug('WBQ: Adding worker to queue: %s', event.who.name())
self._available_workers.append(event)
- logging.debug('Current jobs: %s' % self._jobs)
- logging.debug('Workers available: %d' % len(self._available_workers))
+
+ logging.debug('Current jobs: %s', self._jobs)
+ logging.debug('Workers available: %d', len(self._available_workers))
job = self._jobs.get_next_job()
@@ -308,10 +362,12 @@ class WorkerConnection(distbuild.StateMachine):
('building', distbuild.BuildController,
distbuild.BuildCancel, 'building',
self._maybe_cancel),
+
('building', self._jm, distbuild.JsonEof, None, self._reconnect),
('building', self._jm, distbuild.JsonNewMessage, 'building',
self._handle_json_message),
('building', self, _BuildFailed, 'idle', self._request_job),
+ ('building', self, _BuildCancelled, 'idle', self._request_job),
('building', self, _BuildFinished, 'caching',
self._request_caching),
@@ -325,10 +381,32 @@ class WorkerConnection(distbuild.StateMachine):
self._request_job(None, None)
def _maybe_cancel(self, event_source, build_cancel):
- logging.debug('WC: BuildController %r requested a cancel' %
+
+ if build_cancel.id not in self._job.initiators:
+ return # event not relevant
+
+ logging.debug('WC: BuildController %r requested a cancel',
event_source)
- # TODO: implement cancel
+ if (len(self._job.initiators) == 1):
+ logging.debug('WC: Cancelling running job %s '
+ 'with job id %s running on %s',
+ self._job.artifact.basename(),
+ self._job.id,
+ self.name())
+
+ msg = distbuild.message('exec-cancel', id=self._job.id)
+ self._jm.send(msg)
+ self.mainloop.queue_event(self, _BuildCancelled())
+ else:
+ logging.debug('WC: Not cancelling running job %s with job id %s, '
+ 'other initiators want it done: %s',
+ self._job.artifact.basename(),
+ self._job.id,
+ [i for i in self._job.initiators
+ if i != build_cancel.id])
+
+ self._job.initiators.remove(build_cancel.id)
def _reconnect(self, event_source, event):
distbuild.crash_point()
@@ -411,6 +489,8 @@ class WorkerConnection(distbuild.StateMachine):
self.mainloop.queue_event(self, _BuildFinished())
self._exec_response_msg = new
+ self._job.is_building = False
+
def _request_job(self, event_source, event):
distbuild.crash_point()
self.mainloop.queue_event(WorkerConnection, _NeedJob(self))