diff options
Diffstat (limited to 'distbuild/worker_build_scheduler.py')
-rw-r--r-- | distbuild/worker_build_scheduler.py | 271 |
1 files changed, 177 insertions, 94 deletions
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index fc5849b3..83873d74 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -33,20 +33,30 @@ class WorkerBuildRequest(object): self.artifact = artifact self.initiator_id = initiator_id - class WorkerCancelPending(object): def __init__(self, initiator_id): self.initiator_id = initiator_id - class WorkerBuildStepStarted(object): + def __init__(self, initiators, cache_key, worker_name): + self.initiators = initiators + self.artifact_cache_key = cache_key + self.worker_name = worker_name + +class WorkerBuildStepAlreadyStarted(object): + def __init__(self, initiator_id, cache_key, worker_name): self.initiator_id = initiator_id self.artifact_cache_key = cache_key self.worker_name = worker_name +class WorkerBuildWaiting(object): + + def __init__(self, initiator_id, cache_key): + self.initiator_id = initiator_id + self.artifact_cache_key = cache_key class WorkerBuildOutput(object): @@ -54,21 +64,18 @@ class WorkerBuildOutput(object): self.msg = msg self.artifact_cache_key = cache_key - class WorkerBuildCaching(object): - def __init__(self, initiator_id, cache_key): - self.initiator_id = initiator_id + def __init__(self, initiators, cache_key): + self.initiators = initiators self.artifact_cache_key = cache_key - class WorkerBuildFinished(object): def __init__(self, msg, cache_key): self.msg = msg self.artifact_cache_key = cache_key - class WorkerBuildFailed(object): def __init__(self, msg, cache_key): @@ -84,21 +91,60 @@ class _NeedJob(object): class _HaveAJob(object): - def __init__(self, artifact, initiator_id): + def __init__(self, job): + self.job = job + +class Job(object): + + def __init__(self, job_id, artifact, initiator_id): + self.id = job_id self.artifact = artifact - self.initiator_id = initiator_id - - -class _JobIsFinished(object): + self.initiators = [initiator_id] + self.who = None # we don't know who's going to do this yet + self.is_building = False - def __init__(self, msg): - self.msg = msg - + def add_initiator(self, initiator_id): + self.initiators.append(initiator_id) + +class Jobs(object): + + def __init__(self, idgen): + self._idgen = idgen + self._jobs = {} + + def get(self, artifact_basename): + return (self._jobs[artifact_basename] + if artifact_basename in self._jobs else None) + + def create(self, artifact, initiator_id): + job = Job(self._idgen.next(), artifact, initiator_id) + self._jobs[job.artifact.basename()] = job + return job + + def remove(self, job): + del self._jobs[job.artifact.basename()] + + def exists(self, artifact_basename): + return artifact_basename in self._jobs + + def get_next_job(self): + # for now just return the first thing we find that's not being built + waiting = [job for (_, job) in + self._jobs.iteritems() if job.who == None] + + return waiting.pop() if len(waiting) > 0 else None + + def __repr__(self): + return str([job.artifact.basename() + for (_, job) in self._jobs.iteritems()]) -class _JobFailed(object): +class _BuildFinished(object): + + pass + +class _BuildFailed(object): pass - class _Cached(object): @@ -123,8 +169,9 @@ class WorkerBuildQueuer(distbuild.StateMachine): distbuild.crash_point() logging.debug('WBQ: Setting up %s' % self) - self._request_queue = [] self._available_workers = [] + self._jobs = Jobs( + distbuild.IdentifierGenerator('WorkerBuildQueuerJob')) spec = [ # state, source, event_class, new_state, callback @@ -136,45 +183,86 @@ class WorkerBuildQueuer(distbuild.StateMachine): ] self.add_transitions(spec) + + def _handle_request(self, event_source, event): distbuild.crash_point() - logging.debug('WBQ: Adding request to queue: %s' % event.artifact.name) - self._request_queue.append(event) - logging.debug( - 'WBQ: %d available workers and %d requests queued' % - (len(self._available_workers), - len(self._request_queue))) - if self._available_workers: - self._give_job() + logging.debug('Handling build request for %s' % event.initiator_id) + logging.debug('Current jobs: %s' % self._jobs) + logging.debug('Workers available: %d' % len(self._available_workers)) + + # Have we already made a job for this thing? + # If so, add our initiator id to the existing job + # If not, create a job + + if self._jobs.exists(event.artifact.basename()): + job = self._jobs.get(event.artifact.basename()) + job.initiators.append(event.initiator_id) + + if job.is_building: + logging.debug('Worker build step already started: %s' % + event.artifact.basename()) + progress = WorkerBuildStepAlreadyStarted(event.initiator_id, + event.artifact.cache_key, job.who.name()) + else: + logging.debug('Job created but not building yet ' + '(waiting for a worker to become available): %s' % + event.artifact.basename()) + progress = WorkerBuildWaiting(event.initiator_id, + event.artifact.cache_key) + + self.mainloop.queue_event(WorkerConnection, progress) + else: + logging.debug('WBQ: Creating job for: %s' % event.artifact.name) + job = self._jobs.create(event.artifact, event.initiator_id) + + if self._available_workers: + self._give_job(job) + else: + progress = WorkerBuildWaiting(event.initiator_id, + event.artifact.cache_key) + self.mainloop.queue_event(WorkerConnection, progress) def _handle_cancel(self, event_source, worker_cancel_pending): - for request in [r for r in self._request_queue if - r.initiator_id == worker_cancel_pending.initiator_id]: - logging.debug('WBQ: Removing request from queue: %s', - request.artifact.name) - self._request_queue.remove(request) + # TODO: this probably needs to check whether any initiators + # care about this thing + + pass def _handle_worker(self, event_source, event): distbuild.crash_point() + who = event.who + last_job = who.job() # the job this worker's just completed + + if last_job: + logging.debug('%s wants new job, just did %s' % + (who.name(), last_job.artifact.basename())) + + self._jobs.remove(last_job) + else: + logging.debug('%s wants its first job' % who.name()) + logging.debug('WBQ: Adding worker to queue: %s' % event.who) self._available_workers.append(event) - logging.debug( - 'WBQ: %d available workers and %d requests queued' % - (len(self._available_workers), - len(self._request_queue))) - if self._request_queue: - self._give_job() + logging.debug('Current jobs: %s' % self._jobs) + logging.debug('Workers available: %d' % len(self._available_workers)) + + job = self._jobs.get_next_job() + + if job: + self._give_job(job) - def _give_job(self): - request = self._request_queue.pop(0) + def _give_job(self, job): worker = self._available_workers.pop(0) + job.who = worker.who + logging.debug( 'WBQ: Giving %s to %s' % - (request.artifact.name, worker.who.name())) - self.mainloop.queue_event(worker.who, _HaveAJob(request.artifact, - request.initiator_id)) + (job.artifact.name, worker.who.name())) + + self.mainloop.queue_event(worker.who, _HaveAJob(job)) class WorkerConnection(distbuild.StateMachine): @@ -194,6 +282,9 @@ class WorkerConnection(distbuild.StateMachine): self._worker_cache_server_port = worker_cache_server_port self._morph_instance = morph_instance self._helper_id = None + self._job = None + self._exec_response_msg = None + self._debug_json = False addr, port = self._conn.getpeername() name = socket.getfqdn(addr) @@ -202,6 +293,9 @@ class WorkerConnection(distbuild.StateMachine): def name(self): return self._worker_name + def job(self): + return self._job + def setup(self): distbuild.crash_point() @@ -221,14 +315,14 @@ class WorkerConnection(distbuild.StateMachine): ('building', self._jm, distbuild.JsonEof, None, self._reconnect), ('building', self._jm, distbuild.JsonNewMessage, 'building', self._handle_json_message), - ('building', self, _JobFailed, 'idle', self._request_job), - ('building', self, _JobIsFinished, 'caching', + ('building', self, _BuildFailed, 'idle', self._request_job), + ('building', self, _BuildFinished, 'caching', self._request_caching), ('caching', distbuild.HelperRouter, distbuild.HelperResult, 'caching', self._maybe_handle_helper_result), ('caching', self, _Cached, 'idle', self._request_job), - ('caching', self, _JobFailed, 'idle', self._request_job), + ('caching', self, _BuildFailed, 'idle', self._request_job), ] self.add_transitions(spec) @@ -237,13 +331,8 @@ class WorkerConnection(distbuild.StateMachine): def _maybe_cancel(self, event_source, build_cancel): logging.debug('WC: BuildController %r requested a cancel' % event_source) - if build_cancel.id == self._initiator_id: - distbuild.crash_point() - for id in self._initiator_request_map[self._initiator_id]: - logging.debug('WC: Cancelling exec %s' % id) - msg = distbuild.message('exec-cancel', id=id) - self._jm.send(msg) + # TODO: implement cancel def _reconnect(self, event_source, event): distbuild.crash_point() @@ -254,31 +343,34 @@ class WorkerConnection(distbuild.StateMachine): def _start_build(self, event_source, event): distbuild.crash_point() - self._artifact = event.artifact - self._initiator_id = event.initiator_id + self._job = event.job + self._helper_id = None + self._exec_response_msg = None + logging.debug('WC: starting build: %s for %s' % - (self._artifact.name, self._initiator_id)) + (self._job.artifact.name, self._job.initiators)) argv = [ self._morph_instance, 'worker-build', - self._artifact.name, + self._job.artifact.name, ] msg = distbuild.message('exec-request', - id=self._request_ids.next(), + id=self._job.id, argv=argv, - stdin_contents=distbuild.serialise_artifact(self._artifact), + stdin_contents=distbuild.serialise_artifact(self._job.artifact), ) self._jm.send(msg) - logging.debug('WC: sent to worker %s: %r' % (self._worker_name, msg)) - self._route_map.add(self._initiator_id, msg['id']) - self._initiator_request_map[self._initiator_id].add(msg['id']) - logging.debug( - 'WC: route map from %s to %s', - self._artifact.cache_key, msg['id']) - started = WorkerBuildStepStarted( - self._initiator_id, self._artifact.cache_key, self.name()) + if self._debug_json: + logging.debug('WC: sent to worker %s: %r' + % (self._worker_name, msg)) + + started = WorkerBuildStepStarted(self._job.initiators, + self._job.artifact.cache_key, self.name()) + + self._job.is_building = True + self.mainloop.queue_event(WorkerConnection, started) def _handle_json_message(self, event_source, event): @@ -299,30 +391,29 @@ class WorkerConnection(distbuild.StateMachine): def _handle_exec_output(self, msg): new = dict(msg) - new['id'] = self._route_map.get_incoming_id(msg['id']) + new['ids'] = self._job.initiators logging.debug('WC: emitting: %s', repr(new)) self.mainloop.queue_event( WorkerConnection, - WorkerBuildOutput(new, self._artifact.cache_key)) + WorkerBuildOutput(new, self._job.artifact.cache_key)) def _handle_exec_response(self, msg): - logging.debug('WC: finished building: %s' % self._artifact.name) + logging.debug('WC: finished building: %s' % self._job.artifact.name) + logging.debug('initiators that need to know: %s' + % self._job.initiators) new = dict(msg) - new['id'] = self._route_map.get_incoming_id(msg['id']) - self._route_map.remove(msg['id']) - self._initiator_request_map[self._initiator_id].remove(msg['id']) + new['ids'] = self._job.initiators if new['exit'] != 0: # Build failed. - new_event = WorkerBuildFailed(new, self._artifact.cache_key) + new_event = WorkerBuildFailed(new, self._job.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) - self.mainloop.queue_event(self, _JobFailed()) - self._artifact = None - self._initiator_id = None + self.mainloop.queue_event(self, _BuildFailed()) else: # Build succeeded. We have more work to do: caching the result. - self.mainloop.queue_event(self, _JobIsFinished(new)) + self.mainloop.queue_event(self, _BuildFinished()) + self._exec_response_msg = new def _request_job(self, event_source, event): distbuild.crash_point() @@ -333,14 +424,14 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('Requesting shared artifact cache to get artifacts') - kind = self._artifact.source.morphology['kind'] + kind = self._job.artifact.source.morphology['kind'] if kind == 'chunk': - source_artifacts = self._artifact.source.artifacts + source_artifacts = self._job.artifact.source.artifacts suffixes = ['%s.%s' % (kind, name) for name in source_artifacts] else: - filename = '%s.%s' % (kind, self._artifact.name) + filename = '%s.%s' % (kind, self._job.artifact.name) suffixes = [filename] if kind == 'stratum': @@ -360,7 +451,7 @@ class WorkerConnection(distbuild.StateMachine): '/1.0/fetch?host=%s:%d&cacheid=%s&artifacts=%s' % (urllib.quote(worker_host), self._worker_cache_server_port, - urllib.quote(self._artifact.cache_key), + urllib.quote(self._job.artifact.cache_key), suffixes)) msg = distbuild.message( @@ -370,12 +461,9 @@ class WorkerConnection(distbuild.StateMachine): req = distbuild.HelperRequest(msg) self.mainloop.queue_event(distbuild.HelperRouter, req) - progress = WorkerBuildCaching( - self._initiator_id, self._artifact.cache_key) + progress = WorkerBuildCaching(self._job.initiators, + self._job.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, progress) - - self._initiator_id = None - self._finished_msg = event.msg def _maybe_handle_helper_result(self, event_source, event): if event.msg['id'] == self._helper_id: @@ -384,21 +472,16 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('caching: event.msg: %s' % repr(event.msg)) if event.msg['status'] == httplib.OK: logging.debug('Shared artifact cache population done') + new_event = WorkerBuildFinished( - self._finished_msg, self._artifact.cache_key) + self._exec_response_msg, self._job.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) - self._finished_msg = None - self._helper_id = None self.mainloop.queue_event(self, _Cached()) else: logging.error( 'Failed to populate artifact cache: %s %s' % (event.msg['status'], event.msg['body'])) new_event = WorkerBuildFailed( - self._finished_msg, self._artifact.cache_key) + self._exec_response_msg, self._job.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) - self._finished_msg = None - self._helper_id = None - self.mainloop.queue_event(self, _JobFailed()) - - self._artifact = None + self.mainloop.queue_event(self, _BuildFailed()) |