diff options
-rw-r--r-- | distbuild/build_controller.py | 305 |
1 files changed, 174 insertions, 131 deletions
diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py index 901cb58e..4a59ed65 100644 --- a/distbuild/build_controller.py +++ b/distbuild/build_controller.py @@ -25,15 +25,14 @@ import json import distbuild -# Artifact build states -UNKNOWN = 'unknown' +# Artifact build states. These are used to loosely track the state of the +# remote cache. UNBUILT = 'not-built' BUILDING = 'building' BUILT = 'built' class _Start(object): pass -class _Annotated(object): pass class _Built(object): pass @@ -177,6 +176,33 @@ def find_artifacts(components, artifact): found.append(a) return found + +def find_artifacts_that_are_ready_to_build(root_artifact, components=[]): + '''Return unbuilt artifacts whose dependencies are all built. + + The 'root_artifact' parameter is expected to be a tree of ArtifactReference + objects. These must have the 'state' attribute set to BUILT or UNBUILT. If + 'components' is passed, then only those artifacts and their dependencies + will be built. + + ''' + def is_ready_to_build(artifact): + return (artifact.state == UNBUILT and + all(a.state == BUILT + for a in artifact.dependencies)) + + artifacts, _ = map_build_graph(root_artifact, lambda a: a, + components) + return [a for a in artifacts if is_ready_to_build(a)] + + +def find_artifacts_that_are_building(root_artifact, components=[]): + '''Return artifacts in BUILDING state.''' + artifacts, _ = map_build_graph(root_artifact, lambda a: a, + components) + return [a for a in artifacts if a.state == BUILDING] + + class BuildController(distbuild.StateMachine): '''Control one build-request fulfillment. @@ -210,6 +236,8 @@ class BuildController(distbuild.StateMachine): 'status': 'Computing build graph' } + self.sent_cache_status = False + def __repr__(self): return '<BuildController at 0x%x, request-id %s>' % (id(self), self._request['id']) @@ -239,7 +267,7 @@ class BuildController(distbuild.StateMachine): ('graphing', distbuild.HelperRouter, distbuild.HelperResult, 'graphing', self._maybe_finish_graph), ('graphing', self, _GotGraph, - 'annotating', self._start_annotating), + 'building', self._start_building), ('graphing', self, BuildFailed, None, None), ('graphing', distbuild.InitiatorConnection, distbuild.CancelRequest, 'graphing', @@ -249,19 +277,6 @@ class BuildController(distbuild.StateMachine): self._maybe_notify_initiator_disconnected), ('graphing', self, _Abort, None, None), - ('annotating', distbuild.HelperRouter, distbuild.HelperResult, - 'annotating', self._maybe_handle_cache_response), - ('annotating', self, BuildFailed, None, None), - ('annotating', self, _Annotated, 'building', - self._start_building), - ('annotating', distbuild.InitiatorConnection, - distbuild.CancelRequest, 'annotating', - self._maybe_notify_build_cancelled), - ('annotating', distbuild.InitiatorConnection, - distbuild.InitiatorDisconnect, 'annotating', - self._maybe_notify_initiator_disconnected), - ('annotating', self, _Abort, None, None), - # The exact WorkerConnection that is doing our building changes # from build to build. We must listen to all messages from all # workers, and choose whether to change state inside the callback. @@ -270,6 +285,8 @@ class BuildController(distbuild.StateMachine): # building for us, but the state machines are not intended to # behave that way). + ('building', distbuild.HelperRouter, distbuild.HelperResult, + 'building', self._maybe_handle_cache_response), ('building', distbuild.WorkerConnection, distbuild.WorkerBuildStepStarted, 'building', self._maybe_relay_build_step_started), @@ -412,17 +429,41 @@ class BuildController(distbuild.StateMachine): notify_success(artifact) - def _start_annotating(self, event_source, event): - '''Ask the shared artifact cache which artifacts are already built. + def _build_complete(self): + '''Return True if everything is built. - The request sent by this function is identified by self._helper_id. + In the case of a partial distbuild, self._components tracks which + components of self._artifact the user wants. If that isn't set, we are + doing a full build of self._artifact. ''' - distbuild.crash_point() + if not self._components: + if self._artifact.state == BUILT: + logging.info('Requested artifact is built') + self.mainloop.queue_event(self, _Built()) + return True + else: + if not any(c.state != BUILT for c in self._components): + logging.info('Requested components are built') + self.mainloop.queue_event(self, _Built()) + return True + return False + + def _start_building(self, event_source, got_graph_event): + '''Initialise the build process, after 'graphing' has completed. + + This function broadcasts the BuildStarted status message. + + ''' + self._artifact = got_graph_event.artifact + + # If a partial distbuild was requested, find the components the user + # wanted. Raise an error if some of them aren't actually components of + # the root artifact. - self._artifact = event.artifact names = self._request['component_names'] self._components = find_artifacts(names, self._artifact) + not_found = [] for component in names: found_names = [c.source_name for c in self._components] @@ -433,16 +474,44 @@ class BuildController(distbuild.StateMachine): if not_found: self.fail('Some of the requested components are not in %s: %s' % (self._artifact.name, ', '.join(not_found))) - self._helper_id = self._idgen.next() - artifact_names = [] - def set_state_and_append(artifact): - artifact.state = UNKNOWN - artifact_names.append(artifact.basename()) + # Mark everything as unbuilt to begin with. We'll query the actual + # state from the cache in self._query_cache_state(). + def set_initial_state(artifact): + artifact.state = UNBUILT + map_build_graph(self._artifact, set_initial_state) - _, self._components = map_build_graph(self._artifact, - set_state_and_append, - self._components) + self.mainloop.queue_event(BuildController, + BuildStarted(self._request['id'])) + + self._query_cache_state() + + def _query_cache_state(self): + '''Ask the shared artifact cache which artifacts are already built. + + We query the state of all artifacts that we think still need building. + Some may have been built in the meantime, either by other + BuildController instances within this process, or by other distbuild + networks that share the same artifact cache as us. + + Note that this doesn't attempt to deal with artifacts that were deleted + while we were building. To do that in a race-free way requires handling + the 'some dependencies were missing' error after it occurs. Currently + you will break things if you delete artifacts during a build. + + The request sent by this function is identified by self._helper_id. + + ''' + distbuild.crash_point() + + self._helper_id = self._idgen.next() + + artifact_names = [] + def collect_unbuilt_artifacts(artifact): + if artifact.state == UNBUILT: + artifact_names.append(artifact.basename()) + map_build_graph(self._artifact, collect_unbuilt_artifacts, + self._components) url = urlparse.urljoin(self._artifact_cache_server, '/1.0/artifacts') msg = distbuild.message('http-request', @@ -462,25 +531,19 @@ class BuildController(distbuild.StateMachine): The result tells us which artifacts are built and which are not. This information is saved in 'state' attribute of the ArtifactReference - objects, in the tree linked from self._artifact. This function sends - an _Annotated message to indicate completion, and also broadcasts the - CacheState status message. + objects, in the tree linked from self._artifact. This function also + broadcasts the CacheState status message the first time it runs. This is called for all distbuild.HelperResult messages so we need to filter by self._helper_id. ''' - def set_status(artifact): - is_in_cache = cache_state[artifact.basename()] - artifact.state = BUILT if is_in_cache else UNBUILT - if self._helper_id != event.msg['id']: return # this event is not for us logging.debug('Got cache response: %s' % repr(event.msg)) http_status_code = event.msg['status'] - if http_status_code != httplib.OK: self.fail('Failed to annotate build graph: HTTP request to %s got ' '%d: %s' % (self._artifact_cache_server, @@ -488,90 +551,25 @@ class BuildController(distbuild.StateMachine): return cache_state = json.loads(event.msg['body']) - _, self._components = map_build_graph(self._artifact, set_status, - self._components) - self.mainloop.queue_event(self, _Annotated()) - unbuilt = set() - for c in self._components: - unbuilt.update([a for a in c.walk() if a.state == UNBUILT]) - unbuilt = len(unbuilt) or len([a for a in self._artifact.walk() - if a.state == UNBUILT]) - total = set() - for c in self._components: - total.update([a for a in c.walk()]) - total = len(total) or len([a for _ in self._artifact.walk()]) - - cache_state_msg = CacheState(self._request['id'], unbuilt, total) - self.mainloop.queue_event(BuildController, cache_state_msg) - - if total == 0: - logging.info('There seems to be nothing to build') - self.mainloop.queue_event(self, _Built()) - - def _find_artifacts_that_are_ready_to_build(self): - '''Return unbuilt artifacts whose dependencies are all built. - - This uses the information kept in self._artifacts. Note that this - is NOT checked against the actual state of the cache at any time - after the build starts. So it's entirely possible that this function - will tell the BuildController to build things that appeared in the - cache since the 'annotating' stage completed, or believe that things - which have been deleted from the cache are still there. - - ''' - def is_ready_to_build(artifact): - return (artifact.state == UNBUILT and - all(a.state == BUILT - for a in artifact.dependencies)) - - artifacts, _ = map_build_graph(self._artifact, lambda a: a, - self._components) - return [a for a in artifacts if is_ready_to_build(a)] - - def _build_complete(self): - '''Return True if everything is built. - - In the case of a partial distbuild, self._components tracks which - components of self._artifact the user wants. If that isn't set, we are - doing a full build of self._artifact. - - ''' - if not self._components: - if self._artifact.state == BUILT: - logging.info('Requested artifact is built') - self.mainloop.queue_event(self, _Built()) - return True - else: - if not any(c.state != BUILT for c in self._components): - logging.info('Requested components are built') - self.mainloop.queue_event(self, _Built()) - return True - return False - - def _start_building(self, event_source, event): - '''Send an initial set of chunks to the WorkerBuildQueuer class. - - This function is called once, after the 'annotating' stage completes. - It also broadcasts the BuildStarted status message. - - ''' - if self._build_complete(): - return - - self.mainloop.queue_event(BuildController, - BuildStarted(self._request['id'])) - self._queue_worker_builds(event_source, event) - - def _queue_worker_builds(self, event_source, event): - '''Send a set of chunks to the WorkerBuildQueuer class.''' + # Mark things as built that are now built. We only check the unbuilt + # artifacts, so 'cache_state' will have no info for things we already + # thought were built. + def update_state(artifact): + if artifact.state == UNBUILT: + is_in_cache = cache_state[artifact.basename()] + if is_in_cache: + logging.debug('Found a build of %s in the cache', artifact) + artifact.state = BUILT + _, self._components = map_build_graph(self._artifact, update_state, + self._components) - distbuild.crash_point() + # Send 'Need to build xx/yy artifacts' message, the first time round. + if self.sent_cache_status == False: + self._send_cache_status_message() + self.sent_cache_status = True - if self._build_complete(): - return - - logging.debug('Queuing more worker-builds to run') + # Dump state (for debugging). if self.debug_graph_state: logging.debug('Current state of build graph nodes:') for a, _ in map_build_graph(self._artifact, @@ -583,14 +581,57 @@ class BuildController(distbuild.StateMachine): ' depends on %s which is %s' % (dep.name, dep.state)) - while True: - ready = self._find_artifacts_that_are_ready_to_build() + # Check whether we're actually done already. + if self._build_complete(): + return + + # Enqueue anything which it is now possible for us to build. + ready_to_build = find_artifacts_that_are_ready_to_build( + self._artifact, self._components) + + if len(ready_to_build) == 0: + building = find_artifacts_that_are_building( + self._artifact, self._components) + if len(building) == 0: + self.fail( + "Not possible to build anything else. This may be due to " + "an internal error, or due to artifacts being deleted " + "from the shared cache during the build.") + else: + self._queue_worker_builds(ready_to_build) + + def _send_cache_status_message(self): + '''Send 'Need to build xx/yy artifacts' message.''' - if len(ready) == 0: - logging.debug('No new artifacts queued for building') - break + if len(self._components) == 0: + unbuilt = {a.cache_key for a in self._artifact.walk()} + else: + # Partial distbuild + unbuilt = set() + for c in self._components: + unbuilt.update( + {a.cache_key for a in c.walk() if a.state == UNBUILT}) + + if len(self._components) == 0: + total = {a.cache_key for a in self._artifact.walk()} + else: + # Partial distbuild + total = set() + for c in self._components: + total.update( + {a.cache_key for a in c.walk() if a.state == UNBUILT}) + + cache_state_msg = CacheState( + self._request['id'], len(unbuilt), len(total)) + self.mainloop.queue_event(BuildController, cache_state_msg) - artifact = ready[0] + def _queue_worker_builds(self, artifacts): + '''Send a set of chunks to the WorkerBuildQueuer class for building.''' + distbuild.crash_point() + + logging.debug('Queuing more worker-builds to run') + while len(artifacts) > 0: + artifact = artifacts.pop() logging.debug( 'Requesting worker-build of %s (%s)' % @@ -605,9 +646,11 @@ class BuildController(distbuild.StateMachine): # so when we're building any chunk artifact # we're also building all the chunk artifacts # in this source - for a in ready: - if a.cache_key == artifact.cache_key: - a.state = BUILDING + same_chunk_artifacts = [a for a in artifacts + if a.cache_key == artifact.cache_key] + for a in same_chunk_artifacts: + a.state = BUILDING + artifacts.remove(a) def _maybe_notify_initiator_disconnected(self, event_source, event): if event.id != self._request['id']: @@ -746,8 +789,8 @@ class BuildController(distbuild.StateMachine): This function is called for all WorkerBuildFinished messages, so it must check that the artifact is one that it cares about. - It updates the state of the artifact in self._components and sends - more builds to the WorkerBuildQueuer, if there are any more. + It updates ArtifactReference.state for the given artifact, and queries + the cache to see what there is left to build. ''' distbuild.crash_point() @@ -780,7 +823,7 @@ class BuildController(distbuild.StateMachine): _, self._components = map_build_graph(self._artifact, set_state, self._components) - self._queue_worker_builds(None, event) + self._query_cache_state() def _maybe_notify_build_failed(self, event_source, event): '''Handle failure of a build, from the WorkerBuildQueuer. |