summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPedro Alvarez <pedro.alvarez@codethink.co.uk>2016-03-22 00:56:14 +0000
committerPedro Alvarez <pedro.alvarez@codethink.co.uk>2016-03-27 11:53:02 +0000
commitd7fa7e1a5e2749ad18aa9506239862a8837daee7 (patch)
tree22df22430e03b06db9e343e43b6750547a3ff69b
parent443122fe377952d71b9df17c9e1fda2d15004b03 (diff)
downloadmorph-d7fa7e1a5e2749ad18aa9506239862a8837daee7.tar.gz
Build control in RequestController
Change-Id: I52e887401919b4924d3524319eb5be6243d424a2
-rw-r--r--gear/client.py166
1 files changed, 73 insertions, 93 deletions
diff --git a/gear/client.py b/gear/client.py
index b4a894a5..795bacf6 100644
--- a/gear/client.py
+++ b/gear/client.py
@@ -21,7 +21,6 @@ gear.Server()
class SingleBuildController():
def __init__(self, requests_controller, request_id):
- self.lock = threading.Lock()
self.request_id = request_id
self.requests_controller = requests_controller
self.graph_client = BuildGraphClient(self)
@@ -54,6 +53,7 @@ class SingleBuildController():
print "Decoding artifact received done"
# Mark everything as unbuilt to begin with. We'll query the actua
# state from the cache in self._query_cache_state().
+ #TODO move to RQ
def set_initial_state(artifact):
artifact.state = UNBUILT
print "Setting them as unbuilt"
@@ -77,26 +77,21 @@ class SingleBuildController():
def _process_cache_response(self, cache_response):
response = json.loads( cache_response)
- #for key, value in response.iteritems():
- # if value:
- # cache_key, kind, name = key.split('.')
- # self._mark_artifact_as_built(cache_key, name=name)
- # else:
- # print key
- # print ".. wasnt built"
-
# Mark things as built that are now built. We only check the unbuilt
# artifacts, so 'cache_state' will have no info for things we already
# thought were built.
- with self.lock:
- def update_state(artifact):
- if artifact.state == UNBUILT:
- is_in_cache = response[artifact.basename()]
- if is_in_cache:
- artifact.state = BUILT
- self._map_build_graph(self.artifact, update_state)
-
+ # TODO; move to RQ,
+ def update_state(artifact):
+ if artifact.state == UNBUILT:
+ is_in_cache = response[artifact.basename()]
+ if is_in_cache:
+ artifact.state = BUILT
+ self._map_build_graph(self.artifact, update_state)
+ # TODO: move to RQ
self.build_started = True
+ if requests_controller.lock_queue.locked():
+ print "DEBUG: queque release!"
+ requests_controller.lock_queue.release()
def _map_build_graph(self, artifact, callback, components=[]):
"""Run callback on each artifact in the build graph and return result.
@@ -143,80 +138,45 @@ class SingleBuildController():
is_ready = False
return is_ready
- with self.lock:
- artifacts, _ = self._map_build_graph(root_artifact, lambda a: a,
- components)
- ready = [a for a in artifacts if is_ready_to_build(a)]
+ artifacts, _ = self._map_build_graph(root_artifact, lambda a: a,
+ components)
+ ready = [a for a in artifacts if is_ready_to_build(a)]
return ready
def _queue_worker_builds(self, artifacts):
'''Send a set of chunks to the WorkerBuildQueuer class for building.'''
-
- logging.debug('Queuing more worker-builds to run')
+ print "starting"
while len(artifacts) > 0:
artifact = artifacts.pop()
-
- logging.debug(
- 'Requesting worker-build of %s (%s)' %
- (artifact.name, artifact.cache_key))
- print "Start building %s" % artifact.name
+ print "going to queue %s" % artifact.name
+ self.requests_controller.mark_as_building(artifact.cache_key)
+ print "marked as queue %s" % artifact.name
+ if artifact.kind == 'chunk':
+ # Chunk artifacts are not built independently
+ # so when we're building any chunk artifact
+ # we're also building all the chunk artifacts
+ # in this source
+ same_chunk_artifacts = [a for a in artifacts
+ if a.cache_key == artifact.cache_key]
+ for a in same_chunk_artifacts:
+ artifacts.remove(a)
artifact_encoded = distbuild.encode_artifact_reference(artifact)
+ print "encoded for job queue %s" % artifact.name
+ # Make the build-artifact job use ARCHITECTURE to enable mutliarch
+ # build network (TODO)
job = gear.Job("build-artifact", artifact_encoded)
- print "lock set as building"
- with self.lock:
- artifact.state = BUILDING
- print "set as building %s" % artifact.name
- if artifact.kind == 'chunk':
- # Chunk artifacts are not built independently
- # so when we're building any chunk artifact
- # we're also building all the chunk artifacts
- # in this source
- same_chunk_artifacts = [a for a in artifacts
- if a.cache_key == artifact.cache_key]
- for a in same_chunk_artifacts:
- a.state = BUILDING
- artifacts.remove(a)
self.builder_client.submitJob(job)
+ print "send job queue %s" % artifact.name
- def _find_artifact(self, cache_key, name):
+ def _find_artifacts(self, cache_key, name=None):
artifacts, _ = self._map_build_graph(self.artifact, lambda a: a)
- wanted = [a for a in artifacts if a.cache_key == cache_key]
- # TODO: code ugly as hell
- if wanted:
- if name:
- filtered = [b for b in wanted if b.name == name]
- if filtered:
- return filtered[0]
- else:
- return None
- return wanted[0]
+ if not name:
+ return [a for a in artifacts if a.cache_key == cache_key]
else:
- return None
-
-
+ return [a for a in artifacts if a.cache_key == cache_key
+ and a.name == name]
def _mark_artifact_as_built(self, cache_key, name=None):
- def set_state(a):
- if a.cache_key == artifact.cache_key:
- a.state = BUILT
- self.requests_controller.mark_as_built(self.request_id,
- a.cache_key,
- a.kind, a.name)
-
- artifact = self._find_artifact(cache_key, name)
-
- with self.lock:
- artifact.state = BUILT
- self.requests_controller.mark_as_built(self.request_id,
- artifact.cache_key,
- artifact.kind,
- artifact.name)
- if not name and artifact.kind == 'chunk':
- # Building a single chunk artifact
- # yields all chunk artifacts for the given source
- # so we set the state of this source's artifacts
- # to BUILT
- self._map_build_graph(self.artifact, set_state)
-
+ self.requests_controller.mark_as_built(cache_key)
class CacheRequestClient(gear.Client):
@@ -283,7 +243,9 @@ class BuilderClient(gear.Client):
# failures.
def handleWorkData(self, packet):
job = super(BuilderClient, self).handleWorkData(packet)
- print job.data[-1].strip()
+ #TODO: use cache_key to inform RQ to notify initiators later,
+ # we need to note this cache_key somewhere
+ #print job.data[-1].strip()
# Cleanup previous data to speed up and save memory probably
job.data = []
return job
@@ -292,6 +254,8 @@ class RequestsController():
def __init__(self):
self.next_id = 1
self.new_request_lock = threading.Lock()
+ self.lock_queue = threading.Lock()
+ self.lock_queue.acquire()
self.build_requests = []
self.build_status_lock = threading.Lock()
@@ -309,23 +273,44 @@ class RequestsController():
def queue_if_possible(self):
# TODO: check all of them in a loop?
- controller = self.build_requests[0]['controller']
- with self.build_status_lock:
+ for request in self.build_requests:
+ print request['id']
+ controller = request['controller']
if controller.artifact != None and controller.build_started == True:
- to_build = controller.find_artifacts_that_are_ready_to_build(
- controller.artifact)
+ with self.build_status_lock:
+ to_build = controller.find_artifacts_that_are_ready_to_build(
+ controller.artifact)
+ print to_build
controller._queue_worker_builds(to_build)
- # TODO: Block execution until we receive a mark_as_built
- def mark_as_built(self, request_id, cache_key, kind, name):
+ def mark_as_built(self, cache_key):
with self.build_status_lock:
- print "TO %s: Artifact %s built" % (request_id, name)
+ for request in self.build_requests:
+ artifacts = request['controller']._find_artifacts(cache_key)
+ for artifact in artifacts:
+ artifact.state = BUILT
+ print "TO %s: Artifact %s built" % (request['id'],artifact.name)
+ if self.lock_queue.locked():
+ self.lock_queue.release()
- def mark_as_building(self, cache_key, kind, name)
+ def mark_as_building(self, cache_key):
+ with self.build_status_lock:
+ for request in self.build_requests:
+ artifacts = request['controller']._find_artifacts(cache_key)
+ for artifact in artifacts:
+ artifact.state = BUILDING
+ print "TO %s: Artifact %s building" % (request['id'],artifact.name)
+ #consider chunks case
+ def loop(self):
+ while True:
+ print "DEBUG: locking queue"
+ self.lock_queue.acquire()
+ print "DEBUG locked queue"
+ self.queue_if_possible()
request = {}
request['repo'] = "baserock:baserock/definitions"
@@ -334,9 +319,4 @@ request['system'] = "systems/minimal-system-x86_64-generic.morph"
requests_controller = RequestsController()
requests_controller.add_request(request)
-
-# loop so that client doesn't die
-while True:
- import time
- time.sleep(0.0001)
- requests_controller.queue_if_possible()
+requests_controller.loop()