summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEnrico Canzonieri <ecanzonieri@gmail.com>2015-10-12 00:11:18 -0700
committerEnrico Canzonieri <enrico@yelp.com>2015-11-10 17:57:57 -0800
commit04920bb89f9d73e4028dbd487719975c65954592 (patch)
treed02675d919e29670a01c385c3eccb48ce1baf804
parente99a934bab1d551d07dd0c6365f6a730028489f3 (diff)
downloadkafka-python-04920bb89f9d73e4028dbd487719975c65954592.tar.gz
Unblocking broker aware request
-rw-r--r--kafka/client.py48
-rw-r--r--kafka/conn.py5
2 files changed, 35 insertions, 18 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 13777a4..68277ed 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -2,6 +2,7 @@ import collections
import copy
import functools
import logging
+import select
import time
import kafka.common
@@ -177,6 +178,10 @@ class KafkaClient(object):
# For each broker, send the list of request payloads
# and collect the responses and errors
broker_failures = []
+
+ # For each KafkaConnection we store the real socket so that we can use
+ # a select to perform unblocking I/O
+ socket_connection = {}
for broker, payloads in payloads_by_broker.items():
requestId = self._next_id()
log.debug('Request %s to %s: %s', requestId, broker, payloads)
@@ -210,27 +215,34 @@ class KafkaClient(object):
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = None
continue
+ else:
+ socket_connection[conn.get_connected_socket()] = (conn, broker)
- try:
- response = conn.recv(requestId)
- except ConnectionError as e:
- broker_failures.append(broker)
- log.warning('ConnectionError attempting to receive a '
- 'response to request %s from server %s: %s',
- requestId, broker, e)
+ conn = None
+ while socket_connection:
+ sockets = socket_connection.keys()
+ rlist, _, _ = select.select(sockets, [], [], None)
+ conn, broker = socket_connection.pop(rlist[0])
+ try:
+ response = conn.recv(requestId)
+ except ConnectionError as e:
+ broker_failures.append(broker)
+ log.warning('ConnectionError attempting to receive a '
+ 'response to request %s from server %s: %s',
+ requestId, broker, e)
- for payload in payloads:
- topic_partition = (payload.topic, payload.partition)
- responses[topic_partition] = FailedPayloadsError(payload)
+ for payload in payloads:
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = FailedPayloadsError(payload)
- else:
- _resps = []
- for payload_response in decoder_fn(response):
- topic_partition = (payload_response.topic,
- payload_response.partition)
- responses[topic_partition] = payload_response
- _resps.append(payload_response)
- log.debug('Response %s: %s', requestId, _resps)
+ else:
+ _resps = []
+ for payload_response in decoder_fn(response):
+ topic_partition = (payload_response.topic,
+ payload_response.partition)
+ responses[topic_partition] = payload_response
+ _resps.append(payload_response)
+ log.debug('Response %s: %s', requestId, _resps)
# Connection errors generally mean stale metadata
# although sometimes it means incorrect api request
diff --git a/kafka/conn.py b/kafka/conn.py
index 432e10b..f1a12dc 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -118,6 +118,11 @@ class KafkaConnection(local):
# TODO multiplex socket communication to allow for multi-threaded clients
+ def get_connected_socket(self):
+ if not self._sock:
+ self.reinit()
+ return self._sock
+
def send(self, request_id, payload):
"""
Send a request to Kafka