From 8fff81468df640c0c1fc5daeb8fd8dd980c15c0c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 10 Dec 2015 18:38:34 -0800 Subject: Move Request / Response logging from KafkaClient to BrokerConnection and reenable kafka.conn debug logging in tests --- kafka/client.py | 2 -- kafka/conn.py | 6 ++++-- test/test_conn.py | 18 ------------------ test/testutil.py | 3 --- 4 files changed, 4 insertions(+), 25 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index ca737c4..e66190d 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -144,7 +144,6 @@ class KafkaClient(object): response = conn.recv() if response is not None: decoded = decoder_fn(response) - log.debug('Response %s: %s', correlation_id, decoded) return decoded raise KafkaUnavailableError('All servers failed to process request') @@ -250,7 +249,6 @@ class KafkaClient(object): 'from server %s', correlation_id, broker) continue - log.debug('Response %s: %s', correlation_id, response) for payload_response in decoder_fn(response): topic_partition = (str(payload_response.topic), payload_response.partition) diff --git a/kafka/conn.py b/kafka/conn.py index 9907cb1..bd399a9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -75,11 +75,12 @@ class BrokerConnection(local): self._write_fd.write(message) self._write_fd.flush() except socket.error: - log.exception("Error in BrokerConnection.send()") + log.exception("Error in BrokerConnection.send(): %s", request) self.close() return None if expect_response: self.in_flight_requests.append((self.correlation_id, request.RESPONSE_TYPE)) + log.debug('Request %d: %s', self.correlation_id, request) return self.correlation_id def recv(self, timeout=None): @@ -100,9 +101,10 @@ class BrokerConnection(local): raise RuntimeError('Correlation ids do not match!') response = response_type.decode(self._read_fd) except (RuntimeError, socket.error, struct.error): - log.exception("Error in BrokerConnection.recv()") + log.exception("Error in BrokerConnection.recv() for request %d", correlation_id) self.close() return None + log.debug('Response %d: %s', correlation_id, response) return response def next_correlation_id_recv(self): diff --git a/test/test_conn.py b/test/test_conn.py index 1bdfc1e..684ffe5 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -1,4 +1,3 @@ -import logging import socket import struct from threading import Thread @@ -12,9 +11,6 @@ from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SE class ConnTest(unittest.TestCase): def setUp(self): - # kafka.conn debug logging is verbose, so only enable in conn tests - logging.getLogger('kafka.conn').setLevel(logging.DEBUG) - self.config = { 'host': 'localhost', 'port': 9090, @@ -50,11 +46,6 @@ class ConnTest(unittest.TestCase): # Reset any mock counts caused by __init__ self.MockCreateConn.reset_mock() - def tearDown(self): - # Return connection logging to INFO - logging.getLogger('kafka.conn').setLevel(logging.INFO) - - def test_collect_hosts__happy_path(self): hosts = "localhost:1234,localhost" results = collect_hosts(hosts) @@ -193,15 +184,6 @@ class ConnTest(unittest.TestCase): class TestKafkaConnection(unittest.TestCase): - - def setUp(self): - # kafka.conn debug logging is verbose, so only enable in conn tests - logging.getLogger('kafka.conn').setLevel(logging.DEBUG) - - def tearDown(self): - # Return connection logging to INFO - logging.getLogger('kafka.conn').setLevel(logging.INFO) - @mock.patch('socket.create_connection') def test_copy(self, socket): """KafkaConnection copies work as expected""" diff --git a/test/testutil.py b/test/testutil.py index 5c6ea1b..98fe805 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -112,6 +112,3 @@ class Timer(object): logging.basicConfig(level=logging.DEBUG) logging.getLogger('test.fixtures').setLevel(logging.ERROR) logging.getLogger('test.service').setLevel(logging.ERROR) - -# kafka.conn debug logging is verbose, disable in tests by default -logging.getLogger('kafka.conn').setLevel(logging.INFO) -- cgit v1.2.1