summaryrefslogtreecommitdiff
path: root/paste/httpserver.py
diff options
context:
space:
mode:
authorianb <devnull@localhost>2007-01-30 05:11:25 +0000
committerianb <devnull@localhost>2007-01-30 05:11:25 +0000
commitf3a41d80959005d2ad273aa5c88bad1957bb992e (patch)
tree1ff5206683898a7dbdc2a7c6309d77cc9d1496e7 /paste/httpserver.py
parentff149b45ee82a904541fe8208414f8fd56b7ecaa (diff)
downloadpaste-f3a41d80959005d2ad273aa5c88bad1957bb992e.tar.gz
Allow killing of errant threads through the web application (also add a module to kill threads with ctypes, and a method to kill threads from the httpserver thread pool)
Diffstat (limited to 'paste/httpserver.py')
-rwxr-xr-xpaste/httpserver.py47
1 files changed, 39 insertions, 8 deletions
diff --git a/paste/httpserver.py b/paste/httpserver.py
index a57f60a..7969879 100755
--- a/paste/httpserver.py
+++ b/paste/httpserver.py
@@ -22,6 +22,7 @@ import socket, sys, threading, urlparse, Queue, urllib
import posixpath
import time
import thread
+from itertools import count
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from SocketServer import ThreadingMixIn
from paste.util import converters
@@ -204,8 +205,7 @@ class WSGIHandlerMixin:
# Now that we know what the request was for, we should
# tell the thread pool what its worker is working on
self.server.thread_pool.worker_tracker[thread.get_ident()][1] = self.wsgi_environ
- self.wsgi_environ['paste.httpserver.worker_tracker'] = self.server.thread_pool.worker_tracker
- self.wsgi_environ['paste.httpserver.nworkers'] = self.server.thread_pool.nworkers
+ self.wsgi_environ['paste.httpserver.thread_pool'] = self.server.thread_pool
for k, v in self.headers.items():
key = 'HTTP_' + k.replace("-","_").upper()
@@ -381,18 +381,46 @@ class ThreadPool(object):
self.name = name
self.queue = Queue.Queue()
self.workers = []
+ self.daemon = daemon
+ self._worker_count = count()
for i in range(self.nworkers):
- worker = threading.Thread(target=self.worker_thread_callback,
- name=("worker %d" % i))
- worker.setDaemon(daemon)
- worker.start()
- self.workers.append(worker)
+ self.add_worker_thread()
if not daemon:
atexit.register(self.shutdown)
self.worker_tracker = {}
+ def kill_worker(self, thread_id):
+ """
+ Removes the worker with the given thread_id from the pool, and
+ replaces it with a new worker thread.
+
+ This should only be done for mis-behaving workers.
+ """
+ from paste.util import killthread
+ for thread_obj_id, thread_obj in threading._active.items():
+ if thread_id == thread_obj_id:
+ break
+ else:
+ thread_obj = None
+ killthread.async_raise(thread_id, SystemExit)
+ try:
+ del self.worker_tracker[thread_id]
+ except KeyError:
+ pass
+ if thread_obj in self.workers:
+ self.workers.remove(thread_obj)
+ self.add_worker_thread()
+
+ def add_worker_thread(self):
+ index = self._worker_count.next()
+ worker = threading.Thread(target=self.worker_thread_callback,
+ name=("worker %d" % index))
+ worker.setDaemon(self.daemon)
+ worker.start()
+ self.workers.append(worker)
+
def worker_thread_callback(self):
"""
Worker thread should call this method to get and process queued
@@ -407,7 +435,10 @@ class ThreadPool(object):
try:
runnable()
finally:
- del self.worker_tracker[thread.get_ident()]
+ try:
+ del self.worker_tracker[thread.get_ident()]
+ except KeyError:
+ pass
def shutdown(self):
"""