diff options
author | Adam Coldrick <adam.coldrick@codethink.co.uk> | 2015-05-11 15:06:58 +0000 |
---|---|---|
committer | Baserock Gerrit <gerrit@baserock.org> | 2015-05-12 13:01:13 +0000 |
commit | 43fed1ea4c6c988a08bff1a2c61087116b4a1cb1 (patch) | |
tree | 4244afb360038f3350b153c98ebd132a0c1e833e /distbuild/worker_build_scheduler.py | |
parent | aa19fb9052492ade271add5b35a90eea7171dc01 (diff) | |
download | morph-43fed1ea4c6c988a08bff1a2c61087116b4a1cb1.tar.gz |
Revert "distbuild: Track worker jobs using artifact basename only"
This reverts commit 75ef3e9585091b463b60d2981b3b7283a2ea8eab.
It turns out that the JobQueue may need to handle more than one
build of the same artifact at once, as one may be in the process
of being cancelled when another build of the same artifact is
requested. So they do need an ID separate from the artifact ID.
Change-Id: Ifa0c06987795a4aebdadbd9927de27919377b0a2
Diffstat (limited to 'distbuild/worker_build_scheduler.py')
-rw-r--r-- | distbuild/worker_build_scheduler.py | 77 |
1 files changed, 48 insertions, 29 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 9397d5c9..ce2589d8 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -100,7 +100,8 @@ class _Disconnected(object): class Job(object): - def __init__(self, artifact, initiator_id): + def __init__(self, job_id, artifact, initiator_id): + self.id = job_id self.artifact = artifact self.initiators = [initiator_id] self.who = None # we don't know who's going to do this yet @@ -121,8 +122,8 @@ class Job(object): def set_state(self, state): assert state in ['queued', 'running', 'complete', 'failed'] - logging.debug('Setting job state for job %s to %s', - self.artifact.basename(), state) + logging.debug('Setting job state for job %s with id %s to %s', + self.artifact.basename(), self.id, state) self._state = state @@ -133,28 +134,39 @@ class JobQueue(object): self._owner = owner self._jobs = {} - def get(self, artifact_basename): - return self._jobs.get(artifact_basename, None) + def get_running_job_for_artifact(self, artifact_basename): + jobs = [job for job in self.running_jobs() + if job.artifact.basename() == artifact_basename] + if len(jobs) > 1: + logging.warn('More than one running job for %s', + artifact_basename) + if not jobs: + return None + return jobs[0] + + def get_job_for_id(self, id): + return self._jobs.get(id, None) def add(self, job): artifact_basename = job.artifact.basename() - if artifact_basename in self._jobs: - logging.error( - "Duplicate job for %s added to %s job queue, ignoring.", + if self.has_job_for_artifact(artifact_basename): + logging.info( + "Duplicate job for %s added to %s job queue.", artifact_basename, self._owner) - else: - self._jobs[artifact_basename] = job + self._jobs[job.id] = job def remove(self, job): - if job.artifact.basename() in self._jobs: - del self._jobs[job.artifact.basename()] + if job.id in self._jobs: + del self._jobs[job.id] else: logging.warning("Tried to remove a job that doesn't exist " "(%s)", job.artifact.basename()) - def __contains__(self, artifact_basename): - return artifact_basename in self._jobs + def has_job_for_artifact(self, artifact_basename): + for job in self: + if job.artifact.basename() == artifact_basename: + return True def __iter__(self): return self._jobs.itervalues() @@ -238,6 +250,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): logging.debug('WBQ: Setting up %s' % self) self._available_workers = [] self._jobs = JobQueue(owner='controller') + self._idgen = distbuild.IdentifierGenerator('Job') spec = [ # state, source, event_class, new_state, callback @@ -283,8 +296,9 @@ class WorkerBuildQueuer(distbuild.StateMachine): # If so, add our initiator id to the existing job # If not, create a job - if event.artifact.basename() in self._jobs: - job = self._jobs.get(event.artifact.basename()) + job = self._jobs.get_running_job_for_artifact( + event.artifact.basename()) + if job is not None: job.initiators.append(event.initiator_id) # Completed jobs are not tracked, so we can't tell here if the @@ -306,7 +320,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): self.mainloop.queue_event(WorkerConnection, progress) else: logging.debug('WBQ: Creating job for: %s' % event.artifact.name) - job = Job(event.artifact, event.initiator_id) + job = Job(self._idgen.next(), event.artifact, event.initiator_id) self._jobs.add(job) if self._available_workers: @@ -323,22 +337,27 @@ class WorkerBuildQueuer(distbuild.StateMachine): return False # not for us name = job.artifact.basename() + job_id = job.id - logging.debug('Checking whether to remove job %s', name) + logging.debug('Checking whether to remove job %s with job id %s', + name, job_id) if len(job.initiators) == 1: if job.running() or job.failed(): - logging.debug('NOT removing running job %s ' - '(WorkerConnection will cancel job)', name) + logging.debug('NOT removing running job %s with job id %s ' + '(WorkerConnection will cancel job)', + name, job_id) else: - logging.debug('Removing job %s with job id %s', name) + logging.debug('Removing job %s with job id %s', + name, job_id) return True else: # Don't cancel, but still remove this initiator from # the list of initiators - logging.debug('NOT removing job %s, other initiators want it: ' - '%s', name, [i for i in job.initiators if i != - event.initiator_id]) + logging.debug('NOT removing job %s with job id %s ' + 'other initiators want it: %s', name, job_id, + [i for i in job.initiators + if i != event.initiator_id]) job.initiators.remove(event.initiator_id) @@ -480,7 +499,7 @@ class WorkerConnection(distbuild.StateMachine): 'WC: Cancelling job %s, currently building on %s', job.artifact.basename(), self.name()) - msg = distbuild.message('exec-cancel', id=job.artifact.basename()) + msg = distbuild.message('exec-cancel', id=job.id) self._jm.send(msg) self.mainloop.queue_event(self, _BuildCancelled()) @@ -496,9 +515,9 @@ class WorkerConnection(distbuild.StateMachine): # There's nothing that we can really do if the controller goes nuts # (there's no 'reject job' message), but we can at least log warnings. - if job.artifact.basename() in self._jobs: + if self._jobs.has_job_for_artifact(job.artifact.basename()): logging.warn('Worker %s already has job %s', self.name(), - job.artifact.basename()) + job.id) running_jobs = self._jobs.running_jobs() if len(running_jobs) != 0: @@ -525,7 +544,7 @@ class WorkerConnection(distbuild.StateMachine): ] msg = distbuild.message('exec-request', - id=job.artifact.basename(), + id=job.id, argv=argv, stdin_contents=distbuild.encode_artifact_reference(job.artifact), ) @@ -553,7 +572,7 @@ class WorkerConnection(distbuild.StateMachine): } handler = handlers[event.msg['type']] - job = self._jobs.get(event.msg['id']) + job = self._jobs.get_job_for_id(event.msg['id']) if job: handler(event.msg, job) |