summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPedro Alvarez <pedro.alvarez@codethink.co.uk>2016-03-29 21:39:49 +0000
committerPedro Alvarez <pedro.alvarez@codethink.co.uk>2016-03-29 21:39:49 +0000
commitea91a14e56efe070c977e9ff2877a23f3c498e92 (patch)
treeaa63702ff2a06ce83273888735fa712bddb71f52
parent71f7328aa45339b0a15a7f4076bb7bcfe1a6d98b (diff)
downloadmorph-ea91a14e56efe070c977e9ff2877a23f3c498e92.tar.gz
wip cancelation, it works, but doesn't cancel running jobs
Change-Id: I713471d96c070d9f9bba7cff6f6c2ef526c140d2
-rw-r--r--gear/controller.py80
1 files changed, 63 insertions, 17 deletions
diff --git a/gear/controller.py b/gear/controller.py
index dd3a1499..83616d15 100644
--- a/gear/controller.py
+++ b/gear/controller.py
@@ -38,9 +38,10 @@ class SingleBuildController():
self.build_started = False
self.build_initiated = False
- def start_build(self, request):
+ def start_build(self, request, request_id):
job = gear.Job("build-graph", request)
self.graph_client.submitJob(job)
+ self.request_id = request_id
def _process_build_graph(self, build_graph):
print "Decoding artifact received"
@@ -72,8 +73,15 @@ class SingleBuildController():
# Check first building artifacts
def mark_building_artifacts(artifact):
if (artifact.state == UNBUILT and artifact.cache_key
- in requests_controller.building_list):
+ in requests_controller.building_dict):
artifact.state = BUILDING
+ building_dict = requests_controller.building_dict
+ cache_key = artifact.cache_key
+ if (cache_key in building_dict and
+ not self.request_id in building_dict[cache_key]):
+ requests_controller.building_dict[
+ artifact.cache_key].append(self.request_id)
+
with requests_controller.build_status_lock:
self._map_build_graph(self.artifact, mark_building_artifacts)
@@ -163,9 +171,6 @@ class SingleBuildController():
print "starting"
while len(artifacts) > 0:
artifact = artifacts.pop()
- print "going to queue %s" % artifact.name
- self.requests_controller.mark_as_building(artifact.cache_key)
- print "marked as queue %s" % artifact.name
if artifact.kind == 'chunk':
# Chunk artifacts are not built independently
# so when we're building any chunk artifact
@@ -176,12 +181,11 @@ class SingleBuildController():
for a in same_chunk_artifacts:
artifacts.remove(a)
artifact_encoded = distbuild.encode_artifact_reference(artifact)
- print "encoded for job queue %s" % artifact.name
# Make the build-artifact job use ARCHITECTURE to enable mutliarch
# build network (TODO)
job = gear.Job("build-artifact", artifact_encoded)
self.builder_client.submitJob(job)
- print "send job queue %s" % artifact.name
+ self.requests_controller.mark_as_building(artifact.cache_key, job)
def _find_artifacts(self, cache_key, name=None):
artifacts, _ = self._map_build_graph(self.artifact, lambda a: a)
@@ -268,7 +272,8 @@ class BuilderClient(gear.Client):
class RequestsController(threading.Thread):
def __init__(self):
super(RequestsController, self).__init__()
- self.building_list = []
+ self.building_dict = {}
+ self.jobs_dict = {}
self.next_id = 1
self.new_request_lock = threading.Lock()
self.lock_queue = threading.Condition()
@@ -282,9 +287,10 @@ class RequestsController(threading.Thread):
request_data = {}
with self.new_request_lock:
request_data['id'] = self.next_id
- request_data['controller'] = SingleBuildController(self, self.next_id)
+ controller = SingleBuildController(self, self.next_id)
# TODO: is this the right place to do this?
- request_data['controller'].start_build(json_request)
+ controller.start_build(json_request, self.next_id)
+ request_data['controller'] = controller
request_data['request'] = request
request_data['job'] = job
self.next_id += 1
@@ -305,16 +311,20 @@ class RequestsController(threading.Thread):
def mark_as_built(self, cache_key):
with self.build_status_lock:
- self.building_list.remove(cache_key)
- print self.building_list
+ print "DEBUG: marking as built %s" % cache_key
+ # TODO, was it cancelled meanwhile??
+ del self.building_dict[cache_key]
+ del self.jobs_dict[cache_key]
+ print self.building_dict
for request in self.build_requests:
controller = request['controller']
if controller.artifact != None and controller.build_initiated == True:
- artifacts = request['controller']._find_artifacts(cache_key)
+ artifacts = controller._find_artifacts(cache_key)
for artifact in artifacts:
artifact.state = BUILT
info = "TO %s: Artifact %s built" % (request['id'], artifact.name)
self.send_data_to_client(request['job'], info)
+ print "finished setting as built"
with self.lock_queue:
self.lock_queue.notify()
@@ -324,18 +334,50 @@ class RequestsController(threading.Thread):
job.sendWorkData(msg)
print msg
- def mark_as_building(self, cache_key):
+ def mark_as_building(self, cache_key, job):
with self.build_status_lock:
- self.building_list.append(cache_key)
- print self.building_list
+ print "DEBUG: marking as building %s" % cache_key
+ requests_building = []
+ print self.building_dict
for request in self.build_requests:
controller = request['controller']
+ request_id = request['id']
if controller.artifact != None and controller.build_initiated == True:
- artifacts = request['controller']._find_artifacts(cache_key)
+ artifacts = controller._find_artifacts(cache_key)
for artifact in artifacts:
+ if not request_id in requests_building:
+ requests_building.append(request_id)
artifact.state = BUILDING
info = "TO %s: Artifact %s building" % (request['id'],artifact.name)
self.send_data_to_client(request['job'], info)
+ self.building_dict[cache_key] = requests_building
+ self.jobs_dict[cache_key] = job
+ print "finished setting as building"
+
+ def cancel_request(self, request_id_cancel):
+ with self.build_status_lock:
+ for request in self.build_requests:
+ request_id = request['id']
+ if request_id == request_id_cancel:
+ self.build_requests.remove(request)
+ controller = request['controller']
+ break
+ return
+ to_remove = []
+
+ for cache_key, requests in self.building_dict.iteritems():
+ print "DEBUG: cancelling %s" % cache_key
+ if requests == [request_id_cancel]:
+ job = self.jobs_dict[cache_key]
+ gear.CancelJobAdminRequest(job.handle)
+ to_remove.append(cache_key)
+
+
+ for cache_key in to_remove:
+ del self.building_dict[cache_key]
+ del self.jobs_dict[cache_key]
+ print "Finished cancelling"
+
def shutdown(self):
self.stopped = True
@@ -393,6 +435,10 @@ signal.signal(signal.SIGTERM, term_handler)
requests_controller.start()
requests_manager.start()
+time.sleep(15)
+print "START CANCELATION"
+requests_controller.cancel_request(1)
+
while not requests_controller.stopped:
try:
time.sleep(3)