diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-25 15:59:35 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-25 16:30:32 -0800 |
commit | a667a4b3be03ed75cd225223678bdc6fda0a8016 (patch) | |
tree | de0b5c175862c9af34d048195f581bd3974f79f4 | |
parent | 71a9e65e58151c841cf3e0880de070169ca79c60 (diff) | |
download | kafka-python-a667a4b3be03ed75cd225223678bdc6fda0a8016.tar.gz |
Small KafkaClient.check_version() improvements
- filter connection failure logging during version check
- raise UnrecognizedBrokerVersion if we cant id broker
-rw-r--r-- | kafka/client_async.py | 23 | ||||
-rw-r--r-- | kafka/common.py | 4 |
2 files changed, 24 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index b5b3761..af414e2 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -586,6 +586,15 @@ class KafkaClient(object): OffsetFetchRequest_v0, GroupCoordinatorRequest) from .protocol.metadata import MetadataRequest + # Socket errors are logged as exceptions and can alarm users. Mute them + from logging import Filter + class ConnFilter(Filter): + def filter(self, record): + if record.funcName in ('recv', 'send'): + return False + return True + log_filter = ConnFilter() + test_cases = [ ('0.9', ListGroupsRequest()), ('0.8.2', GroupCoordinatorRequest('kafka-python-default-group')), @@ -593,18 +602,20 @@ class KafkaClient(object): ('0.8.0', MetadataRequest([])), ] + logging.getLogger('kafka.conn').addFilter(log_filter) for version, request in test_cases: connect() f = self.send(node_id, request) - time.sleep(0.5) - self.send(node_id, MetadataRequest([])) + time.sleep(0.1) # HACK: sleeping to wait for socket to send bytes + metadata = self.send(node_id, MetadataRequest([])) self.poll(future=f) + self.poll(future=metadata) assert f.is_done if f.succeeded(): log.info('Broker version identifed as %s', version) - return version + break if six.PY2: assert isinstance(f.exception.args[0], socket.error) @@ -615,6 +626,12 @@ class KafkaClient(object): version, request.__class__.__name__) continue + else: + raise Errors.UnrecognizedBrokerVersion() + + logging.getLogger('kafka.conn').removeFilter(log_filter) + return version + def wakeup(self): os.write(self._wake_w, b'x') diff --git a/kafka/common.py b/kafka/common.py index 84cf719..3fb5ab2 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -132,6 +132,10 @@ class StaleMetadata(KafkaError): invalid_metadata = True +class UnrecognizedBrokerVersion(KafkaError): + pass + + class BrokerResponseError(KafkaError): errno = None message = None |