summaryrefslogtreecommitdiff
path: root/distbuild
diff options
context:
space:
mode:
authorRichard Ipsum <richard.ipsum@codethink.co.uk>2014-04-23 14:55:14 (GMT)
committerRichard Ipsum <richard.ipsum@codethink.co.uk>2014-04-23 16:06:38 (GMT)
commita479c52273f624afb475e59feabf7f1b977c8679 (patch)
tree5b0aa803356a6bc6db92d1a78a5733d656e2559d /distbuild
parentc00c828938918701dd7c3566c765c3a9e10b2253 (diff)
downloadmorph-a479c52273f624afb475e59feabf7f1b977c8679.tar.gz
WorkerBuildQueuer: replace request queue with jobs
Diffstat (limited to 'distbuild')
-rw-r--r--distbuild/worker_build_scheduler.py92
1 files changed, 67 insertions, 25 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py
index de8c95b..5fb1166 100644
--- a/distbuild/worker_build_scheduler.py
+++ b/distbuild/worker_build_scheduler.py
@@ -169,8 +169,9 @@ class WorkerBuildQueuer(distbuild.StateMachine):
distbuild.crash_point()
logging.debug('WBQ: Setting up %s' % self)
- self._request_queue = []
self._available_workers = []
+ self._jobs = Jobs(
+ distbuild.IdentifierGenerator('WorkerBuildQueuerJob'))
spec = [
# state, source, event_class, new_state, callback
@@ -182,45 +183,86 @@ class WorkerBuildQueuer(distbuild.StateMachine):
]
self.add_transitions(spec)
+
+
def _handle_request(self, event_source, event):
distbuild.crash_point()
- logging.debug('WBQ: Adding request to queue: %s' % event.artifact.name)
- self._request_queue.append(event)
- logging.debug(
- 'WBQ: %d available workers and %d requests queued' %
- (len(self._available_workers),
- len(self._request_queue)))
- if self._available_workers:
- self._give_job()
+ logging.debug('Handling build request for %s' % event.initiator_id)
+ logging.debug('Current jobs: %s' % self._jobs)
+ logging.debug('Workers available: %d' % len(self._available_workers))
+
+ # Have we already made a job for this thing?
+ # If so, add our initiator id to the existing job
+ # If not, create a job
+
+ if self._jobs.exists(event.artifact.basename()):
+ job = self._jobs.get(event.artifact.basename())
+ job.initiators.append(event.initiator_id)
+
+ if job.is_building:
+ logging.debug('Worker build step already started: %s' %
+ event.artifact.basename())
+ progress = WorkerBuildStepAlreadyStarted(event.initiator_id,
+ event.artifact.cache_key, job.who.name())
+ else:
+ logging.debug('Job created but not building yet '
+ '(waiting for a worker to become available): %s' %
+ event.artifact.basename())
+ progress = WorkerBuildWaiting(event.initiator_id,
+ event.artifact.cache_key)
+
+ self.mainloop.queue_event(WorkerConnection, progress)
+ else:
+ logging.debug('WBQ: Creating job for: %s' % event.artifact.name)
+ job = self._jobs.create(event.artifact, event.initiator_id)
+
+ if self._available_workers:
+ self._give_job(job)
+ else:
+ progress = WorkerBuildWaiting(event.initiator_id,
+ event.artifact.cache_key)
+ self.mainloop.queue_event(WorkerConnection, progress)
def _handle_cancel(self, event_source, worker_cancel_pending):
- for request in [r for r in self._request_queue if
- r.initiator_id == worker_cancel_pending.initiator_id]:
- logging.debug('WBQ: Removing request from queue: %s',
- request.artifact.name)
- self._request_queue.remove(request)
+ # TODO: this probably needs to check whether any initiators
+ # care about this thing
+
+ pass
def _handle_worker(self, event_source, event):
distbuild.crash_point()
+ who = event.who
+ last_job = who.job() # the job this worker's just completed
+
+ if last_job:
+ logging.debug('%s wants new job, just did %s' %
+ (who.name(), last_job.artifact.basename()))
+
+ self._jobs.remove(last_job)
+ else:
+ logging.debug('%s wants its first job' % who.name())
+
logging.debug('WBQ: Adding worker to queue: %s' % event.who)
self._available_workers.append(event)
- logging.debug(
- 'WBQ: %d available workers and %d requests queued' %
- (len(self._available_workers),
- len(self._request_queue)))
- if self._request_queue:
- self._give_job()
+ logging.debug('Current jobs: %s' % self._jobs)
+ logging.debug('Workers available: %d' % len(self._available_workers))
+
+ job = self._jobs.get_next_job()
+
+ if job:
+ self._give_job(job)
- def _give_job(self):
- request = self._request_queue.pop(0)
+ def _give_job(self, job):
worker = self._available_workers.pop(0)
+ job.who = worker.who
+
logging.debug(
'WBQ: Giving %s to %s' %
- (request.artifact.name, worker.who.name()))
- self.mainloop.queue_event(worker.who, _HaveAJob(request.artifact,
- request.initiator_id))
+ (job.artifact.name, worker.who.name()))
+
+ self.mainloop.queue_event(worker.who, _HaveAJob(job))
class WorkerConnection(distbuild.StateMachine):