import gear import sys import json import threading import requests import urlparse import distbuild #TODO: values from settings cache_server = 'http://cache.baserock.org:8080' # Artifact build states. These are used to loosely track the state of the # remote cache. UNBUILT = 'not-built' BUILDING = 'building' BUILT = 'built' import logging logging.basicConfig() gear.Server() class SingleBuildController(): def __init__(self, requests_controller, request_id): self.request_id = request_id self.requests_controller = requests_controller self.graph_client = BuildGraphClient(self) self.builder_client = BuilderClient(self) self.cache_client = CacheRequestClient(self) self.builder_client.addServer('localhost') self.builder_client.waitForServer() self.graph_client.addServer('localhost') self.graph_client.waitForServer() self.cache_client.addServer('localhost') self.cache_client.waitForServer() self.artifact = None self.build_started = False def start_build(self, request): job = gear.Job("build-graph", request) self.graph_client.submitJob(job) def _process_build_graph(self, build_graph): print "Decoding artifact received" try: self.artifact = distbuild.decode_artifact_reference(build_graph) except ValueError as e: print "ERROR: Failed to decode artifact" print "ERROR: === build graph start ===" print build_graph print "ERROR: === build graph end ===" raise e print "Decoding artifact received done" # Mark everything as unbuilt to begin with. We'll query the actua # state from the cache in self._query_cache_state(). #TODO move to RQ def set_initial_state(artifact): artifact.state = UNBUILT print "Setting them as unbuilt" self._map_build_graph(self.artifact, set_initial_state) print "Setting them as unbuilt done" self._check_cache_state(self.artifact) def _check_cache_state(self, artifact): print "DEBUG: checking cache..." artifact_names = [] def collect_unbuilt_artifacts(artifact): if artifact.state == UNBUILT: artifact_names.append(artifact.basename()) self._map_build_graph(artifact, collect_unbuilt_artifacts) job = gear.Job("cache-request", json.dumps(artifact_names)) self.cache_client.submitJob(job) def _process_cache_response(self, cache_response): response = json.loads( cache_response) # Mark things as built that are now built. We only check the unbuilt # artifacts, so 'cache_state' will have no info for things we already # thought were built. # TODO; move to RQ, def update_state(artifact): if artifact.state == UNBUILT: is_in_cache = response[artifact.basename()] if is_in_cache: artifact.state = BUILT self._map_build_graph(self.artifact, update_state) # TODO: move to RQ self.build_started = True with requests_controller.lock_queue: requests_controller.lock_queue.notify() print "DEBUG: queque release!" def _map_build_graph(self, artifact, callback, components=[]): """Run callback on each artifact in the build graph and return result. If components is given, then only look at the components given and their dependencies. Also, return a list of the components after they have had callback called on them. """ result = [] mapped_components = [] done = set() if components: queue = list(components) else: queue = [artifact] while queue: a = queue.pop() if a not in done: result.append(callback(a)) queue.extend(a.dependencies) done.add(a) if a in components: mapped_components.append(a) return result, mapped_components def find_artifacts_that_are_ready_to_build(self, root_artifact, components=[]): '''Return unbuilt artifacts whose dependencies are all built. The 'root_artifact' parameter is expected to be a tree of ArtifactReference objects. These must have the 'state' attribute set to BUILT or UNBUILT. If 'components' is passed, then only those artifacts and their dependencies will be built. ''' def is_ready_to_build(artifact): # Just in case the state is not set yet. try: is_ready = (artifact.state == UNBUILT and all(a.state == BUILT for a in artifact.dependencies)) except KeyError: is_ready = False return is_ready artifacts, _ = self._map_build_graph(root_artifact, lambda a: a, components) ready = [a for a in artifacts if is_ready_to_build(a)] return ready def _queue_worker_builds(self, artifacts): '''Send a set of chunks to the WorkerBuildQueuer class for building.''' print "starting" while len(artifacts) > 0: artifact = artifacts.pop() print "going to queue %s" % artifact.name self.requests_controller.mark_as_building(artifact.cache_key) print "marked as queue %s" % artifact.name if artifact.kind == 'chunk': # Chunk artifacts are not built independently # so when we're building any chunk artifact # we're also building all the chunk artifacts # in this source same_chunk_artifacts = [a for a in artifacts if a.cache_key == artifact.cache_key] for a in same_chunk_artifacts: artifacts.remove(a) artifact_encoded = distbuild.encode_artifact_reference(artifact) print "encoded for job queue %s" % artifact.name # Make the build-artifact job use ARCHITECTURE to enable mutliarch # build network (TODO) job = gear.Job("build-artifact", artifact_encoded) self.builder_client.submitJob(job) print "send job queue %s" % artifact.name def _find_artifacts(self, cache_key, name=None): artifacts, _ = self._map_build_graph(self.artifact, lambda a: a) if not name: return [a for a in artifacts if a.cache_key == cache_key] else: return [a for a in artifacts if a.cache_key == cache_key and a.name == name] def _mark_artifact_as_built(self, cache_key, name=None): self.requests_controller.mark_as_built(cache_key) class CacheRequestClient(gear.Client): def __init__(self, controller): super(CacheRequestClient, self).__init__() self.controller = controller self.finished = False def handleWorkComplete(self, packet): job = super(CacheRequestClient, self).handleWorkComplete(packet) print "Cache workcomplete" self.controller._process_cache_response(job.data[-1]) return job class BuildGraphClient(gear.Client): def __init__(self, controller): super(BuildGraphClient, self).__init__() self.controller = controller self.finished = False def handleWorkComplete(self, packet): job = super(BuildGraphClient, self).handleWorkComplete(packet) print "Graph workcomplete" self.controller._process_build_graph(job.data[-1]) return job def handleWorkData(self, packet): job = super(BuildGraphClient, self).handleWorkData(packet) print job.data[-1] job.data = [] return job def handleWorkFail(self, packet): job = super(BuildGraphClient, self).handleWorkFail(packet) print "workfail" return job def handleWorkException(self, packet): job = super(BuildGraphClient, self).handleWorkException(packet) print "workexception" return job def handleDisconnect(self, job): job = super(BuildGraphClient, self).handleDisconnect(job) print "disconnect" class BuilderClient(gear.Client): def __init__(self, controller): super(BuilderClient, self).__init__() self.controller = controller self.finished = False def handleWorkComplete(self, packet): job = super(BuilderClient, self).handleWorkComplete(packet) print "Build workcomplete" self.controller._mark_artifact_as_built(job.data[-1]) return job # TODO: send different types of data? stdout, message...? # same for workFail, to identify worker dissconection and build # failures. def handleWorkData(self, packet): job = super(BuilderClient, self).handleWorkData(packet) #TODO: use cache_key to inform RQ to notify initiators later, # we need to note this cache_key somewhere #print job.data[-1].strip() # Cleanup previous data to speed up and save memory probably job.data = [] return job class RequestsController(): def __init__(self): self.next_id = 1 self.new_request_lock = threading.Lock() self.lock_queue = threading.Condition() self.build_requests = [] self.build_status_lock = threading.Lock() def add_request(self, request): json_request = json.dumps(request) request_data = {} with self.new_request_lock: request_data['id'] = self.next_id request_data['controller'] = SingleBuildController(self, self.next_id) # TODO: is this the right place to do this? request_data['controller'].start_build(json_request) request_data['request'] = request self.next_id += 1 self.build_requests.append(request_data) def queue_if_possible(self): print "DEBUG: Looking for jobs to queue" for request in self.build_requests: print request['id'] controller = request['controller'] if controller.artifact != None and controller.build_started == True: with self.build_status_lock: to_build = controller.find_artifacts_that_are_ready_to_build( controller.artifact) print to_build controller._queue_worker_builds(to_build) def mark_as_built(self, cache_key): with self.build_status_lock: for request in self.build_requests: controller = request['controller'] if controller.artifact != None and controller.build_started == True: artifacts = request['controller']._find_artifacts(cache_key) for artifact in artifacts: artifact.state = BUILT print "TO %s: Artifact %s built" % (request['id'],artifact.name) with self.lock_queue: self.lock_queue.notify() def mark_as_building(self, cache_key): with self.build_status_lock: for request in self.build_requests: controller = request['controller'] if controller.artifact != None and controller.build_started == True: artifacts = request['controller']._find_artifacts(cache_key) for artifact in artifacts: artifact.state = BUILDING print "TO %s: Artifact %s building" % (request['id'],artifact.name) #consider chunks case def loop(self): while True: with self.lock_queue: self.lock_queue.wait(20) self.queue_if_possible() request = {} request['repo'] = "baserock:baserock/definitions" request['ref'] = "fbce45e45da79e5c35341845ec3b3d7c321e6ff2" request['system'] = "systems/minimal-system-x86_64-generic.morph" requests_controller = RequestsController() requests_controller.add_request(request) requests_controller.add_request(request) requests_controller.loop()