diff options
author | Sam Thursfield <sam.thursfield@codethink.co.uk> | 2015-04-09 14:50:11 +0000 |
---|---|---|
committer | Sam Thursfield <sam.thursfield@codethink.co.uk> | 2015-05-07 14:36:11 +0000 |
commit | 75ef3e9585091b463b60d2981b3b7283a2ea8eab (patch) | |
tree | 0a6eb257b136947eafc49dd14d459b2b4d3eca7d /distbuild | |
parent | be3383e67b115a4980eb1ef47a84bdcf8c5fa028 (diff) | |
download | morph-75ef3e9585091b463b60d2981b3b7283a2ea8eab.tar.gz |
distbuild: Track worker jobs using artifact basename only
Rather than generating IDs for each job, identify them by what artifact
is going to be built. Artifact cache IDs need to be unique in any case.
Change-Id: I37a0277931c45a8fb6e37ae7c2a6a942ae732fdd
Diffstat (limited to 'distbuild')
-rw-r--r-- | distbuild/worker_build_scheduler.py | 57 |
1 files changed, 23 insertions, 34 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 843b9eb9..bc2df4b1 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -100,8 +100,7 @@ class _Disconnected(object): class Job(object): - def __init__(self, job_id, artifact, initiator_id): - self.id = job_id + def __init__(self, artifact, initiator_id): self.artifact = artifact self.initiators = [initiator_id] self.who = None # we don't know who's going to do this yet @@ -122,8 +121,8 @@ class Job(object): def set_state(self, state): assert state in ['queued', 'running', 'complete', 'failed'] - logging.debug('Setting job state for job %s with id %s to %s', - self.artifact.basename(), self.id, state) + logging.debug('Setting job state for job %s to %s', + self.artifact.basename(), state) self._state = state @@ -236,7 +235,6 @@ class WorkerBuildQueuer(distbuild.StateMachine): logging.debug('WBQ: Setting up %s' % self) self._available_workers = [] self._jobs = JobQueue(owner='controller') - self._idgen = distbuild.IdentifierGenerator('WorkerBuildQueuerJob') spec = [ # state, source, event_class, new_state, callback @@ -301,7 +299,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): self.mainloop.queue_event(WorkerConnection, progress) else: logging.debug('WBQ: Creating job for: %s' % event.artifact.name) - job = Job(self._idgen.next(), event.artifact, event.initiator_id) + job = Job(event.artifact, event.initiator_id) self._jobs.add(job) if self._available_workers: @@ -318,27 +316,22 @@ class WorkerBuildQueuer(distbuild.StateMachine): return False # not for us name = job.artifact.basename() - job_id = job.id - logging.debug('Checking whether to remove job %s with job id %s', - name, job_id) + logging.debug('Checking whether to remove job %s', name) if len(job.initiators) == 1: if job.running() or job.failed(): - logging.debug('NOT removing running job %s with job id %s ' - '(WorkerConnection will cancel job)', - name, job_id) + logging.debug('NOT removing running job %s ' + '(WorkerConnection will cancel job)', name) else: - logging.debug('Removing job %s with job id %s', - name, job_id) + logging.debug('Removing job %s with job id %s', name) return True else: # Don't cancel, but still remove this initiator from # the list of initiators - logging.debug('NOT removing job %s with job id %s ' - 'other initiators want it: %s', name, job_id, - [i for i in job.initiators - if i != event.initiator_id]) + logging.debug('NOT removing job %s, other initiators want it: ' + '%s', name, [i for i in job.initiators if i != + event.initiator_id]) job.initiators.remove(event.initiator_id) @@ -358,8 +351,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): logging.debug('%s wants new job, just did %s', who.name(), last_job.artifact.basename()) - logging.debug('Removing job %s with job id %s', - last_job.artifact.basename(), last_job.id) + logging.debug('Removing job %s', last_job.artifact.basename()) self._jobs.remove(last_job) else: logging.debug('%s wants its first job', who.name()) @@ -480,19 +472,15 @@ class WorkerConnection(distbuild.StateMachine): job = self._current_job if (len(job.initiators) == 1): - logging.debug('WC: Cancelling running job %s ' - 'with job id %s running on %s', - job.artifact.basename(), job.id, - self.name()) + logging.debug('WC: Cancelling running job %s running on %s', + job.artifact.basename(), self.name()) - msg = distbuild.message('exec-cancel', id=job.id) + msg = distbuild.message('exec-cancel', id=job.artifact.basename()) self._jm.send(msg) self.mainloop.queue_event(self, _BuildCancelled()) else: - logging.debug('WC: Not cancelling running job %s with job id %s, ' - 'other initiators want it done: %s', - job.artifact.basename(), - job.id, + logging.debug('WC: Not cancelling running job %s, other initiators ' + 'want it done: %s', job.artifact.basename(), [i for i in job.initiators if i != build_cancel.id]) job.initiators.remove(build_cancel.id) @@ -510,13 +498,14 @@ class WorkerConnection(distbuild.StateMachine): job = event.job - if job.id in self._active_jobs: - logging.warn('Duplicate job %s for worker %s', job.id, self.name()) + if job.artifact.basename() in self._active_jobs: + logging.warn('Worker %s already has job %s', self.name(), + job.artifact.basename()) if self._current_job_exec_response or self._current_job_cache_request: logging.warn('Caching not finished for %s', self._current_job.id) - self._active_jobs[job.id] = job + self._active_jobs[job.artifact.basename()] = job self._current_job = job logging.debug('WC: starting build: %s for %s' % @@ -530,7 +519,7 @@ class WorkerConnection(distbuild.StateMachine): ] msg = distbuild.message('exec-request', - id=job.id, + id=job.artifact.basename(), argv=argv, stdin_contents=distbuild.serialise_artifact(job.artifact, job.artifact.repo, @@ -599,7 +588,7 @@ class WorkerConnection(distbuild.StateMachine): # The job is no longer considered active, because the worker is # finished with it so we won't receive any more messages about it. - del self._active_jobs[job.id] + del self._active_jobs[job.artifact.basename()] def _request_job(self, event_source, event): distbuild.crash_point() |