From 10c3df8270f622bde7d7cb664bf6686f43c6ba7c Mon Sep 17 00:00:00 2001 From: Pedro Alvarez Date: Fri, 18 Mar 2016 01:30:49 +0000 Subject: WIP support cache request to client Change-Id: I0e4100d948674759b088d72fef1eeb1524f59a9e --- gear/client.py | 47 ++++++++++++++++++++++++++++++++++++++++------- gear/worker.py | 14 ++++++++++++++ 2 files changed, 54 insertions(+), 7 deletions(-) diff --git a/gear/client.py b/gear/client.py index afee49cd..5b2814de 100644 --- a/gear/client.py +++ b/gear/client.py @@ -24,10 +24,13 @@ class theController(): self.lock = threading.Lock() self.graph_client = BuildGraphClient(self) self.builder_client = BuilderClient(self) + self.cache_client = CacheRequestClient(self) self.builder_client.addServer('localhost') self.builder_client.waitForServer() # Wait for at least one server to be connected self.graph_client.addServer('localhost') self.graph_client.waitForServer() # Wait for at least one server to be connected + self.cache_client.addServer('localhost') + self.cache_client.waitForServer() # Wait for at least one server to be connected self.artifact = None self.build_started = False @@ -64,11 +67,21 @@ class theController(): artifact_names.append(artifact.basename()) self._map_build_graph(artifact, collect_unbuilt_artifacts) - url = urlparse.urljoin(cache_server, '/1.0/artifacts') - r = requests.post(url, json=artifact_names) - print r.json() + job = gear.Job("cache-request", json.dumps(artifact_names)) + self.cache_client.submitJob(job) + def _process_cache_response(self, cache_response): + response = json.loads( cache_response) + + for key, value in response.iteritems(): + if value: + cache_key, kind, name = key.split('.') + self._mark_artifact_as_built(cache_key, name=name) + else: + print key + print ".. wasnt built" + print "fuk" def _map_build_graph(self, artifact, callback, components=[]): """Run callback on each artifact in the build graph and return result. @@ -149,29 +162,36 @@ class theController(): artifacts.remove(a) self.builder_client.submitJob(job) - def _find_artifact(self, cache_key): + def _find_artifact(self, cache_key, name): artifacts, _ = self._map_build_graph(self.artifact, lambda a: a) wanted = [a for a in artifacts if a.cache_key == cache_key] + # TODO: code ugly as hell if wanted: + if name: + filtered = [b for b in wanted if b.name == name] + if filtered: + return filtered[0] + else: + return None return wanted[0] else: return None - def _mark_artifact_as_built(self, cache_key): + def _mark_artifact_as_built(self, cache_key, name=None): print cache_key def set_state(a): if a.cache_key == artifact.cache_key: a.state = BUILT - artifact = self._find_artifact(cache_key) + artifact = self._find_artifact(cache_key, name) print "lock set as built" with self.lock: print "set as built %s" % artifact.name artifact.state = BUILT - if artifact.kind == 'chunk': + if not name and artifact.kind == 'chunk': # Building a single chunk artifact # yields all chunk artifacts for the given source # so we set the state of this source's artifacts @@ -181,6 +201,19 @@ class theController(): print artifact +class CacheRequestClient(gear.Client): + def __init__(self, controller): + super(CacheRequestClient, self).__init__() + self.controller = controller + self.finished = False + + def handleWorkComplete(self, packet): + job = super(CacheRequestClient, self).handleWorkComplete(packet) + print "workcomplete" + self.controller._process_cache_response(job.data[-1]) + return job + + class BuildGraphClient(gear.Client): def __init__(self, controller): super(BuildGraphClient, self).__init__() diff --git a/gear/worker.py b/gear/worker.py index e8abacda..fa23e01c 100644 --- a/gear/worker.py +++ b/gear/worker.py @@ -4,9 +4,13 @@ worker.addServer('localhost') worker.registerFunction("reverse") worker.registerFunction("build-graph") worker.registerFunction("build-artifact") +worker.registerFunction("cache-request") import time import json import os +import requests +import urlparse + from subprocess import Popen, PIPE, STDOUT import distbuild @@ -16,6 +20,9 @@ import paramiko import logging logging.basicConfig() +#TODO: values from settings +cache_server = 'http://cache.baserock.org:8080' + @contextmanager def ssh_manager(host, port, username, key): @@ -104,3 +111,10 @@ while True: suffixes.append(filename + '.meta') upload_files(artifact.cache_key, suffixes) job.sendWorkComplete(artifact.cache_key) + elif job.name == 'cache-request': + artifact_names = json.loads(job.arguments) + + url = urlparse.urljoin(cache_server, '/1.0/artifacts') + r = requests.post(url, json=artifact_names) + print r.json() + job.sendWorkComplete(json.dumps(r.json())) -- cgit v1.2.1