diff options
-rw-r--r-- | gear/client.py | 71 |
1 files changed, 61 insertions, 10 deletions
diff --git a/gear/client.py b/gear/client.py index dea67a0d..9dc1df60 100644 --- a/gear/client.py +++ b/gear/client.py @@ -3,6 +3,8 @@ import sys import json import threading import requests +import time +import signal import urlparse import distbuild #TODO: values from settings @@ -263,17 +265,20 @@ class BuilderClient(gear.Client): job.data = [] return job -class RequestsController(): +class RequestsController(threading.Thread): def __init__(self): + super(RequestsController, self).__init__() self.building_list = [] self.next_id = 1 self.new_request_lock = threading.Lock() self.lock_queue = threading.Condition() self.build_requests = [] self.build_status_lock = threading.Lock() + self.stopped = False def add_request(self, request): json_request = json.dumps(request) + print "DEBUG: adding request - %s" % json_request request_data = {} with self.new_request_lock: request_data['id'] = self.next_id @@ -325,20 +330,66 @@ class RequestsController(): artifact.state = BUILDING print "TO %s: Artifact %s building" % (request['id'],artifact.name) - #consider chunks case + def shutdown(self): + self.stopped = True + with self.lock_queue: + self.lock_queue.notify() - def loop(self): - while True: + def run(self): + while not self.stopped: with self.lock_queue: - self.lock_queue.wait(20) - self.queue_if_possible() + self.lock_queue.wait() + if not self.stopped: + self.queue_if_possible() + + +class RequestsManager(threading.Thread): + def __init__(self, requests_controller): + super(RequestsManager, self).__init__() + self.requests_controller = requests_controller + self.worker = gear.Worker('controller') + self.worker.addServer('localhost') + self.worker.registerFunction("build-request") + self.stopped = False + + def run(self): + while not self.stopped: + try: + print "DEBUG: Waiting for job" + job = self.worker.getJob() + self._handle_job(job) + except gear.InterruptedError: + print 'We were asked to stop waiting for jobs' + + def shutdown(self): + self.stopped = True + self.worker.stopWaitingForJobs() + + def _handle_job(self, job): + build_request=json.loads(job.arguments) + self.requests_controller.add_request(build_request) request = {} request['repo'] = "baserock:baserock/definitions" request['ref'] = "fbce45e45da79e5c35341845ec3b3d7c321e6ff2" request['system'] = "systems/minimal-system-x86_64-generic.morph" -requests_controller = RequestsController() -requests_controller.add_request(request) -requests_controller.add_request(request) -requests_controller.loop() +# Command line +requests_controller = RequestsController() +requests_manager = RequestsManager(requests_controller) + +def term_handler(signum, frame): + requests_manager.shutdown() + requests_controller.shutdown() +signal.signal(signal.SIGTERM, term_handler) + +requests_controller.start() +requests_manager.start() + +while not requests_controller.stopped: + try: + time.sleep(3) + except KeyboardInterrupt: + print "Ctrl + C: asking tasks to exit nicely...\n" + requests_manager.shutdown() + requests_controller.shutdown() |