import gear import sys import json import threading import requests import time import signal import urlparse import distbuild #TODO: values from settings cache_server = 'http://cache.baserock.org:8080' # Artifact build states. These are used to loosely track the state of the # remote cache. UNBUILT = 'not-built' BUILDING = 'building' BUILT = 'built' import logging logging.basicConfig() gear.Server() class SingleBuildController(): def __init__(self, requests_controller, request_id): self.request_id = request_id self.requests_controller = requests_controller self.graph_client = BuildGraphClient(self) self.builder_client = BuilderClient(self) self.cache_client = CacheRequestClient(self) self.builder_client.addServer('localhost') self.builder_client.waitForServer() self.graph_client.addServer('localhost') self.graph_client.waitForServer() self.cache_client.addServer('localhost') self.cache_client.waitForServer() self.artifact = None self.build_started = False self.build_initiated = False def start_build(self, request, request_id): job = gear.Job("build-graph", request) self.graph_client.submitJob(job) self.request_id = request_id def _process_build_graph(self, build_graph): print "Decoding artifact received" try: self.artifact = distbuild.decode_artifact_reference(build_graph) except ValueError as e: print "ERROR: Failed to decode artifact" print "ERROR: === build graph start ===" print build_graph print "ERROR: === build graph end ===" raise e print "Decoding artifact received done" # Mark everything as unbuilt to begin with. We'll query the actua # state from the cache in self._query_cache_state(). #TODO move to RQ def set_initial_state(artifact): artifact.state = UNBUILT print "Setting them as unbuilt" self._map_build_graph(self.artifact, set_initial_state) print "Setting them as unbuilt done" self.build_initiated = True self._check_cache_state() def _check_cache_state(self): print "DEBUG: checking cache..." # Check first building artifacts def mark_building_artifacts(artifact): if (artifact.state == UNBUILT and artifact.cache_key in requests_controller.building_dict): artifact.state = BUILDING building_dict = requests_controller.building_dict cache_key = artifact.cache_key if (cache_key in building_dict and not self.request_id in building_dict[cache_key]): requests_controller.building_dict[ artifact.cache_key].append(self.request_id) with requests_controller.build_status_lock: self._map_build_graph(self.artifact, mark_building_artifacts) artifact_names = [] def collect_unbuilt_artifacts(artifact): if artifact.state == UNBUILT: artifact_names.append(artifact.basename()) with requests_controller.build_status_lock: self._map_build_graph(self.artifact, collect_unbuilt_artifacts) job = gear.Job("cache-request", json.dumps(artifact_names)) self.cache_client.submitJob(job) def _process_cache_response(self, cache_response): response = json.loads( cache_response) # Mark things as built that are now built. We only check the unbuilt # artifacts, so 'cache_state' will have no info for things we already # thought were built. # TODO; move to RQ, def update_state(artifact): if artifact.state == UNBUILT: is_in_cache = response[artifact.basename()] if is_in_cache: artifact.state = BUILT self._map_build_graph(self.artifact, update_state) # TODO: move to RQ self.build_started = True with requests_controller.lock_queue: requests_controller.lock_queue.notify() print "DEBUG: queque release!" def _map_build_graph(self, artifact, callback, components=[]): """Run callback on each artifact in the build graph and return result. If components is given, then only look at the components given and their dependencies. Also, return a list of the components after they have had callback called on them. """ result = [] mapped_components = [] done = set() if components: queue = list(components) else: queue = [artifact] while queue: a = queue.pop() if a not in done: result.append(callback(a)) queue.extend(a.dependencies) done.add(a) if a in components: mapped_components.append(a) return result, mapped_components def find_artifacts_that_are_ready_to_build(self, root_artifact, components=[]): '''Return unbuilt artifacts whose dependencies are all built. The 'root_artifact' parameter is expected to be a tree of ArtifactReference objects. These must have the 'state' attribute set to BUILT or UNBUILT. If 'components' is passed, then only those artifacts and their dependencies will be built. ''' def is_ready_to_build(artifact): # Just in case the state is not set yet. try: is_ready = (artifact.state == UNBUILT and all(a.state == BUILT for a in artifact.dependencies)) except KeyError: is_ready = False return is_ready artifacts, _ = self._map_build_graph(root_artifact, lambda a: a, components) ready = [a for a in artifacts if is_ready_to_build(a)] return ready def _queue_worker_builds(self, artifacts): '''Send a set of chunks to the WorkerBuildQueuer class for building.''' print "starting" while len(artifacts) > 0: artifact = artifacts.pop() if artifact.kind == 'chunk': # Chunk artifacts are not built independently # so when we're building any chunk artifact # we're also building all the chunk artifacts # in this source same_chunk_artifacts = [a for a in artifacts if a.cache_key == artifact.cache_key] for a in same_chunk_artifacts: artifacts.remove(a) artifact_encoded = distbuild.encode_artifact_reference(artifact) # Make the build-artifact job use ARCHITECTURE to enable mutliarch # build network (TODO) job = gear.Job("build-artifact", artifact_encoded) self.builder_client.submitJob(job) self.requests_controller.mark_as_building(artifact.cache_key, job) def _find_artifacts(self, cache_key, name=None): artifacts, _ = self._map_build_graph(self.artifact, lambda a: a) if not name: return [a for a in artifacts if a.cache_key == cache_key] else: return [a for a in artifacts if a.cache_key == cache_key and a.name == name] def _mark_artifact_as_built(self, cache_key, name=None): self.requests_controller.mark_as_built(cache_key) class CancelationClient(gear.Client): def __init__(self): super(CancelationClient, self).__init__() def handleWorkComplete(self, packet): job = super(CancelationClient, self).handleWorkComplete(packet) print "Cancelation workcomplete" return job class CacheRequestClient(gear.Client): def __init__(self, controller): super(CacheRequestClient, self).__init__() self.controller = controller self.finished = False def handleWorkComplete(self, packet): job = super(CacheRequestClient, self).handleWorkComplete(packet) print "Cache workcomplete" self.controller._process_cache_response(job.data[-1]) return job class BuildGraphClient(gear.Client): def __init__(self, controller): super(BuildGraphClient, self).__init__() self.controller = controller self.finished = False def handleWorkComplete(self, packet): job = super(BuildGraphClient, self).handleWorkComplete(packet) print "Graph workcomplete" self.controller._process_build_graph(job.data[-1]) return job def handleWorkData(self, packet): job = super(BuildGraphClient, self).handleWorkData(packet) print job.data[-1] job.data = [] return job def handleWorkFail(self, packet): job = super(BuildGraphClient, self).handleWorkFail(packet) print "workfail" return job def handleWorkException(self, packet): job = super(BuildGraphClient, self).handleWorkException(packet) print "workexception" return job def handleDisconnect(self, job): job = super(BuildGraphClient, self).handleDisconnect(job) print "disconnect" class BuilderClient(gear.Client): def __init__(self, controller): super(BuilderClient, self).__init__() self.controller = controller self.finished = False def handleWorkComplete(self, packet): job = super(BuilderClient, self).handleWorkComplete(packet) print "Build workcomplete" self.controller._mark_artifact_as_built(job.data[-1]) return job # TODO: send different types of data? stdout, message...? # same for workFail, to identify worker dissconection and build # failures. def handleWorkData(self, packet): job = super(BuilderClient, self).handleWorkData(packet) #TODO: use cache_key to inform RQ to notify initiators later, # we need to note this cache_key somewhere #print job.data[-1].strip() # Cleanup previous data to speed up and save memory probably job.data = [] return job class RequestsController(threading.Thread): def __init__(self): super(RequestsController, self).__init__() self.building_dict = {} self.jobs_dict = {} self.next_id = 1 self.new_request_lock = threading.Lock() self.lock_queue = threading.Condition() self.build_requests = [] self.build_status_lock = threading.Lock() self.stopped = False self.cancel_client = CancelationClient() self.cancel_client.addServer('localhost') self.cancel_client.waitForServer() def add_request(self, request, job=None): json_request = json.dumps(request) print "DEBUG: adding request - %s" % json_request request_data = {} with self.new_request_lock: request_data['id'] = self.next_id controller = SingleBuildController(self, self.next_id) # TODO: is this the right place to do this? controller.start_build(json_request, self.next_id) request_data['controller'] = controller request_data['request'] = request request_data['job'] = job self.next_id += 1 self.build_requests.append(request_data) def queue_if_possible(self): print "DEBUG: Looking for jobs to queue" for request in self.build_requests: print request['id'] controller = request['controller'] if controller.artifact != None and controller.build_started == True: with self.build_status_lock: to_build = controller.find_artifacts_that_are_ready_to_build( controller.artifact) print to_build controller._queue_worker_builds(to_build) def mark_as_built(self, cache_key): with self.build_status_lock: print "DEBUG: marking as built %s" % cache_key # TODO, was it cancelled meanwhile?? del self.building_dict[cache_key] del self.jobs_dict[cache_key] print self.building_dict for request in self.build_requests: controller = request['controller'] if controller.artifact != None and controller.build_initiated == True: artifacts = controller._find_artifacts(cache_key) for artifact in artifacts: artifact.state = BUILT info = "TO %s: Artifact %s built" % (request['id'], artifact.name) self.send_data_to_client(request['job'], info) print "finished setting as built" with self.lock_queue: self.lock_queue.notify() def send_data_to_client(self, job, msg): if job: job.sendWorkData(msg) print msg def mark_as_building(self, cache_key, job): with self.build_status_lock: print "DEBUG: marking as building %s" % cache_key requests_building = [] print self.building_dict for request in self.build_requests: controller = request['controller'] request_id = request['id'] if controller.artifact != None and controller.build_initiated == True: artifacts = controller._find_artifacts(cache_key) for artifact in artifacts: if not request_id in requests_building: requests_building.append(request_id) artifact.state = BUILDING info = "TO %s: Artifact %s building" % (request['id'],artifact.name) self.send_data_to_client(request['job'], info) self.building_dict[cache_key] = requests_building self.jobs_dict[cache_key] = job print "finished setting as building" def cancel_request(self, request_id_cancel): with self.build_status_lock: for request in self.build_requests: request_id = request['id'] if request_id == request_id_cancel: self.build_requests.remove(request) controller = request['controller'] break return to_remove = [] for cache_key, requests in self.building_dict.iteritems(): print "DEBUG: cancelling %s" % cache_key if requests == [request_id_cancel]: job = self.jobs_dict[cache_key] gear.CancelJobAdminRequest(job.handle) to_remove.append(cache_key) for cache_key in to_remove: del self.building_dict[cache_key] del self.jobs_dict[cache_key] job = gear.Job("cancel-%s" % cache_key, cache_key) self.cancel_client.submitJob(job) print "Finished cancelling" def shutdown(self): self.stopped = True with self.lock_queue: self.lock_queue.notify() def run(self): while not self.stopped: with self.lock_queue: self.lock_queue.wait() if not self.stopped: self.queue_if_possible() class RequestsManager(threading.Thread): def __init__(self, requests_controller): super(RequestsManager, self).__init__() self.requests_controller = requests_controller self.worker = gear.Worker('controller') self.worker.addServer('localhost') self.worker.registerFunction("build-request") self.stopped = False def run(self): while not self.stopped: try: print "DEBUG: Waiting for job" job = self.worker.getJob() self._handle_job(job) except gear.InterruptedError: print 'We were asked to stop waiting for jobs' def shutdown(self): self.stopped = True self.worker.stopWaitingForJobs() def _handle_job(self, job): build_request=json.loads(job.arguments) self.requests_controller.add_request(build_request, job) request = {} request['repo'] = "baserock:baserock/definitions" request['ref'] = "fbce45e45da79e5c35341845ec3b3d7c321e6ff2" request['system'] = "systems/minimal-system-x86_64-generic.morph" # Command line requests_controller = RequestsController() requests_manager = RequestsManager(requests_controller) def term_handler(signum, frame): requests_manager.shutdown() requests_controller.shutdown() signal.signal(signal.SIGTERM, term_handler) requests_controller.start() requests_manager.start() time.sleep(15) print "START CANCELATION" requests_controller.cancel_request(1) while not requests_controller.stopped: try: time.sleep(3) except KeyboardInterrupt: print "Ctrl + C: asking tasks to exit nicely...\n" requests_manager.shutdown() requests_controller.shutdown()