From a479c52273f624afb475e59feabf7f1b977c8679 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Wed, 23 Apr 2014 15:55:14 +0100 Subject: WorkerBuildQueuer: replace request queue with jobs --- distbuild/worker_build_scheduler.py | 92 +++++++++++++++++++++++++++---------- 1 file changed, 67 insertions(+), 25 deletions(-) (limited to 'distbuild') diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index de8c95b5..5fb1166b 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -169,8 +169,9 @@ class WorkerBuildQueuer(distbuild.StateMachine): distbuild.crash_point() logging.debug('WBQ: Setting up %s' % self) - self._request_queue = [] self._available_workers = [] + self._jobs = Jobs( + distbuild.IdentifierGenerator('WorkerBuildQueuerJob')) spec = [ # state, source, event_class, new_state, callback @@ -182,45 +183,86 @@ class WorkerBuildQueuer(distbuild.StateMachine): ] self.add_transitions(spec) + + def _handle_request(self, event_source, event): distbuild.crash_point() - logging.debug('WBQ: Adding request to queue: %s' % event.artifact.name) - self._request_queue.append(event) - logging.debug( - 'WBQ: %d available workers and %d requests queued' % - (len(self._available_workers), - len(self._request_queue))) - if self._available_workers: - self._give_job() + logging.debug('Handling build request for %s' % event.initiator_id) + logging.debug('Current jobs: %s' % self._jobs) + logging.debug('Workers available: %d' % len(self._available_workers)) + + # Have we already made a job for this thing? + # If so, add our initiator id to the existing job + # If not, create a job + + if self._jobs.exists(event.artifact.basename()): + job = self._jobs.get(event.artifact.basename()) + job.initiators.append(event.initiator_id) + + if job.is_building: + logging.debug('Worker build step already started: %s' % + event.artifact.basename()) + progress = WorkerBuildStepAlreadyStarted(event.initiator_id, + event.artifact.cache_key, job.who.name()) + else: + logging.debug('Job created but not building yet ' + '(waiting for a worker to become available): %s' % + event.artifact.basename()) + progress = WorkerBuildWaiting(event.initiator_id, + event.artifact.cache_key) + + self.mainloop.queue_event(WorkerConnection, progress) + else: + logging.debug('WBQ: Creating job for: %s' % event.artifact.name) + job = self._jobs.create(event.artifact, event.initiator_id) + + if self._available_workers: + self._give_job(job) + else: + progress = WorkerBuildWaiting(event.initiator_id, + event.artifact.cache_key) + self.mainloop.queue_event(WorkerConnection, progress) def _handle_cancel(self, event_source, worker_cancel_pending): - for request in [r for r in self._request_queue if - r.initiator_id == worker_cancel_pending.initiator_id]: - logging.debug('WBQ: Removing request from queue: %s', - request.artifact.name) - self._request_queue.remove(request) + # TODO: this probably needs to check whether any initiators + # care about this thing + + pass def _handle_worker(self, event_source, event): distbuild.crash_point() + who = event.who + last_job = who.job() # the job this worker's just completed + + if last_job: + logging.debug('%s wants new job, just did %s' % + (who.name(), last_job.artifact.basename())) + + self._jobs.remove(last_job) + else: + logging.debug('%s wants its first job' % who.name()) + logging.debug('WBQ: Adding worker to queue: %s' % event.who) self._available_workers.append(event) - logging.debug( - 'WBQ: %d available workers and %d requests queued' % - (len(self._available_workers), - len(self._request_queue))) - if self._request_queue: - self._give_job() + logging.debug('Current jobs: %s' % self._jobs) + logging.debug('Workers available: %d' % len(self._available_workers)) + + job = self._jobs.get_next_job() + + if job: + self._give_job(job) - def _give_job(self): - request = self._request_queue.pop(0) + def _give_job(self, job): worker = self._available_workers.pop(0) + job.who = worker.who + logging.debug( 'WBQ: Giving %s to %s' % - (request.artifact.name, worker.who.name())) - self.mainloop.queue_event(worker.who, _HaveAJob(request.artifact, - request.initiator_id)) + (job.artifact.name, worker.who.name())) + + self.mainloop.queue_event(worker.who, _HaveAJob(job)) class WorkerConnection(distbuild.StateMachine): -- cgit v1.2.1