import gear import sys import json import threading import distbuild # Artifact build states. These are used to loosely track the state of the # remote cache. UNBUILT = 'not-built' BUILDING = 'building' BUILT = 'built' import logging logging.basicConfig() gear.Server() class theController(): def __init__(self): self.lock = threading.Lock() self.graph_client = BuildGraphClient(self) self.builder_client = BuilderClient(self) self.builder_client.addServer('localhost') self.builder_client.waitForServer() # Wait for at least one server to be connected self.graph_client.addServer('localhost') self.graph_client.waitForServer() # Wait for at least one server to be connected self.artifact = None self.build_started = False def _process_build_graph(self, packet): print "Decoding artifact received" self.artifact = distbuild.decode_artifact_reference(job.data[-1]) print "Decoding artifact received done" # Mark everything as unbuilt to begin with. We'll query the actua # state from the cache in self._query_cache_state(). def set_initial_state(artifact): artifact.state = UNBUILT print "Setting them as unbuilt" self._map_build_graph(self.artifact, set_initial_state) print "Setting them as unbuilt done" self.build_started = True def _map_build_graph(self, artifact, callback, components=[]): """Run callback on each artifact in the build graph and return result. If components is given, then only look at the components given and their dependencies. Also, return a list of the components after they have had callback called on them. """ result = [] mapped_components = [] done = set() if components: queue = list(components) else: queue = [artifact] while queue: a = queue.pop() if a not in done: result.append(callback(a)) queue.extend(a.dependencies) done.add(a) if a in components: mapped_components.append(a) return result, mapped_components def find_artifacts_that_are_ready_to_build(self, root_artifact, components=[]): '''Return unbuilt artifacts whose dependencies are all built. The 'root_artifact' parameter is expected to be a tree of ArtifactReference objects. These must have the 'state' attribute set to BUILT or UNBUILT. If 'components' is passed, then only those artifacts and their dependencies will be built. ''' def is_ready_to_build(artifact): try: is_ready = (artifact.state == UNBUILT and all(a.state == BUILT for a in artifact.dependencies)) except KeyError: is_ready = False return is_ready with self.lock: artifacts, _ = self._map_build_graph(root_artifact, lambda a: a, components) ready = [a for a in artifacts if is_ready_to_build(a)] return ready def _queue_worker_builds(self, artifacts): '''Send a set of chunks to the WorkerBuildQueuer class for building.''' logging.debug('Queuing more worker-builds to run') while len(artifacts) > 0: artifact = artifacts.pop() logging.debug( 'Requesting worker-build of %s (%s)' % (artifact.name, artifact.cache_key)) # TODO: launch build client print "Start building %s" % artifact.name job = gear.Job("reverse", artifact.cache_key) print "lock set as building" with self.lock: artifact.state = BUILDING print "set as building %s" % artifact.name if artifact.kind == 'chunk': # Chunk artifacts are not built independently # so when we're building any chunk artifact # we're also building all the chunk artifacts # in this source same_chunk_artifacts = [a for a in artifacts if a.cache_key == artifact.cache_key] for a in same_chunk_artifacts: a.state = BUILDING artifacts.remove(a) self.builder_client.submitJob(job) def _find_artifact(self, cache_key): artifacts, _ = self._map_build_graph(self.artifact, lambda a: a) wanted = [a for a in artifacts if a.cache_key == cache_key] if wanted: return wanted[0] else: return None def _mark_artifact_as_built(self, cache_key): print cache_key def set_state(a): if a.cache_key == artifact.cache_key: a.state = BUILT artifact = self._find_artifact(cache_key) print "lock set as built" with self.lock: print "set as built %s" % artifact.name artifact.state = BUILT if artifact.kind == 'chunk': # Building a single chunk artifact # yields all chunk artifacts for the given source # so we set the state of this source's artifacts # to BUILT self._map_build_graph(self.artifact, set_state) print artifact class BuildGraphClient(gear.Client): def __init__(self, controller): super(BuildGraphClient, self).__init__() self.controller = controller self.finished = False def handleWorkComplete(self, packet): job = super(BuildGraphClient, self).handleWorkComplete(packet) print "workcomplete" self.controller._process_build_graph(packet) return job def handleWorkData(self, packet): job = super(BuildGraphClient, self).handleWorkData(packet) print job.data[-1] job.data = [] return job def handleWorkFail(self, packet): job = super(BuildGraphClient, self).handleWorkFail(packet) print "workfail" return job def handleWorkException(self, packet): job = super(BuildGraphClient, self).handleWorkException(packet) print "workexception" return job def handleDisconnect(self, job): job = super(BuildGraphClient, self).handleDisconnect(job) print "disconnect" class BuilderClient(gear.Client): def __init__(self, controller): super(BuilderClient, self).__init__() self.controller = controller self.finished = False def handleWorkComplete(self, packet): job = super(BuilderClient, self).handleWorkComplete(packet) print "workcomplete" self.controller._mark_artifact_as_built(job.arguments) return job controller = theController() client = controller.graph_client #job = gear.Job("reverse", "test string") build_graph_request = {} build_graph_request['repo'] = "baserock:baserock/definitions" build_graph_request['ref'] = "master" build_graph_request['system'] = "systems/minimal-system-x86_64-generic.morph" s=json.dumps(build_graph_request) print "Json produced: " print s job = gear.Job("build-graph", s) client.submitJob(job) # loop so that client doesn't die while True: import time time.sleep(0.0001) if controller.artifact != None and controller.build_started == True: to_build = controller.find_artifacts_that_are_ready_to_build(controller.artifact) #print to_build controller._queue_worker_builds(to_build)