diff options
author | Enrico Canzonieri <ecanzonieri@gmail.com> | 2015-10-24 16:50:46 -0700 |
---|---|---|
committer | Enrico Canzonieri <enrico@yelp.com> | 2015-11-10 17:58:04 -0800 |
commit | c2adeeab057b825c8cccae67aac822be02293211 (patch) | |
tree | ee745273c341f5bb482ebe98fda89496bee97b7c | |
parent | 04920bb89f9d73e4028dbd487719975c65954592 (diff) | |
download | kafka-python-c2adeeab057b825c8cccae67aac822be02293211.tar.gz |
Add tests. Bug fix. Rename socket_conn dict.
-rw-r--r-- | kafka/client.py | 14 | ||||
-rw-r--r-- | test/test_conn.py | 17 |
2 files changed, 24 insertions, 7 deletions
diff --git a/kafka/client.py b/kafka/client.py index 68277ed..6603a47 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -179,9 +179,9 @@ class KafkaClient(object): # and collect the responses and errors broker_failures = [] - # For each KafkaConnection we store the real socket so that we can use + # For each KafkaConnection keep the real socket so that we can use # a select to perform unblocking I/O - socket_connection = {} + connections_by_socket = {} for broker, payloads in payloads_by_broker.items(): requestId = self._next_id() log.debug('Request %s to %s: %s', requestId, broker, payloads) @@ -216,13 +216,13 @@ class KafkaClient(object): responses[topic_partition] = None continue else: - socket_connection[conn.get_connected_socket()] = (conn, broker) + connections_by_socket[conn.get_connected_socket()] = (conn, broker) conn = None - while socket_connection: - sockets = socket_connection.keys() + while connections_by_socket: + sockets = connections_by_socket.keys() rlist, _, _ = select.select(sockets, [], [], None) - conn, broker = socket_connection.pop(rlist[0]) + conn, broker = connections_by_socket.pop(rlist[0]) try: response = conn.recv(requestId) except ConnectionError as e: @@ -231,7 +231,7 @@ class KafkaClient(object): 'response to request %s from server %s: %s', requestId, broker, e) - for payload in payloads: + for payload in payloads_by_broker[broker]: topic_partition = (payload.topic, payload.partition) responses[topic_partition] = FailedPayloadsError(payload) diff --git a/test/test_conn.py b/test/test_conn.py index 2b70344..1bdfc1e 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -165,6 +165,23 @@ class ConnTest(unittest.TestCase): self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload']) self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2']) + def test_get_connected_socket(self): + s = self.conn.get_connected_socket() + + self.assertEqual(s, self.MockCreateConn()) + + def test_get_connected_socket_on_dirty_conn(self): + # Dirty the connection + try: + self.conn._raise_connection_error() + except ConnectionError: + pass + + # Test that get_connected_socket tries to connect + self.assertEqual(self.MockCreateConn.call_count, 0) + self.conn.get_connected_socket() + self.assertEqual(self.MockCreateConn.call_count, 1) + def test_close__object_is_reusable(self): # test that sending to a closed connection |