diff options
Diffstat (limited to 'distbuild')
-rw-r--r-- | distbuild/build_controller.py | 129 | ||||
-rw-r--r-- | distbuild/initiator.py | 8 | ||||
-rw-r--r-- | distbuild/json_router.py | 1 | ||||
-rw-r--r-- | distbuild/protocol.py | 6 | ||||
-rw-r--r-- | distbuild/worker_build_scheduler.py | 19 |
5 files changed, 120 insertions, 43 deletions
diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py index d6f3398f..6058862c 100644 --- a/distbuild/build_controller.py +++ b/distbuild/build_controller.py @@ -116,19 +116,40 @@ def build_step_name(artifact): return artifact.source.name -def map_build_graph(artifact, callback): +def map_build_graph(artifact, callback, components=[]): + """Run callback on each artifact in the build graph and return result. + + If components is given, then only look at the components given and + their dependencies. Also, return a list of the components after they + have had callback called on them. + + """ result = [] + mapped_components = [] done = set() - queue = [artifact] + if components: + queue = list(components) + else: + queue = [artifact] while queue: a = queue.pop() if a not in done: result.append(callback(a)) queue.extend(a.source.dependencies) done.add(a) - return result + if a in components: + mapped_components.append(a) + return result, mapped_components +def find_artifacts(components, artifact): + found = [] + for a in artifact.walk(): + name = a.source.morphology['name'] + if name in components: + found.append(a) + return found + class BuildController(distbuild.StateMachine): '''Control one build-request fulfillment. @@ -165,8 +186,10 @@ class BuildController(distbuild.StateMachine): spec = [ # state, source, event_class, new_state, callback ('init', self, _Start, 'graphing', self._start_graphing), - ('init', self._initiator_connection, - distbuild.InitiatorDisconnect, None, None), + ('init', distbuild.InitiatorConnection, + distbuild.InitiatorDisconnect, 'init', + self._maybe_notify_initiator_disconnected), + ('init', self, _Abort, None, None), ('graphing', distbuild.HelperRouter, distbuild.HelperOutput, 'graphing', self._maybe_collect_graph), @@ -175,16 +198,20 @@ class BuildController(distbuild.StateMachine): ('graphing', self, _GotGraph, 'annotating', self._start_annotating), ('graphing', self, BuildFailed, None, None), - ('graphing', self._initiator_connection, - distbuild.InitiatorDisconnect, None, None), + ('graphing', distbuild.InitiatorConnection, + distbuild.InitiatorDisconnect, 'graphing', + self._maybe_notify_initiator_disconnected), + ('graphing', self, _Abort, None, None), ('annotating', distbuild.HelperRouter, distbuild.HelperResult, 'annotating', self._maybe_handle_cache_response), ('annotating', self, BuildFailed, None, None), ('annotating', self, _Annotated, 'building', self._queue_worker_builds), - ('annotating', self._initiator_connection, - distbuild.InitiatorDisconnect, None, None), + ('annotating', distbuild.InitiatorConnection, + distbuild.InitiatorDisconnect, 'annotating', + self._maybe_notify_initiator_disconnected), + ('annotating', self, _Abort, None, None), # The exact WorkerConnection that is doing our building changes # from build to build. We must listen to all messages from all @@ -314,6 +341,17 @@ class BuildController(distbuild.StateMachine): distbuild.crash_point() self._artifact = event.artifact + names = self._request['component_names'] + self._components = find_artifacts(names, self._artifact) + failed = False + for component in self._components: + if component.source.morphology['name'] not in names: + logging.debug('Failed to find %s in build graph' + % component.filename) + failed = True + if failed: + self.fail('Failed to find all components in %s' + % self._artifact.name) self._helper_id = self._idgen.next() artifact_names = [] @@ -321,7 +359,9 @@ class BuildController(distbuild.StateMachine): artifact.state = UNKNOWN artifact_names.append(artifact.basename()) - map_build_graph(self._artifact, set_state_and_append) + _, self._components = map_build_graph(self._artifact, + set_state_and_append, + self._components) url = urlparse.urljoin(self._artifact_cache_server, '/1.0/artifacts') msg = distbuild.message('http-request', @@ -355,11 +395,20 @@ class BuildController(distbuild.StateMachine): return cache_state = json.loads(event.msg['body']) - map_build_graph(self._artifact, set_status) + _, self._components = map_build_graph(self._artifact, set_status, + self._components) self.mainloop.queue_event(self, _Annotated()) - unbuilt = len([a for a in self._artifact.walk() if a.state == UNBUILT]) - total = len([a for _ in self._artifact.walk()]) + unbuilt = set() + for c in self._components: + unbuilt.update([a for a in c.walk() if a.state == UNBUILT]) + unbuilt = len(unbuilt) or len([a for a in self._artifact.walk() + if a.state == UNBUILT]) + total = set() + for c in self._components: + total.update([a for a in c.walk()]) + total = len(total) or len([a for _ in self._artifact.walk()]) + progress = BuildProgress( self._request['id'], 'Need to build %d artifacts, of %d total' % (unbuilt, total)) @@ -375,22 +424,30 @@ class BuildController(distbuild.StateMachine): all(a.state == BUILT for a in artifact.source.dependencies)) - return [a - for a in map_build_graph(self._artifact, lambda a: a) - if is_ready_to_build(a)] + artifacts, _ = map_build_graph(self._artifact, lambda a: a, + self._components) + return [a for a in artifacts if is_ready_to_build(a)] def _queue_worker_builds(self, event_source, event): distbuild.crash_point() - if self._artifact.state == BUILT: - logging.info('Requested artifact is built') - self.mainloop.queue_event(self, _Built()) - return + if not self._components: + if self._artifact.state == BUILT: + logging.info('Requested artifact is built') + self.mainloop.queue_event(self, _Built()) + return + + else: + if not any(c.state != BUILT for c in self._components): + logging.info('Requested components are built') + self.mainloop.queue_event(self, _Built()) + return logging.debug('Queuing more worker-builds to run') if self.debug_graph_state: logging.debug('Current state of build graph nodes:') - for a in map_build_graph(self._artifact, lambda a: a): + for a, _ in map_build_graph(self._artifact, + lambda a: a, self._components): logging.debug(' %s state is %s' % (a.name, a.state)) if a.state != BUILT: for dep in a.dependencies: @@ -424,7 +481,6 @@ class BuildController(distbuild.StateMachine): if a.source == artifact.source: a.state = BUILDING - def _maybe_notify_initiator_disconnected(self, event_source, event): if event.id != self._request['id']: logging.debug('Heard initiator disconnect with event id %d ' @@ -441,7 +497,7 @@ class BuildController(distbuild.StateMachine): cancel = BuildCancel(event.id) self.mainloop.queue_event(BuildController, cancel) - self.mainloop.queue_event(self, _Abort) + self.mainloop.queue_event(self, _Abort()) def _maybe_relay_build_waiting_for_worker(self, event_source, event): if event.initiator_id != self._request['id']: @@ -524,7 +580,8 @@ class BuildController(distbuild.StateMachine): self.mainloop.queue_event(BuildController, progress) def _find_artifact(self, cache_key): - artifacts = map_build_graph(self._artifact, lambda a: a) + artifacts, _ = map_build_graph(self._artifact, lambda a: a, + self._components) wanted = [a for a in artifacts if a.source.cache_key == cache_key] if wanted: return wanted[0] @@ -559,7 +616,8 @@ class BuildController(distbuild.StateMachine): # yields all chunk artifacts for the given source # so we set the state of this source's artifacts # to BUILT - map_build_graph(self._artifact, set_state) + _, self._components = map_build_graph(self._artifact, set_state, + self._components) self._queue_worker_builds(None, event) @@ -610,10 +668,19 @@ class BuildController(distbuild.StateMachine): logging.debug('Notifying initiator of successful build') baseurl = urlparse.urljoin( self._artifact_cache_server, '/1.0/artifacts') - filename = ('%s.%s.%s' % - (self._artifact.source.cache_key, - self._artifact.source.morphology['kind'], - self._artifact.name)) - url = '%s?filename=%s' % (baseurl, urllib.quote(filename)) - finished = BuildFinished(self._request['id'], [url]) + urls = [] + for c in self._components: + name = ('%s.%s.%s' % + (c.source.cache_key, + c.source.morphology['kind'], + c.name)) + urls.append('%s?filename=%s' % (baseurl, urllib.quote(name))) + if not self._components: + name = ('%s.%s.%s' % + (self._artifact.source.cache_key, + self._artifact.source.morphology['kind'], + self._artifact.name)) + urls.append('%s?filename=%s' % (baseurl, urllib.quote(name))) + + finished = BuildFinished(self._request['id'], urls) self.mainloop.queue_event(BuildController, finished) diff --git a/distbuild/initiator.py b/distbuild/initiator.py index eef4c9ec..48299a3d 100644 --- a/distbuild/initiator.py +++ b/distbuild/initiator.py @@ -54,7 +54,7 @@ def create_build_directory(prefix='build'): class Initiator(distbuild.StateMachine): def __init__(self, cm, conn, app, repo_name, ref, morphology, - original_ref): + original_ref, component_names): distbuild.StateMachine.__init__(self, 'waiting') self._cm = cm self._conn = conn @@ -63,6 +63,10 @@ class Initiator(distbuild.StateMachine): self._ref = ref self._morphology = morphology self._original_ref = original_ref + self._component_names = component_names + self._partial = False + if self._component_names: + self._partial = True self._step_outputs = {} self.debug_transitions = False @@ -101,6 +105,8 @@ class Initiator(distbuild.StateMachine): ref=self._ref, morphology=self._morphology, original_ref=self._original_ref, + component_names=self._component_names, + partial=self._partial, protocol_version=distbuild.protocol.VERSION ) self._jm.send(msg) diff --git a/distbuild/json_router.py b/distbuild/json_router.py index b8d0ca55..d9c32a9c 100644 --- a/distbuild/json_router.py +++ b/distbuild/json_router.py @@ -47,7 +47,6 @@ class JsonRouter(distbuild.StateMachine): def setup(self): jm = distbuild.JsonMachine(self.conn) - jm.debug_json = True self.mainloop.add_state_machine(jm) spec = [ diff --git a/distbuild/protocol.py b/distbuild/protocol.py index 73d72d1d..268dcbf6 100644 --- a/distbuild/protocol.py +++ b/distbuild/protocol.py @@ -22,7 +22,7 @@ # time a change is introduced that would break server/initiator compatibility -VERSION = 1 +VERSION = 2 _required_fields = { @@ -31,6 +31,7 @@ _required_fields = { 'repo', 'ref', 'morphology', + 'partial', 'protocol_version', ], 'build-progress': [ @@ -89,7 +90,8 @@ _required_fields = { _optional_fields = { 'build-request': [ - 'original_ref' + 'original_ref', + 'component_names' ] } diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index e58059b2..8b581172 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -149,8 +149,14 @@ class Jobs(object): return waiting.pop() if len(waiting) > 0 else None def __repr__(self): - return str([job.artifact.basename() - for (_, job) in self._jobs.iteritems()]) + items = [] + for job in self._jobs.itervalues(): + if job.who is None: + state = 'queued' + else: + state = 'given to %s' % job.who + items.append('%s (%s)' % (job.artifact.basename(), state)) + return str(items) class _BuildFinished(object): @@ -400,8 +406,6 @@ class WorkerConnection(distbuild.StateMachine): self._current_job_exec_response = None self._current_job_cache_request = None - self._debug_json = False - addr, port = self._conn.getpeername() name = socket.getfqdn(addr) self._worker_name = '%s:%s' % (name, port) @@ -412,6 +416,9 @@ class WorkerConnection(distbuild.StateMachine): def current_job(self): return self._current_job + def __str__(self): + return self.name() + def setup(self): distbuild.crash_point() @@ -513,10 +520,6 @@ class WorkerConnection(distbuild.StateMachine): ) self._jm.send(msg) - if self._debug_json: - logging.debug('WC: sent to worker %s: %r' - % (self._worker_name, msg)) - started = WorkerBuildStepStarted(job.initiators, job.artifact.source.cache_key, self.name()) |