summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Coldrick <adam.coldrick@codethink.co.uk>2015-05-11 15:06:58 +0000
committerAdam Coldrick <adam.coldrick@codethink.co.uk>2015-05-11 15:23:16 +0000
commit504861cb65f2d9442e3a73f83dc8ae4b9631f050 (patch)
treeb22505617ac8bd7f59a6133ce18bbbf928b8a57b
parent5aa34de203939b69d9a2f4c0af0f65a751ff66f3 (diff)
downloadmorph-504861cb65f2d9442e3a73f83dc8ae4b9631f050.tar.gz
Revert "distbuild: Track worker jobs using artifact basename only"
This reverts commit 75ef3e9585091b463b60d2981b3b7283a2ea8eab. Change-Id: Ifa0c06987795a4aebdadbd9927de27919377b0a2
-rw-r--r--distbuild/worker_build_scheduler.py77
1 files changed, 48 insertions, 29 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py
index 71e1c3ef..24027b30 100644
--- a/distbuild/worker_build_scheduler.py
+++ b/distbuild/worker_build_scheduler.py
@@ -100,7 +100,8 @@ class _Disconnected(object):
class Job(object):
- def __init__(self, artifact, initiator_id):
+ def __init__(self, job_id, artifact, initiator_id):
+ self.id = job_id
self.artifact = artifact
self.initiators = [initiator_id]
self.who = None # we don't know who's going to do this yet
@@ -121,8 +122,8 @@ class Job(object):
def set_state(self, state):
assert state in ['queued', 'running', 'complete', 'failed']
- logging.debug('Setting job state for job %s to %s',
- self.artifact.basename(), state)
+ logging.debug('Setting job state for job %s with id %s to %s',
+ self.artifact.basename(), self.id, state)
self._state = state
@@ -133,28 +134,39 @@ class JobQueue(object):
self._owner = owner
self._jobs = {}
- def get(self, artifact_basename):
- return self._jobs.get(artifact_basename, None)
+ def get_running_job_for_artifact(self, artifact_basename):
+ jobs = [job for job in self.running_jobs()
+ if job.artifact.basename() == artifact_basename]
+ if len(jobs) > 1:
+ logging.warn('More than one running job for %s',
+ artifact_basename)
+ if not jobs:
+ return None
+ return jobs[0]
+
+ def get_job_for_id(self, id):
+ return self._jobs.get(id, None)
def add(self, job):
artifact_basename = job.artifact.basename()
- if artifact_basename in self._jobs:
- logging.error(
- "Duplicate job for %s added to %s job queue, ignoring.",
+ if self.has_job_for_artifact(artifact_basename):
+ logging.info(
+ "Duplicate job for %s added to %s job queue.",
artifact_basename, self._owner)
- else:
- self._jobs[artifact_basename] = job
+ self._jobs[job.id] = job
def remove(self, job):
- if job.artifact.basename() in self._jobs:
- del self._jobs[job.artifact.basename()]
+ if job.id in self._jobs:
+ del self._jobs[job.id]
else:
logging.warning("Tried to remove a job that doesn't exist "
"(%s)", job.artifact.basename())
- def __contains__(self, artifact_basename):
- return artifact_basename in self._jobs
+ def has_job_for_artifact(self, artifact_basename):
+ for job in self:
+ if job.artifact.basename() == artifact_basename:
+ return True
def __iter__(self):
return self._jobs.itervalues()
@@ -238,6 +250,7 @@ class WorkerBuildQueuer(distbuild.StateMachine):
logging.debug('WBQ: Setting up %s' % self)
self._available_workers = []
self._jobs = JobQueue(owner='controller')
+ self._idgen = distbuild.IdentifierGenerator('Job')
spec = [
# state, source, event_class, new_state, callback
@@ -283,8 +296,9 @@ class WorkerBuildQueuer(distbuild.StateMachine):
# If so, add our initiator id to the existing job
# If not, create a job
- if event.artifact.basename() in self._jobs:
- job = self._jobs.get(event.artifact.basename())
+ job = self._jobs.get_running_job_for_artifact(
+ event.artifact.basename())
+ if job is not None:
job.initiators.append(event.initiator_id)
# Completed jobs are not tracked, so we can't tell here if the
@@ -306,7 +320,7 @@ class WorkerBuildQueuer(distbuild.StateMachine):
self.mainloop.queue_event(WorkerConnection, progress)
else:
logging.debug('WBQ: Creating job for: %s' % event.artifact.name)
- job = Job(event.artifact, event.initiator_id)
+ job = Job(self._idgen.next(), event.artifact, event.initiator_id)
self._jobs.add(job)
if self._available_workers:
@@ -323,22 +337,27 @@ class WorkerBuildQueuer(distbuild.StateMachine):
return False # not for us
name = job.artifact.basename()
+ job_id = job.id
- logging.debug('Checking whether to remove job %s', name)
+ logging.debug('Checking whether to remove job %s with job id %s',
+ name, job_id)
if len(job.initiators) == 1:
if job.running() or job.failed():
- logging.debug('NOT removing running job %s '
- '(WorkerConnection will cancel job)', name)
+ logging.debug('NOT removing running job %s with job id %s '
+ '(WorkerConnection will cancel job)',
+ name, job_id)
else:
- logging.debug('Removing job %s with job id %s', name)
+ logging.debug('Removing job %s with job id %s',
+ name, job_id)
return True
else:
# Don't cancel, but still remove this initiator from
# the list of initiators
- logging.debug('NOT removing job %s, other initiators want it: '
- '%s', name, [i for i in job.initiators if i !=
- event.initiator_id])
+ logging.debug('NOT removing job %s with job id %s '
+ 'other initiators want it: %s', name, job_id,
+ [i for i in job.initiators
+ if i != event.initiator_id])
job.initiators.remove(event.initiator_id)
@@ -480,7 +499,7 @@ class WorkerConnection(distbuild.StateMachine):
'WC: Cancelling job %s, currently building on %s',
job.artifact.basename(), self.name())
- msg = distbuild.message('exec-cancel', id=job.artifact.basename())
+ msg = distbuild.message('exec-cancel', id=job.id)
self._jm.send(msg)
self.mainloop.queue_event(self, _BuildCancelled())
@@ -496,9 +515,9 @@ class WorkerConnection(distbuild.StateMachine):
# There's nothing that we can really do if the controller goes nuts
# (there's no 'reject job' message), but we can at least log warnings.
- if job.artifact.basename() in self._jobs:
+ if self._jobs.has_job_for_artifact(job.artifact.basename()):
logging.warn('Worker %s already has job %s', self.name(),
- job.artifact.basename())
+ job.id)
running_jobs = self._jobs.running_jobs()
if len(running_jobs) != 0:
@@ -525,7 +544,7 @@ class WorkerConnection(distbuild.StateMachine):
]
msg = distbuild.message('exec-request',
- id=job.artifact.basename(),
+ id=job.id,
argv=argv,
stdin_contents=distbuild.serialise_artifact(job.artifact,
job.artifact.repo,
@@ -555,7 +574,7 @@ class WorkerConnection(distbuild.StateMachine):
}
handler = handlers[event.msg['type']]
- job = self._jobs.get(event.msg['id'])
+ job = self._jobs.get_job_for_id(event.msg['id'])
if job:
handler(event.msg, job)