summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRichard Ipsum <richard.ipsum@codethink.co.uk>2014-05-07 14:16:21 +0000
committerRichard Ipsum <richard.ipsum@codethink.co.uk>2014-05-07 14:16:21 +0000
commit9ea146fdcfbe9b93b297165b1b5222e1eadb9d23 (patch)
treeca17eabee3e7fe929d397df158778caf358b5661
parent201b043e4d7ab03d11ad1e71bac7916b19aed294 (diff)
parentc8cdcbb70740f3c71da83587c1368b7c4b03b330 (diff)
downloadmorph-9ea146fdcfbe9b93b297165b1b5222e1eadb9d23.tar.gz
Merge branch 'baserock/richardipsum/distbuild_cancel2'
Reviewed by: Richard Maw Lars Wirzenius Daniel Silverstone
-rw-r--r--distbuild/build_controller.py26
-rw-r--r--distbuild/mainloop.py4
-rw-r--r--distbuild/worker_build_scheduler.py157
3 files changed, 158 insertions, 29 deletions
diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py
index a3911586..c399939e 100644
--- a/distbuild/build_controller.py
+++ b/distbuild/build_controller.py
@@ -235,9 +235,9 @@ class BuildController(distbuild.StateMachine):
self._maybe_notify_build_failed),
('building', self, _Abort, None, None),
('building', self, _Built, None, self._notify_build_done),
- ('building', self._initiator_connection,
- distbuild.InitiatorDisconnect, None,
- self._notify_initiator_disconnected),
+ ('building', distbuild.InitiatorConnection,
+ distbuild.InitiatorDisconnect, 'building',
+ self._maybe_notify_initiator_disconnected),
]
self.add_transitions(spec)
@@ -438,12 +438,24 @@ class BuildController(distbuild.StateMachine):
a.state = BUILDING
- def _notify_initiator_disconnected(self, event_source, disconnect):
- logging.debug("BuildController %r: initiator id %s disconnected", self,
- disconnect.id)
- cancel = BuildCancel(disconnect.id)
+ def _maybe_notify_initiator_disconnected(self, event_source, event):
+ if event.id != self._request['id']:
+ logging.debug('Heard initiator disconnect with event id %d '
+ 'but our request id is %d',
+ event.id, self._request['id'])
+ return # not for us
+
+ logging.debug("BuildController %r: initiator id %s disconnected",
+ self, event.id)
+
+ cancel_pending = distbuild.WorkerCancelPending(event.id)
+ self.mainloop.queue_event(distbuild.WorkerBuildQueuer, cancel_pending)
+
+ cancel = BuildCancel(event.id)
self.mainloop.queue_event(BuildController, cancel)
+ self.mainloop.queue_event(self, _Abort)
+
def _maybe_relay_build_waiting_for_worker(self, event_source, event):
if event.initiator_id != self._request['id']:
return # not for us
diff --git a/distbuild/mainloop.py b/distbuild/mainloop.py
index e3f9ae2d..c5b7d284 100644
--- a/distbuild/mainloop.py
+++ b/distbuild/mainloop.py
@@ -112,6 +112,6 @@ class MainLoop(object):
def _dequeue_events(self):
while self._events:
- event_queue, event = self._events.pop(0)
- yield event_queue, event
+ event_source, event = self._events.pop(0)
+ yield event_source, event
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py
index 83873d74..48ef4a7f 100644
--- a/distbuild/worker_build_scheduler.py
+++ b/distbuild/worker_build_scheduler.py
@@ -17,7 +17,6 @@
import collections
-import errno
import httplib
import logging
import socket
@@ -102,9 +101,8 @@ class Job(object):
self.initiators = [initiator_id]
self.who = None # we don't know who's going to do this yet
self.is_building = False
+ self.failed = False
- def add_initiator(self, initiator_id):
- self.initiators.append(initiator_id)
class Jobs(object):
@@ -124,6 +122,17 @@ class Jobs(object):
def remove(self, job):
del self._jobs[job.artifact.basename()]
+ def get_jobs(self):
+ return self._jobs
+
+ def remove_jobs(self, jobs):
+ for job in jobs:
+ if job.artifact.basename() in self._jobs:
+ self.remove(job)
+ else:
+ logging.warning("Tried to remove a job that doesn't exist "
+ "(%s)", job.artifact.basename())
+
def exists(self, artifact_basename):
return artifact_basename in self._jobs
@@ -137,19 +146,44 @@ class Jobs(object):
def __repr__(self):
return str([job.artifact.basename()
for (_, job) in self._jobs.iteritems()])
-
+
+
class _BuildFinished(object):
pass
+
class _BuildFailed(object):
pass
+
+
+class _BuildCancelled(object):
+
+ pass
+
class _Cached(object):
pass
-
+
+
+class _ExecStarted(object):
+
+ def __init__(self, job):
+ self.job = job
+
+
+class _ExecEnded(object):
+
+ def __init__(self, job):
+ self.job = job
+
+
+class _ExecFailed(object):
+
+ def __init__(self, job):
+ self.job = job
class WorkerBuildQueuer(distbuild.StateMachine):
@@ -179,11 +213,35 @@ class WorkerBuildQueuer(distbuild.StateMachine):
self._handle_request),
('idle', WorkerBuildQueuer, WorkerCancelPending, 'idle',
self._handle_cancel),
+
('idle', WorkerConnection, _NeedJob, 'idle', self._handle_worker),
+ ('idle', WorkerConnection, _ExecStarted, 'idle',
+ self._set_exec_started),
+ ('idle', WorkerConnection, _ExecEnded, 'idle',
+ self._set_exec_ended),
+ ('idle', WorkerConnection, _ExecFailed, 'idle',
+ self._set_exec_failed)
]
self.add_transitions(spec)
+ def _set_exec_started(self, event_source, event):
+ logging.debug('Setting job state for job %s with id %s: '
+ 'Job is building',
+ event.job.artifact.basename(), event.job.id)
+
+ event.job.is_building = True
+
+ def _set_exec_ended(self, event_source, event):
+ logging.debug('Setting job state for job %s with id %s: '
+ 'Job is NOT building',
+ event.job.artifact.basename(), event.job.id)
+
+ event.job.is_building = False
+ def _set_exec_failed(self, event_source, event):
+ logging.debug('Job %s with id %s failed',
+ event.job.artifact.basename(), event.job.id)
+ event.job.failed = True
def _handle_request(self, event_source, event):
distbuild.crash_point()
@@ -224,11 +282,42 @@ class WorkerBuildQueuer(distbuild.StateMachine):
event.artifact.cache_key)
self.mainloop.queue_event(WorkerConnection, progress)
- def _handle_cancel(self, event_source, worker_cancel_pending):
- # TODO: this probably needs to check whether any initiators
- # care about this thing
+ def _handle_cancel(self, event_source, event):
+
+ def cancel_this(job):
+ if event.initiator_id not in job.initiators:
+ return False # not for us
+
+ name = job.artifact.basename()
+ job_id = job.id
+
+ logging.debug('Checking whether to remove job %s with job id %s',
+ name, job_id)
+
+ if len(job.initiators) == 1:
+ if job.is_building or job.failed:
+ logging.debug('NOT removing running job %s with job id %s '
+ '(WorkerConnection will cancel job)',
+ name, job_id)
+ else:
+ logging.debug('Removing job %s with job id %s',
+ name, job_id)
+ return True
+ else:
+ # Don't cancel, but still remove this initiator from
+ # the list of initiators
+ logging.debug('NOT removing job %s with job id %s '
+ 'other initiators want it: %s', name, job_id,
+ [i for i in job.initiators
+ if i != event.initiator_id])
+
+ job.initiators.remove(event.initiator_id)
- pass
+ return False
+
+ self._jobs.remove_jobs(
+ [job for (_, job) in self._jobs.get_jobs().iteritems()
+ if cancel_this(job)])
def _handle_worker(self, event_source, event):
distbuild.crash_point()
@@ -237,17 +326,20 @@ class WorkerBuildQueuer(distbuild.StateMachine):
last_job = who.job() # the job this worker's just completed
if last_job:
- logging.debug('%s wants new job, just did %s' %
- (who.name(), last_job.artifact.basename()))
+ logging.debug('%s wants new job, just did %s',
+ who.name(), last_job.artifact.basename())
+ logging.debug('Removing job %s with job id %s',
+ last_job.artifact.basename(), last_job.id)
self._jobs.remove(last_job)
else:
- logging.debug('%s wants its first job' % who.name())
+ logging.debug('%s wants its first job', who.name())
- logging.debug('WBQ: Adding worker to queue: %s' % event.who)
+ logging.debug('WBQ: Adding worker to queue: %s', event.who.name())
self._available_workers.append(event)
- logging.debug('Current jobs: %s' % self._jobs)
- logging.debug('Workers available: %d' % len(self._available_workers))
+
+ logging.debug('Current jobs: %s', self._jobs)
+ logging.debug('Workers available: %d', len(self._available_workers))
job = self._jobs.get_next_job()
@@ -270,7 +362,6 @@ class WorkerConnection(distbuild.StateMachine):
'''Communicate with a single worker.'''
_request_ids = distbuild.IdentifierGenerator('WorkerConnection')
- _route_map = distbuild.RouteMap()
_initiator_request_map = collections.defaultdict(set)
def __init__(self, cm, conn, writeable_cache_server,
@@ -312,10 +403,12 @@ class WorkerConnection(distbuild.StateMachine):
('building', distbuild.BuildController,
distbuild.BuildCancel, 'building',
self._maybe_cancel),
+
('building', self._jm, distbuild.JsonEof, None, self._reconnect),
('building', self._jm, distbuild.JsonNewMessage, 'building',
self._handle_json_message),
('building', self, _BuildFailed, 'idle', self._request_job),
+ ('building', self, _BuildCancelled, 'idle', self._request_job),
('building', self, _BuildFinished, 'caching',
self._request_caching),
@@ -329,10 +422,32 @@ class WorkerConnection(distbuild.StateMachine):
self._request_job(None, None)
def _maybe_cancel(self, event_source, build_cancel):
- logging.debug('WC: BuildController %r requested a cancel' %
+
+ if build_cancel.id not in self._job.initiators:
+ return # event not relevant
+
+ logging.debug('WC: BuildController %r requested a cancel',
event_source)
- # TODO: implement cancel
+ if (len(self._job.initiators) == 1):
+ logging.debug('WC: Cancelling running job %s '
+ 'with job id %s running on %s',
+ self._job.artifact.basename(),
+ self._job.id,
+ self.name())
+
+ msg = distbuild.message('exec-cancel', id=self._job.id)
+ self._jm.send(msg)
+ self.mainloop.queue_event(self, _BuildCancelled())
+ else:
+ logging.debug('WC: Not cancelling running job %s with job id %s, '
+ 'other initiators want it done: %s',
+ self._job.artifact.basename(),
+ self._job.id,
+ [i for i in self._job.initiators
+ if i != build_cancel.id])
+
+ self._job.initiators.remove(build_cancel.id)
def _reconnect(self, event_source, event):
distbuild.crash_point()
@@ -369,8 +484,7 @@ class WorkerConnection(distbuild.StateMachine):
started = WorkerBuildStepStarted(self._job.initiators,
self._job.artifact.cache_key, self.name())
- self._job.is_building = True
-
+ self.mainloop.queue_event(WorkerConnection, _ExecStarted(self._job))
self.mainloop.queue_event(WorkerConnection, started)
def _handle_json_message(self, event_source, event):
@@ -409,12 +523,15 @@ class WorkerConnection(distbuild.StateMachine):
# Build failed.
new_event = WorkerBuildFailed(new, self._job.artifact.cache_key)
self.mainloop.queue_event(WorkerConnection, new_event)
+ self.mainloop.queue_event(WorkerConnection, _ExecFailed(self._job))
self.mainloop.queue_event(self, _BuildFailed())
else:
# Build succeeded. We have more work to do: caching the result.
self.mainloop.queue_event(self, _BuildFinished())
self._exec_response_msg = new
+ self.mainloop.queue_event(WorkerConnection, _ExecEnded(self._job))
+
def _request_job(self, event_source, event):
distbuild.crash_point()
self.mainloop.queue_event(WorkerConnection, _NeedJob(self))