diff options
| author | ianb <devnull@localhost> | 2007-01-30 05:11:25 +0000 |
|---|---|---|
| committer | ianb <devnull@localhost> | 2007-01-30 05:11:25 +0000 |
| commit | f3a41d80959005d2ad273aa5c88bad1957bb992e (patch) | |
| tree | 1ff5206683898a7dbdc2a7c6309d77cc9d1496e7 /paste/httpserver.py | |
| parent | ff149b45ee82a904541fe8208414f8fd56b7ecaa (diff) | |
| download | paste-f3a41d80959005d2ad273aa5c88bad1957bb992e.tar.gz | |
Allow killing of errant threads through the web application (also add a module to kill threads with ctypes, and a method to kill threads from the httpserver thread pool)
Diffstat (limited to 'paste/httpserver.py')
| -rwxr-xr-x | paste/httpserver.py | 47 |
1 files changed, 39 insertions, 8 deletions
diff --git a/paste/httpserver.py b/paste/httpserver.py index a57f60a..7969879 100755 --- a/paste/httpserver.py +++ b/paste/httpserver.py @@ -22,6 +22,7 @@ import socket, sys, threading, urlparse, Queue, urllib import posixpath import time import thread +from itertools import count from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer from SocketServer import ThreadingMixIn from paste.util import converters @@ -204,8 +205,7 @@ class WSGIHandlerMixin: # Now that we know what the request was for, we should # tell the thread pool what its worker is working on self.server.thread_pool.worker_tracker[thread.get_ident()][1] = self.wsgi_environ - self.wsgi_environ['paste.httpserver.worker_tracker'] = self.server.thread_pool.worker_tracker - self.wsgi_environ['paste.httpserver.nworkers'] = self.server.thread_pool.nworkers + self.wsgi_environ['paste.httpserver.thread_pool'] = self.server.thread_pool for k, v in self.headers.items(): key = 'HTTP_' + k.replace("-","_").upper() @@ -381,18 +381,46 @@ class ThreadPool(object): self.name = name self.queue = Queue.Queue() self.workers = [] + self.daemon = daemon + self._worker_count = count() for i in range(self.nworkers): - worker = threading.Thread(target=self.worker_thread_callback, - name=("worker %d" % i)) - worker.setDaemon(daemon) - worker.start() - self.workers.append(worker) + self.add_worker_thread() if not daemon: atexit.register(self.shutdown) self.worker_tracker = {} + def kill_worker(self, thread_id): + """ + Removes the worker with the given thread_id from the pool, and + replaces it with a new worker thread. + + This should only be done for mis-behaving workers. + """ + from paste.util import killthread + for thread_obj_id, thread_obj in threading._active.items(): + if thread_id == thread_obj_id: + break + else: + thread_obj = None + killthread.async_raise(thread_id, SystemExit) + try: + del self.worker_tracker[thread_id] + except KeyError: + pass + if thread_obj in self.workers: + self.workers.remove(thread_obj) + self.add_worker_thread() + + def add_worker_thread(self): + index = self._worker_count.next() + worker = threading.Thread(target=self.worker_thread_callback, + name=("worker %d" % index)) + worker.setDaemon(self.daemon) + worker.start() + self.workers.append(worker) + def worker_thread_callback(self): """ Worker thread should call this method to get and process queued @@ -407,7 +435,10 @@ class ThreadPool(object): try: runnable() finally: - del self.worker_tracker[thread.get_ident()] + try: + del self.worker_tracker[thread.get_ident()] + except KeyError: + pass def shutdown(self): """ |
