From 222060e65e83365cd1bc64c3fd2362ed6a61d25b Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Wed, 23 Apr 2014 15:44:23 +0100 Subject: Add new build messages to worker build scheduler --- distbuild/worker_build_scheduler.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) (limited to 'distbuild') diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index fc5849b3..d855d1e0 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -33,20 +33,30 @@ class WorkerBuildRequest(object): self.artifact = artifact self.initiator_id = initiator_id - class WorkerCancelPending(object): def __init__(self, initiator_id): self.initiator_id = initiator_id - class WorkerBuildStepStarted(object): + def __init__(self, initiators, cache_key, worker_name): + self.initiators = initiators + self.artifact_cache_key = cache_key + self.worker_name = worker_name + +class WorkerBuildStepAlreadyStarted(object): + def __init__(self, initiator_id, cache_key, worker_name): self.initiator_id = initiator_id self.artifact_cache_key = cache_key self.worker_name = worker_name +class WorkerBuildWaiting(object): + + def __init__(self, initiator_id, cache_key): + self.initiator_id = initiator_id + self.artifact_cache_key = cache_key class WorkerBuildOutput(object): -- cgit v1.2.1 From aad372d64d3e4a09bfaf0c54616ce482019f9ba2 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Wed, 23 Apr 2014 15:45:37 +0100 Subject: Make WorkerBuildCaching carry a list of ids We need to be able to send this message to a number of initiators --- distbuild/worker_build_scheduler.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) (limited to 'distbuild') diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index d855d1e0..ebbefd06 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -64,21 +64,18 @@ class WorkerBuildOutput(object): self.msg = msg self.artifact_cache_key = cache_key - class WorkerBuildCaching(object): - def __init__(self, initiator_id, cache_key): - self.initiator_id = initiator_id + def __init__(self, initiators, cache_key): + self.initiators = initiators self.artifact_cache_key = cache_key - class WorkerBuildFinished(object): def __init__(self, msg, cache_key): self.msg = msg self.artifact_cache_key = cache_key - class WorkerBuildFailed(object): def __init__(self, msg, cache_key): -- cgit v1.2.1 From c00c828938918701dd7c3566c765c3a9e10b2253 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Wed, 23 Apr 2014 15:51:10 +0100 Subject: Add Jobs and Job classes --- distbuild/worker_build_scheduler.py | 59 ++++++++++++++++++++++++++++++------- 1 file changed, 49 insertions(+), 10 deletions(-) (limited to 'distbuild') diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index ebbefd06..de8c95b5 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -91,21 +91,60 @@ class _NeedJob(object): class _HaveAJob(object): - def __init__(self, artifact, initiator_id): + def __init__(self, job): + self.job = job + +class Job(object): + + def __init__(self, job_id, artifact, initiator_id): + self.id = job_id self.artifact = artifact - self.initiator_id = initiator_id - - -class _JobIsFinished(object): + self.initiators = [initiator_id] + self.who = None # we don't know who's going to do this yet + self.is_building = False - def __init__(self, msg): - self.msg = msg - + def add_initiator(self, initiator_id): + self.initiators.append(initiator_id) + +class Jobs(object): + + def __init__(self, idgen): + self._idgen = idgen + self._jobs = {} + + def get(self, artifact_basename): + return (self._jobs[artifact_basename] + if artifact_basename in self._jobs else None) + + def create(self, artifact, initiator_id): + job = Job(self._idgen.next(), artifact, initiator_id) + self._jobs[job.artifact.basename()] = job + return job + + def remove(self, job): + del self._jobs[job.artifact.basename()] + + def exists(self, artifact_basename): + return artifact_basename in self._jobs + + def get_next_job(self): + # for now just return the first thing we find that's not being built + waiting = [job for (_, job) in + self._jobs.iteritems() if job.who == None] + + return waiting.pop() if len(waiting) > 0 else None + + def __repr__(self): + return str([job.artifact.basename() + for (_, job) in self._jobs.iteritems()]) -class _JobFailed(object): +class _BuildFinished(object): + + pass + +class _BuildFailed(object): pass - class _Cached(object): -- cgit v1.2.1 From a479c52273f624afb475e59feabf7f1b977c8679 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Wed, 23 Apr 2014 15:55:14 +0100 Subject: WorkerBuildQueuer: replace request queue with jobs --- distbuild/worker_build_scheduler.py | 92 +++++++++++++++++++++++++++---------- 1 file changed, 67 insertions(+), 25 deletions(-) (limited to 'distbuild') diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index de8c95b5..5fb1166b 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -169,8 +169,9 @@ class WorkerBuildQueuer(distbuild.StateMachine): distbuild.crash_point() logging.debug('WBQ: Setting up %s' % self) - self._request_queue = [] self._available_workers = [] + self._jobs = Jobs( + distbuild.IdentifierGenerator('WorkerBuildQueuerJob')) spec = [ # state, source, event_class, new_state, callback @@ -182,45 +183,86 @@ class WorkerBuildQueuer(distbuild.StateMachine): ] self.add_transitions(spec) + + def _handle_request(self, event_source, event): distbuild.crash_point() - logging.debug('WBQ: Adding request to queue: %s' % event.artifact.name) - self._request_queue.append(event) - logging.debug( - 'WBQ: %d available workers and %d requests queued' % - (len(self._available_workers), - len(self._request_queue))) - if self._available_workers: - self._give_job() + logging.debug('Handling build request for %s' % event.initiator_id) + logging.debug('Current jobs: %s' % self._jobs) + logging.debug('Workers available: %d' % len(self._available_workers)) + + # Have we already made a job for this thing? + # If so, add our initiator id to the existing job + # If not, create a job + + if self._jobs.exists(event.artifact.basename()): + job = self._jobs.get(event.artifact.basename()) + job.initiators.append(event.initiator_id) + + if job.is_building: + logging.debug('Worker build step already started: %s' % + event.artifact.basename()) + progress = WorkerBuildStepAlreadyStarted(event.initiator_id, + event.artifact.cache_key, job.who.name()) + else: + logging.debug('Job created but not building yet ' + '(waiting for a worker to become available): %s' % + event.artifact.basename()) + progress = WorkerBuildWaiting(event.initiator_id, + event.artifact.cache_key) + + self.mainloop.queue_event(WorkerConnection, progress) + else: + logging.debug('WBQ: Creating job for: %s' % event.artifact.name) + job = self._jobs.create(event.artifact, event.initiator_id) + + if self._available_workers: + self._give_job(job) + else: + progress = WorkerBuildWaiting(event.initiator_id, + event.artifact.cache_key) + self.mainloop.queue_event(WorkerConnection, progress) def _handle_cancel(self, event_source, worker_cancel_pending): - for request in [r for r in self._request_queue if - r.initiator_id == worker_cancel_pending.initiator_id]: - logging.debug('WBQ: Removing request from queue: %s', - request.artifact.name) - self._request_queue.remove(request) + # TODO: this probably needs to check whether any initiators + # care about this thing + + pass def _handle_worker(self, event_source, event): distbuild.crash_point() + who = event.who + last_job = who.job() # the job this worker's just completed + + if last_job: + logging.debug('%s wants new job, just did %s' % + (who.name(), last_job.artifact.basename())) + + self._jobs.remove(last_job) + else: + logging.debug('%s wants its first job' % who.name()) + logging.debug('WBQ: Adding worker to queue: %s' % event.who) self._available_workers.append(event) - logging.debug( - 'WBQ: %d available workers and %d requests queued' % - (len(self._available_workers), - len(self._request_queue))) - if self._request_queue: - self._give_job() + logging.debug('Current jobs: %s' % self._jobs) + logging.debug('Workers available: %d' % len(self._available_workers)) + + job = self._jobs.get_next_job() + + if job: + self._give_job(job) - def _give_job(self): - request = self._request_queue.pop(0) + def _give_job(self, job): worker = self._available_workers.pop(0) + job.who = worker.who + logging.debug( 'WBQ: Giving %s to %s' % - (request.artifact.name, worker.who.name())) - self.mainloop.queue_event(worker.who, _HaveAJob(request.artifact, - request.initiator_id)) + (job.artifact.name, worker.who.name())) + + self.mainloop.queue_event(worker.who, _HaveAJob(job)) class WorkerConnection(distbuild.StateMachine): -- cgit v1.2.1 From 00e24f24bda5355823bcbf520616aad6482dfa2e Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Wed, 23 Apr 2014 15:59:24 +0100 Subject: WorkerConnection: misc attributes _job is the job this worker is carrying out _exec_response_msg will contain the response the worker sends back to us when it finishes the build. --- distbuild/worker_build_scheduler.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'distbuild') diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 5fb1166b..4df728bb 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -282,6 +282,9 @@ class WorkerConnection(distbuild.StateMachine): self._worker_cache_server_port = worker_cache_server_port self._morph_instance = morph_instance self._helper_id = None + self._job = None + self._exec_response_msg = None + self._debug_json = False addr, port = self._conn.getpeername() name = socket.getfqdn(addr) @@ -290,6 +293,9 @@ class WorkerConnection(distbuild.StateMachine): def name(self): return self._worker_name + def job(self): + return self._job + def setup(self): distbuild.crash_point() -- cgit v1.2.1 From 5139c0af22495f44272d20b9a78132f12d5a9a1f Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Wed, 23 Apr 2014 16:00:48 +0100 Subject: Change event names back The name change from BuildFailed -> JobFailed etc was unintentionally merged into master, undo this. --- distbuild/worker_build_scheduler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'distbuild') diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 4df728bb..041cc0e9 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -315,14 +315,14 @@ class WorkerConnection(distbuild.StateMachine): ('building', self._jm, distbuild.JsonEof, None, self._reconnect), ('building', self._jm, distbuild.JsonNewMessage, 'building', self._handle_json_message), - ('building', self, _JobFailed, 'idle', self._request_job), - ('building', self, _JobIsFinished, 'caching', + ('building', self, _BuildFailed, 'idle', self._request_job), + ('building', self, _BuildFinished, 'caching', self._request_caching), ('caching', distbuild.HelperRouter, distbuild.HelperResult, 'caching', self._maybe_handle_helper_result), ('caching', self, _Cached, 'idle', self._request_job), - ('caching', self, _JobFailed, 'idle', self._request_job), + ('caching', self, _BuildFailed, 'idle', self._request_job), ] self.add_transitions(spec) -- cgit v1.2.1 From 8e77bfa5763bd0cc8a0789cb1dd2610ab5d013fd Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Wed, 23 Apr 2014 16:03:07 +0100 Subject: Remove cancel This method no longer works, we will replace it soon. --- distbuild/worker_build_scheduler.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) (limited to 'distbuild') diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 041cc0e9..d8b1ae93 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -331,13 +331,8 @@ class WorkerConnection(distbuild.StateMachine): def _maybe_cancel(self, event_source, build_cancel): logging.debug('WC: BuildController %r requested a cancel' % event_source) - if build_cancel.id == self._initiator_id: - distbuild.crash_point() - for id in self._initiator_request_map[self._initiator_id]: - logging.debug('WC: Cancelling exec %s' % id) - msg = distbuild.message('exec-cancel', id=id) - self._jm.send(msg) + # TODO: implement cancel def _reconnect(self, event_source, event): distbuild.crash_point() -- cgit v1.2.1 From f25aa728184e0b63998f0c1d8a37df5a2e449568 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Wed, 23 Apr 2014 16:05:39 +0100 Subject: WorkerBuildQueuer: Use job's artifact and id Each job is given a unique id, so we don't need to generate an id for each exec request this means we can remove use of route map since we can use the job's id for the exec request --- distbuild/worker_build_scheduler.py | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) (limited to 'distbuild') diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index d8b1ae93..e58cfb11 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -343,31 +343,34 @@ class WorkerConnection(distbuild.StateMachine): def _start_build(self, event_source, event): distbuild.crash_point() - self._artifact = event.artifact - self._initiator_id = event.initiator_id + self._job = event.job + self._helper_id = None + self._exec_response_msg = None + logging.debug('WC: starting build: %s for %s' % - (self._artifact.name, self._initiator_id)) + (self._job.artifact.name, self._job.initiators)) argv = [ self._morph_instance, 'worker-build', - self._artifact.name, + self._job.artifact.name, ] msg = distbuild.message('exec-request', - id=self._request_ids.next(), + id=self._job.id, argv=argv, - stdin_contents=distbuild.serialise_artifact(self._artifact), + stdin_contents=distbuild.serialise_artifact(self._job.artifact), ) self._jm.send(msg) - logging.debug('WC: sent to worker %s: %r' % (self._worker_name, msg)) - self._route_map.add(self._initiator_id, msg['id']) - self._initiator_request_map[self._initiator_id].add(msg['id']) - logging.debug( - 'WC: route map from %s to %s', - self._artifact.cache_key, msg['id']) - started = WorkerBuildStepStarted( - self._initiator_id, self._artifact.cache_key, self.name()) + if self._debug_json: + logging.debug('WC: sent to worker %s: %r' + % (self._worker_name, msg)) + + started = WorkerBuildStepStarted(self._job.initiators, + self._job.artifact.cache_key, self.name()) + + self._job.is_building = True + self.mainloop.queue_event(WorkerConnection, started) def _handle_json_message(self, event_source, event): -- cgit v1.2.1 From 33e2ba3457d6626f24057084f54319cb29338771 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Wed, 23 Apr 2014 16:46:58 +0100 Subject: WorkerConnection: _handle_exec_output msg now contains a list of initiator ids rather than a single one, since BuiltOutput needs to be sent to a number of initiators --- distbuild/worker_build_scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'distbuild') diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index e58cfb11..77dfa550 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -391,11 +391,11 @@ class WorkerConnection(distbuild.StateMachine): def _handle_exec_output(self, msg): new = dict(msg) - new['id'] = self._route_map.get_incoming_id(msg['id']) + new['ids'] = self._job.initiators logging.debug('WC: emitting: %s', repr(new)) self.mainloop.queue_event( WorkerConnection, - WorkerBuildOutput(new, self._artifact.cache_key)) + WorkerBuildOutput(new, self._job.artifact.cache_key)) def _handle_exec_response(self, msg): logging.debug('WC: finished building: %s' % self._artifact.name) -- cgit v1.2.1 From d56402413409a7d8a8cf337072e039b5aa381c3a Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Wed, 23 Apr 2014 16:49:10 +0100 Subject: WorkerConection: _handle_exec_response The exec_response_msg also needs to be sent to a number of initiators, so we give it a list of ids not just one. The exec_response_msg will be sent to the controller once the artifacts have been cached successfully. There's no longer any need to use a route map to retrieve the id of the initiator, since this is stored with the job --- distbuild/worker_build_scheduler.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) (limited to 'distbuild') diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 77dfa550..dbc0dfe4 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -398,23 +398,22 @@ class WorkerConnection(distbuild.StateMachine): WorkerBuildOutput(new, self._job.artifact.cache_key)) def _handle_exec_response(self, msg): - logging.debug('WC: finished building: %s' % self._artifact.name) + logging.debug('WC: finished building: %s' % self._job.artifact.name) + logging.debug('initiators that need to know: %s' + % self._job.initiators) new = dict(msg) - new['id'] = self._route_map.get_incoming_id(msg['id']) - self._route_map.remove(msg['id']) - self._initiator_request_map[self._initiator_id].remove(msg['id']) + new['ids'] = self._job.initiators if new['exit'] != 0: # Build failed. - new_event = WorkerBuildFailed(new, self._artifact.cache_key) + new_event = WorkerBuildFailed(new, self._job.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) - self.mainloop.queue_event(self, _JobFailed()) - self._artifact = None - self._initiator_id = None + self.mainloop.queue_event(self, _BuildFailed()) else: # Build succeeded. We have more work to do: caching the result. - self.mainloop.queue_event(self, _JobIsFinished(new)) + self.mainloop.queue_event(self, _BuildFinished()) + self._exec_response_msg = new def _request_job(self, event_source, event): distbuild.crash_point() -- cgit v1.2.1 From 00a571bde0a3a6005c4ffa029c5c4239a9c2381c Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Wed, 23 Apr 2014 16:54:28 +0100 Subject: WorkerConnection: _request_caching Now we just get everything from the job object --- distbuild/worker_build_scheduler.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) (limited to 'distbuild') diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index dbc0dfe4..5c07a55d 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -424,14 +424,14 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('Requesting shared artifact cache to get artifacts') - kind = self._artifact.source.morphology['kind'] + kind = self._job.artifact.source.morphology['kind'] if kind == 'chunk': - source_artifacts = self._artifact.source.artifacts + source_artifacts = self._job.artifact.source.artifacts suffixes = ['%s.%s' % (kind, name) for name in source_artifacts] else: - filename = '%s.%s' % (kind, self._artifact.name) + filename = '%s.%s' % (kind, self._job.artifact.name) suffixes = [filename] if kind == 'stratum': @@ -451,7 +451,7 @@ class WorkerConnection(distbuild.StateMachine): '/1.0/fetch?host=%s:%d&cacheid=%s&artifacts=%s' % (urllib.quote(worker_host), self._worker_cache_server_port, - urllib.quote(self._artifact.cache_key), + urllib.quote(self._job.artifact.cache_key), suffixes)) msg = distbuild.message( @@ -461,12 +461,9 @@ class WorkerConnection(distbuild.StateMachine): req = distbuild.HelperRequest(msg) self.mainloop.queue_event(distbuild.HelperRouter, req) - progress = WorkerBuildCaching( - self._initiator_id, self._artifact.cache_key) + progress = WorkerBuildCaching(self._job.initiators, + self._job.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, progress) - - self._initiator_id = None - self._finished_msg = event.msg def _maybe_handle_helper_result(self, event_source, event): if event.msg['id'] == self._helper_id: -- cgit v1.2.1 From 92f83c56b18b87bdc08e8f729034a8702dbd7449 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Wed, 23 Apr 2014 16:56:06 +0100 Subject: WorkerConnection: _maybe_handle_helper_result Put our _exec_response_msg into WorkerBuildFinished event, it's essentially the same as _finished_msg, just a different name Get our artifact's cache key from the job --- distbuild/worker_build_scheduler.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) (limited to 'distbuild') diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index 5c07a55d..83873d74 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -472,21 +472,16 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('caching: event.msg: %s' % repr(event.msg)) if event.msg['status'] == httplib.OK: logging.debug('Shared artifact cache population done') + new_event = WorkerBuildFinished( - self._finished_msg, self._artifact.cache_key) + self._exec_response_msg, self._job.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) - self._finished_msg = None - self._helper_id = None self.mainloop.queue_event(self, _Cached()) else: logging.error( 'Failed to populate artifact cache: %s %s' % (event.msg['status'], event.msg['body'])) new_event = WorkerBuildFailed( - self._finished_msg, self._artifact.cache_key) + self._exec_response_msg, self._job.artifact.cache_key) self.mainloop.queue_event(WorkerConnection, new_event) - self._finished_msg = None - self._helper_id = None - self.mainloop.queue_event(self, _JobFailed()) - - self._artifact = None + self.mainloop.queue_event(self, _BuildFailed()) -- cgit v1.2.1 From 2e87c9912e84409e8ac6eba43dc03593be06ca33 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Tue, 22 Apr 2014 19:23:09 +0100 Subject: Add new step-already-started message to protocol --- distbuild/protocol.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'distbuild') diff --git a/distbuild/protocol.py b/distbuild/protocol.py index 9a4c362e..561201bb 100644 --- a/distbuild/protocol.py +++ b/distbuild/protocol.py @@ -39,6 +39,11 @@ _types = { 'step_name', 'worker_name', ], + 'step-already-started': [ + 'id', + 'step_name', + 'worker_name', + ], 'step-output': [ 'id', 'step_name', -- cgit v1.2.1 From 99b9efa9dd588afdc4f1e189380a4c71bbc7114e Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Wed, 23 Apr 2014 17:02:05 +0100 Subject: Add handling for new messages to BuildController There are two new messages: WorkerBuildStepAlreadyStarted tells the initiator that the artifact they want to build is already being built, e.g. 'eglibc-misc is already building on 172.17.1.37:3434' WorkerBuildWaiting tells the initiator that the artifact they want to build can't be built yet because there aren't any workers free, e.g. 'Ready to build eglibc-misc: waiting for a worker to become available' --- distbuild/build_controller.py | 41 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) (limited to 'distbuild') diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py index da9e97c1..31828979 100644 --- a/distbuild/build_controller.py +++ b/distbuild/build_controller.py @@ -93,7 +93,12 @@ class BuildStepStarted(object): self.id = request_id self.step_name = step_name self.worker_name = worker_name - + +class BuildStepAlreadyStarted(BuildStepStarted): + + def __init__(self, request_id, step_name, worker_name): + super(BuildStepAlreadyStarted, self).__init__( + request_id, step_name, worker_name) class BuildOutput(object): @@ -206,6 +211,7 @@ class BuildController(distbuild.StateMachine): # specific to WorkerConnection instances that our currently # building for us, but the state machines are not intended to # behave that way). + ('building', distbuild.WorkerConnection, distbuild.WorkerBuildStepStarted, 'building', self._maybe_relay_build_step_started), @@ -215,6 +221,12 @@ class BuildController(distbuild.StateMachine): ('building', distbuild.WorkerConnection, distbuild.WorkerBuildCaching, 'building', self._maybe_relay_build_caching), + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildStepAlreadyStarted, 'building', + self._maybe_relay_build_step_already_started), + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildWaiting, 'building', + self._maybe_relay_build_waiting_for_worker), ('building', distbuild.WorkerConnection, distbuild.WorkerBuildFinished, 'building', self._maybe_check_result_and_queue_more_builds), @@ -438,6 +450,21 @@ class BuildController(distbuild.StateMachine): cancel = BuildCancel(disconnect.id) self.mainloop.queue_event(BuildController, cancel) + def _maybe_relay_build_waiting_for_worker(self, event_source, event): + if event.initiator_id != self._request['id']: + return # not for us + + artifact = self._find_artifact(event.artifact_cache_key) + if artifact is None: + # This is not the event you are looking for. + return + + progress = BuildProgress( + self._request['id'], + 'Ready to build %s: waiting for a worker to become available' + % artifact.name) + self.mainloop.queue_event(BuildController, progress) + def _maybe_relay_build_step_started(self, event_source, event): distbuild.crash_point() if event.initiator_id != self._request['id']: @@ -456,6 +483,18 @@ class BuildController(distbuild.StateMachine): self.mainloop.queue_event(BuildController, started) logging.debug('BC: emitted %s' % repr(started)) + def _maybe_relay_build_step_already_started(self, event_source, event): + if event.initiator_id != self._request['id']: + return # not for us + + artifact = self._find_artifact(event.artifact_cache_key) + + logging.debug('BC: got build step already started: %s' % artifact.name) + started = BuildStepAlreadyStarted( + self._request['id'], build_step_name(artifact), event.worker_name) + self.mainloop.queue_event(BuildController, started) + logging.debug('BC: emitted %s' % repr(started)) + def _maybe_relay_build_output(self, event_source, event): distbuild.crash_point() if event.msg['id'] != self._request['id']: -- cgit v1.2.1 From 29e199cef71eec9792a833334cd9aa3ae258dd9d Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Wed, 23 Apr 2014 17:03:04 +0100 Subject: Check whether request id is in the event's id list The contents of the message has changed for several events, event messages that need to be sent to several initiators have a list of ids instead of a single id. --- distbuild/build_controller.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) (limited to 'distbuild') diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py index 31828979..f596a295 100644 --- a/distbuild/build_controller.py +++ b/distbuild/build_controller.py @@ -467,11 +467,12 @@ class BuildController(distbuild.StateMachine): def _maybe_relay_build_step_started(self, event_source, event): distbuild.crash_point() - if event.initiator_id != self._request['id']: + if self._request['id'] not in event.initiators: return # not for us logging.debug( 'BC: _relay_build_step_started: %s' % event.artifact_cache_key) + artifact = self._find_artifact(event.artifact_cache_key) if artifact is None: # This is not the event you are looking for. @@ -497,7 +498,7 @@ class BuildController(distbuild.StateMachine): def _maybe_relay_build_output(self, event_source, event): distbuild.crash_point() - if event.msg['id'] != self._request['id']: + if self._request['id'] not in event.msg['ids']: return # not for us logging.debug('BC: got output: %s' % repr(event.msg)) @@ -515,7 +516,8 @@ class BuildController(distbuild.StateMachine): def _maybe_relay_build_caching(self, event_source, event): distbuild.crash_point() - if event.initiator_id != self._request['id']: + + if self._request['id'] not in event.initiators: return # not for us artifact = self._find_artifact(event.artifact_cache_key) @@ -538,7 +540,7 @@ class BuildController(distbuild.StateMachine): def _maybe_check_result_and_queue_more_builds(self, event_source, event): distbuild.crash_point() - if event.msg['id'] != self._request['id']: + if self._request['id'] not in event.msg['ids']: return # not for us artifact = self._find_artifact(event.artifact_cache_key) @@ -579,8 +581,8 @@ class BuildController(distbuild.StateMachine): def _maybe_notify_build_failed(self, event_source, event): distbuild.crash_point() - if event.msg['id'] != self._request['id']: - return + if self._request['id'] not in event.msg['ids']: + return # not for us artifact = self._find_artifact(event.artifact_cache_key) -- cgit v1.2.1 From 23436a1fd5151f5d643bf159187f3779f590e671 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Tue, 22 Apr 2014 19:27:13 +0100 Subject: InitiatorConnection: Handle _step_already_started --- distbuild/initiator_connection.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) (limited to 'distbuild') diff --git a/distbuild/initiator_connection.py b/distbuild/initiator_connection.py index 34f2bdaa..b3769d7c 100644 --- a/distbuild/initiator_connection.py +++ b/distbuild/initiator_connection.py @@ -81,6 +81,9 @@ class InitiatorConnection(distbuild.StateMachine): 'idle', self._send_build_steps_message), ('idle', distbuild.BuildController, distbuild.BuildStepStarted, 'idle', self._send_build_step_started_message), + ('idle', distbuild.BuildController, + distbuild.BuildStepAlreadyStarted, 'idle', + self._send_build_step_already_started_message), ('idle', distbuild.BuildController, distbuild.BuildOutput, 'idle', self._send_build_output_message), ('idle', distbuild.BuildController, distbuild.BuildStepFinished, @@ -113,7 +116,6 @@ class InitiatorConnection(distbuild.StateMachine): self.initiator_name, str(id)) self.mainloop.queue_event(InitiatorConnection, InitiatorDisconnect(id)) - # TODO should this clear our_ids? self.mainloop.queue_event(self, _Close(event_source)) def _close(self, event_source, event): @@ -183,6 +185,9 @@ class InitiatorConnection(distbuild.StateMachine): self._log_send(msg) def _send_build_step_started_message(self, event_source, event): + logging.debug('InitiatorConnection: build_step_started: ' + 'id=%s step_name=%s worker_name=%s' % + (event.id, event.step_name, event.worker_name)) if event.id in self.our_ids: msg = distbuild.message('step-started', id=self._route_map.get_incoming_id(event.id), @@ -191,6 +196,19 @@ class InitiatorConnection(distbuild.StateMachine): self.jm.send(msg) self._log_send(msg) + def _send_build_step_already_started_message(self, event_source, event): + logging.debug('InitiatorConnection: build_step_already_started: ' + 'id=%s step_name=%s worker_name=%s' % (event.id, event.step_name, + event.worker_name)) + + if event.id in self.our_ids: + msg = distbuild.message('step-already-started', + id=self._route_map.get_incoming_id(event.id), + step_name=event.step_name, + worker_name=event.worker_name) + self.jm.send(msg) + self._log_send(msg) + def _send_build_output_message(self, event_source, event): logging.debug('InitiatorConnection: build_output: ' 'id=%s stdout=%s stderr=%s' % @@ -205,6 +223,8 @@ class InitiatorConnection(distbuild.StateMachine): self._log_send(msg) def _send_build_step_finished_message(self, event_source, event): + logging.debug('heard built step finished: event.id: %s our_ids: %s' + % (str(event.id), str(self.our_ids))) if event.id in self.our_ids: msg = distbuild.message('step-finished', id=self._route_map.get_incoming_id(event.id), -- cgit v1.2.1 From ce9bd13839d4a20f31ae44b74b8f3e4a48bb75b6 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Tue, 22 Apr 2014 19:28:39 +0100 Subject: Initiator: Handle step-already-started message --- distbuild/initiator.py | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'distbuild') diff --git a/distbuild/initiator.py b/distbuild/initiator.py index 6e4ca65a..67f37764 100644 --- a/distbuild/initiator.py +++ b/distbuild/initiator.py @@ -94,6 +94,7 @@ class Initiator(distbuild.StateMachine): 'build-progress': self._handle_build_progress_message, 'build-steps': self._handle_build_steps_message, 'step-started': self._handle_step_started_message, + 'step-already-started': self._handle_step_already_started_message, 'step-output': self._handle_step_output_message, 'step-finished': self._handle_step_finished_message, 'step-failed': self._handle_step_failed_message, @@ -127,6 +128,12 @@ class Initiator(distbuild.StateMachine): self._step_outputs[msg['step_name']].close() del self._step_outputs[msg['step_name']] + def _handle_step_already_started_message(self, msg): + self._app.status( + msg='%s is already building on %s' % (msg['step_name'], + msg['worker_name'])) + self._open_output(msg) + def _handle_step_started_message(self, msg): self._app.status( msg='Started building %(step_name)s on %(worker_name)s', -- cgit v1.2.1 From 4c3c32ed36388137897bf8eade030991a18092b3 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Tue, 22 Apr 2014 19:29:20 +0100 Subject: Import all the things --- distbuild/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'distbuild') diff --git a/distbuild/__init__.py b/distbuild/__init__.py index 57aaccaf..7274f6a9 100644 --- a/distbuild/__init__.py +++ b/distbuild/__init__.py @@ -44,11 +44,14 @@ from worker_build_scheduler import (WorkerBuildQueuer, WorkerCancelPending, WorkerBuildOutput, WorkerBuildCaching, + WorkerBuildStepAlreadyStarted, + WorkerBuildWaiting, WorkerBuildFinished, WorkerBuildFailed, WorkerBuildStepStarted) from build_controller import (BuildController, BuildFailed, BuildProgress, - BuildSteps, BuildStepStarted, BuildOutput, + BuildSteps, BuildStepStarted, + BuildStepAlreadyStarted, BuildOutput, BuildStepFinished, BuildStepFailed, BuildFinished, BuildCancel, build_step_name, map_build_graph) -- cgit v1.2.1