summaryrefslogtreecommitdiff
path: root/paste/httpserver.py
diff options
context:
space:
mode:
authorianb <devnull@localhost>2007-04-27 21:38:56 +0000
committerianb <devnull@localhost>2007-04-27 21:38:56 +0000
commit1a4a60600040d9f61e5bc617f380ebf3b4a36acf (patch)
tree36d74b57264748275a13dd79a094ad38d862ff28 /paste/httpserver.py
parent260f3173670f03cc2102240e7a37ee28883e7fbb (diff)
downloadpaste-1a4a60600040d9f61e5bc617f380ebf3b4a36acf.tar.gz
Cleaned up the worker lifecycle logic, fixing some bugs in how workers are tracked. Track why worker threads are started, and include this information in the logs.
Diffstat (limited to 'paste/httpserver.py')
-rwxr-xr-xpaste/httpserver.py82
1 files changed, 48 insertions, 34 deletions
diff --git a/paste/httpserver.py b/paste/httpserver.py
index 319bc07..1df18cb 100755
--- a/paste/httpserver.py
+++ b/paste/httpserver.py
@@ -597,18 +597,17 @@ class ThreadPool(object):
if not daemon:
atexit.register(self.shutdown)
for i in range(self.nworkers):
- self.add_worker_thread()
+ self.add_worker_thread(message='Initial worker pool')
def add_task(self, task):
"""
Add a task to the queue
"""
- self.logger.debug('Added task %s', task)
+ self.logger.debug('Added task (%i tasks queued)', self.queue.qsize())
if self.hung_check_period:
self.requests_since_last_hung_check += 1
if self.requests_since_last_hung_check > self.hung_check_period:
self.requests_since_last_hung_check = 0
- self.logger.info('Calling periodic kill_hung_threads()')
self.kill_hung_threads()
if not self.idle_workers and self.spawn_if_under:
# spawn_if_under can come into effect...
@@ -629,7 +628,7 @@ class ThreadPool(object):
'workers', busy, self.spawn_if_under-busy)
self._last_added_new_idle_workers = time.time()
for i in range(self.spawn_if_under - busy):
- self.add_worker_thread()
+ self.add_worker_thread(message='Response to lack of idle workers')
else:
self.logger.debug(
'No extra workers needed (%s busy workers)',
@@ -642,6 +641,8 @@ class ThreadPool(object):
self.logger.info(
'Culling %s extra workers (%s idle workers present)',
len(self.workers)-self.nworkers, len(self.idle_workers))
+ self.logger.debug(
+ 'Idle workers: %s', self.idle_workers)
for i in range(len(self.workers) - self.nworkers):
self.queue.put(self.SHUTDOWN)
self.queue.put(task)
@@ -702,7 +703,7 @@ class ThreadPool(object):
if thread_obj in self.workers:
self.workers.remove(thread_obj)
self.dying_threads[thread_id] = (time.time(), thread_obj)
- self.add_worker_thread()
+ self.add_worker_thread(message='Replacement for killed thread %s' % thread_id)
def thread_exists(self, thread_id):
"""
@@ -710,14 +711,13 @@ class ThreadPool(object):
"""
return thread_id in threading._active
- def add_worker_thread(self):
+ def add_worker_thread(self, *args, **kwargs):
index = self._worker_count.next()
worker = threading.Thread(target=self.worker_thread_callback,
+ args=args, kwargs=kwargs,
name=("worker %d" % index))
worker.setDaemon(self.daemon)
worker.start()
- self.workers.append(worker)
- self.logger.debug('Added worker %s', worker)
def kill_hung_threads(self):
"""
@@ -773,7 +773,7 @@ class ThreadPool(object):
ave_time = '%.2fsec' % ave_time
else:
ave_time = 'N/A'
- self.logger.debug(
+ self.logger.info(
"kill_hung_threads status: %s threads (%s working, %s idle, %s starting) "
"ave time %s, max time %.2fsec, killed %s workers"
% (idle_workers + starting_workers + working_workers,
@@ -819,35 +819,30 @@ class ThreadPool(object):
print 'Shutting down', threading.currentThread()
raise ServerExit(3)
- def worker_thread_callback(self):
+ def worker_thread_callback(self, message=None):
"""
Worker thread should call this method to get and process queued
callables.
"""
thread_obj = threading.currentThread()
thread_id = thread_obj.thread_id = thread.get_ident()
+ self.workers.append(thread_obj)
self.idle_workers.append(thread_id)
requests_processed = 0
- while True:
- if self.max_requests and self.max_requests < requests_processed:
- # Replace this thread then die
- self.logger.debug('Thread %s processed %i requests (limit %s); stopping thread'
- % (thread_id, requests_processed, self.max_requests))
- try:
- self.idle_workers.remove(thread_id)
- except ValueError:
- pass
- self.add_worker_thread()
- return
- runnable = self.queue.get()
- if runnable is ThreadPool.SHUTDOWN:
- self.logger.debug('Worker %s asked to SHUTDOWN', thread_id)
- try:
- self.idle_workers.remove(thread_id)
- except ValueError:
- pass
- return
- else:
+ add_replacement_worker = False
+ self.logger.debug('Started new worker %s: %s', thread_id, message)
+ try:
+ while True:
+ if self.max_requests and self.max_requests < requests_processed:
+ # Replace this thread then die
+ self.logger.debug('Thread %s processed %i requests (limit %s); stopping thread'
+ % (thread_id, requests_processed, self.max_requests))
+ add_replacement_worker = True
+ break
+ runnable = self.queue.get()
+ if runnable is ThreadPool.SHUTDOWN:
+ self.logger.debug('Worker %s asked to SHUTDOWN', thread_id)
+ break
try:
self.idle_workers.remove(thread_id)
except ValueError:
@@ -865,16 +860,35 @@ class ThreadPool(object):
print >> sys.stderr, (
'Unexpected exception in worker %r' % runnable)
traceback.print_exc()
+ if thread_id in self.dying_threads:
+ # That last exception was intended to kill me
+ break
finally:
try:
del self.worker_tracker[thread_id]
except KeyError:
pass
sys.exc_clear()
- if thread_id in self.dying_threads:
- # This thread has been killed; so die!
- return
- self.idle_workers.append(thread_id)
+ self.idle_workers.append(thread_id)
+ finally:
+ try:
+ del self.worker_tracker[thread_id]
+ except KeyError:
+ pass
+ try:
+ self.idle_workers.remove(thread_id)
+ except ValueError:
+ pass
+ try:
+ self.workers.remove(thread_obj)
+ except ValueError:
+ pass
+ try:
+ del self.dying_threads[thread_id]
+ except KeyError:
+ pass
+ if add_replacement_worker:
+ self.add_worker_thread(message='Voluntary replacement for thread %s' % thread_id)
def shutdown(self, force_quit_timeout=0):
"""