summaryrefslogtreecommitdiff
path: root/gear/controller.py
diff options
context:
space:
mode:
Diffstat (limited to 'gear/controller.py')
-rw-r--r--gear/controller.py395
1 files changed, 395 insertions, 0 deletions
diff --git a/gear/controller.py b/gear/controller.py
new file mode 100644
index 00000000..9dc1df60
--- /dev/null
+++ b/gear/controller.py
@@ -0,0 +1,395 @@
+import gear
+import sys
+import json
+import threading
+import requests
+import time
+import signal
+import urlparse
+import distbuild
+#TODO: values from settings
+cache_server = 'http://cache.baserock.org:8080'
+
+# Artifact build states. These are used to loosely track the state of the
+# remote cache.
+UNBUILT = 'not-built'
+BUILDING = 'building'
+BUILT = 'built'
+
+import logging
+logging.basicConfig()
+
+gear.Server()
+
+class SingleBuildController():
+ def __init__(self, requests_controller, request_id):
+ self.request_id = request_id
+ self.requests_controller = requests_controller
+ self.graph_client = BuildGraphClient(self)
+ self.builder_client = BuilderClient(self)
+ self.cache_client = CacheRequestClient(self)
+ self.builder_client.addServer('localhost')
+ self.builder_client.waitForServer()
+ self.graph_client.addServer('localhost')
+ self.graph_client.waitForServer()
+ self.cache_client.addServer('localhost')
+ self.cache_client.waitForServer()
+ self.artifact = None
+ self.build_started = False
+ self.build_initiated = False
+
+ def start_build(self, request):
+ job = gear.Job("build-graph", request)
+ self.graph_client.submitJob(job)
+
+ def _process_build_graph(self, build_graph):
+ print "Decoding artifact received"
+ try:
+ self.artifact = distbuild.decode_artifact_reference(build_graph)
+ except ValueError as e:
+ print "ERROR: Failed to decode artifact"
+ print "ERROR: === build graph start ==="
+ print build_graph
+ print "ERROR: === build graph end ==="
+ raise e
+
+ print "Decoding artifact received done"
+ # Mark everything as unbuilt to begin with. We'll query the actua
+ # state from the cache in self._query_cache_state().
+ #TODO move to RQ
+ def set_initial_state(artifact):
+ artifact.state = UNBUILT
+ print "Setting them as unbuilt"
+ self._map_build_graph(self.artifact, set_initial_state)
+ print "Setting them as unbuilt done"
+ self.build_initiated = True
+ self._check_cache_state()
+
+
+ def _check_cache_state(self):
+ print "DEBUG: checking cache..."
+
+ # Check first building artifacts
+ def mark_building_artifacts(artifact):
+ if (artifact.state == UNBUILT and artifact.cache_key
+ in requests_controller.building_list):
+ artifact.state = BUILDING
+ with requests_controller.build_status_lock:
+ self._map_build_graph(self.artifact, mark_building_artifacts)
+
+
+ artifact_names = []
+ def collect_unbuilt_artifacts(artifact):
+ if artifact.state == UNBUILT:
+ artifact_names.append(artifact.basename())
+ with requests_controller.build_status_lock:
+ self._map_build_graph(self.artifact, collect_unbuilt_artifacts)
+
+ job = gear.Job("cache-request", json.dumps(artifact_names))
+ self.cache_client.submitJob(job)
+
+
+ def _process_cache_response(self, cache_response):
+ response = json.loads( cache_response)
+
+ # Mark things as built that are now built. We only check the unbuilt
+ # artifacts, so 'cache_state' will have no info for things we already
+ # thought were built.
+ # TODO; move to RQ,
+ def update_state(artifact):
+ if artifact.state == UNBUILT:
+ is_in_cache = response[artifact.basename()]
+ if is_in_cache:
+ artifact.state = BUILT
+ self._map_build_graph(self.artifact, update_state)
+ # TODO: move to RQ
+ self.build_started = True
+ with requests_controller.lock_queue:
+ requests_controller.lock_queue.notify()
+ print "DEBUG: queque release!"
+
+ def _map_build_graph(self, artifact, callback, components=[]):
+ """Run callback on each artifact in the build graph and return result.
+
+ If components is given, then only look at the components given and
+ their dependencies. Also, return a list of the components after they
+ have had callback called on them.
+
+ """
+ result = []
+ mapped_components = []
+ done = set()
+ if components:
+ queue = list(components)
+ else:
+ queue = [artifact]
+ while queue:
+ a = queue.pop()
+ if a not in done:
+ result.append(callback(a))
+ queue.extend(a.dependencies)
+ done.add(a)
+ if a in components:
+ mapped_components.append(a)
+ return result, mapped_components
+
+ def find_artifacts_that_are_ready_to_build(self, root_artifact,
+ components=[]):
+ '''Return unbuilt artifacts whose dependencies are all built.
+
+ The 'root_artifact' parameter is expected to be a tree of
+ ArtifactReference objects. These must have the 'state' attribute set
+ to BUILT or UNBUILT. If 'components' is passed, then only those
+ artifacts and their dependencies will be built.
+
+ '''
+ def is_ready_to_build(artifact):
+ # Just in case the state is not set yet.
+ try:
+ is_ready = (artifact.state == UNBUILT and
+ all(a.state == BUILT
+ for a in artifact.dependencies))
+ except KeyError:
+ is_ready = False
+ return is_ready
+
+ artifacts, _ = self._map_build_graph(root_artifact, lambda a: a,
+ components)
+ ready = [a for a in artifacts if is_ready_to_build(a)]
+ return ready
+
+ def _queue_worker_builds(self, artifacts):
+ '''Send a set of chunks to the WorkerBuildQueuer class for building.'''
+ print "starting"
+ while len(artifacts) > 0:
+ artifact = artifacts.pop()
+ print "going to queue %s" % artifact.name
+ self.requests_controller.mark_as_building(artifact.cache_key)
+ print "marked as queue %s" % artifact.name
+ if artifact.kind == 'chunk':
+ # Chunk artifacts are not built independently
+ # so when we're building any chunk artifact
+ # we're also building all the chunk artifacts
+ # in this source
+ same_chunk_artifacts = [a for a in artifacts
+ if a.cache_key == artifact.cache_key]
+ for a in same_chunk_artifacts:
+ artifacts.remove(a)
+ artifact_encoded = distbuild.encode_artifact_reference(artifact)
+ print "encoded for job queue %s" % artifact.name
+ # Make the build-artifact job use ARCHITECTURE to enable mutliarch
+ # build network (TODO)
+ job = gear.Job("build-artifact", artifact_encoded)
+ self.builder_client.submitJob(job)
+ print "send job queue %s" % artifact.name
+
+ def _find_artifacts(self, cache_key, name=None):
+ artifacts, _ = self._map_build_graph(self.artifact, lambda a: a)
+ if not name:
+ return [a for a in artifacts if a.cache_key == cache_key]
+ else:
+ return [a for a in artifacts if a.cache_key == cache_key
+ and a.name == name]
+ def _mark_artifact_as_built(self, cache_key, name=None):
+ self.requests_controller.mark_as_built(cache_key)
+
+
+class CacheRequestClient(gear.Client):
+ def __init__(self, controller):
+ super(CacheRequestClient, self).__init__()
+ self.controller = controller
+ self.finished = False
+
+ def handleWorkComplete(self, packet):
+ job = super(CacheRequestClient, self).handleWorkComplete(packet)
+ print "Cache workcomplete"
+ self.controller._process_cache_response(job.data[-1])
+ return job
+
+
+class BuildGraphClient(gear.Client):
+ def __init__(self, controller):
+ super(BuildGraphClient, self).__init__()
+ self.controller = controller
+ self.finished = False
+
+ def handleWorkComplete(self, packet):
+ job = super(BuildGraphClient, self).handleWorkComplete(packet)
+ print "Graph workcomplete"
+ self.controller._process_build_graph(job.data[-1])
+ return job
+
+ def handleWorkData(self, packet):
+ job = super(BuildGraphClient, self).handleWorkData(packet)
+ print job.data[-1]
+ job.data = []
+ return job
+
+ def handleWorkFail(self, packet):
+ job = super(BuildGraphClient, self).handleWorkFail(packet)
+ print "workfail"
+ return job
+
+ def handleWorkException(self, packet):
+ job = super(BuildGraphClient, self).handleWorkException(packet)
+ print "workexception"
+ return job
+
+ def handleDisconnect(self, job):
+ job = super(BuildGraphClient, self).handleDisconnect(job)
+ print "disconnect"
+
+
+
+class BuilderClient(gear.Client):
+ def __init__(self, controller):
+ super(BuilderClient, self).__init__()
+ self.controller = controller
+ self.finished = False
+
+ def handleWorkComplete(self, packet):
+ job = super(BuilderClient, self).handleWorkComplete(packet)
+ print "Build workcomplete"
+ self.controller._mark_artifact_as_built(job.data[-1])
+ return job
+
+ # TODO: send different types of data? stdout, message...?
+ # same for workFail, to identify worker dissconection and build
+ # failures.
+ def handleWorkData(self, packet):
+ job = super(BuilderClient, self).handleWorkData(packet)
+ #TODO: use cache_key to inform RQ to notify initiators later,
+ # we need to note this cache_key somewhere
+ #print job.data[-1].strip()
+ # Cleanup previous data to speed up and save memory probably
+ job.data = []
+ return job
+
+class RequestsController(threading.Thread):
+ def __init__(self):
+ super(RequestsController, self).__init__()
+ self.building_list = []
+ self.next_id = 1
+ self.new_request_lock = threading.Lock()
+ self.lock_queue = threading.Condition()
+ self.build_requests = []
+ self.build_status_lock = threading.Lock()
+ self.stopped = False
+
+ def add_request(self, request):
+ json_request = json.dumps(request)
+ print "DEBUG: adding request - %s" % json_request
+ request_data = {}
+ with self.new_request_lock:
+ request_data['id'] = self.next_id
+ request_data['controller'] = SingleBuildController(self, self.next_id)
+ # TODO: is this the right place to do this?
+ request_data['controller'].start_build(json_request)
+ request_data['request'] = request
+ self.next_id += 1
+ self.build_requests.append(request_data)
+
+ def queue_if_possible(self):
+ print "DEBUG: Looking for jobs to queue"
+ for request in self.build_requests:
+ print request['id']
+ controller = request['controller']
+ if controller.artifact != None and controller.build_started == True:
+ with self.build_status_lock:
+ to_build = controller.find_artifacts_that_are_ready_to_build(
+ controller.artifact)
+ print to_build
+ controller._queue_worker_builds(to_build)
+
+
+ def mark_as_built(self, cache_key):
+ with self.build_status_lock:
+ self.building_list.remove(cache_key)
+ print self.building_list
+ for request in self.build_requests:
+ controller = request['controller']
+ if controller.artifact != None and controller.build_initiated == True:
+ artifacts = request['controller']._find_artifacts(cache_key)
+ for artifact in artifacts:
+ artifact.state = BUILT
+ print "TO %s: Artifact %s built" % (request['id'],artifact.name)
+
+ with self.lock_queue:
+ self.lock_queue.notify()
+
+
+ def mark_as_building(self, cache_key):
+ with self.build_status_lock:
+ self.building_list.append(cache_key)
+ print self.building_list
+ for request in self.build_requests:
+ controller = request['controller']
+ if controller.artifact != None and controller.build_initiated == True:
+ artifacts = request['controller']._find_artifacts(cache_key)
+ for artifact in artifacts:
+ artifact.state = BUILDING
+ print "TO %s: Artifact %s building" % (request['id'],artifact.name)
+
+ def shutdown(self):
+ self.stopped = True
+ with self.lock_queue:
+ self.lock_queue.notify()
+
+ def run(self):
+ while not self.stopped:
+ with self.lock_queue:
+ self.lock_queue.wait()
+ if not self.stopped:
+ self.queue_if_possible()
+
+
+class RequestsManager(threading.Thread):
+ def __init__(self, requests_controller):
+ super(RequestsManager, self).__init__()
+ self.requests_controller = requests_controller
+ self.worker = gear.Worker('controller')
+ self.worker.addServer('localhost')
+ self.worker.registerFunction("build-request")
+ self.stopped = False
+
+ def run(self):
+ while not self.stopped:
+ try:
+ print "DEBUG: Waiting for job"
+ job = self.worker.getJob()
+ self._handle_job(job)
+ except gear.InterruptedError:
+ print 'We were asked to stop waiting for jobs'
+
+ def shutdown(self):
+ self.stopped = True
+ self.worker.stopWaitingForJobs()
+
+ def _handle_job(self, job):
+ build_request=json.loads(job.arguments)
+ self.requests_controller.add_request(build_request)
+
+request = {}
+request['repo'] = "baserock:baserock/definitions"
+request['ref'] = "fbce45e45da79e5c35341845ec3b3d7c321e6ff2"
+request['system'] = "systems/minimal-system-x86_64-generic.morph"
+
+# Command line
+requests_controller = RequestsController()
+requests_manager = RequestsManager(requests_controller)
+
+def term_handler(signum, frame):
+ requests_manager.shutdown()
+ requests_controller.shutdown()
+signal.signal(signal.SIGTERM, term_handler)
+
+requests_controller.start()
+requests_manager.start()
+
+while not requests_controller.stopped:
+ try:
+ time.sleep(3)
+ except KeyboardInterrupt:
+ print "Ctrl + C: asking tasks to exit nicely...\n"
+ requests_manager.shutdown()
+ requests_controller.shutdown()