summaryrefslogtreecommitdiff
path: root/paste/httpserver.py
diff options
context:
space:
mode:
authorpjenvey <devnull@localhost>2006-07-19 23:35:03 +0000
committerpjenvey <devnull@localhost>2006-07-19 23:35:03 +0000
commit64045b248cf57e7607265994a1dcf92986a54f36 (patch)
tree80b0cc64822cff8573c45236e38f692b6b2d5409 /paste/httpserver.py
parent2089aac3fff40df4544df6f78fb02d227b48d8ee (diff)
downloadpaste-64045b248cf57e7607265994a1dcf92986a54f36.tar.gz
added WSGIThreadPoolServer. Provides much better performance than
WSGIServer. paste.httpserver.serve now uses the thread pooled server by default. Users can set the number of worker threads (defaults to 10) or switch back to the normal WSGIServer with config file toggles. submitted by: james@jamestaylor.org (Thanks!) resolves: #112
Diffstat (limited to 'paste/httpserver.py')
-rwxr-xr-xpaste/httpserver.py155
1 files changed, 146 insertions, 9 deletions
diff --git a/paste/httpserver.py b/paste/httpserver.py
index 7d91056..4fcf35e 100755
--- a/paste/httpserver.py
+++ b/paste/httpserver.py
@@ -17,9 +17,10 @@ if pyOpenSSL is installed, it also provides SSL capabilities.
# @@: add support for chunked encoding, this is not a 1.1 server
# till this is completed.
+import errno, socket, sys, threading, urlparse, Queue
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from SocketServer import ThreadingMixIn
-import urlparse, sys, socket
+from paste.deploy import converters
__all__ = ['WSGIHandlerMixin','WSGIServer','WSGIHandler', 'serve']
__version__ = "0.5"
@@ -326,9 +327,118 @@ class WSGIHandler(WSGIHandlerMixin, BaseHTTPRequestHandler):
except SocketErrors, exce:
self.wsgi_connection_drop(exce)
-class WSGIServer(ThreadingMixIn, SecureHTTPServer):
- daemon_threads = False
+class ThreadPool(object):
+ """
+ Generic thread pool with a queue of callables to consume.
+ """
+ SHUTDOWN = object()
+
+ def __init__(self, nworkers, name="ThreadPool"):
+ """
+ Create thread pool with `nworkers` worker threads.
+ """
+ self.nworkers = nworkers
+ self.name = name
+ self.queue = Queue.Queue()
+ self.workers = []
+ for i in range(self.nworkers):
+ worker = threading.Thread(target=self.worker_thread_callback,
+ name=("%s worker %d" % (self.name, i)))
+ worker.start()
+ self.workers.append(worker)
+
+ def worker_thread_callback(self):
+ """
+ Worker thread should call this method to get and process queued
+ callables.
+ """
+ while True:
+ runnable = self.queue.get()
+ if runnable is ThreadPool.SHUTDOWN:
+ return
+ else:
+ runnable()
+
+ def shutdown(self):
+ """
+ Shutdown the queue (after finishing any pending requests).
+ """
+ # Add a shutdown request for every worker
+ for i in range(self.nworkers):
+ self.queue.put(ThreadPool.SHUTDOWN)
+ # Wait for each thread to terminate
+ for worker in self.workers:
+ worker.join()
+
+class ThreadPoolMixIn:
+ """
+ Mix-in class to process requests from a thread pool
+ """
+ def __init__(self, nworkers):
+ # Create and start the workers
+ self.running = True
+ assert nworkers > 0, "ThreadPoolMixin servers must have at least one worker"
+ self.thread_pool = ThreadPool(nworkers,
+ "ThreadPoolMixin HTTP server on %s:%d"
+ % (self.server_name, self.server_port))
+
+ def process_request(self, request, client_address):
+ """
+ Queue the request to be processed by on of the thread pool threads
+ """
+ # This sets the socket to blocking mode (and no timeout) since it
+ # may take the thread pool a little while to get back to it. (This
+ # is the default but since we set a timeout on the parent socket so
+ # that we can trap interrupts we need to restore this,.)
+ request.setblocking(1)
+ # Queue processing of the request
+ self.thread_pool.queue.put(
+ lambda: self.process_request_in_thread(request, client_address))
+
+ def process_request_in_thread(self, request, client_address):
+ """
+ The worker thread should call back here to do the rest of the
+ request processing. Error handling normaller done in 'handle_request'
+ must be done here.
+ """
+ try:
+ self.finish_request(request, client_address)
+ self.close_request(request)
+ except:
+ self.handle_error(request, client_address)
+ self.close_request(request)
+ def serve_forever(self):
+ """
+ Overrides `serve_forever` to shut the threadpool down cleanly.
+ """
+ try:
+ while self.running:
+ try:
+ self.handle_request()
+ except socket.timeout:
+ # Timeout is expected, gives interrupts a chance to
+ # propogate, just keep handling
+ pass
+ finally:
+ self.thread_pool.shutdown()
+
+ def server_activate(self):
+ """
+ Overrides server_activate to set timeout on our listener socket.
+ """
+ # We set the timeout here so that we can trap interrupts on windows
+ self.socket.settimeout(1)
+ self.socket.listen(self.request_queue_size)
+
+ def server_close(self):
+ """
+ Finish pending requests and shutdown the server.
+ """
+ self.running = False
+ self.socket.close()
+
+class WSGIServerBase(SecureHTTPServer):
def __init__(self, wsgi_application, server_address,
RequestHandlerClass=None, ssl_context=None):
SecureHTTPServer.__init__(self, server_address,
@@ -343,9 +453,20 @@ class WSGIServer(ThreadingMixIn, SecureHTTPServer):
conn.settimeout(self.wsgi_socket_timeout)
return (conn, info)
+class WSGIServer(ThreadingMixIn, WSGIServerBase):
+ daemon_threads = False
+
+class WSGIThreadPoolServer(ThreadPoolMixIn, WSGIServerBase):
+ def __init__(self, wsgi_application, server_address,
+ RequestHandlerClass=None, ssl_context=None, nworkers=10):
+ WSGIServerBase.__init__(self, wsgi_application, server_address,
+ RequestHandlerClass, ssl_context)
+ ThreadPoolMixIn.__init__(self, nworkers)
+
def serve(application, host=None, port=None, handler=None, ssl_pem=None,
server_version=None, protocol_version=None, start_loop=True,
- daemon_threads=None, socket_timeout=None):
+ daemon_threads=None, socket_timeout=None, use_threadpool=True,
+ threadpool_workers=10):
"""
Serves your ``application`` over HTTP(S) via WSGI interface
@@ -414,6 +535,17 @@ def serve(application, host=None, port=None, handler=None, ssl_pem=None,
disconnect, but at a later time it might follow the RFC a bit
more closely.
+ ``use_threadpool``
+
+ Server requests from a pool of worker threads (``threadpool_workers``)
+ rather than creating a new thread for each request. This can
+ substantially reduce latency since there is a high cost associated
+ with thread creation.
+
+ ``threadpool_workers``
+
+ Number of worker threads to create when ``use_threadpool`` is true. This
+ can be a string or an integer value.
"""
ssl_context = None
if ssl_pem:
@@ -440,13 +572,19 @@ def serve(application, host=None, port=None, handler=None, ssl_pem=None,
assert protocol_version in ('HTTP/0.9','HTTP/1.0','HTTP/1.1')
handler.protocol_version = protocol_version
- server = WSGIServer(application, server_address, handler, ssl_context)
- if daemon_threads:
- server.daemon_threads = daemon_threads
+
+ if converters.asbool(use_threadpool):
+ server = WSGIThreadPoolServer(application, server_address, handler,
+ ssl_context, int(threadpool_workers))
+ else:
+ server = WSGIServer(application, server_address, handler, ssl_context)
+ if daemon_threads:
+ server.daemon_threads = daemon_threads
+
if socket_timeout:
server.wsgi_socket_timeout = int(socket_timeout)
- if start_loop:
+ if converters.asbool(start_loop):
print "serving on %s:%s" % server.server_address
try:
server.serve_forever()
@@ -472,4 +610,3 @@ if __name__ == '__main__':
#serve(dump_environ, ssl_pem="test.pem")
serve(dump_environ, server_version="Wombles/1.0",
protocol_version="HTTP/1.1", port="8888")
-