summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Coldrick <adam.coldrick@codethink.co.uk>2015-05-11 15:06:58 +0000
committerBaserock Gerrit <gerrit@baserock.org>2015-05-12 13:01:13 +0000
commit43fed1ea4c6c988a08bff1a2c61087116b4a1cb1 (patch)
tree4244afb360038f3350b153c98ebd132a0c1e833e
parentaa19fb9052492ade271add5b35a90eea7171dc01 (diff)
downloadmorph-43fed1ea4c6c988a08bff1a2c61087116b4a1cb1.tar.gz
Revert "distbuild: Track worker jobs using artifact basename only"
This reverts commit 75ef3e9585091b463b60d2981b3b7283a2ea8eab. It turns out that the JobQueue may need to handle more than one build of the same artifact at once, as one may be in the process of being cancelled when another build of the same artifact is requested. So they do need an ID separate from the artifact ID. Change-Id: Ifa0c06987795a4aebdadbd9927de27919377b0a2
-rw-r--r--distbuild/worker_build_scheduler.py77
1 files changed, 48 insertions, 29 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py
index 9397d5c9..ce2589d8 100644
--- a/distbuild/worker_build_scheduler.py
+++ b/distbuild/worker_build_scheduler.py
@@ -100,7 +100,8 @@ class _Disconnected(object):
class Job(object):
- def __init__(self, artifact, initiator_id):
+ def __init__(self, job_id, artifact, initiator_id):
+ self.id = job_id
self.artifact = artifact
self.initiators = [initiator_id]
self.who = None # we don't know who's going to do this yet
@@ -121,8 +122,8 @@ class Job(object):
def set_state(self, state):
assert state in ['queued', 'running', 'complete', 'failed']
- logging.debug('Setting job state for job %s to %s',
- self.artifact.basename(), state)
+ logging.debug('Setting job state for job %s with id %s to %s',
+ self.artifact.basename(), self.id, state)
self._state = state
@@ -133,28 +134,39 @@ class JobQueue(object):
self._owner = owner
self._jobs = {}
- def get(self, artifact_basename):
- return self._jobs.get(artifact_basename, None)
+ def get_running_job_for_artifact(self, artifact_basename):
+ jobs = [job for job in self.running_jobs()
+ if job.artifact.basename() == artifact_basename]
+ if len(jobs) > 1:
+ logging.warn('More than one running job for %s',
+ artifact_basename)
+ if not jobs:
+ return None
+ return jobs[0]
+
+ def get_job_for_id(self, id):
+ return self._jobs.get(id, None)
def add(self, job):
artifact_basename = job.artifact.basename()
- if artifact_basename in self._jobs:
- logging.error(
- "Duplicate job for %s added to %s job queue, ignoring.",
+ if self.has_job_for_artifact(artifact_basename):
+ logging.info(
+ "Duplicate job for %s added to %s job queue.",
artifact_basename, self._owner)
- else:
- self._jobs[artifact_basename] = job
+ self._jobs[job.id] = job
def remove(self, job):
- if job.artifact.basename() in self._jobs:
- del self._jobs[job.artifact.basename()]
+ if job.id in self._jobs:
+ del self._jobs[job.id]
else:
logging.warning("Tried to remove a job that doesn't exist "
"(%s)", job.artifact.basename())
- def __contains__(self, artifact_basename):
- return artifact_basename in self._jobs
+ def has_job_for_artifact(self, artifact_basename):
+ for job in self:
+ if job.artifact.basename() == artifact_basename:
+ return True
def __iter__(self):
return self._jobs.itervalues()
@@ -238,6 +250,7 @@ class WorkerBuildQueuer(distbuild.StateMachine):
logging.debug('WBQ: Setting up %s' % self)
self._available_workers = []
self._jobs = JobQueue(owner='controller')
+ self._idgen = distbuild.IdentifierGenerator('Job')
spec = [
# state, source, event_class, new_state, callback
@@ -283,8 +296,9 @@ class WorkerBuildQueuer(distbuild.StateMachine):
# If so, add our initiator id to the existing job
# If not, create a job
- if event.artifact.basename() in self._jobs:
- job = self._jobs.get(event.artifact.basename())
+ job = self._jobs.get_running_job_for_artifact(
+ event.artifact.basename())
+ if job is not None:
job.initiators.append(event.initiator_id)
# Completed jobs are not tracked, so we can't tell here if the
@@ -306,7 +320,7 @@ class WorkerBuildQueuer(distbuild.StateMachine):
self.mainloop.queue_event(WorkerConnection, progress)
else:
logging.debug('WBQ: Creating job for: %s' % event.artifact.name)
- job = Job(event.artifact, event.initiator_id)
+ job = Job(self._idgen.next(), event.artifact, event.initiator_id)
self._jobs.add(job)
if self._available_workers:
@@ -323,22 +337,27 @@ class WorkerBuildQueuer(distbuild.StateMachine):
return False # not for us
name = job.artifact.basename()
+ job_id = job.id
- logging.debug('Checking whether to remove job %s', name)
+ logging.debug('Checking whether to remove job %s with job id %s',
+ name, job_id)
if len(job.initiators) == 1:
if job.running() or job.failed():
- logging.debug('NOT removing running job %s '
- '(WorkerConnection will cancel job)', name)
+ logging.debug('NOT removing running job %s with job id %s '
+ '(WorkerConnection will cancel job)',
+ name, job_id)
else:
- logging.debug('Removing job %s with job id %s', name)
+ logging.debug('Removing job %s with job id %s',
+ name, job_id)
return True
else:
# Don't cancel, but still remove this initiator from
# the list of initiators
- logging.debug('NOT removing job %s, other initiators want it: '
- '%s', name, [i for i in job.initiators if i !=
- event.initiator_id])
+ logging.debug('NOT removing job %s with job id %s '
+ 'other initiators want it: %s', name, job_id,
+ [i for i in job.initiators
+ if i != event.initiator_id])
job.initiators.remove(event.initiator_id)
@@ -480,7 +499,7 @@ class WorkerConnection(distbuild.StateMachine):
'WC: Cancelling job %s, currently building on %s',
job.artifact.basename(), self.name())
- msg = distbuild.message('exec-cancel', id=job.artifact.basename())
+ msg = distbuild.message('exec-cancel', id=job.id)
self._jm.send(msg)
self.mainloop.queue_event(self, _BuildCancelled())
@@ -496,9 +515,9 @@ class WorkerConnection(distbuild.StateMachine):
# There's nothing that we can really do if the controller goes nuts
# (there's no 'reject job' message), but we can at least log warnings.
- if job.artifact.basename() in self._jobs:
+ if self._jobs.has_job_for_artifact(job.artifact.basename()):
logging.warn('Worker %s already has job %s', self.name(),
- job.artifact.basename())
+ job.id)
running_jobs = self._jobs.running_jobs()
if len(running_jobs) != 0:
@@ -525,7 +544,7 @@ class WorkerConnection(distbuild.StateMachine):
]
msg = distbuild.message('exec-request',
- id=job.artifact.basename(),
+ id=job.id,
argv=argv,
stdin_contents=distbuild.encode_artifact_reference(job.artifact),
)
@@ -553,7 +572,7 @@ class WorkerConnection(distbuild.StateMachine):
}
handler = handlers[event.msg['type']]
- job = self._jobs.get(event.msg['id'])
+ job = self._jobs.get_job_for_id(event.msg['id'])
if job:
handler(event.msg, job)