From 43fed1ea4c6c988a08bff1a2c61087116b4a1cb1 Mon Sep 17 00:00:00 2001 From: Adam Coldrick Date: Mon, 11 May 2015 15:06:58 +0000 Subject: Revert "distbuild: Track worker jobs using artifact basename only" This reverts commit 75ef3e9585091b463b60d2981b3b7283a2ea8eab. It turns out that the JobQueue may need to handle more than one build of the same artifact at once, as one may be in the process of being cancelled when another build of the same artifact is requested. So they do need an ID separate from the artifact ID. Change-Id: Ifa0c06987795a4aebdadbd9927de27919377b0a2 --- distbuild/worker_build_scheduler.py | 77 +++++++++++++++++++++++-------------- 1 file changed, 48 insertions(+), 29 deletions(-) diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 9397d5c9..ce2589d8 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -100,7 +100,8 @@ class _Disconnected(object): class Job(object): - def __init__(self, artifact, initiator_id): + def __init__(self, job_id, artifact, initiator_id): + self.id = job_id self.artifact = artifact self.initiators = [initiator_id] self.who = None # we don't know who's going to do this yet @@ -121,8 +122,8 @@ class Job(object): def set_state(self, state): assert state in ['queued', 'running', 'complete', 'failed'] - logging.debug('Setting job state for job %s to %s', - self.artifact.basename(), state) + logging.debug('Setting job state for job %s with id %s to %s', + self.artifact.basename(), self.id, state) self._state = state @@ -133,28 +134,39 @@ class JobQueue(object): self._owner = owner self._jobs = {} - def get(self, artifact_basename): - return self._jobs.get(artifact_basename, None) + def get_running_job_for_artifact(self, artifact_basename): + jobs = [job for job in self.running_jobs() + if job.artifact.basename() == artifact_basename] + if len(jobs) > 1: + logging.warn('More than one running job for %s', + artifact_basename) + if not jobs: + return None + return jobs[0] + + def get_job_for_id(self, id): + return self._jobs.get(id, None) def add(self, job): artifact_basename = job.artifact.basename() - if artifact_basename in self._jobs: - logging.error( - "Duplicate job for %s added to %s job queue, ignoring.", + if self.has_job_for_artifact(artifact_basename): + logging.info( + "Duplicate job for %s added to %s job queue.", artifact_basename, self._owner) - else: - self._jobs[artifact_basename] = job + self._jobs[job.id] = job def remove(self, job): - if job.artifact.basename() in self._jobs: - del self._jobs[job.artifact.basename()] + if job.id in self._jobs: + del self._jobs[job.id] else: logging.warning("Tried to remove a job that doesn't exist " "(%s)", job.artifact.basename()) - def __contains__(self, artifact_basename): - return artifact_basename in self._jobs + def has_job_for_artifact(self, artifact_basename): + for job in self: + if job.artifact.basename() == artifact_basename: + return True def __iter__(self): return self._jobs.itervalues() @@ -238,6 +250,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): logging.debug('WBQ: Setting up %s' % self) self._available_workers = [] self._jobs = JobQueue(owner='controller') + self._idgen = distbuild.IdentifierGenerator('Job') spec = [ # state, source, event_class, new_state, callback @@ -283,8 +296,9 @@ class WorkerBuildQueuer(distbuild.StateMachine): # If so, add our initiator id to the existing job # If not, create a job - if event.artifact.basename() in self._jobs: - job = self._jobs.get(event.artifact.basename()) + job = self._jobs.get_running_job_for_artifact( + event.artifact.basename()) + if job is not None: job.initiators.append(event.initiator_id) # Completed jobs are not tracked, so we can't tell here if the @@ -306,7 +320,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): self.mainloop.queue_event(WorkerConnection, progress) else: logging.debug('WBQ: Creating job for: %s' % event.artifact.name) - job = Job(event.artifact, event.initiator_id) + job = Job(self._idgen.next(), event.artifact, event.initiator_id) self._jobs.add(job) if self._available_workers: @@ -323,22 +337,27 @@ class WorkerBuildQueuer(distbuild.StateMachine): return False # not for us name = job.artifact.basename() + job_id = job.id - logging.debug('Checking whether to remove job %s', name) + logging.debug('Checking whether to remove job %s with job id %s', + name, job_id) if len(job.initiators) == 1: if job.running() or job.failed(): - logging.debug('NOT removing running job %s ' - '(WorkerConnection will cancel job)', name) + logging.debug('NOT removing running job %s with job id %s ' + '(WorkerConnection will cancel job)', + name, job_id) else: - logging.debug('Removing job %s with job id %s', name) + logging.debug('Removing job %s with job id %s', + name, job_id) return True else: # Don't cancel, but still remove this initiator from # the list of initiators - logging.debug('NOT removing job %s, other initiators want it: ' - '%s', name, [i for i in job.initiators if i != - event.initiator_id]) + logging.debug('NOT removing job %s with job id %s ' + 'other initiators want it: %s', name, job_id, + [i for i in job.initiators + if i != event.initiator_id]) job.initiators.remove(event.initiator_id) @@ -480,7 +499,7 @@ class WorkerConnection(distbuild.StateMachine): 'WC: Cancelling job %s, currently building on %s', job.artifact.basename(), self.name()) - msg = distbuild.message('exec-cancel', id=job.artifact.basename()) + msg = distbuild.message('exec-cancel', id=job.id) self._jm.send(msg) self.mainloop.queue_event(self, _BuildCancelled()) @@ -496,9 +515,9 @@ class WorkerConnection(distbuild.StateMachine): # There's nothing that we can really do if the controller goes nuts # (there's no 'reject job' message), but we can at least log warnings. - if job.artifact.basename() in self._jobs: + if self._jobs.has_job_for_artifact(job.artifact.basename()): logging.warn('Worker %s already has job %s', self.name(), - job.artifact.basename()) + job.id) running_jobs = self._jobs.running_jobs() if len(running_jobs) != 0: @@ -525,7 +544,7 @@ class WorkerConnection(distbuild.StateMachine): ] msg = distbuild.message('exec-request', - id=job.artifact.basename(), + id=job.id, argv=argv, stdin_contents=distbuild.encode_artifact_reference(job.artifact), ) @@ -553,7 +572,7 @@ class WorkerConnection(distbuild.StateMachine): } handler = handlers[event.msg['type']] - job = self._jobs.get(event.msg['id']) + job = self._jobs.get_job_for_id(event.msg['id']) if job: handler(event.msg, job) -- cgit v1.2.1