summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-12-02 14:21:13 -0800
committerDana Powers <dana.powers@gmail.com>2015-12-02 14:21:13 -0800
commit58e2ab41a76518c433d7375a24191018b85ced85 (patch)
tree4d1fdb80c101bdc2ed0b333ea75d81ddb4fba67c /kafka/client.py
parentcdcaea6f944df10941522ebcb08946bf34c357db (diff)
parentc2adeeab057b825c8cccae67aac822be02293211 (diff)
downloadkafka-python-58e2ab41a76518c433d7375a24191018b85ced85.tar.gz
Merge pull request #473 from ecanzonieri/use_unblocking_io_for_aware_requests
Use unblocking io for broker aware requests
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py48
1 files changed, 30 insertions, 18 deletions
diff --git a/kafka/client.py b/kafka/client.py
index c05e142..64b814b 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -2,6 +2,7 @@ import collections
import copy
import functools
import logging
+import select
import time
import kafka.common
@@ -177,6 +178,10 @@ class KafkaClient(object):
# For each broker, send the list of request payloads
# and collect the responses and errors
broker_failures = []
+
+ # For each KafkaConnection keep the real socket so that we can use
+ # a select to perform unblocking I/O
+ connections_by_socket = {}
for broker, payloads in payloads_by_broker.items():
requestId = self._next_id()
log.debug('Request %s to %s: %s', requestId, broker, payloads)
@@ -210,27 +215,34 @@ class KafkaClient(object):
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = None
continue
+ else:
+ connections_by_socket[conn.get_connected_socket()] = (conn, broker)
- try:
- response = conn.recv(requestId)
- except ConnectionError as e:
- broker_failures.append(broker)
- log.warning('ConnectionError attempting to receive a '
- 'response to request %s from server %s: %s',
- requestId, broker, e)
+ conn = None
+ while connections_by_socket:
+ sockets = connections_by_socket.keys()
+ rlist, _, _ = select.select(sockets, [], [], None)
+ conn, broker = connections_by_socket.pop(rlist[0])
+ try:
+ response = conn.recv(requestId)
+ except ConnectionError as e:
+ broker_failures.append(broker)
+ log.warning('ConnectionError attempting to receive a '
+ 'response to request %s from server %s: %s',
+ requestId, broker, e)
- for payload in payloads:
- topic_partition = (payload.topic, payload.partition)
- responses[topic_partition] = FailedPayloadsError(payload)
+ for payload in payloads_by_broker[broker]:
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = FailedPayloadsError(payload)
- else:
- _resps = []
- for payload_response in decoder_fn(response):
- topic_partition = (payload_response.topic,
- payload_response.partition)
- responses[topic_partition] = payload_response
- _resps.append(payload_response)
- log.debug('Response %s: %s', requestId, _resps)
+ else:
+ _resps = []
+ for payload_response in decoder_fn(response):
+ topic_partition = (payload_response.topic,
+ payload_response.partition)
+ responses[topic_partition] = payload_response
+ _resps.append(payload_response)
+ log.debug('Response %s: %s', requestId, _resps)
# Connection errors generally mean stale metadata
# although sometimes it means incorrect api request