From d7fa7e1a5e2749ad18aa9506239862a8837daee7 Mon Sep 17 00:00:00 2001 From: Pedro Alvarez Date: Tue, 22 Mar 2016 00:56:14 +0000 Subject: Build control in RequestController Change-Id: I52e887401919b4924d3524319eb5be6243d424a2 --- gear/client.py | 166 +++++++++++++++++++++++++-------------------------------- 1 file changed, 73 insertions(+), 93 deletions(-) diff --git a/gear/client.py b/gear/client.py index b4a894a5..795bacf6 100644 --- a/gear/client.py +++ b/gear/client.py @@ -21,7 +21,6 @@ gear.Server() class SingleBuildController(): def __init__(self, requests_controller, request_id): - self.lock = threading.Lock() self.request_id = request_id self.requests_controller = requests_controller self.graph_client = BuildGraphClient(self) @@ -54,6 +53,7 @@ class SingleBuildController(): print "Decoding artifact received done" # Mark everything as unbuilt to begin with. We'll query the actua # state from the cache in self._query_cache_state(). + #TODO move to RQ def set_initial_state(artifact): artifact.state = UNBUILT print "Setting them as unbuilt" @@ -77,26 +77,21 @@ class SingleBuildController(): def _process_cache_response(self, cache_response): response = json.loads( cache_response) - #for key, value in response.iteritems(): - # if value: - # cache_key, kind, name = key.split('.') - # self._mark_artifact_as_built(cache_key, name=name) - # else: - # print key - # print ".. wasnt built" - # Mark things as built that are now built. We only check the unbuilt # artifacts, so 'cache_state' will have no info for things we already # thought were built. - with self.lock: - def update_state(artifact): - if artifact.state == UNBUILT: - is_in_cache = response[artifact.basename()] - if is_in_cache: - artifact.state = BUILT - self._map_build_graph(self.artifact, update_state) - + # TODO; move to RQ, + def update_state(artifact): + if artifact.state == UNBUILT: + is_in_cache = response[artifact.basename()] + if is_in_cache: + artifact.state = BUILT + self._map_build_graph(self.artifact, update_state) + # TODO: move to RQ self.build_started = True + if requests_controller.lock_queue.locked(): + print "DEBUG: queque release!" + requests_controller.lock_queue.release() def _map_build_graph(self, artifact, callback, components=[]): """Run callback on each artifact in the build graph and return result. @@ -143,80 +138,45 @@ class SingleBuildController(): is_ready = False return is_ready - with self.lock: - artifacts, _ = self._map_build_graph(root_artifact, lambda a: a, - components) - ready = [a for a in artifacts if is_ready_to_build(a)] + artifacts, _ = self._map_build_graph(root_artifact, lambda a: a, + components) + ready = [a for a in artifacts if is_ready_to_build(a)] return ready def _queue_worker_builds(self, artifacts): '''Send a set of chunks to the WorkerBuildQueuer class for building.''' - - logging.debug('Queuing more worker-builds to run') + print "starting" while len(artifacts) > 0: artifact = artifacts.pop() - - logging.debug( - 'Requesting worker-build of %s (%s)' % - (artifact.name, artifact.cache_key)) - print "Start building %s" % artifact.name + print "going to queue %s" % artifact.name + self.requests_controller.mark_as_building(artifact.cache_key) + print "marked as queue %s" % artifact.name + if artifact.kind == 'chunk': + # Chunk artifacts are not built independently + # so when we're building any chunk artifact + # we're also building all the chunk artifacts + # in this source + same_chunk_artifacts = [a for a in artifacts + if a.cache_key == artifact.cache_key] + for a in same_chunk_artifacts: + artifacts.remove(a) artifact_encoded = distbuild.encode_artifact_reference(artifact) + print "encoded for job queue %s" % artifact.name + # Make the build-artifact job use ARCHITECTURE to enable mutliarch + # build network (TODO) job = gear.Job("build-artifact", artifact_encoded) - print "lock set as building" - with self.lock: - artifact.state = BUILDING - print "set as building %s" % artifact.name - if artifact.kind == 'chunk': - # Chunk artifacts are not built independently - # so when we're building any chunk artifact - # we're also building all the chunk artifacts - # in this source - same_chunk_artifacts = [a for a in artifacts - if a.cache_key == artifact.cache_key] - for a in same_chunk_artifacts: - a.state = BUILDING - artifacts.remove(a) self.builder_client.submitJob(job) + print "send job queue %s" % artifact.name - def _find_artifact(self, cache_key, name): + def _find_artifacts(self, cache_key, name=None): artifacts, _ = self._map_build_graph(self.artifact, lambda a: a) - wanted = [a for a in artifacts if a.cache_key == cache_key] - # TODO: code ugly as hell - if wanted: - if name: - filtered = [b for b in wanted if b.name == name] - if filtered: - return filtered[0] - else: - return None - return wanted[0] + if not name: + return [a for a in artifacts if a.cache_key == cache_key] else: - return None - - + return [a for a in artifacts if a.cache_key == cache_key + and a.name == name] def _mark_artifact_as_built(self, cache_key, name=None): - def set_state(a): - if a.cache_key == artifact.cache_key: - a.state = BUILT - self.requests_controller.mark_as_built(self.request_id, - a.cache_key, - a.kind, a.name) - - artifact = self._find_artifact(cache_key, name) - - with self.lock: - artifact.state = BUILT - self.requests_controller.mark_as_built(self.request_id, - artifact.cache_key, - artifact.kind, - artifact.name) - if not name and artifact.kind == 'chunk': - # Building a single chunk artifact - # yields all chunk artifacts for the given source - # so we set the state of this source's artifacts - # to BUILT - self._map_build_graph(self.artifact, set_state) - + self.requests_controller.mark_as_built(cache_key) class CacheRequestClient(gear.Client): @@ -283,7 +243,9 @@ class BuilderClient(gear.Client): # failures. def handleWorkData(self, packet): job = super(BuilderClient, self).handleWorkData(packet) - print job.data[-1].strip() + #TODO: use cache_key to inform RQ to notify initiators later, + # we need to note this cache_key somewhere + #print job.data[-1].strip() # Cleanup previous data to speed up and save memory probably job.data = [] return job @@ -292,6 +254,8 @@ class RequestsController(): def __init__(self): self.next_id = 1 self.new_request_lock = threading.Lock() + self.lock_queue = threading.Lock() + self.lock_queue.acquire() self.build_requests = [] self.build_status_lock = threading.Lock() @@ -309,23 +273,44 @@ class RequestsController(): def queue_if_possible(self): # TODO: check all of them in a loop? - controller = self.build_requests[0]['controller'] - with self.build_status_lock: + for request in self.build_requests: + print request['id'] + controller = request['controller'] if controller.artifact != None and controller.build_started == True: - to_build = controller.find_artifacts_that_are_ready_to_build( - controller.artifact) + with self.build_status_lock: + to_build = controller.find_artifacts_that_are_ready_to_build( + controller.artifact) + print to_build controller._queue_worker_builds(to_build) - # TODO: Block execution until we receive a mark_as_built - def mark_as_built(self, request_id, cache_key, kind, name): + def mark_as_built(self, cache_key): with self.build_status_lock: - print "TO %s: Artifact %s built" % (request_id, name) + for request in self.build_requests: + artifacts = request['controller']._find_artifacts(cache_key) + for artifact in artifacts: + artifact.state = BUILT + print "TO %s: Artifact %s built" % (request['id'],artifact.name) + if self.lock_queue.locked(): + self.lock_queue.release() - def mark_as_building(self, cache_key, kind, name) + def mark_as_building(self, cache_key): + with self.build_status_lock: + for request in self.build_requests: + artifacts = request['controller']._find_artifacts(cache_key) + for artifact in artifacts: + artifact.state = BUILDING + print "TO %s: Artifact %s building" % (request['id'],artifact.name) + #consider chunks case + def loop(self): + while True: + print "DEBUG: locking queue" + self.lock_queue.acquire() + print "DEBUG locked queue" + self.queue_if_possible() request = {} request['repo'] = "baserock:baserock/definitions" @@ -334,9 +319,4 @@ request['system'] = "systems/minimal-system-x86_64-generic.morph" requests_controller = RequestsController() requests_controller.add_request(request) - -# loop so that client doesn't die -while True: - import time - time.sleep(0.0001) - requests_controller.queue_if_possible() +requests_controller.loop() -- cgit v1.2.1