# Copyright 2013 Rackspace Australia # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import gear import json import logging import os import threading import time class ZuulManager(threading.Thread): """ This thread manages all of the launched gearman workers. As required by the zuul protocol it handles stopping builds when they are cancelled through stop:turbo-hipster-manager-%hostname. To do this it implements its own gearman worker waiting for events on that manager. """ log = logging.getLogger("worker_manager.ZuulManager") def __init__(self, worker_server, tasks): super(ZuulManager, self).__init__() self._stop = threading.Event() self.stopping = False self.running = False self.worker_server = worker_server self.tasks = tasks self.gearman_worker = None self.setup_gearman() def setup_gearman(self): hostname = os.uname()[1] self.gearman_worker = gear.Worker('turbo-hipster-manager-%s' % hostname) self.gearman_worker.addServer( self.worker_server.config['zuul_server']['gearman_host'], self.worker_server.config['zuul_server']['gearman_port'] ) def register_functions(self): hostname = os.uname()[1] self.gearman_worker.registerFunction( 'stop:turbo-hipster-manager-%s' % hostname) def stop_gracefully(self): self.stopping = True self.gearman_worker.stopWaitingForJobs() while self.running: self.log.debug('waiting to finish') time.sleep(0.1) self._stop.set() self.gearman_worker.shutdown() def stop(self): self._stop.set() # Unblock gearman self.log.debug("Telling gearman to stop waiting for jobs") self.gearman_worker.stopWaitingForJobs() self.gearman_worker.shutdown() def stopped(self): return self._stop.isSet() def run(self): while not self.stopped() and not self.stopping: self.running = True try: # gearman_worker.getJob() blocks until a job is available self.log.debug("Waiting for server") self.gearman_worker.waitForServer() if (not self.stopped() and self.gearman_worker.running and self.gearman_worker.active_connections): self.register_functions() self.gearman_worker.waitForServer() logging.debug("Waiting for job") self.current_step = 0 job = self.gearman_worker.getJob() self._handle_job(job) except gear.InterruptedError: self.log.debug('We were asked to stop waiting for jobs') except: self.log.exception('Unknown exception waiting for job.') self.running = False self.log.debug("Finished manager thread") def _handle_job(self, job): """ Handle the requested job """ try: job_arguments = json.loads(job.arguments.decode('utf-8')) self.tasks[job_arguments['name']].stop_working( job_arguments['number']) job.sendWorkComplete() except Exception as e: self.log.exception('Exception waiting for management job.') job.sendWorkException(str(e).encode('utf-8')) class ZuulClient(threading.Thread): """ ...""" log = logging.getLogger("worker_manager.ZuulClient") def __init__(self, worker_server): super(ZuulClient, self).__init__() self._stop = threading.Event() self.stopping = False self.running = False self.worker_server = worker_server # Set up the runner worker self.gearman_worker = None self.functions = {} self.job = None self.setup_gearman() def setup_gearman(self): self.log.debug("Set up gearman worker") self.gearman_worker = gear.Worker(self.worker_server.worker_name) self.gearman_worker.addServer( self.worker_server.config['zuul_server']['gearman_host'], self.worker_server.config['zuul_server']['gearman_port'] ) def register_functions(self): self.log.debug("Register functions with gearman") for function_name, plugin in self.functions.items(): self.gearman_worker.registerFunction(function_name) self.log.debug(self.gearman_worker.functions) def add_function(self, function_name, plugin): self.log.debug("Add function, %s, to list" % function_name) self.functions[function_name] = plugin def stop(self): self._stop.set() for task in self.functions.values(): task.stop_working() # Unblock gearman self.log.debug("Telling gearman to stop waiting for jobs") self.gearman_worker.stopWaitingForJobs() self.gearman_worker.shutdown() def stop_gracefully(self): self.stopping = True self.gearman_worker.stopWaitingForJobs() while self.running: time.sleep(0.1) self._stop.set() self.gearman_worker.shutdown() def stopped(self): return self._stop.isSet() def run(self): while not self.stopped() and not self.stopping: self.running = True try: # gearman_worker.getJob() blocks until a job is available self.log.debug("Waiting for server") self.gearman_worker.waitForServer() if (not self.stopped() and self.gearman_worker.running and self.gearman_worker.active_connections): self.register_functions() self.gearman_worker.waitForServer() self.log.debug("Waiting for job") self.job = self.gearman_worker.getJob() self._handle_job() except gear.InterruptedError: self.log.debug('We were asked to stop waiting for jobs') except: self.log.exception('Unknown exception waiting for job.') self.running = False self.log.debug("Finished client thread") def _handle_job(self): """ We have a job, give it to the right plugin """ if self.job: self.log.debug("We have a job, we'll launch the task now.") self.functions[self.job.name].start_job(self.job)