summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPedro Alvarez <pedro.alvarez@codethink.co.uk>2016-03-18 01:30:49 +0000
committerPedro Alvarez <pedro.alvarez@codethink.co.uk>2016-03-27 11:53:02 +0000
commit10c3df8270f622bde7d7cb664bf6686f43c6ba7c (patch)
tree7d35f327270afdc074ae7d8d7eb39735c3684ba3
parentf58ac21f294da35c60c2f2a5de627c258a7ee15d (diff)
downloadmorph-10c3df8270f622bde7d7cb664bf6686f43c6ba7c.tar.gz
WIP support cache request to client
Change-Id: I0e4100d948674759b088d72fef1eeb1524f59a9e
-rw-r--r--gear/client.py47
-rw-r--r--gear/worker.py14
2 files changed, 54 insertions, 7 deletions
diff --git a/gear/client.py b/gear/client.py
index afee49cd..5b2814de 100644
--- a/gear/client.py
+++ b/gear/client.py
@@ -24,10 +24,13 @@ class theController():
self.lock = threading.Lock()
self.graph_client = BuildGraphClient(self)
self.builder_client = BuilderClient(self)
+ self.cache_client = CacheRequestClient(self)
self.builder_client.addServer('localhost')
self.builder_client.waitForServer() # Wait for at least one server to be connected
self.graph_client.addServer('localhost')
self.graph_client.waitForServer() # Wait for at least one server to be connected
+ self.cache_client.addServer('localhost')
+ self.cache_client.waitForServer() # Wait for at least one server to be connected
self.artifact = None
self.build_started = False
@@ -64,11 +67,21 @@ class theController():
artifact_names.append(artifact.basename())
self._map_build_graph(artifact, collect_unbuilt_artifacts)
- url = urlparse.urljoin(cache_server, '/1.0/artifacts')
- r = requests.post(url, json=artifact_names)
- print r.json()
+ job = gear.Job("cache-request", json.dumps(artifact_names))
+ self.cache_client.submitJob(job)
+ def _process_cache_response(self, cache_response):
+ response = json.loads( cache_response)
+
+ for key, value in response.iteritems():
+ if value:
+ cache_key, kind, name = key.split('.')
+ self._mark_artifact_as_built(cache_key, name=name)
+ else:
+ print key
+ print ".. wasnt built"
+ print "fuk"
def _map_build_graph(self, artifact, callback, components=[]):
"""Run callback on each artifact in the build graph and return result.
@@ -149,29 +162,36 @@ class theController():
artifacts.remove(a)
self.builder_client.submitJob(job)
- def _find_artifact(self, cache_key):
+ def _find_artifact(self, cache_key, name):
artifacts, _ = self._map_build_graph(self.artifact, lambda a: a)
wanted = [a for a in artifacts if a.cache_key == cache_key]
+ # TODO: code ugly as hell
if wanted:
+ if name:
+ filtered = [b for b in wanted if b.name == name]
+ if filtered:
+ return filtered[0]
+ else:
+ return None
return wanted[0]
else:
return None
- def _mark_artifact_as_built(self, cache_key):
+ def _mark_artifact_as_built(self, cache_key, name=None):
print cache_key
def set_state(a):
if a.cache_key == artifact.cache_key:
a.state = BUILT
- artifact = self._find_artifact(cache_key)
+ artifact = self._find_artifact(cache_key, name)
print "lock set as built"
with self.lock:
print "set as built %s" % artifact.name
artifact.state = BUILT
- if artifact.kind == 'chunk':
+ if not name and artifact.kind == 'chunk':
# Building a single chunk artifact
# yields all chunk artifacts for the given source
# so we set the state of this source's artifacts
@@ -181,6 +201,19 @@ class theController():
print artifact
+class CacheRequestClient(gear.Client):
+ def __init__(self, controller):
+ super(CacheRequestClient, self).__init__()
+ self.controller = controller
+ self.finished = False
+
+ def handleWorkComplete(self, packet):
+ job = super(CacheRequestClient, self).handleWorkComplete(packet)
+ print "workcomplete"
+ self.controller._process_cache_response(job.data[-1])
+ return job
+
+
class BuildGraphClient(gear.Client):
def __init__(self, controller):
super(BuildGraphClient, self).__init__()
diff --git a/gear/worker.py b/gear/worker.py
index e8abacda..fa23e01c 100644
--- a/gear/worker.py
+++ b/gear/worker.py
@@ -4,9 +4,13 @@ worker.addServer('localhost')
worker.registerFunction("reverse")
worker.registerFunction("build-graph")
worker.registerFunction("build-artifact")
+worker.registerFunction("cache-request")
import time
import json
import os
+import requests
+import urlparse
+
from subprocess import Popen, PIPE, STDOUT
import distbuild
@@ -16,6 +20,9 @@ import paramiko
import logging
logging.basicConfig()
+#TODO: values from settings
+cache_server = 'http://cache.baserock.org:8080'
+
@contextmanager
def ssh_manager(host, port, username, key):
@@ -104,3 +111,10 @@ while True:
suffixes.append(filename + '.meta')
upload_files(artifact.cache_key, suffixes)
job.sendWorkComplete(artifact.cache_key)
+ elif job.name == 'cache-request':
+ artifact_names = json.loads(job.arguments)
+
+ url = urlparse.urljoin(cache_server, '/1.0/artifacts')
+ r = requests.post(url, json=artifact_names)
+ print r.json()
+ job.sendWorkComplete(json.dumps(r.json()))