diff options
author | Richard Ipsum <richard.ipsum@codethink.co.uk> | 2014-05-07 14:16:21 +0000 |
---|---|---|
committer | Richard Ipsum <richard.ipsum@codethink.co.uk> | 2014-05-07 14:16:21 +0000 |
commit | 9ea146fdcfbe9b93b297165b1b5222e1eadb9d23 (patch) | |
tree | ca17eabee3e7fe929d397df158778caf358b5661 /distbuild | |
parent | 201b043e4d7ab03d11ad1e71bac7916b19aed294 (diff) | |
parent | c8cdcbb70740f3c71da83587c1368b7c4b03b330 (diff) | |
download | morph-9ea146fdcfbe9b93b297165b1b5222e1eadb9d23.tar.gz |
Merge branch 'baserock/richardipsum/distbuild_cancel2'
Reviewed by:
Richard Maw
Lars Wirzenius
Daniel Silverstone
Diffstat (limited to 'distbuild')
-rw-r--r-- | distbuild/build_controller.py | 26 | ||||
-rw-r--r-- | distbuild/mainloop.py | 4 | ||||
-rw-r--r-- | distbuild/worker_build_scheduler.py | 157 |
3 files changed, 158 insertions, 29 deletions
diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py index a3911586..c399939e 100644 --- a/distbuild/build_controller.py +++ b/distbuild/build_controller.py @@ -235,9 +235,9 @@ class BuildController(distbuild.StateMachine): self._maybe_notify_build_failed), ('building', self, _Abort, None, None), ('building', self, _Built, None, self._notify_build_done), - ('building', self._initiator_connection, - distbuild.InitiatorDisconnect, None, - self._notify_initiator_disconnected), + ('building', distbuild.InitiatorConnection, + distbuild.InitiatorDisconnect, 'building', + self._maybe_notify_initiator_disconnected), ] self.add_transitions(spec) @@ -438,12 +438,24 @@ class BuildController(distbuild.StateMachine): a.state = BUILDING - def _notify_initiator_disconnected(self, event_source, disconnect): - logging.debug("BuildController %r: initiator id %s disconnected", self, - disconnect.id) - cancel = BuildCancel(disconnect.id) + def _maybe_notify_initiator_disconnected(self, event_source, event): + if event.id != self._request['id']: + logging.debug('Heard initiator disconnect with event id %d ' + 'but our request id is %d', + event.id, self._request['id']) + return # not for us + + logging.debug("BuildController %r: initiator id %s disconnected", + self, event.id) + + cancel_pending = distbuild.WorkerCancelPending(event.id) + self.mainloop.queue_event(distbuild.WorkerBuildQueuer, cancel_pending) + + cancel = BuildCancel(event.id) self.mainloop.queue_event(BuildController, cancel) + self.mainloop.queue_event(self, _Abort) + def _maybe_relay_build_waiting_for_worker(self, event_source, event): if event.initiator_id != self._request['id']: return # not for us diff --git a/distbuild/mainloop.py b/distbuild/mainloop.py index e3f9ae2d..c5b7d284 100644 --- a/distbuild/mainloop.py +++ b/distbuild/mainloop.py @@ -112,6 +112,6 @@ class MainLoop(object): def _dequeue_events(self): while self._events: - event_queue, event = self._events.pop(0) - yield event_queue, event + event_source, event = self._events.pop(0) + yield event_source, event diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 83873d74..48ef4a7f 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -17,7 +17,6 @@ import collections -import errno import httplib import logging import socket @@ -102,9 +101,8 @@ class Job(object): self.initiators = [initiator_id] self.who = None # we don't know who's going to do this yet self.is_building = False + self.failed = False - def add_initiator(self, initiator_id): - self.initiators.append(initiator_id) class Jobs(object): @@ -124,6 +122,17 @@ class Jobs(object): def remove(self, job): del self._jobs[job.artifact.basename()] + def get_jobs(self): + return self._jobs + + def remove_jobs(self, jobs): + for job in jobs: + if job.artifact.basename() in self._jobs: + self.remove(job) + else: + logging.warning("Tried to remove a job that doesn't exist " + "(%s)", job.artifact.basename()) + def exists(self, artifact_basename): return artifact_basename in self._jobs @@ -137,19 +146,44 @@ class Jobs(object): def __repr__(self): return str([job.artifact.basename() for (_, job) in self._jobs.iteritems()]) - + + class _BuildFinished(object): pass + class _BuildFailed(object): pass + + +class _BuildCancelled(object): + + pass + class _Cached(object): pass - + + +class _ExecStarted(object): + + def __init__(self, job): + self.job = job + + +class _ExecEnded(object): + + def __init__(self, job): + self.job = job + + +class _ExecFailed(object): + + def __init__(self, job): + self.job = job class WorkerBuildQueuer(distbuild.StateMachine): @@ -179,11 +213,35 @@ class WorkerBuildQueuer(distbuild.StateMachine): self._handle_request), ('idle', WorkerBuildQueuer, WorkerCancelPending, 'idle', self._handle_cancel), + ('idle', WorkerConnection, _NeedJob, 'idle', self._handle_worker), + ('idle', WorkerConnection, _ExecStarted, 'idle', + self._set_exec_started), + ('idle', WorkerConnection, _ExecEnded, 'idle', + self._set_exec_ended), + ('idle', WorkerConnection, _ExecFailed, 'idle', + self._set_exec_failed) ] self.add_transitions(spec) + def _set_exec_started(self, event_source, event): + logging.debug('Setting job state for job %s with id %s: ' + 'Job is building', + event.job.artifact.basename(), event.job.id) + + event.job.is_building = True + + def _set_exec_ended(self, event_source, event): + logging.debug('Setting job state for job %s with id %s: ' + 'Job is NOT building', + event.job.artifact.basename(), event.job.id) + + event.job.is_building = False + def _set_exec_failed(self, event_source, event): + logging.debug('Job %s with id %s failed', + event.job.artifact.basename(), event.job.id) + event.job.failed = True def _handle_request(self, event_source, event): distbuild.crash_point() @@ -224,11 +282,42 @@ class WorkerBuildQueuer(distbuild.StateMachine): event.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, progress) - def _handle_cancel(self, event_source, worker_cancel_pending): - # TODO: this probably needs to check whether any initiators - # care about this thing + def _handle_cancel(self, event_source, event): + + def cancel_this(job): + if event.initiator_id not in job.initiators: + return False # not for us + + name = job.artifact.basename() + job_id = job.id + + logging.debug('Checking whether to remove job %s with job id %s', + name, job_id) + + if len(job.initiators) == 1: + if job.is_building or job.failed: + logging.debug('NOT removing running job %s with job id %s ' + '(WorkerConnection will cancel job)', + name, job_id) + else: + logging.debug('Removing job %s with job id %s', + name, job_id) + return True + else: + # Don't cancel, but still remove this initiator from + # the list of initiators + logging.debug('NOT removing job %s with job id %s ' + 'other initiators want it: %s', name, job_id, + [i for i in job.initiators + if i != event.initiator_id]) + + job.initiators.remove(event.initiator_id) - pass + return False + + self._jobs.remove_jobs( + [job for (_, job) in self._jobs.get_jobs().iteritems() + if cancel_this(job)]) def _handle_worker(self, event_source, event): distbuild.crash_point() @@ -237,17 +326,20 @@ class WorkerBuildQueuer(distbuild.StateMachine): last_job = who.job() # the job this worker's just completed if last_job: - logging.debug('%s wants new job, just did %s' % - (who.name(), last_job.artifact.basename())) + logging.debug('%s wants new job, just did %s', + who.name(), last_job.artifact.basename()) + logging.debug('Removing job %s with job id %s', + last_job.artifact.basename(), last_job.id) self._jobs.remove(last_job) else: - logging.debug('%s wants its first job' % who.name()) + logging.debug('%s wants its first job', who.name()) - logging.debug('WBQ: Adding worker to queue: %s' % event.who) + logging.debug('WBQ: Adding worker to queue: %s', event.who.name()) self._available_workers.append(event) - logging.debug('Current jobs: %s' % self._jobs) - logging.debug('Workers available: %d' % len(self._available_workers)) + + logging.debug('Current jobs: %s', self._jobs) + logging.debug('Workers available: %d', len(self._available_workers)) job = self._jobs.get_next_job() @@ -270,7 +362,6 @@ class WorkerConnection(distbuild.StateMachine): '''Communicate with a single worker.''' _request_ids = distbuild.IdentifierGenerator('WorkerConnection') - _route_map = distbuild.RouteMap() _initiator_request_map = collections.defaultdict(set) def __init__(self, cm, conn, writeable_cache_server, @@ -312,10 +403,12 @@ class WorkerConnection(distbuild.StateMachine): ('building', distbuild.BuildController, distbuild.BuildCancel, 'building', self._maybe_cancel), + ('building', self._jm, distbuild.JsonEof, None, self._reconnect), ('building', self._jm, distbuild.JsonNewMessage, 'building', self._handle_json_message), ('building', self, _BuildFailed, 'idle', self._request_job), + ('building', self, _BuildCancelled, 'idle', self._request_job), ('building', self, _BuildFinished, 'caching', self._request_caching), @@ -329,10 +422,32 @@ class WorkerConnection(distbuild.StateMachine): self._request_job(None, None) def _maybe_cancel(self, event_source, build_cancel): - logging.debug('WC: BuildController %r requested a cancel' % + + if build_cancel.id not in self._job.initiators: + return # event not relevant + + logging.debug('WC: BuildController %r requested a cancel', event_source) - # TODO: implement cancel + if (len(self._job.initiators) == 1): + logging.debug('WC: Cancelling running job %s ' + 'with job id %s running on %s', + self._job.artifact.basename(), + self._job.id, + self.name()) + + msg = distbuild.message('exec-cancel', id=self._job.id) + self._jm.send(msg) + self.mainloop.queue_event(self, _BuildCancelled()) + else: + logging.debug('WC: Not cancelling running job %s with job id %s, ' + 'other initiators want it done: %s', + self._job.artifact.basename(), + self._job.id, + [i for i in self._job.initiators + if i != build_cancel.id]) + + self._job.initiators.remove(build_cancel.id) def _reconnect(self, event_source, event): distbuild.crash_point() @@ -369,8 +484,7 @@ class WorkerConnection(distbuild.StateMachine): started = WorkerBuildStepStarted(self._job.initiators, self._job.artifact.cache_key, self.name()) - self._job.is_building = True - + self.mainloop.queue_event(WorkerConnection, _ExecStarted(self._job)) self.mainloop.queue_event(WorkerConnection, started) def _handle_json_message(self, event_source, event): @@ -409,12 +523,15 @@ class WorkerConnection(distbuild.StateMachine): # Build failed. new_event = WorkerBuildFailed(new, self._job.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) + self.mainloop.queue_event(WorkerConnection, _ExecFailed(self._job)) self.mainloop.queue_event(self, _BuildFailed()) else: # Build succeeded. We have more work to do: caching the result. self.mainloop.queue_event(self, _BuildFinished()) self._exec_response_msg = new + self.mainloop.queue_event(WorkerConnection, _ExecEnded(self._job)) + def _request_job(self, event_source, event): distbuild.crash_point() self.mainloop.queue_event(WorkerConnection, _NeedJob(self)) |