summaryrefslogtreecommitdiff
path: root/paste/httpserver.py
diff options
context:
space:
mode:
Diffstat (limited to 'paste/httpserver.py')
-rwxr-xr-xpaste/httpserver.py82
1 files changed, 48 insertions, 34 deletions
diff --git a/paste/httpserver.py b/paste/httpserver.py
index 319bc07..1df18cb 100755
--- a/paste/httpserver.py
+++ b/paste/httpserver.py
@@ -597,18 +597,17 @@ class ThreadPool(object):
if not daemon:
atexit.register(self.shutdown)
for i in range(self.nworkers):
- self.add_worker_thread()
+ self.add_worker_thread(message='Initial worker pool')
def add_task(self, task):
"""
Add a task to the queue
"""
- self.logger.debug('Added task %s', task)
+ self.logger.debug('Added task (%i tasks queued)', self.queue.qsize())
if self.hung_check_period:
self.requests_since_last_hung_check += 1
if self.requests_since_last_hung_check > self.hung_check_period:
self.requests_since_last_hung_check = 0
- self.logger.info('Calling periodic kill_hung_threads()')
self.kill_hung_threads()
if not self.idle_workers and self.spawn_if_under:
# spawn_if_under can come into effect...
@@ -629,7 +628,7 @@ class ThreadPool(object):
'workers', busy, self.spawn_if_under-busy)
self._last_added_new_idle_workers = time.time()
for i in range(self.spawn_if_under - busy):
- self.add_worker_thread()
+ self.add_worker_thread(message='Response to lack of idle workers')
else:
self.logger.debug(
'No extra workers needed (%s busy workers)',
@@ -642,6 +641,8 @@ class ThreadPool(object):
self.logger.info(
'Culling %s extra workers (%s idle workers present)',
len(self.workers)-self.nworkers, len(self.idle_workers))
+ self.logger.debug(
+ 'Idle workers: %s', self.idle_workers)
for i in range(len(self.workers) - self.nworkers):
self.queue.put(self.SHUTDOWN)
self.queue.put(task)
@@ -702,7 +703,7 @@ class ThreadPool(object):
if thread_obj in self.workers:
self.workers.remove(thread_obj)
self.dying_threads[thread_id] = (time.time(), thread_obj)
- self.add_worker_thread()
+ self.add_worker_thread(message='Replacement for killed thread %s' % thread_id)
def thread_exists(self, thread_id):
"""
@@ -710,14 +711,13 @@ class ThreadPool(object):
"""
return thread_id in threading._active
- def add_worker_thread(self):
+ def add_worker_thread(self, *args, **kwargs):
index = self._worker_count.next()
worker = threading.Thread(target=self.worker_thread_callback,
+ args=args, kwargs=kwargs,
name=("worker %d" % index))
worker.setDaemon(self.daemon)
worker.start()
- self.workers.append(worker)
- self.logger.debug('Added worker %s', worker)
def kill_hung_threads(self):
"""
@@ -773,7 +773,7 @@ class ThreadPool(object):
ave_time = '%.2fsec' % ave_time
else:
ave_time = 'N/A'
- self.logger.debug(
+ self.logger.info(
"kill_hung_threads status: %s threads (%s working, %s idle, %s starting) "
"ave time %s, max time %.2fsec, killed %s workers"
% (idle_workers + starting_workers + working_workers,
@@ -819,35 +819,30 @@ class ThreadPool(object):
print 'Shutting down', threading.currentThread()
raise ServerExit(3)
- def worker_thread_callback(self):
+ def worker_thread_callback(self, message=None):
"""
Worker thread should call this method to get and process queued
callables.
"""
thread_obj = threading.currentThread()
thread_id = thread_obj.thread_id = thread.get_ident()
+ self.workers.append(thread_obj)
self.idle_workers.append(thread_id)
requests_processed = 0
- while True:
- if self.max_requests and self.max_requests < requests_processed:
- # Replace this thread then die
- self.logger.debug('Thread %s processed %i requests (limit %s); stopping thread'
- % (thread_id, requests_processed, self.max_requests))
- try:
- self.idle_workers.remove(thread_id)
- except ValueError:
- pass
- self.add_worker_thread()
- return
- runnable = self.queue.get()
- if runnable is ThreadPool.SHUTDOWN:
- self.logger.debug('Worker %s asked to SHUTDOWN', thread_id)
- try:
- self.idle_workers.remove(thread_id)
- except ValueError:
- pass
- return
- else:
+ add_replacement_worker = False
+ self.logger.debug('Started new worker %s: %s', thread_id, message)
+ try:
+ while True:
+ if self.max_requests and self.max_requests < requests_processed:
+ # Replace this thread then die
+ self.logger.debug('Thread %s processed %i requests (limit %s); stopping thread'
+ % (thread_id, requests_processed, self.max_requests))
+ add_replacement_worker = True
+ break
+ runnable = self.queue.get()
+ if runnable is ThreadPool.SHUTDOWN:
+ self.logger.debug('Worker %s asked to SHUTDOWN', thread_id)
+ break
try:
self.idle_workers.remove(thread_id)
except ValueError:
@@ -865,16 +860,35 @@ class ThreadPool(object):
print >> sys.stderr, (
'Unexpected exception in worker %r' % runnable)
traceback.print_exc()
+ if thread_id in self.dying_threads:
+ # That last exception was intended to kill me
+ break
finally:
try:
del self.worker_tracker[thread_id]
except KeyError:
pass
sys.exc_clear()
- if thread_id in self.dying_threads:
- # This thread has been killed; so die!
- return
- self.idle_workers.append(thread_id)
+ self.idle_workers.append(thread_id)
+ finally:
+ try:
+ del self.worker_tracker[thread_id]
+ except KeyError:
+ pass
+ try:
+ self.idle_workers.remove(thread_id)
+ except ValueError:
+ pass
+ try:
+ self.workers.remove(thread_obj)
+ except ValueError:
+ pass
+ try:
+ del self.dying_threads[thread_id]
+ except KeyError:
+ pass
+ if add_replacement_worker:
+ self.add_worker_thread(message='Voluntary replacement for thread %s' % thread_id)
def shutdown(self, force_quit_timeout=0):
"""