# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) # Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php """ threadedprint.py ================ :author: Ian Bicking :date: 12 Jul 2004 Multi-threaded printing; allows the output produced via print to be separated according to the thread. To use this, you must install the catcher, like:: threadedprint.install() The installation optionally takes one of three parameters: default The default destination for print statements (e.g., ``sys.stdout``). factory A function that will produce the stream for a thread, given the thread's name. paramwriter Instead of writing to a file-like stream, this function will be called like ``paramwriter(thread_name, text)`` for every write. The thread name is the value returned by ``threading.currentThread().getName()``, a string (typically something like Thread-N). You can also submit file-like objects for specific threads, which will override any of these parameters. To do this, call ``register(stream, [threadName])``. ``threadName`` is optional, and if not provided the stream will be registered for the current thread. If no specific stream is registered for a thread, and no default has been provided, then an error will occur when anything is written to ``sys.stdout`` (or printed). Note: the stream's ``write`` method will be called in the thread the text came from, so you should consider thread safety, especially if multiple threads share the same writer. Note: if you want access to the original standard out, use ``sys.__stdout__``. You may also uninstall this, via:: threadedprint.uninstall() TODO ---- * Something with ``sys.stderr``. * Some default handlers. Maybe something that hooks into `logging`. * Possibly cache the results of ``factory`` calls. This would be a semantic change. """ import threading import sys from paste.util import filemixin class PrintCatcher(filemixin.FileMixin): def __init__(self, default=None, factory=None, paramwriter=None, leave_stdout=False): assert len(filter(lambda x: x is not None, [default, factory, paramwriter])) <= 1, ( "You can only provide one of default, factory, or paramwriter") if leave_stdout: assert not default, ( "You cannot pass in both default (%r) and " "leave_stdout=True" % default) default = sys.stdout if default: self._defaultfunc = self._writedefault elif factory: self._defaultfunc = self._writefactory elif paramwriter: self._defaultfunc = self._writeparam else: self._defaultfunc = self._writeerror self._default = default self._factory = factory self._paramwriter = paramwriter self._catchers = {} def write(self, v, currentThread=threading.currentThread): name = currentThread().getName() catchers = self._catchers if not catchers.has_key(name): self._defaultfunc(name, v) else: catcher = catchers[name] catcher.write(v) def seek(self, *args): # Weird, but Google App Engine is seeking on stdout name = threading.currentThread().getName() catchers = self._catchers if not name in catchers: self._default.seek(*args) else: catchers[name].seek(*args) def read(self, *args): name = threading.currentThread().getName() catchers = self._catchers if not name in catchers: self._default.read(*args) else: catchers[name].read(*args) def _writedefault(self, name, v): self._default.write(v) def _writefactory(self, name, v): self._factory(name).write(v) def _writeparam(self, name, v): self._paramwriter(name, v) def _writeerror(self, name, v): assert False, ( "There is no PrintCatcher output stream for the thread %r" % name) def register(self, catcher, name=None, currentThread=threading.currentThread): if name is None: name = currentThread().getName() self._catchers[name] = catcher def deregister(self, name=None, currentThread=threading.currentThread): if name is None: name = currentThread().getName() assert self._catchers.has_key(name), ( "There is no PrintCatcher catcher for the thread %r" % name) del self._catchers[name] _printcatcher = None _oldstdout = None def install(**kw): global _printcatcher, _oldstdout, register, deregister if (not _printcatcher or sys.stdout is not _printcatcher): _oldstdout = sys.stdout _printcatcher = sys.stdout = PrintCatcher(**kw) register = _printcatcher.register deregister = _printcatcher.deregister def uninstall(): global _printcatcher, _oldstdout, register, deregister if _printcatcher: sys.stdout = _oldstdout _printcatcher = _oldstdout = None register = not_installed_error deregister = not_installed_error def not_installed_error(*args, **kw): assert False, ( "threadedprint has not yet been installed (call " "threadedprint.install())") register = deregister = not_installed_error class StdinCatcher(filemixin.FileMixin): def __init__(self, default=None, factory=None, paramwriter=None): assert len(filter(lambda x: x is not None, [default, factory, paramwriter])) <= 1, ( "You can only provide one of default, factory, or paramwriter") if default: self._defaultfunc = self._readdefault elif factory: self._defaultfunc = self._readfactory elif paramwriter: self._defaultfunc = self._readparam else: self._defaultfunc = self._readerror self._default = default self._factory = factory self._paramwriter = paramwriter self._catchers = {} def read(self, size=None, currentThread=threading.currentThread): name = currentThread().getName() catchers = self._catchers if not catchers.has_key(name): return self._defaultfunc(name, size) else: catcher = catchers[name] return catcher.read(size) def _readdefault(self, name, size): self._default.read(size) def _readfactory(self, name, size): self._factory(name).read(size) def _readparam(self, name, size): self._paramreader(name, size) def _readerror(self, name, size): assert False, ( "There is no StdinCatcher output stream for the thread %r" % name) def register(self, catcher, name=None, currentThread=threading.currentThread): if name is None: name = currentThread().getName() self._catchers[name] = catcher def deregister(self, catcher, name=None, currentThread=threading.currentThread): if name is None: name = currentThread().getName() assert self._catchers.has_key(name), ( "There is no StdinCatcher catcher for the thread %r" % name) del self._catchers[name] _stdincatcher = None _oldstdin = None def install_stdin(**kw): global _stdincatcher, _oldstdin, register_stdin, deregister_stdin if not _stdincatcher: _oldstdin = sys.stdin _stdincatcher = sys.stdin = StdinCatcher(**kw) register_stdin = _stdincatcher.register deregister_stdin = _stdincatcher.deregister def uninstall_stdin(): global _stdincatcher, _oldstdin, register_stdin, deregister_stdin if _stdincatcher: sys.stdin = _oldstdin _stdincatcher = _oldstdin = None register_stdin = deregister_stdin = not_installed_error_stdin def not_installed_error_stdin(*args, **kw): assert False, ( "threadedprint has not yet been installed for stdin (call " "threadedprint.install_stdin())")