summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEnrico Canzonieri <ecanzonieri@gmail.com>2015-10-24 16:50:46 -0700
committerEnrico Canzonieri <enrico@yelp.com>2015-11-10 17:58:04 -0800
commitc2adeeab057b825c8cccae67aac822be02293211 (patch)
treeee745273c341f5bb482ebe98fda89496bee97b7c
parent04920bb89f9d73e4028dbd487719975c65954592 (diff)
downloadkafka-python-c2adeeab057b825c8cccae67aac822be02293211.tar.gz
Add tests. Bug fix. Rename socket_conn dict.
-rw-r--r--kafka/client.py14
-rw-r--r--test/test_conn.py17
2 files changed, 24 insertions, 7 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 68277ed..6603a47 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -179,9 +179,9 @@ class KafkaClient(object):
# and collect the responses and errors
broker_failures = []
- # For each KafkaConnection we store the real socket so that we can use
+ # For each KafkaConnection keep the real socket so that we can use
# a select to perform unblocking I/O
- socket_connection = {}
+ connections_by_socket = {}
for broker, payloads in payloads_by_broker.items():
requestId = self._next_id()
log.debug('Request %s to %s: %s', requestId, broker, payloads)
@@ -216,13 +216,13 @@ class KafkaClient(object):
responses[topic_partition] = None
continue
else:
- socket_connection[conn.get_connected_socket()] = (conn, broker)
+ connections_by_socket[conn.get_connected_socket()] = (conn, broker)
conn = None
- while socket_connection:
- sockets = socket_connection.keys()
+ while connections_by_socket:
+ sockets = connections_by_socket.keys()
rlist, _, _ = select.select(sockets, [], [], None)
- conn, broker = socket_connection.pop(rlist[0])
+ conn, broker = connections_by_socket.pop(rlist[0])
try:
response = conn.recv(requestId)
except ConnectionError as e:
@@ -231,7 +231,7 @@ class KafkaClient(object):
'response to request %s from server %s: %s',
requestId, broker, e)
- for payload in payloads:
+ for payload in payloads_by_broker[broker]:
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = FailedPayloadsError(payload)
diff --git a/test/test_conn.py b/test/test_conn.py
index 2b70344..1bdfc1e 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -165,6 +165,23 @@ class ConnTest(unittest.TestCase):
self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2'])
+ def test_get_connected_socket(self):
+ s = self.conn.get_connected_socket()
+
+ self.assertEqual(s, self.MockCreateConn())
+
+ def test_get_connected_socket_on_dirty_conn(self):
+ # Dirty the connection
+ try:
+ self.conn._raise_connection_error()
+ except ConnectionError:
+ pass
+
+ # Test that get_connected_socket tries to connect
+ self.assertEqual(self.MockCreateConn.call_count, 0)
+ self.conn.get_connected_socket()
+ self.assertEqual(self.MockCreateConn.call_count, 1)
+
def test_close__object_is_reusable(self):
# test that sending to a closed connection