summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--distbuild/worker_build_scheduler.py224
1 files changed, 115 insertions, 109 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py
index bc2df4b1..71e1c3ef 100644
--- a/distbuild/worker_build_scheduler.py
+++ b/distbuild/worker_build_scheduler.py
@@ -134,11 +134,11 @@ class JobQueue(object):
self._jobs = {}
def get(self, artifact_basename):
- return (self._jobs[artifact_basename]
- if artifact_basename in self._jobs else None)
+ return self._jobs.get(artifact_basename, None)
def add(self, job):
artifact_basename = job.artifact.basename()
+
if artifact_basename in self._jobs:
logging.error(
"Duplicate job for %s added to %s job queue, ignoring.",
@@ -153,23 +153,25 @@ class JobQueue(object):
logging.warning("Tried to remove a job that doesn't exist "
"(%s)", job.artifact.basename())
- def get_jobs(self):
- return self._jobs
+ def __contains__(self, artifact_basename):
+ return artifact_basename in self._jobs
+
+ def __iter__(self):
+ return self._jobs.itervalues()
def remove_jobs(self, jobs):
for job in jobs:
self.remove(job)
- def exists(self, artifact_basename):
- return artifact_basename in self._jobs
-
def get_next_job(self):
# for now just return the first thing we find that's not being built
- waiting = [job for (_, job) in
- self._jobs.iteritems() if job.who == None]
+ waiting = [job for job in self if job.who == None]
return waiting.pop() if len(waiting) > 0 else None
+ def running_jobs(self):
+ return [job for job in self if job.running()]
+
def __repr__(self):
items = []
for job in self._jobs.itervalues():
@@ -180,7 +182,8 @@ class JobQueue(object):
class _BuildFinished(object):
- pass
+ def __init__(self, job):
+ self.job = job
class _BuildFailed(object):
@@ -260,10 +263,14 @@ class WorkerBuildQueuer(distbuild.StateMachine):
event.job.set_state('running')
def _set_job_finished(self, event_source, event):
- event.job.set_state('complete')
+ job = event.job
+ job.set_state('complete')
+ self._jobs.remove(job)
def _set_job_failed(self, event_source, event):
- event.job.set_state('failed')
+ job = event.job
+ job.set_state('failed')
+ self._jobs.remove(job)
def _handle_request(self, event_source, event):
distbuild.crash_point()
@@ -276,7 +283,7 @@ class WorkerBuildQueuer(distbuild.StateMachine):
# If so, add our initiator id to the existing job
# If not, create a job
- if self._jobs.exists(event.artifact.basename()):
+ if event.artifact.basename() in self._jobs:
job = self._jobs.get(event.artifact.basename())
job.initiators.append(event.initiator_id)
@@ -338,24 +345,11 @@ class WorkerBuildQueuer(distbuild.StateMachine):
return False
self._jobs.remove_jobs(
- [job for (_, job) in self._jobs.get_jobs().iteritems()
- if cancel_this(job)])
+ [job for job in self._jobs if cancel_this(job)])
def _handle_worker(self, event_source, event):
distbuild.crash_point()
- who = event.who
- last_job = who.current_job() # the job this worker's just completed
-
- if last_job:
- logging.debug('%s wants new job, just did %s',
- who.name(), last_job.artifact.basename())
-
- logging.debug('Removing job %s', last_job.artifact.basename())
- self._jobs.remove(last_job)
- else:
- logging.debug('%s wants its first job', who.name())
-
logging.debug('WBQ: Adding worker to queue: %s', event.who.name())
self._available_workers.append(event)
@@ -408,21 +402,15 @@ class WorkerConnection(distbuild.StateMachine):
self._worker_cache_server_port = worker_cache_server_port
self._morph_instance = morph_instance
- self._active_jobs = dict()
- self._current_job = None
- self._current_job_exec_response = None
- self._current_job_cache_request = None
-
addr, port = self._conn.getpeername()
name = socket.getfqdn(addr)
self._worker_name = '%s:%s' % (name, port)
+ self._jobs = JobQueue(owner=self.name())
+
def name(self):
return self._worker_name
- def current_job(self):
- return self._current_job
-
def __str__(self):
return self.name()
@@ -463,27 +451,38 @@ class WorkerConnection(distbuild.StateMachine):
self._request_job(None, None)
def _maybe_cancel(self, event_source, build_cancel):
-
- if build_cancel.id not in self._current_job.initiators:
- return # event not relevant
-
logging.debug('WC: BuildController %r requested a cancel',
event_source)
- job = self._current_job
- if (len(job.initiators) == 1):
- logging.debug('WC: Cancelling running job %s running on %s',
- job.artifact.basename(), self.name())
+ initiator_id = build_cancel.id
+ for job in self._jobs.running_jobs():
+ self._remove_initiator_from_job(job, initiator_id)
- msg = distbuild.message('exec-cancel', id=job.artifact.basename())
- self._jm.send(msg)
- self.mainloop.queue_event(self, _BuildCancelled())
- else:
- logging.debug('WC: Not cancelling running job %s, other initiators '
- 'want it done: %s', job.artifact.basename(),
- [i for i in job.initiators if i != build_cancel.id])
+ def _remove_initiator_from_job(self, job, initiator_id):
+ '''Remove the given initiator from 'job', and cancel it if needed.
+
+ If the given initiator is not interested in 'job', nothing happens.
- job.initiators.remove(build_cancel.id)
+ '''
+
+ if initiator_id in job.initiators:
+ if len(job.initiators) == 1:
+ self._cancel_job(job)
+ else:
+ logging.debug(
+ 'WC: Not cancelling running job %s, other initiators want '
+ 'it done: %s', job.artifact.basename(),
+ [i for i in job.initiators if i != initiator_id])
+ job.initiators.remove(initiator_id)
+
+ def _cancel_job(self, job):
+ logging.debug(
+ 'WC: Cancelling job %s, currently building on %s',
+ job.artifact.basename(), self.name())
+
+ msg = distbuild.message('exec-cancel', id=job.artifact.basename())
+ self._jm.send(msg)
+ self.mainloop.queue_event(self, _BuildCancelled())
def _disconnected(self, event_source, event):
distbuild.crash_point()
@@ -493,20 +492,27 @@ class WorkerConnection(distbuild.StateMachine):
self.mainloop.queue_event(self._cm, distbuild.Reconnect())
- def _start_build(self, event_source, event):
- distbuild.crash_point()
-
- job = event.job
+ def _sanity_check_new_job(self, job):
+ # There's nothing that we can really do if the controller goes nuts
+ # (there's no 'reject job' message), but we can at least log warnings.
- if job.artifact.basename() in self._active_jobs:
+ if job.artifact.basename() in self._jobs:
logging.warn('Worker %s already has job %s', self.name(),
job.artifact.basename())
- if self._current_job_exec_response or self._current_job_cache_request:
- logging.warn('Caching not finished for %s', self._current_job.id)
+ running_jobs = self._jobs.running_jobs()
+ if len(running_jobs) != 0:
+ logging.warn('This worker already has running jobs: %s',
+ running_jobs)
+
+ def _start_build(self, event_source, event):
+ distbuild.crash_point()
+
+ job = event.job
+ self._sanity_check_new_job(job)
- self._active_jobs[job.artifact.basename()] = job
- self._current_job = job
+ self._jobs.add(job)
+ job.set_state('running')
logging.debug('WC: starting build: %s for %s' %
(job.artifact.name, job.initiators))
@@ -527,10 +533,12 @@ class WorkerConnection(distbuild.StateMachine):
)
self._jm.send(msg)
+ # The WorkerBuildQueuer object will set the job state to 'running' when
+ # it receives the _JobStarted message.
+ self.mainloop.queue_event(WorkerConnection, _JobStarted(job))
+
started = WorkerBuildStepStarted(job.initiators,
job.artifact.cache_key, self.name())
-
- self.mainloop.queue_event(WorkerConnection, _JobStarted(job))
self.mainloop.queue_event(WorkerConnection, started)
def _handle_json_message(self, event_source, event):
@@ -547,7 +555,7 @@ class WorkerConnection(distbuild.StateMachine):
}
handler = handlers[event.msg['type']]
- job = self._active_jobs.get(event.msg['id'])
+ job = self._jobs.get(event.msg['id'])
if job:
handler(event.msg, job)
@@ -583,12 +591,8 @@ class WorkerConnection(distbuild.StateMachine):
self.mainloop.queue_event(self, _BuildFailed())
else:
# Build succeeded. We have more work to do: caching the result.
- self.mainloop.queue_event(self, _BuildFinished())
- self._current_job_exec_response = new
-
- # The job is no longer considered active, because the worker is
- # finished with it so we won't receive any more messages about it.
- del self._active_jobs[job.artifact.basename()]
+ self.mainloop.queue_event(self, _BuildFinished(job))
+ job._exec_response = new
def _request_job(self, event_source, event):
distbuild.crash_point()
@@ -602,7 +606,7 @@ class WorkerConnection(distbuild.StateMachine):
logging.debug('Requesting shared artifact cache to get artifacts')
- job = self._current_job
+ job = event.job
kind = job.artifact.kind
if kind == 'chunk':
@@ -633,7 +637,7 @@ class WorkerConnection(distbuild.StateMachine):
msg = distbuild.message(
'http-request', id=self._request_ids.next(), url=url,
method='GET', body=None, headers=None)
- self._current_job_cache_request = msg['id']
+ job._cache_request_id = msg['id']
req = distbuild.HelperRequest(msg)
self.mainloop.queue_event(distbuild.HelperRouter, req)
@@ -642,41 +646,43 @@ class WorkerConnection(distbuild.StateMachine):
self.mainloop.queue_event(WorkerConnection, progress)
def _maybe_handle_helper_result(self, event_source, event):
- if event.msg['id'] == self._current_job_cache_request:
- distbuild.crash_point()
-
- logging.debug('caching: event.msg: %s' % repr(event.msg))
- if event.msg['status'] == httplib.OK:
- logging.debug('Shared artifact cache population done')
-
- new_event = WorkerBuildFinished(
- self._current_job_exec_response,
- self._current_job.artifact.cache_key)
- self.mainloop.queue_event(WorkerConnection, new_event)
- self.mainloop.queue_event(self, _Cached())
- else:
- logging.error(
- 'Failed to populate artifact cache: %s %s' %
- (event.msg['status'], event.msg['body']))
-
- # We will attempt to remove this job twice
- # unless we mark it as failed before the BuildController
- # processes the WorkerBuildFailed event.
- #
- # The BuildController will not try to cancel jobs that have
- # been marked as failed.
- self.mainloop.queue_event(WorkerConnection,
- _JobFailed(self._current_job))
-
- new_event = WorkerBuildFailed(
- self._current_job_exec_response,
- self._current_job.artifact.cache_key)
- self.mainloop.queue_event(WorkerConnection, new_event)
-
- self.mainloop.queue_event(self, _BuildFailed())
-
- self.mainloop.queue_event(WorkerConnection,
- _JobFinished(self._current_job))
-
- self._current_job_exec_response = None
- self._current_job_cache_request = None
+ # This function is called for every HelperResult message sent by the
+ # controller's distbuild-helper process (for every completed or failed
+ # http-request).
+ for job in self._jobs:
+ if event.msg['id'] == getattr(job, '_cache_request_id', None):
+ self._handle_helper_result_for_job(job, event)
+
+ def _handle_helper_result_for_job(self, job, event):
+ distbuild.crash_point()
+
+ logging.debug('caching: event.msg: %s' % repr(event.msg))
+ if event.msg['status'] == httplib.OK:
+ logging.debug('Shared artifact cache population done')
+
+ finished_event = WorkerBuildFinished(
+ job._exec_response, job.artifact.cache_key)
+ self.mainloop.queue_event(WorkerConnection, finished_event)
+
+ self.mainloop.queue_event(self, _Cached())
+ else:
+ logging.error(
+ 'Failed to populate artifact cache: %s %s' %
+ (event.msg['status'], event.msg['body']))
+
+ # We will attempt to remove this job twice
+ # unless we mark it as failed before the BuildController
+ # processes the WorkerBuildFailed event.
+ #
+ # The BuildController will not try to cancel jobs that have
+ # been marked as failed.
+ self.mainloop.queue_event(WorkerConnection, _JobFailed(job))
+
+ failed_event = WorkerBuildFailed(
+ job._exec_response, job.artifact.cache_key)
+ self.mainloop.queue_event(WorkerConnection, failed_event)
+
+ self.mainloop.queue_event(self, _BuildFailed())
+
+ # Caching is the last step of a job, so we're now done with it.
+ self.mainloop.queue_event(WorkerConnection, _JobFinished(job))