summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py39
1 files changed, 25 insertions, 14 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 6117262..4c21b8c 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -5,6 +5,14 @@ import copy
import errno
import logging
from random import shuffle, uniform
+
+# selectors in stdlib as of py3.4
+try:
+ import selectors # pylint: disable=import-error
+except ImportError:
+ # vendored backport module
+ from .vendor import selectors34 as selectors
+
import socket
import struct
import sys
@@ -138,6 +146,9 @@ class BrokerConnection(object):
api_version_auto_timeout_ms (int): number of milliseconds to throw a
timeout exception from the constructor when checking the broker
api version. Only applies if api_version is None
+ selector (selectors.BaseSelector): Provide a specific selector
+ implementation to use for I/O multiplexing.
+ Default: selectors.DefaultSelector
state_change_callback (callable): function to be called when the
connection state changes from CONNECTING to CONNECTED etc.
metrics (kafka.metrics.Metrics): Optionally provide a metrics
@@ -173,6 +184,7 @@ class BrokerConnection(object):
'ssl_crlfile': None,
'ssl_password': None,
'api_version': (0, 8, 2), # default to most restrictive
+ 'selector': selectors.DefaultSelector,
'state_change_callback': lambda conn: True,
'metrics': None,
'metric_group_prefix': '',
@@ -704,7 +716,7 @@ class BrokerConnection(object):
def recv(self):
"""Non-blocking network receive.
- Return response if available
+ Return list of (response, future)
"""
if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING:
log.warning('%s cannot recv: socket not connected', self)
@@ -727,17 +739,16 @@ class BrokerConnection(object):
self.config['request_timeout_ms']))
return ()
- for response in responses:
+ # augment respones w/ correlation_id, future, and timestamp
+ for i in range(len(responses)):
(correlation_id, future, timestamp) = self.in_flight_requests.popleft()
- if isinstance(response, Errors.KafkaError):
- self.close(response)
- break
-
+ latency_ms = (time.time() - timestamp) * 1000
if self._sensors:
- self._sensors.request_time.record((time.time() - timestamp) * 1000)
+ self._sensors.request_time.record(latency_ms)
- log.debug('%s Response %d: %s', self, correlation_id, response)
- future.success(response)
+ response = responses[i]
+ log.debug('%s Response %d (%s ms): %s', self, correlation_id, latency_ms, response)
+ responses[i] = (response, future)
return responses
@@ -899,12 +910,12 @@ class BrokerConnection(object):
# request was unrecognized
mr = self.send(MetadataRequest[0]([]))
- if self._sock:
- self._sock.setblocking(True)
+ selector = self.config['selector']()
+ selector.register(self._sock, selectors.EVENT_READ)
while not (f.is_done and mr.is_done):
- self.recv()
- if self._sock:
- self._sock.setblocking(False)
+ for response, future in self.recv():
+ future.success(response)
+ selector.select(1)
if f.succeeded():
if isinstance(request, ApiVersionRequest[0]):