diff options
author | Ben Bangert <ben@groovie.org> | 2012-08-09 14:53:13 -0700 |
---|---|---|
committer | Ben Bangert <ben@groovie.org> | 2012-08-09 14:53:13 -0700 |
commit | 7842a3e947b06ac1d7441b26601fa54a9b9f8897 (patch) | |
tree | ed4feeecdad34743f5b4fa6d7a7228a27fc01836 | |
parent | 9cedc30d91dfdd07e2d6acfc2c84216088a69e7a (diff) | |
download | kazoo-7842a3e947b06ac1d7441b26601fa54a9b9f8897.tar.gz |
Fashion a logging handler appropriate based on the handler used for KazooClient.
Fixes #16.
-rw-r--r-- | kazoo/client.py | 52 | ||||
-rw-r--r-- | kazoo/klog.py | 138 |
2 files changed, 143 insertions, 47 deletions
diff --git a/kazoo/client.py b/kazoo/client.py index 8fc0f6f..068a395 100644 --- a/kazoo/client.py +++ b/kazoo/client.py @@ -1,7 +1,6 @@ """Kazoo Zookeeper Client""" import inspect import logging -import os from collections import namedtuple from functools import partial from os.path import split @@ -19,7 +18,7 @@ from kazoo.recipe.party import Party from kazoo.recipe.party import ShallowParty from kazoo.recipe.election import Election from kazoo.retry import KazooRetry -from kazoo.handlers.util import thread +from kazoo.klog import setup_logging log = logging.getLogger(__name__) @@ -27,51 +26,6 @@ ZK_OPEN_ACL_UNSAFE = {"perms": zookeeper.PERM_ALL, "scheme": "world", "id": "anyone"} -## Zookeeper Logging Setup -# Setup Zookeeper logging thread -_logging_pipe = os.pipe() -zookeeper.set_log_stream(os.fdopen(_logging_pipe[1], 'w')) - - -@thread -def _loggingthread(): - """Zookeeper logging redirect - - Zookeeper by default logs directly out. This thread handles reading - off the pipe that the above `set_log_stream` call designates so - that the Zookeeper logging output can be turned into Python logging - statements under the `Zookeeper` name. - - """ - r, w = _logging_pipe - log = logging.getLogger('ZooKeeper').log - f = os.fdopen(r) - levels = dict(ZOO_INFO=logging.INFO, - ZOO_WARN=logging.WARNING, - ZOO_ERROR=logging.ERROR, - ZOO_DEBUG=logging.DEBUG, - ) - while 1: - line = f.readline().strip() - try: - if '@' in line: - level, message = line.split('@', 1) - level = levels.get(level.split(':')[-1]) - - if 'Exceeded deadline by' in line and level == logging.WARNING: - level = logging.DEBUG - - else: - level = None - - if level is None: - log(logging.INFO, line) - else: - log(level, message) - except Exception as v: - logging.getLogger('ZooKeeper').exception("Logging error: %s", v) - - ## Client State and Event objects class KazooState(object): @@ -392,6 +346,10 @@ class KazooClient(object): from kazoo.recipe.watchers import ChildrenWatch from kazoo.recipe.watchers import DataWatch + # Check for logging + using_gevent = 'gevent' in handler.name + setup_logging(use_gevent=using_gevent) + # Check for chroot chroot_check = hosts.split('/', 1) if len(chroot_check) == 2: diff --git a/kazoo/klog.py b/kazoo/klog.py new file mode 100644 index 0000000..4add0b4 --- /dev/null +++ b/kazoo/klog.py @@ -0,0 +1,138 @@ +"""Kazoo Logging for Zookeeper + +Zookeeper logging redirects that fashion the appropriate logging setup based +on the handler used for the :class:`~kazoo.client.KazooClient`. + +""" +import os +import logging + +import zookeeper + +from kazoo.handlers.util import get_realthread + +zk_log = logging.getLogger('ZooKeeper') +_logging_setup = False + + +def setup_logging(use_gevent=False): + global _logging_setup + + if _logging_setup: + return + + if use_gevent: + _setup_gevent_logging() + else: + _setup_threading_logging() + _logging_setup = True + + +def _setup_gevent_logging(): + """Setup greenlet based logging redirects""" + import gevent + from kazoo.handlers.gevent import (_pipe, _using_libevent) + _logging_pipe = _pipe() + zookeeper.set_log_stream(os.fdopen(_logging_pipe[1], 'w')) + + if _using_libevent: + gevent.core.event(gevent.core.EV_READ | gevent.core.EV_PERSIST, + _logging_pipe[0], _log_pipe_reader).add() + else: + gevent.spawn(_logging_greenlet, _logging_pipe) + + +def _setup_threading_logging(): + """Setup threading based logging redirects""" + _logging_pipe = os.pipe() + zookeeper.set_log_stream(os.fdopen(_logging_pipe[1], 'w')) + + thread = get_realthread() + thread.start_new_thread(_loggingthread, (_logging_pipe,)) + + +def _process_message(line): + """Line processor used by all loggers""" + log = zk_log.log + levels = dict(ZOO_INFO=logging.INFO, + ZOO_WARN=logging.WARNING, + ZOO_ERROR=logging.ERROR, + ZOO_DEBUG=logging.DEBUG, + ) + try: + if '@' in line: + level, message = line.split('@', 1) + level = levels.get(level.split(':')[-1]) + + if 'Exceeded deadline by' in line and level == logging.WARNING: + level = logging.DEBUG + + else: + level = None + + if level is None: + log(logging.INFO, line) + else: + log(level, message) + except Exception as v: + zk_log.exception("Logging error: %s", v) + + +def _log_pipe_reader(event, evtype): + """Logging callback for new gevent 0.13 pipe notifications""" + try: + data = [] + char = os.read(event.fd, 1) + while char != '\n': + data.append(char) + char = os.read(event.fd, 1) + line = ''.join(data).strip() + if not line: + return + _process_message(line) + except Exception: + pass + + +def _logging_greenlet(logging_pipe): + """Zookeeper logging redirect + + This greenlet based logger waits for the pipe to get data, then reads + lines off it and processes them. + + Used for gevent 1.0 and above. + + """ + from gevent.socket import wait_read + r, w = logging_pipe + while 1: + wait_read(r) + data = [] + char = os.read(r, 1) + while char != '\n': + data.append(char) + char = os.read(r, 1) + line = ''.join(data).strip() + if not line: + return + _process_message(line) + + +def _loggingthread(logging_pipe): + """Zookeeper logging redirect + + Zookeeper by default logs directly out. This thread handles reading + off the pipe that the above `set_log_stream` call designates so + that the Zookeeper logging output can be turned into Python logging + statements under the `Zookeeper` name. + + """ + r, w = logging_pipe + f = os.fdopen(r) + while 1: + line = f.readline().strip() + + # Skip empty lines + if not line: + continue + _process_message(line) |