From 1a4a60600040d9f61e5bc617f380ebf3b4a36acf Mon Sep 17 00:00:00 2001 From: ianb Date: Fri, 27 Apr 2007 21:38:56 +0000 Subject: Cleaned up the worker lifecycle logic, fixing some bugs in how workers are tracked. Track why worker threads are started, and include this information in the logs. --- paste/httpserver.py | 82 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 48 insertions(+), 34 deletions(-) (limited to 'paste/httpserver.py') diff --git a/paste/httpserver.py b/paste/httpserver.py index 319bc07..1df18cb 100755 --- a/paste/httpserver.py +++ b/paste/httpserver.py @@ -597,18 +597,17 @@ class ThreadPool(object): if not daemon: atexit.register(self.shutdown) for i in range(self.nworkers): - self.add_worker_thread() + self.add_worker_thread(message='Initial worker pool') def add_task(self, task): """ Add a task to the queue """ - self.logger.debug('Added task %s', task) + self.logger.debug('Added task (%i tasks queued)', self.queue.qsize()) if self.hung_check_period: self.requests_since_last_hung_check += 1 if self.requests_since_last_hung_check > self.hung_check_period: self.requests_since_last_hung_check = 0 - self.logger.info('Calling periodic kill_hung_threads()') self.kill_hung_threads() if not self.idle_workers and self.spawn_if_under: # spawn_if_under can come into effect... @@ -629,7 +628,7 @@ class ThreadPool(object): 'workers', busy, self.spawn_if_under-busy) self._last_added_new_idle_workers = time.time() for i in range(self.spawn_if_under - busy): - self.add_worker_thread() + self.add_worker_thread(message='Response to lack of idle workers') else: self.logger.debug( 'No extra workers needed (%s busy workers)', @@ -642,6 +641,8 @@ class ThreadPool(object): self.logger.info( 'Culling %s extra workers (%s idle workers present)', len(self.workers)-self.nworkers, len(self.idle_workers)) + self.logger.debug( + 'Idle workers: %s', self.idle_workers) for i in range(len(self.workers) - self.nworkers): self.queue.put(self.SHUTDOWN) self.queue.put(task) @@ -702,7 +703,7 @@ class ThreadPool(object): if thread_obj in self.workers: self.workers.remove(thread_obj) self.dying_threads[thread_id] = (time.time(), thread_obj) - self.add_worker_thread() + self.add_worker_thread(message='Replacement for killed thread %s' % thread_id) def thread_exists(self, thread_id): """ @@ -710,14 +711,13 @@ class ThreadPool(object): """ return thread_id in threading._active - def add_worker_thread(self): + def add_worker_thread(self, *args, **kwargs): index = self._worker_count.next() worker = threading.Thread(target=self.worker_thread_callback, + args=args, kwargs=kwargs, name=("worker %d" % index)) worker.setDaemon(self.daemon) worker.start() - self.workers.append(worker) - self.logger.debug('Added worker %s', worker) def kill_hung_threads(self): """ @@ -773,7 +773,7 @@ class ThreadPool(object): ave_time = '%.2fsec' % ave_time else: ave_time = 'N/A' - self.logger.debug( + self.logger.info( "kill_hung_threads status: %s threads (%s working, %s idle, %s starting) " "ave time %s, max time %.2fsec, killed %s workers" % (idle_workers + starting_workers + working_workers, @@ -819,35 +819,30 @@ class ThreadPool(object): print 'Shutting down', threading.currentThread() raise ServerExit(3) - def worker_thread_callback(self): + def worker_thread_callback(self, message=None): """ Worker thread should call this method to get and process queued callables. """ thread_obj = threading.currentThread() thread_id = thread_obj.thread_id = thread.get_ident() + self.workers.append(thread_obj) self.idle_workers.append(thread_id) requests_processed = 0 - while True: - if self.max_requests and self.max_requests < requests_processed: - # Replace this thread then die - self.logger.debug('Thread %s processed %i requests (limit %s); stopping thread' - % (thread_id, requests_processed, self.max_requests)) - try: - self.idle_workers.remove(thread_id) - except ValueError: - pass - self.add_worker_thread() - return - runnable = self.queue.get() - if runnable is ThreadPool.SHUTDOWN: - self.logger.debug('Worker %s asked to SHUTDOWN', thread_id) - try: - self.idle_workers.remove(thread_id) - except ValueError: - pass - return - else: + add_replacement_worker = False + self.logger.debug('Started new worker %s: %s', thread_id, message) + try: + while True: + if self.max_requests and self.max_requests < requests_processed: + # Replace this thread then die + self.logger.debug('Thread %s processed %i requests (limit %s); stopping thread' + % (thread_id, requests_processed, self.max_requests)) + add_replacement_worker = True + break + runnable = self.queue.get() + if runnable is ThreadPool.SHUTDOWN: + self.logger.debug('Worker %s asked to SHUTDOWN', thread_id) + break try: self.idle_workers.remove(thread_id) except ValueError: @@ -865,16 +860,35 @@ class ThreadPool(object): print >> sys.stderr, ( 'Unexpected exception in worker %r' % runnable) traceback.print_exc() + if thread_id in self.dying_threads: + # That last exception was intended to kill me + break finally: try: del self.worker_tracker[thread_id] except KeyError: pass sys.exc_clear() - if thread_id in self.dying_threads: - # This thread has been killed; so die! - return - self.idle_workers.append(thread_id) + self.idle_workers.append(thread_id) + finally: + try: + del self.worker_tracker[thread_id] + except KeyError: + pass + try: + self.idle_workers.remove(thread_id) + except ValueError: + pass + try: + self.workers.remove(thread_obj) + except ValueError: + pass + try: + del self.dying_threads[thread_id] + except KeyError: + pass + if add_replacement_worker: + self.add_worker_thread(message='Voluntary replacement for thread %s' % thread_id) def shutdown(self, force_quit_timeout=0): """ -- cgit v1.2.1