From 54c06f51db3558c80706b02260c6e05b28f3ebdb Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Thu, 24 Apr 2014 18:15:17 +0100 Subject: Remove route map --- distbuild/worker_build_scheduler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 83873d74..15ae4a27 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -270,7 +270,6 @@ class WorkerConnection(distbuild.StateMachine): '''Communicate with a single worker.''' _request_ids = distbuild.IdentifierGenerator('WorkerConnection') - _route_map = distbuild.RouteMap() _initiator_request_map = collections.defaultdict(set) def __init__(self, cm, conn, writeable_cache_server, -- cgit v1.2.1 From fb5f740adeae236ba32bdac67857351054c0bae9 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Mon, 5 May 2014 16:02:24 +0100 Subject: Add cancelling to build controller --- distbuild/build_controller.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py index a3911586..c399939e 100644 --- a/distbuild/build_controller.py +++ b/distbuild/build_controller.py @@ -235,9 +235,9 @@ class BuildController(distbuild.StateMachine): self._maybe_notify_build_failed), ('building', self, _Abort, None, None), ('building', self, _Built, None, self._notify_build_done), - ('building', self._initiator_connection, - distbuild.InitiatorDisconnect, None, - self._notify_initiator_disconnected), + ('building', distbuild.InitiatorConnection, + distbuild.InitiatorDisconnect, 'building', + self._maybe_notify_initiator_disconnected), ] self.add_transitions(spec) @@ -438,12 +438,24 @@ class BuildController(distbuild.StateMachine): a.state = BUILDING - def _notify_initiator_disconnected(self, event_source, disconnect): - logging.debug("BuildController %r: initiator id %s disconnected", self, - disconnect.id) - cancel = BuildCancel(disconnect.id) + def _maybe_notify_initiator_disconnected(self, event_source, event): + if event.id != self._request['id']: + logging.debug('Heard initiator disconnect with event id %d ' + 'but our request id is %d', + event.id, self._request['id']) + return # not for us + + logging.debug("BuildController %r: initiator id %s disconnected", + self, event.id) + + cancel_pending = distbuild.WorkerCancelPending(event.id) + self.mainloop.queue_event(distbuild.WorkerBuildQueuer, cancel_pending) + + cancel = BuildCancel(event.id) self.mainloop.queue_event(BuildController, cancel) + self.mainloop.queue_event(self, _Abort) + def _maybe_relay_build_waiting_for_worker(self, event_source, event): if event.initiator_id != self._request['id']: return # not for us -- cgit v1.2.1 From d4d5866a2ca258ad492c12008b489aaa2a6b60d3 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Mon, 5 May 2014 16:03:15 +0100 Subject: Rename vars in dequeue_events --- distbuild/mainloop.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distbuild/mainloop.py b/distbuild/mainloop.py index e3f9ae2d..c5b7d284 100644 --- a/distbuild/mainloop.py +++ b/distbuild/mainloop.py @@ -112,6 +112,6 @@ class MainLoop(object): def _dequeue_events(self): while self._events: - event_queue, event = self._events.pop(0) - yield event_queue, event + event_source, event = self._events.pop(0) + yield event_source, event -- cgit v1.2.1 From e5ce0b989eb9ffdf704225051056bee20937d1ab Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Mon, 5 May 2014 16:09:54 +0100 Subject: Remove unused import and method add_initiator() isn't necessary given lists have a remove method. --- distbuild/worker_build_scheduler.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 15ae4a27..85856a37 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -17,7 +17,6 @@ import collections -import errno import httplib import logging import socket @@ -103,8 +102,6 @@ class Job(object): self.who = None # we don't know who's going to do this yet self.is_building = False - def add_initiator(self, initiator_id): - self.initiators.append(initiator_id) class Jobs(object): -- cgit v1.2.1 From ee252ff578a83a801730e6f72dbcdc274f2e26d1 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Mon, 5 May 2014 16:12:17 +0100 Subject: Add cancelling to WorkerBuildScheduler --- distbuild/worker_build_scheduler.py | 106 +++++++++++++++++++++++++++++++----- 1 file changed, 93 insertions(+), 13 deletions(-) diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 85856a37..79180843 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -121,6 +121,17 @@ class Jobs(object): def remove(self, job): del self._jobs[job.artifact.basename()] + def get_jobs(self): + return self._jobs + + def remove_jobs(self, jobs): + for job in jobs: + if job.artifact.basename() in self._jobs: + self.remove(job) + else: + logging.warning("Tried to remove a job that doesn't exist " + "(%s)", job.artifact.basename()) + def exists(self, artifact_basename): return artifact_basename in self._jobs @@ -134,14 +145,22 @@ class Jobs(object): def __repr__(self): return str([job.artifact.basename() for (_, job) in self._jobs.iteritems()]) - + + class _BuildFinished(object): pass + class _BuildFailed(object): pass + + +class _BuildCancelled(object): + + pass + class _Cached(object): @@ -176,6 +195,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): self._handle_request), ('idle', WorkerBuildQueuer, WorkerCancelPending, 'idle', self._handle_cancel), + ('idle', WorkerConnection, _NeedJob, 'idle', self._handle_worker), ] self.add_transitions(spec) @@ -221,11 +241,42 @@ class WorkerBuildQueuer(distbuild.StateMachine): event.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, progress) - def _handle_cancel(self, event_source, worker_cancel_pending): - # TODO: this probably needs to check whether any initiators - # care about this thing + def _handle_cancel(self, event_source, event): + + def cancel_this(job): + if event.initiator_id not in job.initiators: + return False # not for us + + name = job.artifact.basename() + job_id = job.id - pass + logging.debug('Checking whether to remove job %s with job id %s', + name, job_id) + + if len(job.initiators) == 1: + if job.is_building: + logging.debug('NOT removing running job %s with job id %s ' + '(WorkerConnection will cancel job)', + name, job_id) + else: + logging.debug('Removing job %s with job id %s', + name, job_id) + return True + else: + # Don't cancel, but still remove this initiator from + # the list of initiators + logging.debug('NOT removing job %s with job id %s ' + 'other initiators want it: %s', name, job_id, + [i for i in job.initiators + if i != event.initiator_id]) + + job.initiators.remove(event.initiator_id) + + return False + + self._jobs.remove_jobs( + [job for (_, job) in self._jobs.get_jobs().iteritems() + if cancel_this(job)]) def _handle_worker(self, event_source, event): distbuild.crash_point() @@ -234,17 +285,20 @@ class WorkerBuildQueuer(distbuild.StateMachine): last_job = who.job() # the job this worker's just completed if last_job: - logging.debug('%s wants new job, just did %s' % - (who.name(), last_job.artifact.basename())) + logging.debug('%s wants new job, just did %s', + who.name(), last_job.artifact.basename()) + logging.debug('Removing job %s with job id %s', + last_job.artifact.basename(), last_job.id) self._jobs.remove(last_job) else: - logging.debug('%s wants its first job' % who.name()) + logging.debug('%s wants its first job', who.name()) - logging.debug('WBQ: Adding worker to queue: %s' % event.who) + logging.debug('WBQ: Adding worker to queue: %s', event.who.name()) self._available_workers.append(event) - logging.debug('Current jobs: %s' % self._jobs) - logging.debug('Workers available: %d' % len(self._available_workers)) + + logging.debug('Current jobs: %s', self._jobs) + logging.debug('Workers available: %d', len(self._available_workers)) job = self._jobs.get_next_job() @@ -308,10 +362,12 @@ class WorkerConnection(distbuild.StateMachine): ('building', distbuild.BuildController, distbuild.BuildCancel, 'building', self._maybe_cancel), + ('building', self._jm, distbuild.JsonEof, None, self._reconnect), ('building', self._jm, distbuild.JsonNewMessage, 'building', self._handle_json_message), ('building', self, _BuildFailed, 'idle', self._request_job), + ('building', self, _BuildCancelled, 'idle', self._request_job), ('building', self, _BuildFinished, 'caching', self._request_caching), @@ -325,10 +381,32 @@ class WorkerConnection(distbuild.StateMachine): self._request_job(None, None) def _maybe_cancel(self, event_source, build_cancel): - logging.debug('WC: BuildController %r requested a cancel' % + + if build_cancel.id not in self._job.initiators: + return # event not relevant + + logging.debug('WC: BuildController %r requested a cancel', event_source) - # TODO: implement cancel + if (len(self._job.initiators) == 1): + logging.debug('WC: Cancelling running job %s ' + 'with job id %s running on %s', + self._job.artifact.basename(), + self._job.id, + self.name()) + + msg = distbuild.message('exec-cancel', id=self._job.id) + self._jm.send(msg) + self.mainloop.queue_event(self, _BuildCancelled()) + else: + logging.debug('WC: Not cancelling running job %s with job id %s, ' + 'other initiators want it done: %s', + self._job.artifact.basename(), + self._job.id, + [i for i in self._job.initiators + if i != build_cancel.id]) + + self._job.initiators.remove(build_cancel.id) def _reconnect(self, event_source, event): distbuild.crash_point() @@ -411,6 +489,8 @@ class WorkerConnection(distbuild.StateMachine): self.mainloop.queue_event(self, _BuildFinished()) self._exec_response_msg = new + self._job.is_building = False + def _request_job(self, event_source, event): distbuild.crash_point() self.mainloop.queue_event(WorkerConnection, _NeedJob(self)) -- cgit v1.2.1 From 7d14b7e578640e9b7c500cbee0dd42f7ef88e726 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Mon, 5 May 2014 19:45:52 +0100 Subject: Use messages to update job state --- distbuild/worker_build_scheduler.py | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 79180843..7f813251 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -165,6 +165,18 @@ class _BuildCancelled(object): class _Cached(object): pass + + +class _ExecStarted(object): + + def __init__(self, job): + self.job = job + + +class _ExecEnded(object): + + def __init__(self, job): + self.job = job class WorkerBuildQueuer(distbuild.StateMachine): @@ -197,10 +209,20 @@ class WorkerBuildQueuer(distbuild.StateMachine): self._handle_cancel), ('idle', WorkerConnection, _NeedJob, 'idle', self._handle_worker), + ('idle', WorkerConnection, _ExecStarted, 'idle', + self._set_exec_started), + ('idle', WorkerConnection, _ExecEnded, 'idle', + self._set_exec_ended), ] self.add_transitions(spec) + def _set_exec_started(self, event_source, event): + logging.debug('Setting job state: Job is building') + event.job.is_building = True + def _set_exec_ended(self, event_source, event): + logging.debug('Setting job state: Job is NOT building') + event.job.is_building = False def _handle_request(self, event_source, event): distbuild.crash_point() @@ -443,8 +465,7 @@ class WorkerConnection(distbuild.StateMachine): started = WorkerBuildStepStarted(self._job.initiators, self._job.artifact.cache_key, self.name()) - self._job.is_building = True - + self.mainloop.queue_event(WorkerConnection, _ExecStarted(self._job)) self.mainloop.queue_event(WorkerConnection, started) def _handle_json_message(self, event_source, event): @@ -489,7 +510,7 @@ class WorkerConnection(distbuild.StateMachine): self.mainloop.queue_event(self, _BuildFinished()) self._exec_response_msg = new - self._job.is_building = False + self.mainloop.queue_event(WorkerConnection, _ExecEnded(self._job)) def _request_job(self, event_source, event): distbuild.crash_point() -- cgit v1.2.1 From c8cdcbb70740f3c71da83587c1368b7c4b03b330 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Mon, 5 May 2014 21:18:42 +0100 Subject: Add _ExecFailed event To cancel jobs cleanly we need to know when a job has failed. --- distbuild/worker_build_scheduler.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 7f813251..48ef4a7f 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -101,6 +101,7 @@ class Job(object): self.initiators = [initiator_id] self.who = None # we don't know who's going to do this yet self.is_building = False + self.failed = False class Jobs(object): @@ -177,7 +178,12 @@ class _ExecEnded(object): def __init__(self, job): self.job = job - + + +class _ExecFailed(object): + + def __init__(self, job): + self.job = job class WorkerBuildQueuer(distbuild.StateMachine): @@ -213,17 +219,30 @@ class WorkerBuildQueuer(distbuild.StateMachine): self._set_exec_started), ('idle', WorkerConnection, _ExecEnded, 'idle', self._set_exec_ended), + ('idle', WorkerConnection, _ExecFailed, 'idle', + self._set_exec_failed) ] self.add_transitions(spec) def _set_exec_started(self, event_source, event): - logging.debug('Setting job state: Job is building') + logging.debug('Setting job state for job %s with id %s: ' + 'Job is building', + event.job.artifact.basename(), event.job.id) + event.job.is_building = True def _set_exec_ended(self, event_source, event): - logging.debug('Setting job state: Job is NOT building') + logging.debug('Setting job state for job %s with id %s: ' + 'Job is NOT building', + event.job.artifact.basename(), event.job.id) + event.job.is_building = False + def _set_exec_failed(self, event_source, event): + logging.debug('Job %s with id %s failed', + event.job.artifact.basename(), event.job.id) + event.job.failed = True + def _handle_request(self, event_source, event): distbuild.crash_point() @@ -276,7 +295,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): name, job_id) if len(job.initiators) == 1: - if job.is_building: + if job.is_building or job.failed: logging.debug('NOT removing running job %s with job id %s ' '(WorkerConnection will cancel job)', name, job_id) @@ -504,6 +523,7 @@ class WorkerConnection(distbuild.StateMachine): # Build failed. new_event = WorkerBuildFailed(new, self._job.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) + self.mainloop.queue_event(WorkerConnection, _ExecFailed(self._job)) self.mainloop.queue_event(self, _BuildFailed()) else: # Build succeeded. We have more work to do: caching the result. -- cgit v1.2.1