From c9426219a8f522d0834e9e5ca2b0385f61e0663b Mon Sep 17 00:00:00 2001 From: Pedro Alvarez Date: Mon, 28 Mar 2016 16:48:57 +0000 Subject: Rename client.py to controller.py Change-Id: Ife9fb074ff20cfdd940d93d305b6e4d2aaa1b56b --- gear/client.py | 395 ----------------------------------------------------- gear/controller.py | 395 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 395 insertions(+), 395 deletions(-) delete mode 100644 gear/client.py create mode 100644 gear/controller.py diff --git a/gear/client.py b/gear/client.py deleted file mode 100644 index 9dc1df60..00000000 --- a/gear/client.py +++ /dev/null @@ -1,395 +0,0 @@ -import gear -import sys -import json -import threading -import requests -import time -import signal -import urlparse -import distbuild -#TODO: values from settings -cache_server = 'http://cache.baserock.org:8080' - -# Artifact build states. These are used to loosely track the state of the -# remote cache. -UNBUILT = 'not-built' -BUILDING = 'building' -BUILT = 'built' - -import logging -logging.basicConfig() - -gear.Server() - -class SingleBuildController(): - def __init__(self, requests_controller, request_id): - self.request_id = request_id - self.requests_controller = requests_controller - self.graph_client = BuildGraphClient(self) - self.builder_client = BuilderClient(self) - self.cache_client = CacheRequestClient(self) - self.builder_client.addServer('localhost') - self.builder_client.waitForServer() - self.graph_client.addServer('localhost') - self.graph_client.waitForServer() - self.cache_client.addServer('localhost') - self.cache_client.waitForServer() - self.artifact = None - self.build_started = False - self.build_initiated = False - - def start_build(self, request): - job = gear.Job("build-graph", request) - self.graph_client.submitJob(job) - - def _process_build_graph(self, build_graph): - print "Decoding artifact received" - try: - self.artifact = distbuild.decode_artifact_reference(build_graph) - except ValueError as e: - print "ERROR: Failed to decode artifact" - print "ERROR: === build graph start ===" - print build_graph - print "ERROR: === build graph end ===" - raise e - - print "Decoding artifact received done" - # Mark everything as unbuilt to begin with. We'll query the actua - # state from the cache in self._query_cache_state(). - #TODO move to RQ - def set_initial_state(artifact): - artifact.state = UNBUILT - print "Setting them as unbuilt" - self._map_build_graph(self.artifact, set_initial_state) - print "Setting them as unbuilt done" - self.build_initiated = True - self._check_cache_state() - - - def _check_cache_state(self): - print "DEBUG: checking cache..." - - # Check first building artifacts - def mark_building_artifacts(artifact): - if (artifact.state == UNBUILT and artifact.cache_key - in requests_controller.building_list): - artifact.state = BUILDING - with requests_controller.build_status_lock: - self._map_build_graph(self.artifact, mark_building_artifacts) - - - artifact_names = [] - def collect_unbuilt_artifacts(artifact): - if artifact.state == UNBUILT: - artifact_names.append(artifact.basename()) - with requests_controller.build_status_lock: - self._map_build_graph(self.artifact, collect_unbuilt_artifacts) - - job = gear.Job("cache-request", json.dumps(artifact_names)) - self.cache_client.submitJob(job) - - - def _process_cache_response(self, cache_response): - response = json.loads( cache_response) - - # Mark things as built that are now built. We only check the unbuilt - # artifacts, so 'cache_state' will have no info for things we already - # thought were built. - # TODO; move to RQ, - def update_state(artifact): - if artifact.state == UNBUILT: - is_in_cache = response[artifact.basename()] - if is_in_cache: - artifact.state = BUILT - self._map_build_graph(self.artifact, update_state) - # TODO: move to RQ - self.build_started = True - with requests_controller.lock_queue: - requests_controller.lock_queue.notify() - print "DEBUG: queque release!" - - def _map_build_graph(self, artifact, callback, components=[]): - """Run callback on each artifact in the build graph and return result. - - If components is given, then only look at the components given and - their dependencies. Also, return a list of the components after they - have had callback called on them. - - """ - result = [] - mapped_components = [] - done = set() - if components: - queue = list(components) - else: - queue = [artifact] - while queue: - a = queue.pop() - if a not in done: - result.append(callback(a)) - queue.extend(a.dependencies) - done.add(a) - if a in components: - mapped_components.append(a) - return result, mapped_components - - def find_artifacts_that_are_ready_to_build(self, root_artifact, - components=[]): - '''Return unbuilt artifacts whose dependencies are all built. - - The 'root_artifact' parameter is expected to be a tree of - ArtifactReference objects. These must have the 'state' attribute set - to BUILT or UNBUILT. If 'components' is passed, then only those - artifacts and their dependencies will be built. - - ''' - def is_ready_to_build(artifact): - # Just in case the state is not set yet. - try: - is_ready = (artifact.state == UNBUILT and - all(a.state == BUILT - for a in artifact.dependencies)) - except KeyError: - is_ready = False - return is_ready - - artifacts, _ = self._map_build_graph(root_artifact, lambda a: a, - components) - ready = [a for a in artifacts if is_ready_to_build(a)] - return ready - - def _queue_worker_builds(self, artifacts): - '''Send a set of chunks to the WorkerBuildQueuer class for building.''' - print "starting" - while len(artifacts) > 0: - artifact = artifacts.pop() - print "going to queue %s" % artifact.name - self.requests_controller.mark_as_building(artifact.cache_key) - print "marked as queue %s" % artifact.name - if artifact.kind == 'chunk': - # Chunk artifacts are not built independently - # so when we're building any chunk artifact - # we're also building all the chunk artifacts - # in this source - same_chunk_artifacts = [a for a in artifacts - if a.cache_key == artifact.cache_key] - for a in same_chunk_artifacts: - artifacts.remove(a) - artifact_encoded = distbuild.encode_artifact_reference(artifact) - print "encoded for job queue %s" % artifact.name - # Make the build-artifact job use ARCHITECTURE to enable mutliarch - # build network (TODO) - job = gear.Job("build-artifact", artifact_encoded) - self.builder_client.submitJob(job) - print "send job queue %s" % artifact.name - - def _find_artifacts(self, cache_key, name=None): - artifacts, _ = self._map_build_graph(self.artifact, lambda a: a) - if not name: - return [a for a in artifacts if a.cache_key == cache_key] - else: - return [a for a in artifacts if a.cache_key == cache_key - and a.name == name] - def _mark_artifact_as_built(self, cache_key, name=None): - self.requests_controller.mark_as_built(cache_key) - - -class CacheRequestClient(gear.Client): - def __init__(self, controller): - super(CacheRequestClient, self).__init__() - self.controller = controller - self.finished = False - - def handleWorkComplete(self, packet): - job = super(CacheRequestClient, self).handleWorkComplete(packet) - print "Cache workcomplete" - self.controller._process_cache_response(job.data[-1]) - return job - - -class BuildGraphClient(gear.Client): - def __init__(self, controller): - super(BuildGraphClient, self).__init__() - self.controller = controller - self.finished = False - - def handleWorkComplete(self, packet): - job = super(BuildGraphClient, self).handleWorkComplete(packet) - print "Graph workcomplete" - self.controller._process_build_graph(job.data[-1]) - return job - - def handleWorkData(self, packet): - job = super(BuildGraphClient, self).handleWorkData(packet) - print job.data[-1] - job.data = [] - return job - - def handleWorkFail(self, packet): - job = super(BuildGraphClient, self).handleWorkFail(packet) - print "workfail" - return job - - def handleWorkException(self, packet): - job = super(BuildGraphClient, self).handleWorkException(packet) - print "workexception" - return job - - def handleDisconnect(self, job): - job = super(BuildGraphClient, self).handleDisconnect(job) - print "disconnect" - - - -class BuilderClient(gear.Client): - def __init__(self, controller): - super(BuilderClient, self).__init__() - self.controller = controller - self.finished = False - - def handleWorkComplete(self, packet): - job = super(BuilderClient, self).handleWorkComplete(packet) - print "Build workcomplete" - self.controller._mark_artifact_as_built(job.data[-1]) - return job - - # TODO: send different types of data? stdout, message...? - # same for workFail, to identify worker dissconection and build - # failures. - def handleWorkData(self, packet): - job = super(BuilderClient, self).handleWorkData(packet) - #TODO: use cache_key to inform RQ to notify initiators later, - # we need to note this cache_key somewhere - #print job.data[-1].strip() - # Cleanup previous data to speed up and save memory probably - job.data = [] - return job - -class RequestsController(threading.Thread): - def __init__(self): - super(RequestsController, self).__init__() - self.building_list = [] - self.next_id = 1 - self.new_request_lock = threading.Lock() - self.lock_queue = threading.Condition() - self.build_requests = [] - self.build_status_lock = threading.Lock() - self.stopped = False - - def add_request(self, request): - json_request = json.dumps(request) - print "DEBUG: adding request - %s" % json_request - request_data = {} - with self.new_request_lock: - request_data['id'] = self.next_id - request_data['controller'] = SingleBuildController(self, self.next_id) - # TODO: is this the right place to do this? - request_data['controller'].start_build(json_request) - request_data['request'] = request - self.next_id += 1 - self.build_requests.append(request_data) - - def queue_if_possible(self): - print "DEBUG: Looking for jobs to queue" - for request in self.build_requests: - print request['id'] - controller = request['controller'] - if controller.artifact != None and controller.build_started == True: - with self.build_status_lock: - to_build = controller.find_artifacts_that_are_ready_to_build( - controller.artifact) - print to_build - controller._queue_worker_builds(to_build) - - - def mark_as_built(self, cache_key): - with self.build_status_lock: - self.building_list.remove(cache_key) - print self.building_list - for request in self.build_requests: - controller = request['controller'] - if controller.artifact != None and controller.build_initiated == True: - artifacts = request['controller']._find_artifacts(cache_key) - for artifact in artifacts: - artifact.state = BUILT - print "TO %s: Artifact %s built" % (request['id'],artifact.name) - - with self.lock_queue: - self.lock_queue.notify() - - - def mark_as_building(self, cache_key): - with self.build_status_lock: - self.building_list.append(cache_key) - print self.building_list - for request in self.build_requests: - controller = request['controller'] - if controller.artifact != None and controller.build_initiated == True: - artifacts = request['controller']._find_artifacts(cache_key) - for artifact in artifacts: - artifact.state = BUILDING - print "TO %s: Artifact %s building" % (request['id'],artifact.name) - - def shutdown(self): - self.stopped = True - with self.lock_queue: - self.lock_queue.notify() - - def run(self): - while not self.stopped: - with self.lock_queue: - self.lock_queue.wait() - if not self.stopped: - self.queue_if_possible() - - -class RequestsManager(threading.Thread): - def __init__(self, requests_controller): - super(RequestsManager, self).__init__() - self.requests_controller = requests_controller - self.worker = gear.Worker('controller') - self.worker.addServer('localhost') - self.worker.registerFunction("build-request") - self.stopped = False - - def run(self): - while not self.stopped: - try: - print "DEBUG: Waiting for job" - job = self.worker.getJob() - self._handle_job(job) - except gear.InterruptedError: - print 'We were asked to stop waiting for jobs' - - def shutdown(self): - self.stopped = True - self.worker.stopWaitingForJobs() - - def _handle_job(self, job): - build_request=json.loads(job.arguments) - self.requests_controller.add_request(build_request) - -request = {} -request['repo'] = "baserock:baserock/definitions" -request['ref'] = "fbce45e45da79e5c35341845ec3b3d7c321e6ff2" -request['system'] = "systems/minimal-system-x86_64-generic.morph" - -# Command line -requests_controller = RequestsController() -requests_manager = RequestsManager(requests_controller) - -def term_handler(signum, frame): - requests_manager.shutdown() - requests_controller.shutdown() -signal.signal(signal.SIGTERM, term_handler) - -requests_controller.start() -requests_manager.start() - -while not requests_controller.stopped: - try: - time.sleep(3) - except KeyboardInterrupt: - print "Ctrl + C: asking tasks to exit nicely...\n" - requests_manager.shutdown() - requests_controller.shutdown() diff --git a/gear/controller.py b/gear/controller.py new file mode 100644 index 00000000..9dc1df60 --- /dev/null +++ b/gear/controller.py @@ -0,0 +1,395 @@ +import gear +import sys +import json +import threading +import requests +import time +import signal +import urlparse +import distbuild +#TODO: values from settings +cache_server = 'http://cache.baserock.org:8080' + +# Artifact build states. These are used to loosely track the state of the +# remote cache. +UNBUILT = 'not-built' +BUILDING = 'building' +BUILT = 'built' + +import logging +logging.basicConfig() + +gear.Server() + +class SingleBuildController(): + def __init__(self, requests_controller, request_id): + self.request_id = request_id + self.requests_controller = requests_controller + self.graph_client = BuildGraphClient(self) + self.builder_client = BuilderClient(self) + self.cache_client = CacheRequestClient(self) + self.builder_client.addServer('localhost') + self.builder_client.waitForServer() + self.graph_client.addServer('localhost') + self.graph_client.waitForServer() + self.cache_client.addServer('localhost') + self.cache_client.waitForServer() + self.artifact = None + self.build_started = False + self.build_initiated = False + + def start_build(self, request): + job = gear.Job("build-graph", request) + self.graph_client.submitJob(job) + + def _process_build_graph(self, build_graph): + print "Decoding artifact received" + try: + self.artifact = distbuild.decode_artifact_reference(build_graph) + except ValueError as e: + print "ERROR: Failed to decode artifact" + print "ERROR: === build graph start ===" + print build_graph + print "ERROR: === build graph end ===" + raise e + + print "Decoding artifact received done" + # Mark everything as unbuilt to begin with. We'll query the actua + # state from the cache in self._query_cache_state(). + #TODO move to RQ + def set_initial_state(artifact): + artifact.state = UNBUILT + print "Setting them as unbuilt" + self._map_build_graph(self.artifact, set_initial_state) + print "Setting them as unbuilt done" + self.build_initiated = True + self._check_cache_state() + + + def _check_cache_state(self): + print "DEBUG: checking cache..." + + # Check first building artifacts + def mark_building_artifacts(artifact): + if (artifact.state == UNBUILT and artifact.cache_key + in requests_controller.building_list): + artifact.state = BUILDING + with requests_controller.build_status_lock: + self._map_build_graph(self.artifact, mark_building_artifacts) + + + artifact_names = [] + def collect_unbuilt_artifacts(artifact): + if artifact.state == UNBUILT: + artifact_names.append(artifact.basename()) + with requests_controller.build_status_lock: + self._map_build_graph(self.artifact, collect_unbuilt_artifacts) + + job = gear.Job("cache-request", json.dumps(artifact_names)) + self.cache_client.submitJob(job) + + + def _process_cache_response(self, cache_response): + response = json.loads( cache_response) + + # Mark things as built that are now built. We only check the unbuilt + # artifacts, so 'cache_state' will have no info for things we already + # thought were built. + # TODO; move to RQ, + def update_state(artifact): + if artifact.state == UNBUILT: + is_in_cache = response[artifact.basename()] + if is_in_cache: + artifact.state = BUILT + self._map_build_graph(self.artifact, update_state) + # TODO: move to RQ + self.build_started = True + with requests_controller.lock_queue: + requests_controller.lock_queue.notify() + print "DEBUG: queque release!" + + def _map_build_graph(self, artifact, callback, components=[]): + """Run callback on each artifact in the build graph and return result. + + If components is given, then only look at the components given and + their dependencies. Also, return a list of the components after they + have had callback called on them. + + """ + result = [] + mapped_components = [] + done = set() + if components: + queue = list(components) + else: + queue = [artifact] + while queue: + a = queue.pop() + if a not in done: + result.append(callback(a)) + queue.extend(a.dependencies) + done.add(a) + if a in components: + mapped_components.append(a) + return result, mapped_components + + def find_artifacts_that_are_ready_to_build(self, root_artifact, + components=[]): + '''Return unbuilt artifacts whose dependencies are all built. + + The 'root_artifact' parameter is expected to be a tree of + ArtifactReference objects. These must have the 'state' attribute set + to BUILT or UNBUILT. If 'components' is passed, then only those + artifacts and their dependencies will be built. + + ''' + def is_ready_to_build(artifact): + # Just in case the state is not set yet. + try: + is_ready = (artifact.state == UNBUILT and + all(a.state == BUILT + for a in artifact.dependencies)) + except KeyError: + is_ready = False + return is_ready + + artifacts, _ = self._map_build_graph(root_artifact, lambda a: a, + components) + ready = [a for a in artifacts if is_ready_to_build(a)] + return ready + + def _queue_worker_builds(self, artifacts): + '''Send a set of chunks to the WorkerBuildQueuer class for building.''' + print "starting" + while len(artifacts) > 0: + artifact = artifacts.pop() + print "going to queue %s" % artifact.name + self.requests_controller.mark_as_building(artifact.cache_key) + print "marked as queue %s" % artifact.name + if artifact.kind == 'chunk': + # Chunk artifacts are not built independently + # so when we're building any chunk artifact + # we're also building all the chunk artifacts + # in this source + same_chunk_artifacts = [a for a in artifacts + if a.cache_key == artifact.cache_key] + for a in same_chunk_artifacts: + artifacts.remove(a) + artifact_encoded = distbuild.encode_artifact_reference(artifact) + print "encoded for job queue %s" % artifact.name + # Make the build-artifact job use ARCHITECTURE to enable mutliarch + # build network (TODO) + job = gear.Job("build-artifact", artifact_encoded) + self.builder_client.submitJob(job) + print "send job queue %s" % artifact.name + + def _find_artifacts(self, cache_key, name=None): + artifacts, _ = self._map_build_graph(self.artifact, lambda a: a) + if not name: + return [a for a in artifacts if a.cache_key == cache_key] + else: + return [a for a in artifacts if a.cache_key == cache_key + and a.name == name] + def _mark_artifact_as_built(self, cache_key, name=None): + self.requests_controller.mark_as_built(cache_key) + + +class CacheRequestClient(gear.Client): + def __init__(self, controller): + super(CacheRequestClient, self).__init__() + self.controller = controller + self.finished = False + + def handleWorkComplete(self, packet): + job = super(CacheRequestClient, self).handleWorkComplete(packet) + print "Cache workcomplete" + self.controller._process_cache_response(job.data[-1]) + return job + + +class BuildGraphClient(gear.Client): + def __init__(self, controller): + super(BuildGraphClient, self).__init__() + self.controller = controller + self.finished = False + + def handleWorkComplete(self, packet): + job = super(BuildGraphClient, self).handleWorkComplete(packet) + print "Graph workcomplete" + self.controller._process_build_graph(job.data[-1]) + return job + + def handleWorkData(self, packet): + job = super(BuildGraphClient, self).handleWorkData(packet) + print job.data[-1] + job.data = [] + return job + + def handleWorkFail(self, packet): + job = super(BuildGraphClient, self).handleWorkFail(packet) + print "workfail" + return job + + def handleWorkException(self, packet): + job = super(BuildGraphClient, self).handleWorkException(packet) + print "workexception" + return job + + def handleDisconnect(self, job): + job = super(BuildGraphClient, self).handleDisconnect(job) + print "disconnect" + + + +class BuilderClient(gear.Client): + def __init__(self, controller): + super(BuilderClient, self).__init__() + self.controller = controller + self.finished = False + + def handleWorkComplete(self, packet): + job = super(BuilderClient, self).handleWorkComplete(packet) + print "Build workcomplete" + self.controller._mark_artifact_as_built(job.data[-1]) + return job + + # TODO: send different types of data? stdout, message...? + # same for workFail, to identify worker dissconection and build + # failures. + def handleWorkData(self, packet): + job = super(BuilderClient, self).handleWorkData(packet) + #TODO: use cache_key to inform RQ to notify initiators later, + # we need to note this cache_key somewhere + #print job.data[-1].strip() + # Cleanup previous data to speed up and save memory probably + job.data = [] + return job + +class RequestsController(threading.Thread): + def __init__(self): + super(RequestsController, self).__init__() + self.building_list = [] + self.next_id = 1 + self.new_request_lock = threading.Lock() + self.lock_queue = threading.Condition() + self.build_requests = [] + self.build_status_lock = threading.Lock() + self.stopped = False + + def add_request(self, request): + json_request = json.dumps(request) + print "DEBUG: adding request - %s" % json_request + request_data = {} + with self.new_request_lock: + request_data['id'] = self.next_id + request_data['controller'] = SingleBuildController(self, self.next_id) + # TODO: is this the right place to do this? + request_data['controller'].start_build(json_request) + request_data['request'] = request + self.next_id += 1 + self.build_requests.append(request_data) + + def queue_if_possible(self): + print "DEBUG: Looking for jobs to queue" + for request in self.build_requests: + print request['id'] + controller = request['controller'] + if controller.artifact != None and controller.build_started == True: + with self.build_status_lock: + to_build = controller.find_artifacts_that_are_ready_to_build( + controller.artifact) + print to_build + controller._queue_worker_builds(to_build) + + + def mark_as_built(self, cache_key): + with self.build_status_lock: + self.building_list.remove(cache_key) + print self.building_list + for request in self.build_requests: + controller = request['controller'] + if controller.artifact != None and controller.build_initiated == True: + artifacts = request['controller']._find_artifacts(cache_key) + for artifact in artifacts: + artifact.state = BUILT + print "TO %s: Artifact %s built" % (request['id'],artifact.name) + + with self.lock_queue: + self.lock_queue.notify() + + + def mark_as_building(self, cache_key): + with self.build_status_lock: + self.building_list.append(cache_key) + print self.building_list + for request in self.build_requests: + controller = request['controller'] + if controller.artifact != None and controller.build_initiated == True: + artifacts = request['controller']._find_artifacts(cache_key) + for artifact in artifacts: + artifact.state = BUILDING + print "TO %s: Artifact %s building" % (request['id'],artifact.name) + + def shutdown(self): + self.stopped = True + with self.lock_queue: + self.lock_queue.notify() + + def run(self): + while not self.stopped: + with self.lock_queue: + self.lock_queue.wait() + if not self.stopped: + self.queue_if_possible() + + +class RequestsManager(threading.Thread): + def __init__(self, requests_controller): + super(RequestsManager, self).__init__() + self.requests_controller = requests_controller + self.worker = gear.Worker('controller') + self.worker.addServer('localhost') + self.worker.registerFunction("build-request") + self.stopped = False + + def run(self): + while not self.stopped: + try: + print "DEBUG: Waiting for job" + job = self.worker.getJob() + self._handle_job(job) + except gear.InterruptedError: + print 'We were asked to stop waiting for jobs' + + def shutdown(self): + self.stopped = True + self.worker.stopWaitingForJobs() + + def _handle_job(self, job): + build_request=json.loads(job.arguments) + self.requests_controller.add_request(build_request) + +request = {} +request['repo'] = "baserock:baserock/definitions" +request['ref'] = "fbce45e45da79e5c35341845ec3b3d7c321e6ff2" +request['system'] = "systems/minimal-system-x86_64-generic.morph" + +# Command line +requests_controller = RequestsController() +requests_manager = RequestsManager(requests_controller) + +def term_handler(signum, frame): + requests_manager.shutdown() + requests_controller.shutdown() +signal.signal(signal.SIGTERM, term_handler) + +requests_controller.start() +requests_manager.start() + +while not requests_controller.stopped: + try: + time.sleep(3) + except KeyboardInterrupt: + print "Ctrl + C: asking tasks to exit nicely...\n" + requests_manager.shutdown() + requests_controller.shutdown() -- cgit v1.2.1