summaryrefslogtreecommitdiff
path: root/paste/httpserver.py
diff options
context:
space:
mode:
Diffstat (limited to 'paste/httpserver.py')
-rwxr-xr-xpaste/httpserver.py519
1 files changed, 494 insertions, 25 deletions
diff --git a/paste/httpserver.py b/paste/httpserver.py
index db6e25a..1a39c06 100755
--- a/paste/httpserver.py
+++ b/paste/httpserver.py
@@ -23,10 +23,17 @@ import socket, sys, threading, urlparse, Queue, urllib
import posixpath
import time
import thread
+import os
from itertools import count
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from SocketServer import ThreadingMixIn
from paste.util import converters
+import logging
+try:
+ from paste.util import killthread
+except ImportError:
+ # Not available, probably no ctypes
+ killthread = None
__all__ = ['WSGIHandlerMixin', 'WSGIServer', 'WSGIHandler', 'serve']
__version__ = "0.5"
@@ -466,10 +473,60 @@ class LimitedLengthFile(object):
class ThreadPool(object):
"""
Generic thread pool with a queue of callables to consume.
+
+ Keeps a notion of the status of its worker threads:
+
+ idle: worker thread with nothing to do
+
+ busy: worker thread doing its job
+
+ hung: worker thread that's been doing a job for too long
+
+ dying: a hung thread that has been killed, but hasn't died quite
+ yet.
+
+ zombie: what was a worker thread that we've tried to kill but
+ isn't dead yet.
+
+ At any time you can call track_threads, to get a dictionary with
+ these keys and lists of thread_ids that fall in that status. All
+ keys will be present, even if they point to emty lists.
+
+ hung threads are threads that have been busy more than
+ hung_thread_limit seconds. Hung threads are killed when they live
+ longer than kill_thread_limit seconds. A thread is then
+ considered dying for dying_limit seconds, if it is still alive
+ after that it is considered a zombie.
+
+ When there are no idle workers and a request comes in, another
+ worker *may* be spawned. If there are less than spawn_if_under
+ threads in the busy state, another thread will be spawned. So if
+ the limit is 5, and there are 4 hung threads and 6 busy threads,
+ no thread will be spawned.
+
+ When there are more than max_zombie_threads_before_die zombie
+ threads, a SystemExit exception will be raised, stopping the
+ server. Use 0 or None to never raise this exception. Zombie
+ threads *should* get cleaned up, but killing threads is no
+ necessarily reliable. This is turned off by default, since it is
+ only a good idea if you've deployed the server with some process
+ watching from above (something similar to daemontools or zdaemon).
+
"""
+
+
SHUTDOWN = object()
- def __init__(self, nworkers, name="ThreadPool", daemon=False):
+ def __init__(self, nworkers, name="ThreadPool", daemon=False,
+ hung_thread_limit=30, # when a thread is marked "hung"
+ kill_thread_limit=600, # when you kill that hung thread
+ dying_limit=300, # seconds that a kill should take to go into effect (longer than this and the thread is a "zombie")
+ spawn_if_under=5, # spawn if there's too many hung threads
+ max_zombie_threads_before_die=0, # when to give up on the process
+ hung_check_period=100, # every 100 requests check for hung workers
+ logger=None, # Place to log messages to
+ error_email=None, # Person(s) to notify if serious problem occurs
+ ):
"""
Create thread pool with `nworkers` worker threads.
"""
@@ -478,6 +535,12 @@ class ThreadPool(object):
self.queue = Queue.Queue()
self.workers = []
self.daemon = daemon
+ if logger is None:
+ logger = logging.getLogger('paste.httpserver.ThreadPool')
+ if isinstance(logger, basestring):
+ logger = logging.getLogger(logger)
+ self.logger = logger
+ self.error_email = error_email
self._worker_count = count()
for i in range(self.nworkers):
self.add_worker_thread()
@@ -485,7 +548,111 @@ class ThreadPool(object):
if not daemon:
atexit.register(self.shutdown)
+ assert (not kill_thread_limit
+ or kill_thread_limit >= hung_thread_limit), (
+ "kill_thread_limit (%s) should be higher than hung_thread_limit (%s)"
+ % (kill_thread_limit, hung_thread_limit))
+ if not killthread:
+ kill_thread_limit = 0
+ self.logger.info(
+ "Cannot use kill_thread_limit as ctypes/killthread is not available")
+ self.kill_thread_limit = kill_thread_limit
+ self.dying_limit = dying_limit
+ self.hung_thread_limit = hung_thread_limit
+ assert spawn_if_under <= nworkers, (
+ "spawn_if_under (%s) should be less than nworkers (%s)"
+ % (spawn_if_under, nworkers))
+ self.spawn_if_under = spawn_if_under
+ self.max_zombie_threads_before_die = max_zombie_threads_before_die
+ self.hung_check_period = hung_check_period
+ self.requests_since_last_hung_check = 0
+ # Used to keep track of what worker is doing what:
self.worker_tracker = {}
+ # Used to keep track of the workers not doing anything:
+ self.idle_workers = []
+ # Used to keep track of threads that have been killed, but maybe aren't dead yet:
+ self.dying_threads = {}
+
+ def add_task(self, task):
+ """
+ Add a task to the queue
+ """
+ self.logger.debug('Added task %s', task)
+ if self.hung_check_period:
+ self.requests_since_last_hung_check += 1
+ if self.requests_since_last_hung_check > self.hung_check_period:
+ self.requests_since_last_hung_check = 0
+ self.logger.info('Calling periodic kill_hung_threads()')
+ self.kill_hung_threads()
+ if not self.idle_workers and self.spawn_if_under:
+ # spawn_if_under can come into effect...
+ busy = 0
+ now = time.time()
+ self.logger.debug('No idle workers for task; checking if we need to make more workers')
+ for worker in self.workers:
+ if not hasattr(worker, 'thread_id'):
+ # Not initialized
+ continue
+ if worker.thread_id in self.worker_tracker:
+ time_started, info = self.worker_tracker[worker.thread_id]
+ if now - time_started < self.hung_thread_limit:
+ busy += 1
+ if busy < self.spawn_if_under:
+ self.logger.info(
+ 'No idle tasks, and only %s busy tasks; adding %s more '
+ 'workers', busy, self.spawn_if_under-busy)
+ for i in range(self.spawn_if_under - busy):
+ self.add_worker_thread()
+ else:
+ self.logger.debug(
+ 'No extra workers needed (%s busy workers)',
+ busy)
+ if (len(self.workers) > self.nworkers
+ and len(self.idle_workers) > 3):
+ # We've spawned worers in the past, but they aren't needed
+ # anymore; kill off some
+ self.logger.info(
+ 'Culling %s extra workers (%s idle workers present)',
+ len(self.workers)-self.nworkers, len(self.idle_workers))
+ for i in range(len(self.workers) - self.nworkers):
+ self.queue.put(self.SHUTDOWN)
+ self.queue.put(task)
+
+ def track_threads(self):
+ """
+ Return a dict summarizing the threads in the pool (as
+ described in the ThreadPool docstring).
+ """
+ result = dict(idle=[], busy=[], hung=[], dying=[], zombie=[])
+ now = time.time()
+ for worker in self.workers:
+ if not hasattr(worker, 'thread_id'):
+ # The worker hasn't fully started up, we should just
+ # ignore it
+ continue
+ if worker.thread_id in self.worker_tracker:
+ time_started, info = self.worker_tracker[worker.thread_id]
+ if now - time_started > self.hung_thread_limit:
+ result['hung'].append(worker)
+ else:
+ result['busy'].append(worker)
+ else:
+ result['idle'].append(worker)
+ for thread_id, (time_killed, worker) in self.dying_threads.items():
+ if not self.thread_exists(thread_id):
+ # Cull dying threads that are actually dead and gone
+ self.logger.info('Killed thread %s no longer around',
+ thread_id)
+ try:
+ del self.dying_threads[thread_id]
+ except KeyError:
+ pass
+ continue
+ if now - time_killed > self.dying_limit:
+ result['zombie'].append(worker)
+ else:
+ result['dying'].append(worker)
+ return result
def kill_worker(self, thread_id):
"""
@@ -494,21 +661,27 @@ class ThreadPool(object):
This should only be done for mis-behaving workers.
"""
- from paste.util import killthread
- for thread_obj_id, thread_obj in threading._active.items():
- if thread_id == thread_obj_id:
- break
- else:
- thread_obj = None
+ if killthread is None:
+ raise RuntimeError(
+ "Cannot kill worker; killthread/ctypes not available")
+ thread_obj = threading._active.get(thread_id)
killthread.async_raise(thread_id, SystemExit)
try:
del self.worker_tracker[thread_id]
except KeyError:
pass
+ self.logger.info('Killing thread %s', thread_id)
if thread_obj in self.workers:
self.workers.remove(thread_obj)
+ self.dying_threads[thread_id] = (time.time(), thread_obj)
self.add_worker_thread()
+ def thread_exists(self, thread_id):
+ """
+ Returns true if a thread with this id is still running
+ """
+ return thread_id in threading._active
+
def add_worker_thread(self):
index = self._worker_count.next()
worker = threading.Thread(target=self.worker_thread_callback,
@@ -516,18 +689,108 @@ class ThreadPool(object):
worker.setDaemon(self.daemon)
worker.start()
self.workers.append(worker)
+ self.logger.debug('Added worker %s', worker)
+
+ def kill_hung_threads(self):
+ """
+ Tries to kill any hung threads
+ """
+ if not self.kill_thread_limit:
+ # No killing should occur
+ return
+ now = time.time()
+ for worker in self.workers:
+ if not hasattr(worker, 'thread_id'):
+ # Not setup yet
+ continue
+ if worker.thread_id not in self.worker_tracker:
+ # Must be idle
+ continue
+ time_started, info = self.worker_tracker[worker.thread_id]
+ if now - time_started > self.kill_thread_limit:
+ self.logger.warning(
+ 'Thread %s hung (working on task for %i seconds)',
+ worker.thread_id, now - time_started)
+ try:
+ import pprint
+ info_desc = pprint.pformat(info)
+ except:
+ out = StringIO()
+ traceback.print_exc(file=out)
+ info_desc = 'Error:\n%s' % out.getvalue()
+ self.notify_problem(
+ "Killing worker thread (id=%(thread_id)s) because it has been \n"
+ "working on task for %(time)s seconds (limit is %(limit)s)\n"
+ "Info on task:\n"
+ "%(info)s"
+ % dict(thread_id=worker.thread_id,
+ time=now - time_started,
+ limit=self.kill_thread_limit,
+ info=info_desc))
+ self.kill_worker(worker.thread_id)
+ self.check_max_zombies()
+
+ def check_max_zombies(self):
+ """
+ Check if we've reached max_zombie_threads_before_die; if so
+ then kill the entire process.
+ """
+ if not self.max_zombie_threads_before_die:
+ return
+ found = []
+ now = time.time()
+ for thread_id, (time_killed, worker) in self.dying_threads.items():
+ if not self.thread_exists(thread_id):
+ # Cull dying threads that are actually dead and gone
+ try:
+ del self.dying_threads[thread_id]
+ except KeyError:
+ pass
+ continue
+ if now - time_killed > self.dying_limit:
+ found.append(thread_id)
+ if found:
+ self.logger.info('Found %s zombie threads', found)
+ if len(found) > self.max_zombie_threads_before_die:
+ self.logger.fatal(
+ 'Exiting process because %s zombie threads is more than %s limit',
+ len(found), self.max_zombie_threads_before_die)
+ self.notify_problem(
+ "Exiting process because %(found)s zombie threads "
+ "(more than limit of %(limit)s)\n"
+ "Bad threads (ids):\n"
+ " %(ids)s\n"
+ % dict(found=len(found),
+ limit=self.max_zombie_threads_before_die,
+ ids="\n ".join(map(str, found))),
+ subject="Process restart (too many zombie threads)")
+ self.shutdown(10)
+ print 'Shutting down', threading.currentThread()
+ raise ServerExit(3)
def worker_thread_callback(self):
"""
Worker thread should call this method to get and process queued
callables.
"""
+ thread_obj = threading.currentThread()
+ thread_id = thread_obj.thread_id = thread.get_ident()
+ self.idle_workers.append(thread_id)
while True:
runnable = self.queue.get()
if runnable is ThreadPool.SHUTDOWN:
+ self.logger.debug('Worker %s asked to SHUTDOWN', thread_id)
+ try:
+ self.idle_workers.remove(thread_id)
+ except ValueError:
+ pass
return
else:
- self.worker_tracker[thread.get_ident()] = [time.time(), None]
+ try:
+ self.idle_workers.remove(thread_id)
+ except ValueError:
+ pass
+ self.worker_tracker[thread_id] = [time.time(), None]
try:
try:
runnable()
@@ -541,33 +804,138 @@ class ThreadPool(object):
traceback.print_exc()
finally:
try:
- del self.worker_tracker[thread.get_ident()]
+ del self.worker_tracker[thread_id]
except KeyError:
pass
sys.exc_clear()
+ if thread_id in self.dying_threads:
+ # This thread has been killed; so die!
+ return
+ self.idle_workers.append(thread_id)
- def shutdown(self):
+ def shutdown(self, force_quit_timeout=0):
"""
Shutdown the queue (after finishing any pending requests).
"""
+ self.logger.info('Shutting down threadpool')
# Add a shutdown request for every worker
- for i in range(self.nworkers):
+ for i in range(len(self.workers)):
self.queue.put(ThreadPool.SHUTDOWN)
# Wait for each thread to terminate
+ hung_workers = []
for worker in self.workers:
- worker.join()
+ worker.join(0.5)
+ if worker.isAlive():
+ hung_workers.append(worker)
+ zombies = []
+ for thread_id in self.dying_threads:
+ if self.thread_exists(thread_id):
+ zombies.append(thread_id)
+ if hung_workers or zombies:
+ self.logger.info("%s workers didn't stop properly, and %s zombies",
+ len(hung_workers), len(zombies))
+ if hung_workers:
+ for worker in hung_workers:
+ self.kill_worker(worker.thread_id)
+ self.logger.info('Workers killed forcefully')
+ if force_quit_timeout:
+ hung = []
+ timed_out = False
+ need_force_quit = bool(zombies)
+ for workers in self.workers:
+ if not timed_out and worker.isAlive():
+ timed_out = True
+ worker.join(force_quit_timeout)
+ if worker.isAlive():
+ print "Worker %s won't die" % worker
+ need_force_quit = True
+ if need_force_quit:
+ import atexit
+ # Remove the threading atexit callback
+ for callback in list(atexit._exithandlers):
+ func = getattr(callback[0], 'im_func', None)
+ if not func:
+ continue
+ globs = getattr(func, 'func_globals', {})
+ mod = globs.get('__name__')
+ if mod == 'threading':
+ atexit._exithandlers.remove(callback)
+ atexit._run_exitfuncs()
+ print 'Forcefully exiting process'
+ os._exit(3)
+ else:
+ self.logger.info('All workers eventually killed')
+ else:
+ self.logger.info('All workers stopped')
+
+ def notify_problem(self, msg, subject=None, spawn_thread=True):
+ """
+ Called when there's a substantial problem. msg contains the
+ body of the notification, subject the summary.
+ If spawn_thread is true, then the email will be send in
+ another thread (so this doesn't block).
+ """
+ if not self.error_email:
+ return
+ if spawn_thread:
+ t = threading.Thread(
+ target=self.notify_problem,
+ args=(msg, subject, False))
+ t.start()
+ return
+ from_address = 'errors@localhost'
+ if not subject:
+ subject = msg.strip().splitlines()[0]
+ subject = subject[:50]
+ subject = '[http threadpool] %s' % subject
+ headers = [
+ "To: %s" % self.error_email,
+ "From: %s" % from_address,
+ "Subject: %s" % subject,
+ ]
+ try:
+ system = ' '.join(os.uname())
+ except:
+ system = '(unknown)'
+ body = (
+ "An error has occurred in the paste.httpserver.ThreadPool\n"
+ "Error:\n"
+ " %(msg)s\n"
+ "Occurred at: %(time)s\n"
+ "PID: %(pid)s\n"
+ "System: %(system)s\n"
+ "Server .py file: %(file)s\n"
+ % dict(msg=msg,
+ time=time.strftime("%c"),
+ pid=os.getpid(),
+ system=system,
+ file=os.path.abspath(__file__),
+ ))
+ message = '\n'.join(headers) + "\n\n" + body
+ import smtplib
+ server = smtplib.SMTP('localhost')
+ error_emails = [
+ e.strip() for e in self.error_email.split(",")
+ if e.strip()]
+ server.sendmail(from_address, error_emails, message)
+ server.quit()
+ print 'email sent to', error_emails, message
+
class ThreadPoolMixIn(object):
"""
Mix-in class to process requests from a thread pool
"""
- def __init__(self, nworkers, daemon=False):
+ def __init__(self, nworkers, daemon=False, **threadpool_options):
# Create and start the workers
self.running = True
assert nworkers > 0, "ThreadPoolMixin servers must have at least one worker"
- self.thread_pool = ThreadPool(nworkers,
+ self.thread_pool = ThreadPool(
+ nworkers,
"ThreadPoolMixin HTTP server on %s:%d"
- % (self.server_name, self.server_port), daemon)
+ % (self.server_name, self.server_port),
+ daemon,
+ **threadpool_options)
def process_request(self, request, client_address):
"""
@@ -579,9 +947,16 @@ class ThreadPoolMixIn(object):
# that we can trap interrupts we need to restore this,.)
request.setblocking(1)
# Queue processing of the request
- self.thread_pool.queue.put(
+ self.thread_pool.add_task(
lambda: self.process_request_in_thread(request, client_address))
+ def handle_error(self, request, client_address):
+ exc_class, exc, tb = sys.exc_info()
+ if exc_class is ServerExit:
+ # This is actually a request to stop the server
+ raise
+ return super(ThreadPoolMixin, self).handle_error(request, client_address)
+
def process_request_in_thread(self, request, client_address):
"""
The worker thread should call back here to do the rest of the
@@ -624,7 +999,7 @@ class ThreadPoolMixIn(object):
"""
self.running = False
self.socket.close()
- self.thread_pool.shutdown()
+ self.thread_pool.shutdown(60)
class WSGIServerBase(SecureHTTPServer):
def __init__(self, wsgi_application, server_address,
@@ -647,15 +1022,26 @@ class WSGIServer(ThreadingMixIn, WSGIServerBase):
class WSGIThreadPoolServer(ThreadPoolMixIn, WSGIServerBase):
def __init__(self, wsgi_application, server_address,
RequestHandlerClass=None, ssl_context=None,
- nworkers=10, daemon_threads=False):
+ nworkers=10, daemon_threads=False,
+ threadpool_options=None):
WSGIServerBase.__init__(self, wsgi_application, server_address,
RequestHandlerClass, ssl_context)
- ThreadPoolMixIn.__init__(self, nworkers, daemon_threads)
+ if threadpool_options is None:
+ threadpool_options = {}
+ ThreadPoolMixIn.__init__(self, nworkers, daemon_threads,
+ **threadpool_options)
+
+class ServerExit(SystemExit):
+ """
+ Raised to tell the server to really exit (SystemExit is normally
+ caught)
+ """
def serve(application, host=None, port=None, handler=None, ssl_pem=None,
ssl_context=None, server_version=None, protocol_version=None,
start_loop=True, daemon_threads=None, socket_timeout=None,
- use_threadpool=True, threadpool_workers=10):
+ use_threadpool=True, threadpool_workers=10,
+ threadpool_options=None):
"""
Serves your ``application`` over HTTP(S) via WSGI interface
@@ -684,7 +1070,7 @@ def serve(application, host=None, port=None, handler=None, ssl_pem=None,
$ openssl genrsa 1024 > host.key
$ chmod 400 host.key
- $ openssl req -new -x509 -nodes -sha1 -days 365 \
+ $ openssl req -new -x509 -nodes -sha1 -days 365 \\
-key host.key > host.cert
$ cat host.cert host.key > host.pem
$ chmod 400 host.pem
@@ -744,6 +1130,13 @@ def serve(application, host=None, port=None, handler=None, ssl_pem=None,
Number of worker threads to create when ``use_threadpool`` is true. This
can be a string or an integer value.
+
+ ``threadpool_options``
+
+ A dictionary of options to be used when instantiating the
+ threadpool. See paste.httpserver.ThreadPool for specific
+ options (``threadpool_workers`` is a specific option that can
+ also go here).
"""
is_ssl = False
if ssl_pem or ssl_context:
@@ -779,7 +1172,8 @@ def serve(application, host=None, port=None, handler=None, ssl_pem=None,
if converters.asbool(use_threadpool):
server = WSGIThreadPoolServer(application, server_address, handler,
ssl_context, int(threadpool_workers),
- daemon_threads)
+ daemon_threads,
+ threadpool_options=threadpool_options)
else:
server = WSGIServer(application, server_address, handler, ssl_context)
if daemon_threads:
@@ -806,10 +1200,85 @@ def serve(application, host=None, port=None, handler=None, ssl_pem=None,
# For paste.deploy server instantiation (egg:Paste#http)
# Note: this gets a separate function because it has to expect string
# arguments (though that's not much of an issue yet, ever?)
-def server_runner(wsgi_app, global_conf, *args, **kwargs):
- serve(wsgi_app, *args, **kwargs)
+def server_runner(wsgi_app, global_conf, **kwargs):
+ from paste.deploy.converters import asbool
+ for name in ['port', 'socket_timeout', 'threadpool_workers',
+ 'threadpool_hung_thread_limit',
+ 'threadpool_kill_thread_limit',
+ 'threadpool_dying_limit', 'threadpool_spawn_if_under',
+ 'threadpool_max_zombie_threads_before_die',
+ 'threadpool_hung_check_period']:
+ if name in kwargs:
+ kwargs[name] = int(kwargs[name])
+ for name in ['use_threadpool', 'daemon_threads']:
+ if name in kwargs:
+ kwargs[name] = asbool(kwargs[name])
+ threadpool_options = {}
+ for name, value in kwargs.items():
+ if name.startswith('threadpool_') and name != 'threadpool_workers':
+ threadpool_options[name[len('threadpool_'):]] = value
+ del kwargs[name]
+ if ('error_email' not in threadpool_options
+ and 'error_email' in global_conf):
+ threadpool_options['error_email'] = global_conf['error_email']
+ kwargs['threadpool_options'] = threadpool_options
+ serve(wsgi_app, **kwargs)
+
+server_runner.__doc__ = serve.__doc__ + """
+
+ You can also set these threadpool options:
+
+ ``threadpool_hung_thread_limit``:
+
+ The number of seconds a thread can work on a task before it is
+ considered hung (stuck). Default 30 seconds.
+
+ ``threadpool_kill_thread_limit``:
+
+ The number of seconds a thread can work before you should kill it
+ (assuming it will never finish). Default 600 seconds (10 minutes).
+
+ ``threadpool_dying_limit``:
+
+ The length of time after killing a thread that it should actually
+ disappear. If it lives longer than this, it is considered a
+ "zombie". Note that even in easy situations killing a thread can
+ be very slow. Default 300 seconds (5 minutes).
+
+ ``threadpool_spawn_if_under``:
+
+ If there are no idle threads and a request comes in, and there are
+ less than this number of *busy* threads, then add workers to the
+ pool. Busy threads are threads that have taken less than
+ ``threadpool_hung_thread_limit`` seconds so far. So if you get
+ *lots* of requests but they complete in a reasonable amount of time,
+ the requests will simply queue up (adding more threads probably
+ wouldn't speed them up). But if you have lots of hung threads and
+ one more request comes in, this will add workers to handle it.
+ Default 5.
+
+ ``threadpool_max_zombie_threads_before_die``:
+
+ If there are more zombies than this, just kill the process. This is
+ only good if you have a monitor that will automatically restart
+ the server. This can clean up the mess. Default 0 (disabled).
+
+ `threadpool_hung_check_period``:
+
+ Every X requests, check for hung threads that need to be killed,
+ or for zombie threads that should cause a restart. Default 100
+ requests.
+
+ ``threadpool_logger``:
+
+ Logging messages will go the logger named here.
+
+ ``threadpool_error_email`` (or global ``error_email`` setting):
+
+ When threads are killed or the process restarted, this email
+ address will be contacted (using an SMTP server on localhost).
+"""
-server_runner.__doc__ = serve.__doc__
if __name__ == '__main__':
from paste.wsgilib import dump_environ