diff options
Diffstat (limited to 'distbuild')
-rw-r--r-- | distbuild/worker_build_scheduler.py | 77 |
1 files changed, 48 insertions, 29 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 9397d5c9..ce2589d8 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -100,7 +100,8 @@ class _Disconnected(object): class Job(object): - def __init__(self, artifact, initiator_id): + def __init__(self, job_id, artifact, initiator_id): + self.id = job_id self.artifact = artifact self.initiators = [initiator_id] self.who = None # we don't know who's going to do this yet @@ -121,8 +122,8 @@ class Job(object): def set_state(self, state): assert state in ['queued', 'running', 'complete', 'failed'] - logging.debug('Setting job state for job %s to %s', - self.artifact.basename(), state) + logging.debug('Setting job state for job %s with id %s to %s', + self.artifact.basename(), self.id, state) self._state = state @@ -133,28 +134,39 @@ class JobQueue(object): self._owner = owner self._jobs = {} - def get(self, artifact_basename): - return self._jobs.get(artifact_basename, None) + def get_running_job_for_artifact(self, artifact_basename): + jobs = [job for job in self.running_jobs() + if job.artifact.basename() == artifact_basename] + if len(jobs) > 1: + logging.warn('More than one running job for %s', + artifact_basename) + if not jobs: + return None + return jobs[0] + + def get_job_for_id(self, id): + return self._jobs.get(id, None) def add(self, job): artifact_basename = job.artifact.basename() - if artifact_basename in self._jobs: - logging.error( - "Duplicate job for %s added to %s job queue, ignoring.", + if self.has_job_for_artifact(artifact_basename): + logging.info( + "Duplicate job for %s added to %s job queue.", artifact_basename, self._owner) - else: - self._jobs[artifact_basename] = job + self._jobs[job.id] = job def remove(self, job): - if job.artifact.basename() in self._jobs: - del self._jobs[job.artifact.basename()] + if job.id in self._jobs: + del self._jobs[job.id] else: logging.warning("Tried to remove a job that doesn't exist " "(%s)", job.artifact.basename()) - def __contains__(self, artifact_basename): - return artifact_basename in self._jobs + def has_job_for_artifact(self, artifact_basename): + for job in self: + if job.artifact.basename() == artifact_basename: + return True def __iter__(self): return self._jobs.itervalues() @@ -238,6 +250,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): logging.debug('WBQ: Setting up %s' % self) self._available_workers = [] self._jobs = JobQueue(owner='controller') + self._idgen = distbuild.IdentifierGenerator('Job') spec = [ # state, source, event_class, new_state, callback @@ -283,8 +296,9 @@ class WorkerBuildQueuer(distbuild.StateMachine): # If so, add our initiator id to the existing job # If not, create a job - if event.artifact.basename() in self._jobs: - job = self._jobs.get(event.artifact.basename()) + job = self._jobs.get_running_job_for_artifact( + event.artifact.basename()) + if job is not None: job.initiators.append(event.initiator_id) # Completed jobs are not tracked, so we can't tell here if the @@ -306,7 +320,7 @@ class WorkerBuildQueuer(distbuild.StateMachine): self.mainloop.queue_event(WorkerConnection, progress) else: logging.debug('WBQ: Creating job for: %s' % event.artifact.name) - job = Job(event.artifact, event.initiator_id) + job = Job(self._idgen.next(), event.artifact, event.initiator_id) self._jobs.add(job) if self._available_workers: @@ -323,22 +337,27 @@ class WorkerBuildQueuer(distbuild.StateMachine): return False # not for us name = job.artifact.basename() + job_id = job.id - logging.debug('Checking whether to remove job %s', name) + logging.debug('Checking whether to remove job %s with job id %s', + name, job_id) if len(job.initiators) == 1: if job.running() or job.failed(): - logging.debug('NOT removing running job %s ' - '(WorkerConnection will cancel job)', name) + logging.debug('NOT removing running job %s with job id %s ' + '(WorkerConnection will cancel job)', + name, job_id) else: - logging.debug('Removing job %s with job id %s', name) + logging.debug('Removing job %s with job id %s', + name, job_id) return True else: # Don't cancel, but still remove this initiator from # the list of initiators - logging.debug('NOT removing job %s, other initiators want it: ' - '%s', name, [i for i in job.initiators if i != - event.initiator_id]) + logging.debug('NOT removing job %s with job id %s ' + 'other initiators want it: %s', name, job_id, + [i for i in job.initiators + if i != event.initiator_id]) job.initiators.remove(event.initiator_id) @@ -480,7 +499,7 @@ class WorkerConnection(distbuild.StateMachine): 'WC: Cancelling job %s, currently building on %s', job.artifact.basename(), self.name()) - msg = distbuild.message('exec-cancel', id=job.artifact.basename()) + msg = distbuild.message('exec-cancel', id=job.id) self._jm.send(msg) self.mainloop.queue_event(self, _BuildCancelled()) @@ -496,9 +515,9 @@ class WorkerConnection(distbuild.StateMachine): # There's nothing that we can really do if the controller goes nuts # (there's no 'reject job' message), but we can at least log warnings. - if job.artifact.basename() in self._jobs: + if self._jobs.has_job_for_artifact(job.artifact.basename()): logging.warn('Worker %s already has job %s', self.name(), - job.artifact.basename()) + job.id) running_jobs = self._jobs.running_jobs() if len(running_jobs) != 0: @@ -525,7 +544,7 @@ class WorkerConnection(distbuild.StateMachine): ] msg = distbuild.message('exec-request', - id=job.artifact.basename(), + id=job.id, argv=argv, stdin_contents=distbuild.encode_artifact_reference(job.artifact), ) @@ -553,7 +572,7 @@ class WorkerConnection(distbuild.StateMachine): } handler = handlers[event.msg['type']] - job = self._jobs.get(event.msg['id']) + job = self._jobs.get_job_for_id(event.msg['id']) if job: handler(event.msg, job) |