From a36799cb8543a687217121e5651b95b388652dc4 Mon Sep 17 00:00:00 2001 From: ianb Date: Sat, 10 Mar 2007 02:35:35 +0000 Subject: Major threadpool features to add threads temporarily, kill threads, monitor the killed threads, and notify the administrator about problems. Also updates to watchthreads to see this new information. --- paste/httpserver.py | 519 +++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 494 insertions(+), 25 deletions(-) (limited to 'paste/httpserver.py') diff --git a/paste/httpserver.py b/paste/httpserver.py index db6e25a..1a39c06 100755 --- a/paste/httpserver.py +++ b/paste/httpserver.py @@ -23,10 +23,17 @@ import socket, sys, threading, urlparse, Queue, urllib import posixpath import time import thread +import os from itertools import count from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer from SocketServer import ThreadingMixIn from paste.util import converters +import logging +try: + from paste.util import killthread +except ImportError: + # Not available, probably no ctypes + killthread = None __all__ = ['WSGIHandlerMixin', 'WSGIServer', 'WSGIHandler', 'serve'] __version__ = "0.5" @@ -466,10 +473,60 @@ class LimitedLengthFile(object): class ThreadPool(object): """ Generic thread pool with a queue of callables to consume. + + Keeps a notion of the status of its worker threads: + + idle: worker thread with nothing to do + + busy: worker thread doing its job + + hung: worker thread that's been doing a job for too long + + dying: a hung thread that has been killed, but hasn't died quite + yet. + + zombie: what was a worker thread that we've tried to kill but + isn't dead yet. + + At any time you can call track_threads, to get a dictionary with + these keys and lists of thread_ids that fall in that status. All + keys will be present, even if they point to emty lists. + + hung threads are threads that have been busy more than + hung_thread_limit seconds. Hung threads are killed when they live + longer than kill_thread_limit seconds. A thread is then + considered dying for dying_limit seconds, if it is still alive + after that it is considered a zombie. + + When there are no idle workers and a request comes in, another + worker *may* be spawned. If there are less than spawn_if_under + threads in the busy state, another thread will be spawned. So if + the limit is 5, and there are 4 hung threads and 6 busy threads, + no thread will be spawned. + + When there are more than max_zombie_threads_before_die zombie + threads, a SystemExit exception will be raised, stopping the + server. Use 0 or None to never raise this exception. Zombie + threads *should* get cleaned up, but killing threads is no + necessarily reliable. This is turned off by default, since it is + only a good idea if you've deployed the server with some process + watching from above (something similar to daemontools or zdaemon). + """ + + SHUTDOWN = object() - def __init__(self, nworkers, name="ThreadPool", daemon=False): + def __init__(self, nworkers, name="ThreadPool", daemon=False, + hung_thread_limit=30, # when a thread is marked "hung" + kill_thread_limit=600, # when you kill that hung thread + dying_limit=300, # seconds that a kill should take to go into effect (longer than this and the thread is a "zombie") + spawn_if_under=5, # spawn if there's too many hung threads + max_zombie_threads_before_die=0, # when to give up on the process + hung_check_period=100, # every 100 requests check for hung workers + logger=None, # Place to log messages to + error_email=None, # Person(s) to notify if serious problem occurs + ): """ Create thread pool with `nworkers` worker threads. """ @@ -478,6 +535,12 @@ class ThreadPool(object): self.queue = Queue.Queue() self.workers = [] self.daemon = daemon + if logger is None: + logger = logging.getLogger('paste.httpserver.ThreadPool') + if isinstance(logger, basestring): + logger = logging.getLogger(logger) + self.logger = logger + self.error_email = error_email self._worker_count = count() for i in range(self.nworkers): self.add_worker_thread() @@ -485,7 +548,111 @@ class ThreadPool(object): if not daemon: atexit.register(self.shutdown) + assert (not kill_thread_limit + or kill_thread_limit >= hung_thread_limit), ( + "kill_thread_limit (%s) should be higher than hung_thread_limit (%s)" + % (kill_thread_limit, hung_thread_limit)) + if not killthread: + kill_thread_limit = 0 + self.logger.info( + "Cannot use kill_thread_limit as ctypes/killthread is not available") + self.kill_thread_limit = kill_thread_limit + self.dying_limit = dying_limit + self.hung_thread_limit = hung_thread_limit + assert spawn_if_under <= nworkers, ( + "spawn_if_under (%s) should be less than nworkers (%s)" + % (spawn_if_under, nworkers)) + self.spawn_if_under = spawn_if_under + self.max_zombie_threads_before_die = max_zombie_threads_before_die + self.hung_check_period = hung_check_period + self.requests_since_last_hung_check = 0 + # Used to keep track of what worker is doing what: self.worker_tracker = {} + # Used to keep track of the workers not doing anything: + self.idle_workers = [] + # Used to keep track of threads that have been killed, but maybe aren't dead yet: + self.dying_threads = {} + + def add_task(self, task): + """ + Add a task to the queue + """ + self.logger.debug('Added task %s', task) + if self.hung_check_period: + self.requests_since_last_hung_check += 1 + if self.requests_since_last_hung_check > self.hung_check_period: + self.requests_since_last_hung_check = 0 + self.logger.info('Calling periodic kill_hung_threads()') + self.kill_hung_threads() + if not self.idle_workers and self.spawn_if_under: + # spawn_if_under can come into effect... + busy = 0 + now = time.time() + self.logger.debug('No idle workers for task; checking if we need to make more workers') + for worker in self.workers: + if not hasattr(worker, 'thread_id'): + # Not initialized + continue + if worker.thread_id in self.worker_tracker: + time_started, info = self.worker_tracker[worker.thread_id] + if now - time_started < self.hung_thread_limit: + busy += 1 + if busy < self.spawn_if_under: + self.logger.info( + 'No idle tasks, and only %s busy tasks; adding %s more ' + 'workers', busy, self.spawn_if_under-busy) + for i in range(self.spawn_if_under - busy): + self.add_worker_thread() + else: + self.logger.debug( + 'No extra workers needed (%s busy workers)', + busy) + if (len(self.workers) > self.nworkers + and len(self.idle_workers) > 3): + # We've spawned worers in the past, but they aren't needed + # anymore; kill off some + self.logger.info( + 'Culling %s extra workers (%s idle workers present)', + len(self.workers)-self.nworkers, len(self.idle_workers)) + for i in range(len(self.workers) - self.nworkers): + self.queue.put(self.SHUTDOWN) + self.queue.put(task) + + def track_threads(self): + """ + Return a dict summarizing the threads in the pool (as + described in the ThreadPool docstring). + """ + result = dict(idle=[], busy=[], hung=[], dying=[], zombie=[]) + now = time.time() + for worker in self.workers: + if not hasattr(worker, 'thread_id'): + # The worker hasn't fully started up, we should just + # ignore it + continue + if worker.thread_id in self.worker_tracker: + time_started, info = self.worker_tracker[worker.thread_id] + if now - time_started > self.hung_thread_limit: + result['hung'].append(worker) + else: + result['busy'].append(worker) + else: + result['idle'].append(worker) + for thread_id, (time_killed, worker) in self.dying_threads.items(): + if not self.thread_exists(thread_id): + # Cull dying threads that are actually dead and gone + self.logger.info('Killed thread %s no longer around', + thread_id) + try: + del self.dying_threads[thread_id] + except KeyError: + pass + continue + if now - time_killed > self.dying_limit: + result['zombie'].append(worker) + else: + result['dying'].append(worker) + return result def kill_worker(self, thread_id): """ @@ -494,21 +661,27 @@ class ThreadPool(object): This should only be done for mis-behaving workers. """ - from paste.util import killthread - for thread_obj_id, thread_obj in threading._active.items(): - if thread_id == thread_obj_id: - break - else: - thread_obj = None + if killthread is None: + raise RuntimeError( + "Cannot kill worker; killthread/ctypes not available") + thread_obj = threading._active.get(thread_id) killthread.async_raise(thread_id, SystemExit) try: del self.worker_tracker[thread_id] except KeyError: pass + self.logger.info('Killing thread %s', thread_id) if thread_obj in self.workers: self.workers.remove(thread_obj) + self.dying_threads[thread_id] = (time.time(), thread_obj) self.add_worker_thread() + def thread_exists(self, thread_id): + """ + Returns true if a thread with this id is still running + """ + return thread_id in threading._active + def add_worker_thread(self): index = self._worker_count.next() worker = threading.Thread(target=self.worker_thread_callback, @@ -516,18 +689,108 @@ class ThreadPool(object): worker.setDaemon(self.daemon) worker.start() self.workers.append(worker) + self.logger.debug('Added worker %s', worker) + + def kill_hung_threads(self): + """ + Tries to kill any hung threads + """ + if not self.kill_thread_limit: + # No killing should occur + return + now = time.time() + for worker in self.workers: + if not hasattr(worker, 'thread_id'): + # Not setup yet + continue + if worker.thread_id not in self.worker_tracker: + # Must be idle + continue + time_started, info = self.worker_tracker[worker.thread_id] + if now - time_started > self.kill_thread_limit: + self.logger.warning( + 'Thread %s hung (working on task for %i seconds)', + worker.thread_id, now - time_started) + try: + import pprint + info_desc = pprint.pformat(info) + except: + out = StringIO() + traceback.print_exc(file=out) + info_desc = 'Error:\n%s' % out.getvalue() + self.notify_problem( + "Killing worker thread (id=%(thread_id)s) because it has been \n" + "working on task for %(time)s seconds (limit is %(limit)s)\n" + "Info on task:\n" + "%(info)s" + % dict(thread_id=worker.thread_id, + time=now - time_started, + limit=self.kill_thread_limit, + info=info_desc)) + self.kill_worker(worker.thread_id) + self.check_max_zombies() + + def check_max_zombies(self): + """ + Check if we've reached max_zombie_threads_before_die; if so + then kill the entire process. + """ + if not self.max_zombie_threads_before_die: + return + found = [] + now = time.time() + for thread_id, (time_killed, worker) in self.dying_threads.items(): + if not self.thread_exists(thread_id): + # Cull dying threads that are actually dead and gone + try: + del self.dying_threads[thread_id] + except KeyError: + pass + continue + if now - time_killed > self.dying_limit: + found.append(thread_id) + if found: + self.logger.info('Found %s zombie threads', found) + if len(found) > self.max_zombie_threads_before_die: + self.logger.fatal( + 'Exiting process because %s zombie threads is more than %s limit', + len(found), self.max_zombie_threads_before_die) + self.notify_problem( + "Exiting process because %(found)s zombie threads " + "(more than limit of %(limit)s)\n" + "Bad threads (ids):\n" + " %(ids)s\n" + % dict(found=len(found), + limit=self.max_zombie_threads_before_die, + ids="\n ".join(map(str, found))), + subject="Process restart (too many zombie threads)") + self.shutdown(10) + print 'Shutting down', threading.currentThread() + raise ServerExit(3) def worker_thread_callback(self): """ Worker thread should call this method to get and process queued callables. """ + thread_obj = threading.currentThread() + thread_id = thread_obj.thread_id = thread.get_ident() + self.idle_workers.append(thread_id) while True: runnable = self.queue.get() if runnable is ThreadPool.SHUTDOWN: + self.logger.debug('Worker %s asked to SHUTDOWN', thread_id) + try: + self.idle_workers.remove(thread_id) + except ValueError: + pass return else: - self.worker_tracker[thread.get_ident()] = [time.time(), None] + try: + self.idle_workers.remove(thread_id) + except ValueError: + pass + self.worker_tracker[thread_id] = [time.time(), None] try: try: runnable() @@ -541,33 +804,138 @@ class ThreadPool(object): traceback.print_exc() finally: try: - del self.worker_tracker[thread.get_ident()] + del self.worker_tracker[thread_id] except KeyError: pass sys.exc_clear() + if thread_id in self.dying_threads: + # This thread has been killed; so die! + return + self.idle_workers.append(thread_id) - def shutdown(self): + def shutdown(self, force_quit_timeout=0): """ Shutdown the queue (after finishing any pending requests). """ + self.logger.info('Shutting down threadpool') # Add a shutdown request for every worker - for i in range(self.nworkers): + for i in range(len(self.workers)): self.queue.put(ThreadPool.SHUTDOWN) # Wait for each thread to terminate + hung_workers = [] for worker in self.workers: - worker.join() + worker.join(0.5) + if worker.isAlive(): + hung_workers.append(worker) + zombies = [] + for thread_id in self.dying_threads: + if self.thread_exists(thread_id): + zombies.append(thread_id) + if hung_workers or zombies: + self.logger.info("%s workers didn't stop properly, and %s zombies", + len(hung_workers), len(zombies)) + if hung_workers: + for worker in hung_workers: + self.kill_worker(worker.thread_id) + self.logger.info('Workers killed forcefully') + if force_quit_timeout: + hung = [] + timed_out = False + need_force_quit = bool(zombies) + for workers in self.workers: + if not timed_out and worker.isAlive(): + timed_out = True + worker.join(force_quit_timeout) + if worker.isAlive(): + print "Worker %s won't die" % worker + need_force_quit = True + if need_force_quit: + import atexit + # Remove the threading atexit callback + for callback in list(atexit._exithandlers): + func = getattr(callback[0], 'im_func', None) + if not func: + continue + globs = getattr(func, 'func_globals', {}) + mod = globs.get('__name__') + if mod == 'threading': + atexit._exithandlers.remove(callback) + atexit._run_exitfuncs() + print 'Forcefully exiting process' + os._exit(3) + else: + self.logger.info('All workers eventually killed') + else: + self.logger.info('All workers stopped') + + def notify_problem(self, msg, subject=None, spawn_thread=True): + """ + Called when there's a substantial problem. msg contains the + body of the notification, subject the summary. + If spawn_thread is true, then the email will be send in + another thread (so this doesn't block). + """ + if not self.error_email: + return + if spawn_thread: + t = threading.Thread( + target=self.notify_problem, + args=(msg, subject, False)) + t.start() + return + from_address = 'errors@localhost' + if not subject: + subject = msg.strip().splitlines()[0] + subject = subject[:50] + subject = '[http threadpool] %s' % subject + headers = [ + "To: %s" % self.error_email, + "From: %s" % from_address, + "Subject: %s" % subject, + ] + try: + system = ' '.join(os.uname()) + except: + system = '(unknown)' + body = ( + "An error has occurred in the paste.httpserver.ThreadPool\n" + "Error:\n" + " %(msg)s\n" + "Occurred at: %(time)s\n" + "PID: %(pid)s\n" + "System: %(system)s\n" + "Server .py file: %(file)s\n" + % dict(msg=msg, + time=time.strftime("%c"), + pid=os.getpid(), + system=system, + file=os.path.abspath(__file__), + )) + message = '\n'.join(headers) + "\n\n" + body + import smtplib + server = smtplib.SMTP('localhost') + error_emails = [ + e.strip() for e in self.error_email.split(",") + if e.strip()] + server.sendmail(from_address, error_emails, message) + server.quit() + print 'email sent to', error_emails, message + class ThreadPoolMixIn(object): """ Mix-in class to process requests from a thread pool """ - def __init__(self, nworkers, daemon=False): + def __init__(self, nworkers, daemon=False, **threadpool_options): # Create and start the workers self.running = True assert nworkers > 0, "ThreadPoolMixin servers must have at least one worker" - self.thread_pool = ThreadPool(nworkers, + self.thread_pool = ThreadPool( + nworkers, "ThreadPoolMixin HTTP server on %s:%d" - % (self.server_name, self.server_port), daemon) + % (self.server_name, self.server_port), + daemon, + **threadpool_options) def process_request(self, request, client_address): """ @@ -579,9 +947,16 @@ class ThreadPoolMixIn(object): # that we can trap interrupts we need to restore this,.) request.setblocking(1) # Queue processing of the request - self.thread_pool.queue.put( + self.thread_pool.add_task( lambda: self.process_request_in_thread(request, client_address)) + def handle_error(self, request, client_address): + exc_class, exc, tb = sys.exc_info() + if exc_class is ServerExit: + # This is actually a request to stop the server + raise + return super(ThreadPoolMixin, self).handle_error(request, client_address) + def process_request_in_thread(self, request, client_address): """ The worker thread should call back here to do the rest of the @@ -624,7 +999,7 @@ class ThreadPoolMixIn(object): """ self.running = False self.socket.close() - self.thread_pool.shutdown() + self.thread_pool.shutdown(60) class WSGIServerBase(SecureHTTPServer): def __init__(self, wsgi_application, server_address, @@ -647,15 +1022,26 @@ class WSGIServer(ThreadingMixIn, WSGIServerBase): class WSGIThreadPoolServer(ThreadPoolMixIn, WSGIServerBase): def __init__(self, wsgi_application, server_address, RequestHandlerClass=None, ssl_context=None, - nworkers=10, daemon_threads=False): + nworkers=10, daemon_threads=False, + threadpool_options=None): WSGIServerBase.__init__(self, wsgi_application, server_address, RequestHandlerClass, ssl_context) - ThreadPoolMixIn.__init__(self, nworkers, daemon_threads) + if threadpool_options is None: + threadpool_options = {} + ThreadPoolMixIn.__init__(self, nworkers, daemon_threads, + **threadpool_options) + +class ServerExit(SystemExit): + """ + Raised to tell the server to really exit (SystemExit is normally + caught) + """ def serve(application, host=None, port=None, handler=None, ssl_pem=None, ssl_context=None, server_version=None, protocol_version=None, start_loop=True, daemon_threads=None, socket_timeout=None, - use_threadpool=True, threadpool_workers=10): + use_threadpool=True, threadpool_workers=10, + threadpool_options=None): """ Serves your ``application`` over HTTP(S) via WSGI interface @@ -684,7 +1070,7 @@ def serve(application, host=None, port=None, handler=None, ssl_pem=None, $ openssl genrsa 1024 > host.key $ chmod 400 host.key - $ openssl req -new -x509 -nodes -sha1 -days 365 \ + $ openssl req -new -x509 -nodes -sha1 -days 365 \\ -key host.key > host.cert $ cat host.cert host.key > host.pem $ chmod 400 host.pem @@ -744,6 +1130,13 @@ def serve(application, host=None, port=None, handler=None, ssl_pem=None, Number of worker threads to create when ``use_threadpool`` is true. This can be a string or an integer value. + + ``threadpool_options`` + + A dictionary of options to be used when instantiating the + threadpool. See paste.httpserver.ThreadPool for specific + options (``threadpool_workers`` is a specific option that can + also go here). """ is_ssl = False if ssl_pem or ssl_context: @@ -779,7 +1172,8 @@ def serve(application, host=None, port=None, handler=None, ssl_pem=None, if converters.asbool(use_threadpool): server = WSGIThreadPoolServer(application, server_address, handler, ssl_context, int(threadpool_workers), - daemon_threads) + daemon_threads, + threadpool_options=threadpool_options) else: server = WSGIServer(application, server_address, handler, ssl_context) if daemon_threads: @@ -806,10 +1200,85 @@ def serve(application, host=None, port=None, handler=None, ssl_pem=None, # For paste.deploy server instantiation (egg:Paste#http) # Note: this gets a separate function because it has to expect string # arguments (though that's not much of an issue yet, ever?) -def server_runner(wsgi_app, global_conf, *args, **kwargs): - serve(wsgi_app, *args, **kwargs) +def server_runner(wsgi_app, global_conf, **kwargs): + from paste.deploy.converters import asbool + for name in ['port', 'socket_timeout', 'threadpool_workers', + 'threadpool_hung_thread_limit', + 'threadpool_kill_thread_limit', + 'threadpool_dying_limit', 'threadpool_spawn_if_under', + 'threadpool_max_zombie_threads_before_die', + 'threadpool_hung_check_period']: + if name in kwargs: + kwargs[name] = int(kwargs[name]) + for name in ['use_threadpool', 'daemon_threads']: + if name in kwargs: + kwargs[name] = asbool(kwargs[name]) + threadpool_options = {} + for name, value in kwargs.items(): + if name.startswith('threadpool_') and name != 'threadpool_workers': + threadpool_options[name[len('threadpool_'):]] = value + del kwargs[name] + if ('error_email' not in threadpool_options + and 'error_email' in global_conf): + threadpool_options['error_email'] = global_conf['error_email'] + kwargs['threadpool_options'] = threadpool_options + serve(wsgi_app, **kwargs) + +server_runner.__doc__ = serve.__doc__ + """ + + You can also set these threadpool options: + + ``threadpool_hung_thread_limit``: + + The number of seconds a thread can work on a task before it is + considered hung (stuck). Default 30 seconds. + + ``threadpool_kill_thread_limit``: + + The number of seconds a thread can work before you should kill it + (assuming it will never finish). Default 600 seconds (10 minutes). + + ``threadpool_dying_limit``: + + The length of time after killing a thread that it should actually + disappear. If it lives longer than this, it is considered a + "zombie". Note that even in easy situations killing a thread can + be very slow. Default 300 seconds (5 minutes). + + ``threadpool_spawn_if_under``: + + If there are no idle threads and a request comes in, and there are + less than this number of *busy* threads, then add workers to the + pool. Busy threads are threads that have taken less than + ``threadpool_hung_thread_limit`` seconds so far. So if you get + *lots* of requests but they complete in a reasonable amount of time, + the requests will simply queue up (adding more threads probably + wouldn't speed them up). But if you have lots of hung threads and + one more request comes in, this will add workers to handle it. + Default 5. + + ``threadpool_max_zombie_threads_before_die``: + + If there are more zombies than this, just kill the process. This is + only good if you have a monitor that will automatically restart + the server. This can clean up the mess. Default 0 (disabled). + + `threadpool_hung_check_period``: + + Every X requests, check for hung threads that need to be killed, + or for zombie threads that should cause a restart. Default 100 + requests. + + ``threadpool_logger``: + + Logging messages will go the logger named here. + + ``threadpool_error_email`` (or global ``error_email`` setting): + + When threads are killed or the process restarted, this email + address will be contacted (using an SMTP server on localhost). +""" -server_runner.__doc__ = serve.__doc__ if __name__ == '__main__': from paste.wsgilib import dump_environ -- cgit v1.2.1