diff options
author | ianb <devnull@localhost> | 2007-04-27 21:38:56 +0000 |
---|---|---|
committer | ianb <devnull@localhost> | 2007-04-27 21:38:56 +0000 |
commit | 1a4a60600040d9f61e5bc617f380ebf3b4a36acf (patch) | |
tree | 36d74b57264748275a13dd79a094ad38d862ff28 /paste/httpserver.py | |
parent | 260f3173670f03cc2102240e7a37ee28883e7fbb (diff) | |
download | paste-1a4a60600040d9f61e5bc617f380ebf3b4a36acf.tar.gz |
Cleaned up the worker lifecycle logic, fixing some bugs in how workers are tracked. Track why worker threads are started, and include this information in the logs.
Diffstat (limited to 'paste/httpserver.py')
-rwxr-xr-x | paste/httpserver.py | 82 |
1 files changed, 48 insertions, 34 deletions
diff --git a/paste/httpserver.py b/paste/httpserver.py index 319bc07..1df18cb 100755 --- a/paste/httpserver.py +++ b/paste/httpserver.py @@ -597,18 +597,17 @@ class ThreadPool(object): if not daemon: atexit.register(self.shutdown) for i in range(self.nworkers): - self.add_worker_thread() + self.add_worker_thread(message='Initial worker pool') def add_task(self, task): """ Add a task to the queue """ - self.logger.debug('Added task %s', task) + self.logger.debug('Added task (%i tasks queued)', self.queue.qsize()) if self.hung_check_period: self.requests_since_last_hung_check += 1 if self.requests_since_last_hung_check > self.hung_check_period: self.requests_since_last_hung_check = 0 - self.logger.info('Calling periodic kill_hung_threads()') self.kill_hung_threads() if not self.idle_workers and self.spawn_if_under: # spawn_if_under can come into effect... @@ -629,7 +628,7 @@ class ThreadPool(object): 'workers', busy, self.spawn_if_under-busy) self._last_added_new_idle_workers = time.time() for i in range(self.spawn_if_under - busy): - self.add_worker_thread() + self.add_worker_thread(message='Response to lack of idle workers') else: self.logger.debug( 'No extra workers needed (%s busy workers)', @@ -642,6 +641,8 @@ class ThreadPool(object): self.logger.info( 'Culling %s extra workers (%s idle workers present)', len(self.workers)-self.nworkers, len(self.idle_workers)) + self.logger.debug( + 'Idle workers: %s', self.idle_workers) for i in range(len(self.workers) - self.nworkers): self.queue.put(self.SHUTDOWN) self.queue.put(task) @@ -702,7 +703,7 @@ class ThreadPool(object): if thread_obj in self.workers: self.workers.remove(thread_obj) self.dying_threads[thread_id] = (time.time(), thread_obj) - self.add_worker_thread() + self.add_worker_thread(message='Replacement for killed thread %s' % thread_id) def thread_exists(self, thread_id): """ @@ -710,14 +711,13 @@ class ThreadPool(object): """ return thread_id in threading._active - def add_worker_thread(self): + def add_worker_thread(self, *args, **kwargs): index = self._worker_count.next() worker = threading.Thread(target=self.worker_thread_callback, + args=args, kwargs=kwargs, name=("worker %d" % index)) worker.setDaemon(self.daemon) worker.start() - self.workers.append(worker) - self.logger.debug('Added worker %s', worker) def kill_hung_threads(self): """ @@ -773,7 +773,7 @@ class ThreadPool(object): ave_time = '%.2fsec' % ave_time else: ave_time = 'N/A' - self.logger.debug( + self.logger.info( "kill_hung_threads status: %s threads (%s working, %s idle, %s starting) " "ave time %s, max time %.2fsec, killed %s workers" % (idle_workers + starting_workers + working_workers, @@ -819,35 +819,30 @@ class ThreadPool(object): print 'Shutting down', threading.currentThread() raise ServerExit(3) - def worker_thread_callback(self): + def worker_thread_callback(self, message=None): """ Worker thread should call this method to get and process queued callables. """ thread_obj = threading.currentThread() thread_id = thread_obj.thread_id = thread.get_ident() + self.workers.append(thread_obj) self.idle_workers.append(thread_id) requests_processed = 0 - while True: - if self.max_requests and self.max_requests < requests_processed: - # Replace this thread then die - self.logger.debug('Thread %s processed %i requests (limit %s); stopping thread' - % (thread_id, requests_processed, self.max_requests)) - try: - self.idle_workers.remove(thread_id) - except ValueError: - pass - self.add_worker_thread() - return - runnable = self.queue.get() - if runnable is ThreadPool.SHUTDOWN: - self.logger.debug('Worker %s asked to SHUTDOWN', thread_id) - try: - self.idle_workers.remove(thread_id) - except ValueError: - pass - return - else: + add_replacement_worker = False + self.logger.debug('Started new worker %s: %s', thread_id, message) + try: + while True: + if self.max_requests and self.max_requests < requests_processed: + # Replace this thread then die + self.logger.debug('Thread %s processed %i requests (limit %s); stopping thread' + % (thread_id, requests_processed, self.max_requests)) + add_replacement_worker = True + break + runnable = self.queue.get() + if runnable is ThreadPool.SHUTDOWN: + self.logger.debug('Worker %s asked to SHUTDOWN', thread_id) + break try: self.idle_workers.remove(thread_id) except ValueError: @@ -865,16 +860,35 @@ class ThreadPool(object): print >> sys.stderr, ( 'Unexpected exception in worker %r' % runnable) traceback.print_exc() + if thread_id in self.dying_threads: + # That last exception was intended to kill me + break finally: try: del self.worker_tracker[thread_id] except KeyError: pass sys.exc_clear() - if thread_id in self.dying_threads: - # This thread has been killed; so die! - return - self.idle_workers.append(thread_id) + self.idle_workers.append(thread_id) + finally: + try: + del self.worker_tracker[thread_id] + except KeyError: + pass + try: + self.idle_workers.remove(thread_id) + except ValueError: + pass + try: + self.workers.remove(thread_obj) + except ValueError: + pass + try: + del self.dying_threads[thread_id] + except KeyError: + pass + if add_replacement_worker: + self.add_worker_thread(message='Voluntary replacement for thread %s' % thread_id) def shutdown(self, force_quit_timeout=0): """ |