import gear import sys import json import threading import requests import urlparse import distbuild #TODO: values from settings cache_server = 'http://cache.baserock.org:8080' # Artifact build states. These are used to loosely track the state of the # remote cache. UNBUILT = 'not-built' BUILDING = 'building' BUILT = 'built' import logging logging.basicConfig() gear.Server() class theController(): def __init__(self): self.lock = threading.Lock() self.graph_client = BuildGraphClient(self) self.builder_client = BuilderClient(self) self.cache_client = CacheRequestClient(self) self.builder_client.addServer('localhost') self.builder_client.waitForServer() # Wait for at least one server to be connected self.graph_client.addServer('localhost') self.graph_client.waitForServer() # Wait for at least one server to be connected self.cache_client.addServer('localhost') self.cache_client.waitForServer() # Wait for at least one server to be connected self.artifact = None self.build_started = False def _process_build_graph(self, build_graph): print "Decoding artifact received" try: self.artifact = distbuild.decode_artifact_reference(build_graph) except ValueError as e: print "ERROR: Failed to decode artifact" print "ERROR: === build graph start ===" print build_graph print "ERROR: === build graph end ===" raise e print "Decoding artifact received done" # Mark everything as unbuilt to begin with. We'll query the actua # state from the cache in self._query_cache_state(). def set_initial_state(artifact): artifact.state = UNBUILT print "Setting them as unbuilt" self._map_build_graph(self.artifact, set_initial_state) print "Setting them as unbuilt done" # TODO: check cache before marking as started: # http://stackoverflow.com/questions/9110593/asynchronous-requests-with-python-requests self._check_cache_state(self.artifact) def _check_cache_state(self, artifact): print "DEBUG: checking cache..." artifact_names = [] def collect_unbuilt_artifacts(artifact): if artifact.state == UNBUILT: artifact_names.append(artifact.basename()) self._map_build_graph(artifact, collect_unbuilt_artifacts) job = gear.Job("cache-request", json.dumps(artifact_names)) self.cache_client.submitJob(job) def _process_cache_response(self, cache_response): response = json.loads( cache_response) for key, value in response.iteritems(): if value: cache_key, kind, name = key.split('.') self._mark_artifact_as_built(cache_key, name=name) else: print key print ".. wasnt built" self.build_started = True def _map_build_graph(self, artifact, callback, components=[]): """Run callback on each artifact in the build graph and return result. If components is given, then only look at the components given and their dependencies. Also, return a list of the components after they have had callback called on them. """ result = [] mapped_components = [] done = set() if components: queue = list(components) else: queue = [artifact] while queue: a = queue.pop() if a not in done: result.append(callback(a)) queue.extend(a.dependencies) done.add(a) if a in components: mapped_components.append(a) return result, mapped_components def find_artifacts_that_are_ready_to_build(self, root_artifact, components=[]): '''Return unbuilt artifacts whose dependencies are all built. The 'root_artifact' parameter is expected to be a tree of ArtifactReference objects. These must have the 'state' attribute set to BUILT or UNBUILT. If 'components' is passed, then only those artifacts and their dependencies will be built. ''' def is_ready_to_build(artifact): # Just in case the state is not set yet. try: is_ready = (artifact.state == UNBUILT and all(a.state == BUILT for a in artifact.dependencies)) except KeyError: is_ready = False return is_ready with self.lock: artifacts, _ = self._map_build_graph(root_artifact, lambda a: a, components) ready = [a for a in artifacts if is_ready_to_build(a)] return ready def _queue_worker_builds(self, artifacts): '''Send a set of chunks to the WorkerBuildQueuer class for building.''' logging.debug('Queuing more worker-builds to run') while len(artifacts) > 0: artifact = artifacts.pop() logging.debug( 'Requesting worker-build of %s (%s)' % (artifact.name, artifact.cache_key)) print "Start building %s" % artifact.name artifact_encoded = distbuild.encode_artifact_reference(artifact) job = gear.Job("build-artifact", artifact_encoded) print "lock set as building" with self.lock: artifact.state = BUILDING print "set as building %s" % artifact.name if artifact.kind == 'chunk': # Chunk artifacts are not built independently # so when we're building any chunk artifact # we're also building all the chunk artifacts # in this source same_chunk_artifacts = [a for a in artifacts if a.cache_key == artifact.cache_key] for a in same_chunk_artifacts: a.state = BUILDING artifacts.remove(a) self.builder_client.submitJob(job) def _find_artifact(self, cache_key, name): artifacts, _ = self._map_build_graph(self.artifact, lambda a: a) wanted = [a for a in artifacts if a.cache_key == cache_key] # TODO: code ugly as hell if wanted: if name: filtered = [b for b in wanted if b.name == name] if filtered: return filtered[0] else: return None return wanted[0] else: return None def _mark_artifact_as_built(self, cache_key, name=None): def set_state(a): if a.cache_key == artifact.cache_key: a.state = BUILT artifact = self._find_artifact(cache_key, name) with self.lock: artifact.state = BUILT if not name and artifact.kind == 'chunk': # Building a single chunk artifact # yields all chunk artifacts for the given source # so we set the state of this source's artifacts # to BUILT self._map_build_graph(self.artifact, set_state) class CacheRequestClient(gear.Client): def __init__(self, controller): super(CacheRequestClient, self).__init__() self.controller = controller self.finished = False def handleWorkComplete(self, packet): job = super(CacheRequestClient, self).handleWorkComplete(packet) print "workcomplete" self.controller._process_cache_response(job.data[-1]) return job class BuildGraphClient(gear.Client): def __init__(self, controller): super(BuildGraphClient, self).__init__() self.controller = controller self.finished = False def handleWorkComplete(self, packet): job = super(BuildGraphClient, self).handleWorkComplete(packet) print "workcomplete" self.controller._process_build_graph(job.data[-1]) return job def handleWorkData(self, packet): job = super(BuildGraphClient, self).handleWorkData(packet) print job.data[-1] job.data = [] return job def handleWorkFail(self, packet): job = super(BuildGraphClient, self).handleWorkFail(packet) print "workfail" return job def handleWorkException(self, packet): job = super(BuildGraphClient, self).handleWorkException(packet) print "workexception" return job def handleDisconnect(self, job): job = super(BuildGraphClient, self).handleDisconnect(job) print "disconnect" class BuilderClient(gear.Client): def __init__(self, controller): super(BuilderClient, self).__init__() self.controller = controller self.finished = False def handleWorkComplete(self, packet): job = super(BuilderClient, self).handleWorkComplete(packet) print "workcomplete" self.controller._mark_artifact_as_built(job.data[-1]) return job # TODO: send different types of data? stdout, message...? # same for workFail, to identify worker dissconection and build # failures. def handleWorkData(self, packet): job = super(BuilderClient, self).handleWorkData(packet) print job.data[-1].strip() # Cleanup previous data to speed up and save memory probably job.data = [] return job controller = theController() client = controller.graph_client #job = gear.Job("reverse", "test string") build_graph_request = {} build_graph_request['repo'] = "baserock:baserock/definitions" build_graph_request['ref'] = "fbce45e45da79e5c35341845ec3b3d7c321e6ff2" build_graph_request['system'] = "systems/minimal-system-x86_64-generic.morph" s=json.dumps(build_graph_request) print "Json produced: " print s job = gear.Job("build-graph", s) client.submitJob(job) # loop so that client doesn't die while True: import time time.sleep(0.0001) if controller.artifact != None and controller.build_started == True: to_build = controller.find_artifacts_that_are_ready_to_build(controller.artifact) #print to_build controller._queue_worker_builds(to_build)