From ea91a14e56efe070c977e9ff2877a23f3c498e92 Mon Sep 17 00:00:00 2001 From: Pedro Alvarez Date: Tue, 29 Mar 2016 21:39:49 +0000 Subject: wip cancelation, it works, but doesn't cancel running jobs Change-Id: I713471d96c070d9f9bba7cff6f6c2ef526c140d2 --- gear/controller.py | 80 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 63 insertions(+), 17 deletions(-) diff --git a/gear/controller.py b/gear/controller.py index dd3a1499..83616d15 100644 --- a/gear/controller.py +++ b/gear/controller.py @@ -38,9 +38,10 @@ class SingleBuildController(): self.build_started = False self.build_initiated = False - def start_build(self, request): + def start_build(self, request, request_id): job = gear.Job("build-graph", request) self.graph_client.submitJob(job) + self.request_id = request_id def _process_build_graph(self, build_graph): print "Decoding artifact received" @@ -72,8 +73,15 @@ class SingleBuildController(): # Check first building artifacts def mark_building_artifacts(artifact): if (artifact.state == UNBUILT and artifact.cache_key - in requests_controller.building_list): + in requests_controller.building_dict): artifact.state = BUILDING + building_dict = requests_controller.building_dict + cache_key = artifact.cache_key + if (cache_key in building_dict and + not self.request_id in building_dict[cache_key]): + requests_controller.building_dict[ + artifact.cache_key].append(self.request_id) + with requests_controller.build_status_lock: self._map_build_graph(self.artifact, mark_building_artifacts) @@ -163,9 +171,6 @@ class SingleBuildController(): print "starting" while len(artifacts) > 0: artifact = artifacts.pop() - print "going to queue %s" % artifact.name - self.requests_controller.mark_as_building(artifact.cache_key) - print "marked as queue %s" % artifact.name if artifact.kind == 'chunk': # Chunk artifacts are not built independently # so when we're building any chunk artifact @@ -176,12 +181,11 @@ class SingleBuildController(): for a in same_chunk_artifacts: artifacts.remove(a) artifact_encoded = distbuild.encode_artifact_reference(artifact) - print "encoded for job queue %s" % artifact.name # Make the build-artifact job use ARCHITECTURE to enable mutliarch # build network (TODO) job = gear.Job("build-artifact", artifact_encoded) self.builder_client.submitJob(job) - print "send job queue %s" % artifact.name + self.requests_controller.mark_as_building(artifact.cache_key, job) def _find_artifacts(self, cache_key, name=None): artifacts, _ = self._map_build_graph(self.artifact, lambda a: a) @@ -268,7 +272,8 @@ class BuilderClient(gear.Client): class RequestsController(threading.Thread): def __init__(self): super(RequestsController, self).__init__() - self.building_list = [] + self.building_dict = {} + self.jobs_dict = {} self.next_id = 1 self.new_request_lock = threading.Lock() self.lock_queue = threading.Condition() @@ -282,9 +287,10 @@ class RequestsController(threading.Thread): request_data = {} with self.new_request_lock: request_data['id'] = self.next_id - request_data['controller'] = SingleBuildController(self, self.next_id) + controller = SingleBuildController(self, self.next_id) # TODO: is this the right place to do this? - request_data['controller'].start_build(json_request) + controller.start_build(json_request, self.next_id) + request_data['controller'] = controller request_data['request'] = request request_data['job'] = job self.next_id += 1 @@ -305,16 +311,20 @@ class RequestsController(threading.Thread): def mark_as_built(self, cache_key): with self.build_status_lock: - self.building_list.remove(cache_key) - print self.building_list + print "DEBUG: marking as built %s" % cache_key + # TODO, was it cancelled meanwhile?? + del self.building_dict[cache_key] + del self.jobs_dict[cache_key] + print self.building_dict for request in self.build_requests: controller = request['controller'] if controller.artifact != None and controller.build_initiated == True: - artifacts = request['controller']._find_artifacts(cache_key) + artifacts = controller._find_artifacts(cache_key) for artifact in artifacts: artifact.state = BUILT info = "TO %s: Artifact %s built" % (request['id'], artifact.name) self.send_data_to_client(request['job'], info) + print "finished setting as built" with self.lock_queue: self.lock_queue.notify() @@ -324,18 +334,50 @@ class RequestsController(threading.Thread): job.sendWorkData(msg) print msg - def mark_as_building(self, cache_key): + def mark_as_building(self, cache_key, job): with self.build_status_lock: - self.building_list.append(cache_key) - print self.building_list + print "DEBUG: marking as building %s" % cache_key + requests_building = [] + print self.building_dict for request in self.build_requests: controller = request['controller'] + request_id = request['id'] if controller.artifact != None and controller.build_initiated == True: - artifacts = request['controller']._find_artifacts(cache_key) + artifacts = controller._find_artifacts(cache_key) for artifact in artifacts: + if not request_id in requests_building: + requests_building.append(request_id) artifact.state = BUILDING info = "TO %s: Artifact %s building" % (request['id'],artifact.name) self.send_data_to_client(request['job'], info) + self.building_dict[cache_key] = requests_building + self.jobs_dict[cache_key] = job + print "finished setting as building" + + def cancel_request(self, request_id_cancel): + with self.build_status_lock: + for request in self.build_requests: + request_id = request['id'] + if request_id == request_id_cancel: + self.build_requests.remove(request) + controller = request['controller'] + break + return + to_remove = [] + + for cache_key, requests in self.building_dict.iteritems(): + print "DEBUG: cancelling %s" % cache_key + if requests == [request_id_cancel]: + job = self.jobs_dict[cache_key] + gear.CancelJobAdminRequest(job.handle) + to_remove.append(cache_key) + + + for cache_key in to_remove: + del self.building_dict[cache_key] + del self.jobs_dict[cache_key] + print "Finished cancelling" + def shutdown(self): self.stopped = True @@ -393,6 +435,10 @@ signal.signal(signal.SIGTERM, term_handler) requests_controller.start() requests_manager.start() +time.sleep(15) +print "START CANCELATION" +requests_controller.cancel_request(1) + while not requests_controller.stopped: try: time.sleep(3) -- cgit v1.2.1