# Copyright (C) 2010 Vinay Sajip. All Rights Reserved. # # Permission to use, copy, modify, and distribute this software and its # documentation for any purpose and without fee is hereby granted, # provided that the above copyright notice appear in all copies and that # both that copyright notice and this permission notice appear in # supporting documentation, and that the name of Vinay Sajip # not be used in advertising or publicity pertaining to distribution # of the software without specific, written prior permission. # VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING # ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL # VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR # ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER # IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # """ This module contains classes which help you work with queues. A typical application is when you want to log from performance-critical threads, but where the handlers you want to use are slow (for example, :class:`~logging.handlers.SMTPHandler`). In that case, you can create a queue, pass it to a :class:`QueueHandler` instance and use that instance with your loggers. Elsewhere, you can instantiate a :class:`QueueListener` with the same queue and some slow handlers, and call :meth:`~QueueListener.start` on it. This will start monitoring the queue on a separate thread and call all the configured handlers *on that thread*, so that your logging thread is not held up by the slow handlers. Note that as well as in-process queues, you can use these classes with queues from the :mod:`multiprocessing` module. **N.B.** This is part of the standard library since Python 3.2, so the version here is for use with earlier Python versions. """ import logging try: import Queue as queue except ImportError: import queue import threading class QueueHandler(logging.Handler): """ This handler sends events to a queue. Typically, it would be used together with a multiprocessing Queue to centralise logging to file in one process (in a multi-process application), so as to avoid file write contention between processes. :param queue: The queue to send `LogRecords` to. """ def __init__(self, queue): """ Initialise an instance, using the passed queue. """ logging.Handler.__init__(self) self.queue = queue def enqueue(self, record): """ Enqueue a record. The base implementation uses :meth:`~queue.Queue.put_nowait`. You may want to override this method if you want to use blocking, timeouts or custom queue implementations. :param record: The record to enqueue. """ self.queue.put_nowait(record) def prepare(self, record): """ Prepares a record for queuing. The object returned by this method is enqueued. The base implementation formats the record to merge the message and arguments, and removes unpickleable items from the record in-place. You might want to override this method if you want to convert the record to a dict or JSON string, or send a modified copy of the record while leaving the original intact. :param record: The record to prepare. """ # The format operation gets traceback text into record.exc_text # (if there's exception data), and also puts the message into # record.message. We can then use this to replace the original # msg + args, as these might be unpickleable. We also zap the # exc_info attribute, as it's no longer needed and, if not None, # will typically not be pickleable. self.format(record) record.msg = record.message record.args = None record.exc_info = None return record def emit(self, record): """ Emit a record. Writes the LogRecord to the queue, preparing it for pickling first. :param record: The record to emit. """ try: self.enqueue(self.prepare(record)) except (KeyboardInterrupt, SystemExit): raise except: self.handleError(record) class QueueListener(object): """ This class implements an internal threaded listener which watches for LogRecords being added to a queue, removes them and passes them to a list of handlers for processing. :param record: The queue to listen to. :param handlers: The handlers to invoke on everything received from the queue. """ _sentinel = None def __init__(self, queue, *handlers): """ Initialise an instance with the specified queue and handlers. """ self.queue = queue self.handlers = handlers self._stop = threading.Event() self._thread = None def dequeue(self, block): """ Dequeue a record and return it, optionally blocking. The base implementation uses :meth:`~queue.Queue.get`. You may want to override this method if you want to use timeouts or work with custom queue implementations. :param block: Whether to block if the queue is empty. If `False` and the queue is empty, an :class:`~queue.Empty` exception will be thrown. """ return self.queue.get(block) def start(self): """ Start the listener. This starts up a background thread to monitor the queue for LogRecords to process. """ self._thread = t = threading.Thread(target=self._monitor) t.setDaemon(True) t.start() def prepare(self , record): """ Prepare a record for handling. This method just returns the passed-in record. You may want to override this method if you need to do any custom marshalling or manipulation of the record before passing it to the handlers. :param record: The record to prepare. """ return record def handle(self, record): """ Handle a record. This just loops through the handlers offering them the record to handle. :param record: The record to handle. """ record = self.prepare(record) for handler in self.handlers: handler.handle(record) def _monitor(self): """ Monitor the queue for records, and ask the handler to deal with them. This method runs on a separate, internal thread. The thread will terminate if it sees a sentinel object in the queue. """ q = self.queue has_task_done = hasattr(q, 'task_done') while not self._stop.isSet(): try: record = self.dequeue(True) if record is self._sentinel: break self.handle(record) if has_task_done: q.task_done() except queue.Empty: pass # There might still be records in the queue. while True: try: record = self.dequeue(False) if record is self._sentinel: break self.handle(record) if has_task_done: q.task_done() except queue.Empty: break def enqueue_sentinel(self): """ Writes a sentinel to the queue to tell the listener to quit. This implementation uses ``put_nowait()``. You may want to override this method if you want to use timeouts or work with custom queue implementations. """ self.queue.put_nowait(self._sentinel) def stop(self): """ Stop the listener. This asks the thread to terminate, and then waits for it to do so. Note that if you don't call this before your application exits, there may be some records still left on the queue, which won't be processed. """ self._stop.set() self.enqueue_sentinel() self._thread.join() self._thread = None