summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Bangert <ben@groovie.org>2012-08-09 14:53:13 -0700
committerBen Bangert <ben@groovie.org>2012-08-09 14:53:13 -0700
commit7842a3e947b06ac1d7441b26601fa54a9b9f8897 (patch)
treeed4feeecdad34743f5b4fa6d7a7228a27fc01836
parent9cedc30d91dfdd07e2d6acfc2c84216088a69e7a (diff)
downloadkazoo-7842a3e947b06ac1d7441b26601fa54a9b9f8897.tar.gz
Fashion a logging handler appropriate based on the handler used for KazooClient.
Fixes #16.
-rw-r--r--kazoo/client.py52
-rw-r--r--kazoo/klog.py138
2 files changed, 143 insertions, 47 deletions
diff --git a/kazoo/client.py b/kazoo/client.py
index 8fc0f6f..068a395 100644
--- a/kazoo/client.py
+++ b/kazoo/client.py
@@ -1,7 +1,6 @@
"""Kazoo Zookeeper Client"""
import inspect
import logging
-import os
from collections import namedtuple
from functools import partial
from os.path import split
@@ -19,7 +18,7 @@ from kazoo.recipe.party import Party
from kazoo.recipe.party import ShallowParty
from kazoo.recipe.election import Election
from kazoo.retry import KazooRetry
-from kazoo.handlers.util import thread
+from kazoo.klog import setup_logging
log = logging.getLogger(__name__)
@@ -27,51 +26,6 @@ ZK_OPEN_ACL_UNSAFE = {"perms": zookeeper.PERM_ALL, "scheme": "world",
"id": "anyone"}
-## Zookeeper Logging Setup
-# Setup Zookeeper logging thread
-_logging_pipe = os.pipe()
-zookeeper.set_log_stream(os.fdopen(_logging_pipe[1], 'w'))
-
-
-@thread
-def _loggingthread():
- """Zookeeper logging redirect
-
- Zookeeper by default logs directly out. This thread handles reading
- off the pipe that the above `set_log_stream` call designates so
- that the Zookeeper logging output can be turned into Python logging
- statements under the `Zookeeper` name.
-
- """
- r, w = _logging_pipe
- log = logging.getLogger('ZooKeeper').log
- f = os.fdopen(r)
- levels = dict(ZOO_INFO=logging.INFO,
- ZOO_WARN=logging.WARNING,
- ZOO_ERROR=logging.ERROR,
- ZOO_DEBUG=logging.DEBUG,
- )
- while 1:
- line = f.readline().strip()
- try:
- if '@' in line:
- level, message = line.split('@', 1)
- level = levels.get(level.split(':')[-1])
-
- if 'Exceeded deadline by' in line and level == logging.WARNING:
- level = logging.DEBUG
-
- else:
- level = None
-
- if level is None:
- log(logging.INFO, line)
- else:
- log(level, message)
- except Exception as v:
- logging.getLogger('ZooKeeper').exception("Logging error: %s", v)
-
-
## Client State and Event objects
class KazooState(object):
@@ -392,6 +346,10 @@ class KazooClient(object):
from kazoo.recipe.watchers import ChildrenWatch
from kazoo.recipe.watchers import DataWatch
+ # Check for logging
+ using_gevent = 'gevent' in handler.name
+ setup_logging(use_gevent=using_gevent)
+
# Check for chroot
chroot_check = hosts.split('/', 1)
if len(chroot_check) == 2:
diff --git a/kazoo/klog.py b/kazoo/klog.py
new file mode 100644
index 0000000..4add0b4
--- /dev/null
+++ b/kazoo/klog.py
@@ -0,0 +1,138 @@
+"""Kazoo Logging for Zookeeper
+
+Zookeeper logging redirects that fashion the appropriate logging setup based
+on the handler used for the :class:`~kazoo.client.KazooClient`.
+
+"""
+import os
+import logging
+
+import zookeeper
+
+from kazoo.handlers.util import get_realthread
+
+zk_log = logging.getLogger('ZooKeeper')
+_logging_setup = False
+
+
+def setup_logging(use_gevent=False):
+ global _logging_setup
+
+ if _logging_setup:
+ return
+
+ if use_gevent:
+ _setup_gevent_logging()
+ else:
+ _setup_threading_logging()
+ _logging_setup = True
+
+
+def _setup_gevent_logging():
+ """Setup greenlet based logging redirects"""
+ import gevent
+ from kazoo.handlers.gevent import (_pipe, _using_libevent)
+ _logging_pipe = _pipe()
+ zookeeper.set_log_stream(os.fdopen(_logging_pipe[1], 'w'))
+
+ if _using_libevent:
+ gevent.core.event(gevent.core.EV_READ | gevent.core.EV_PERSIST,
+ _logging_pipe[0], _log_pipe_reader).add()
+ else:
+ gevent.spawn(_logging_greenlet, _logging_pipe)
+
+
+def _setup_threading_logging():
+ """Setup threading based logging redirects"""
+ _logging_pipe = os.pipe()
+ zookeeper.set_log_stream(os.fdopen(_logging_pipe[1], 'w'))
+
+ thread = get_realthread()
+ thread.start_new_thread(_loggingthread, (_logging_pipe,))
+
+
+def _process_message(line):
+ """Line processor used by all loggers"""
+ log = zk_log.log
+ levels = dict(ZOO_INFO=logging.INFO,
+ ZOO_WARN=logging.WARNING,
+ ZOO_ERROR=logging.ERROR,
+ ZOO_DEBUG=logging.DEBUG,
+ )
+ try:
+ if '@' in line:
+ level, message = line.split('@', 1)
+ level = levels.get(level.split(':')[-1])
+
+ if 'Exceeded deadline by' in line and level == logging.WARNING:
+ level = logging.DEBUG
+
+ else:
+ level = None
+
+ if level is None:
+ log(logging.INFO, line)
+ else:
+ log(level, message)
+ except Exception as v:
+ zk_log.exception("Logging error: %s", v)
+
+
+def _log_pipe_reader(event, evtype):
+ """Logging callback for new gevent 0.13 pipe notifications"""
+ try:
+ data = []
+ char = os.read(event.fd, 1)
+ while char != '\n':
+ data.append(char)
+ char = os.read(event.fd, 1)
+ line = ''.join(data).strip()
+ if not line:
+ return
+ _process_message(line)
+ except Exception:
+ pass
+
+
+def _logging_greenlet(logging_pipe):
+ """Zookeeper logging redirect
+
+ This greenlet based logger waits for the pipe to get data, then reads
+ lines off it and processes them.
+
+ Used for gevent 1.0 and above.
+
+ """
+ from gevent.socket import wait_read
+ r, w = logging_pipe
+ while 1:
+ wait_read(r)
+ data = []
+ char = os.read(r, 1)
+ while char != '\n':
+ data.append(char)
+ char = os.read(r, 1)
+ line = ''.join(data).strip()
+ if not line:
+ return
+ _process_message(line)
+
+
+def _loggingthread(logging_pipe):
+ """Zookeeper logging redirect
+
+ Zookeeper by default logs directly out. This thread handles reading
+ off the pipe that the above `set_log_stream` call designates so
+ that the Zookeeper logging output can be turned into Python logging
+ statements under the `Zookeeper` name.
+
+ """
+ r, w = logging_pipe
+ f = os.fdopen(r)
+ while 1:
+ line = f.readline().strip()
+
+ # Skip empty lines
+ if not line:
+ continue
+ _process_message(line)