From 425015014eab514b377e6454771540d284e48a58 Mon Sep 17 00:00:00 2001 From: Pedro Alvarez Date: Mon, 28 Mar 2016 16:38:42 +0000 Subject: Start creating RequestsManager to listen to requests Change-Id: I5b660939333c57a6a47f384dc4f346a48334ffe9 --- gear/client.py | 71 +++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 61 insertions(+), 10 deletions(-) (limited to 'gear') diff --git a/gear/client.py b/gear/client.py index dea67a0d..9dc1df60 100644 --- a/gear/client.py +++ b/gear/client.py @@ -3,6 +3,8 @@ import sys import json import threading import requests +import time +import signal import urlparse import distbuild #TODO: values from settings @@ -263,17 +265,20 @@ class BuilderClient(gear.Client): job.data = [] return job -class RequestsController(): +class RequestsController(threading.Thread): def __init__(self): + super(RequestsController, self).__init__() self.building_list = [] self.next_id = 1 self.new_request_lock = threading.Lock() self.lock_queue = threading.Condition() self.build_requests = [] self.build_status_lock = threading.Lock() + self.stopped = False def add_request(self, request): json_request = json.dumps(request) + print "DEBUG: adding request - %s" % json_request request_data = {} with self.new_request_lock: request_data['id'] = self.next_id @@ -325,20 +330,66 @@ class RequestsController(): artifact.state = BUILDING print "TO %s: Artifact %s building" % (request['id'],artifact.name) - #consider chunks case + def shutdown(self): + self.stopped = True + with self.lock_queue: + self.lock_queue.notify() - def loop(self): - while True: + def run(self): + while not self.stopped: with self.lock_queue: - self.lock_queue.wait(20) - self.queue_if_possible() + self.lock_queue.wait() + if not self.stopped: + self.queue_if_possible() + + +class RequestsManager(threading.Thread): + def __init__(self, requests_controller): + super(RequestsManager, self).__init__() + self.requests_controller = requests_controller + self.worker = gear.Worker('controller') + self.worker.addServer('localhost') + self.worker.registerFunction("build-request") + self.stopped = False + + def run(self): + while not self.stopped: + try: + print "DEBUG: Waiting for job" + job = self.worker.getJob() + self._handle_job(job) + except gear.InterruptedError: + print 'We were asked to stop waiting for jobs' + + def shutdown(self): + self.stopped = True + self.worker.stopWaitingForJobs() + + def _handle_job(self, job): + build_request=json.loads(job.arguments) + self.requests_controller.add_request(build_request) request = {} request['repo'] = "baserock:baserock/definitions" request['ref'] = "fbce45e45da79e5c35341845ec3b3d7c321e6ff2" request['system'] = "systems/minimal-system-x86_64-generic.morph" -requests_controller = RequestsController() -requests_controller.add_request(request) -requests_controller.add_request(request) -requests_controller.loop() +# Command line +requests_controller = RequestsController() +requests_manager = RequestsManager(requests_controller) + +def term_handler(signum, frame): + requests_manager.shutdown() + requests_controller.shutdown() +signal.signal(signal.SIGTERM, term_handler) + +requests_controller.start() +requests_manager.start() + +while not requests_controller.stopped: + try: + time.sleep(3) + except KeyboardInterrupt: + print "Ctrl + C: asking tasks to exit nicely...\n" + requests_manager.shutdown() + requests_controller.shutdown() -- cgit v1.2.1