From 210625c3986a659f9d6053af0b8655279c6417a1 Mon Sep 17 00:00:00 2001 From: Pedro Alvarez Date: Mon, 7 Mar 2016 23:38:19 +0000 Subject: Build logic done Change-Id: I0524be09b808b8b2282aaa1e61993d80d169e64a --- gear/client.py | 94 +++++++++++++++++++++++++++++++++++++++------------------- gear/worker.py | 3 +- 2 files changed, 66 insertions(+), 31 deletions(-) diff --git a/gear/client.py b/gear/client.py index 34cfacf9..e9d818a6 100644 --- a/gear/client.py +++ b/gear/client.py @@ -1,6 +1,7 @@ import gear import sys import json +import threading import distbuild @@ -17,8 +18,15 @@ gear.Server() class theController(): def __init__(self): + self.lock = threading.Lock() self.graph_client = BuildGraphClient(self) + self.builder_client = BuilderClient(self) + self.builder_client.addServer('localhost') + self.builder_client.waitForServer() # Wait for at least one server to be connected + self.graph_client.addServer('localhost') + self.graph_client.waitForServer() # Wait for at least one server to be connected self.artifact = None + self.build_started = False def _process_build_graph(self, packet): print "Decoding artifact received" @@ -31,6 +39,7 @@ class theController(): print "Setting them as unbuilt" self._map_build_graph(self.artifact, set_initial_state) print "Setting them as unbuilt done" + self.build_started = True def _map_build_graph(self, artifact, callback, components=[]): """Run callback on each artifact in the build graph and return result. @@ -67,17 +76,22 @@ class theController(): ''' def is_ready_to_build(artifact): - return (artifact.state == UNBUILT and - all(a.state == BUILT - for a in artifact.dependencies)) - - artifacts, _ = self._map_build_graph(root_artifact, lambda a: a, + try: + is_ready = (artifact.state == UNBUILT and + all(a.state == BUILT + for a in artifact.dependencies)) + except KeyError: + is_ready = False + return is_ready + + with self.lock: + artifacts, _ = self._map_build_graph(root_artifact, lambda a: a, components) - return [a for a in artifacts if is_ready_to_build(a)] + ready = [a for a in artifacts if is_ready_to_build(a)] + return ready def _queue_worker_builds(self, artifacts): '''Send a set of chunks to the WorkerBuildQueuer class for building.''' - distbuild.crash_point() logging.debug('Queuing more worker-builds to run') while len(artifacts) > 0: @@ -87,21 +101,26 @@ class theController(): 'Requesting worker-build of %s (%s)' % (artifact.name, artifact.cache_key)) # TODO: launch build client - artifact.state = BUILDING - if artifact.kind == 'chunk': - # Chunk artifacts are not built independently - # so when we're building any chunk artifact - # we're also building all the chunk artifacts - # in this source - same_chunk_artifacts = [a for a in artifacts - if a.cache_key == artifact.cache_key] - for a in same_chunk_artifacts: - a.state = BUILDING - artifacts.remove(a) + print "Start building %s" % artifact.name + job = gear.Job("reverse", artifact.cache_key) + print "lock set as building" + with self.lock: + artifact.state = BUILDING + print "set as building %s" % artifact.name + if artifact.kind == 'chunk': + # Chunk artifacts are not built independently + # so when we're building any chunk artifact + # we're also building all the chunk artifacts + # in this source + same_chunk_artifacts = [a for a in artifacts + if a.cache_key == artifact.cache_key] + for a in same_chunk_artifacts: + a.state = BUILDING + artifacts.remove(a) + self.builder_client.submitJob(job) def _find_artifact(self, cache_key): - artifacts, _ = map_build_graph(self._artifact, lambda a: a, - self._components) + artifacts, _ = self._map_build_graph(self.artifact, lambda a: a) wanted = [a for a in artifacts if a.cache_key == cache_key] if wanted: return wanted[0] @@ -109,9 +128,27 @@ class theController(): return None - def controller._mark_artifact_as_built(self, packet) - artifacts = _find_artifacts(packet[-1]) - # Do something like _maybe_check_result_and_queue_more_builds + def _mark_artifact_as_built(self, cache_key): + print cache_key + + def set_state(a): + if a.cache_key == artifact.cache_key: + a.state = BUILT + + artifact = self._find_artifact(cache_key) + + print "lock set as built" + with self.lock: + print "set as built %s" % artifact.name + artifact.state = BUILT + if artifact.kind == 'chunk': + # Building a single chunk artifact + # yields all chunk artifacts for the given source + # so we set the state of this source's artifacts + # to BUILT + self._map_build_graph(self.artifact, set_state) + + print artifact class BuildGraphClient(gear.Client): @@ -157,15 +194,12 @@ class BuilderClient(gear.Client): def handleWorkComplete(self, packet): job = super(BuilderClient, self).handleWorkComplete(packet) print "workcomplete" - self.controller._mark_artifact_as_built(packet) + self.controller._mark_artifact_as_built(job.arguments) return job controller = theController() client = controller.graph_client -client.addServer('localhost') -client.waitForServer() # Wait for at least one server to be connected -print "server connected" #job = gear.Job("reverse", "test string") build_graph_request = {} build_graph_request['repo'] = "baserock:baserock/definitions" @@ -183,8 +217,8 @@ client.submitJob(job) # loop so that client doesn't die while True: import time - time.sleep(2) - if controller.artifact != None: + time.sleep(0.0001) + if controller.artifact != None and controller.build_started == True: to_build = controller.find_artifacts_that_are_ready_to_build(controller.artifact) - print to_build + #print to_build controller._queue_worker_builds(to_build) diff --git a/gear/worker.py b/gear/worker.py index 90f0e415..0b46a221 100644 --- a/gear/worker.py +++ b/gear/worker.py @@ -13,7 +13,8 @@ while True: job = worker.getJob() print "DEBUG: Received job '%s'" % job.name if job.name == "reverse": - for x in range(0, 100000): + print "DEBUG: Starting job reverse with '%s'" % job.arguments + for x in range(0, 100): job.sendWorkData("This is: %s" % x) job.sendWorkComplete("answer") elif job.name == "build-graph": -- cgit v1.2.1