diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 39 |
1 files changed, 25 insertions, 14 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 6117262..4c21b8c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -5,6 +5,14 @@ import copy import errno import logging from random import shuffle, uniform + +# selectors in stdlib as of py3.4 +try: + import selectors # pylint: disable=import-error +except ImportError: + # vendored backport module + from .vendor import selectors34 as selectors + import socket import struct import sys @@ -138,6 +146,9 @@ class BrokerConnection(object): api_version_auto_timeout_ms (int): number of milliseconds to throw a timeout exception from the constructor when checking the broker api version. Only applies if api_version is None + selector (selectors.BaseSelector): Provide a specific selector + implementation to use for I/O multiplexing. + Default: selectors.DefaultSelector state_change_callback (callable): function to be called when the connection state changes from CONNECTING to CONNECTED etc. metrics (kafka.metrics.Metrics): Optionally provide a metrics @@ -173,6 +184,7 @@ class BrokerConnection(object): 'ssl_crlfile': None, 'ssl_password': None, 'api_version': (0, 8, 2), # default to most restrictive + 'selector': selectors.DefaultSelector, 'state_change_callback': lambda conn: True, 'metrics': None, 'metric_group_prefix': '', @@ -704,7 +716,7 @@ class BrokerConnection(object): def recv(self): """Non-blocking network receive. - Return response if available + Return list of (response, future) """ if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING: log.warning('%s cannot recv: socket not connected', self) @@ -727,17 +739,16 @@ class BrokerConnection(object): self.config['request_timeout_ms'])) return () - for response in responses: + # augment respones w/ correlation_id, future, and timestamp + for i in range(len(responses)): (correlation_id, future, timestamp) = self.in_flight_requests.popleft() - if isinstance(response, Errors.KafkaError): - self.close(response) - break - + latency_ms = (time.time() - timestamp) * 1000 if self._sensors: - self._sensors.request_time.record((time.time() - timestamp) * 1000) + self._sensors.request_time.record(latency_ms) - log.debug('%s Response %d: %s', self, correlation_id, response) - future.success(response) + response = responses[i] + log.debug('%s Response %d (%s ms): %s', self, correlation_id, latency_ms, response) + responses[i] = (response, future) return responses @@ -899,12 +910,12 @@ class BrokerConnection(object): # request was unrecognized mr = self.send(MetadataRequest[0]([])) - if self._sock: - self._sock.setblocking(True) + selector = self.config['selector']() + selector.register(self._sock, selectors.EVENT_READ) while not (f.is_done and mr.is_done): - self.recv() - if self._sock: - self._sock.setblocking(False) + for response, future in self.recv(): + future.success(response) + selector.select(1) if f.succeeded(): if isinstance(request, ApiVersionRequest[0]): |