summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-25 15:59:35 -0800
committerDana Powers <dana.powers@rd.io>2016-01-25 16:30:32 -0800
commita667a4b3be03ed75cd225223678bdc6fda0a8016 (patch)
treede0b5c175862c9af34d048195f581bd3974f79f4
parent71a9e65e58151c841cf3e0880de070169ca79c60 (diff)
downloadkafka-python-a667a4b3be03ed75cd225223678bdc6fda0a8016.tar.gz
Small KafkaClient.check_version() improvements
- filter connection failure logging during version check - raise UnrecognizedBrokerVersion if we cant id broker
-rw-r--r--kafka/client_async.py23
-rw-r--r--kafka/common.py4
2 files changed, 24 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index b5b3761..af414e2 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -586,6 +586,15 @@ class KafkaClient(object):
OffsetFetchRequest_v0, GroupCoordinatorRequest)
from .protocol.metadata import MetadataRequest
+ # Socket errors are logged as exceptions and can alarm users. Mute them
+ from logging import Filter
+ class ConnFilter(Filter):
+ def filter(self, record):
+ if record.funcName in ('recv', 'send'):
+ return False
+ return True
+ log_filter = ConnFilter()
+
test_cases = [
('0.9', ListGroupsRequest()),
('0.8.2', GroupCoordinatorRequest('kafka-python-default-group')),
@@ -593,18 +602,20 @@ class KafkaClient(object):
('0.8.0', MetadataRequest([])),
]
+ logging.getLogger('kafka.conn').addFilter(log_filter)
for version, request in test_cases:
connect()
f = self.send(node_id, request)
- time.sleep(0.5)
- self.send(node_id, MetadataRequest([]))
+ time.sleep(0.1) # HACK: sleeping to wait for socket to send bytes
+ metadata = self.send(node_id, MetadataRequest([]))
self.poll(future=f)
+ self.poll(future=metadata)
assert f.is_done
if f.succeeded():
log.info('Broker version identifed as %s', version)
- return version
+ break
if six.PY2:
assert isinstance(f.exception.args[0], socket.error)
@@ -615,6 +626,12 @@ class KafkaClient(object):
version, request.__class__.__name__)
continue
+ else:
+ raise Errors.UnrecognizedBrokerVersion()
+
+ logging.getLogger('kafka.conn').removeFilter(log_filter)
+ return version
+
def wakeup(self):
os.write(self._wake_w, b'x')
diff --git a/kafka/common.py b/kafka/common.py
index 84cf719..3fb5ab2 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -132,6 +132,10 @@ class StaleMetadata(KafkaError):
invalid_metadata = True
+class UnrecognizedBrokerVersion(KafkaError):
+ pass
+
+
class BrokerResponseError(KafkaError):
errno = None
message = None