summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py45
1 files changed, 19 insertions, 26 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 5a1d624..d70e4f2 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -152,8 +152,8 @@ class KafkaClient(object):
conn = self._conns[node_id]
return conn.state is ConnectionStates.DISCONNECTED and not conn.blacked_out()
- def _initiate_connect(self, node_id):
- """Initiate a connection to the given node (must be in metadata)"""
+ def _maybe_connect(self, node_id):
+ """Idempotent non-blocking connection attempt to the given node id."""
if node_id not in self._conns:
broker = self.cluster.broker_metadata(node_id)
assert broker, 'Broker id %s not in current metadata' % node_id
@@ -164,22 +164,21 @@ class KafkaClient(object):
host, port, afi = get_ip_port_afi(broker.host)
self._conns[node_id] = BrokerConnection(host, broker.port, afi,
**self.config)
- return self._finish_connect(node_id)
-
- def _finish_connect(self, node_id):
- assert node_id in self._conns, '%s is not in current conns' % node_id
state = self._conns[node_id].connect()
if state is ConnectionStates.CONNECTING:
self._connecting.add(node_id)
+
+ # Whether CONNECTED or DISCONNECTED, we need to remove from connecting
elif node_id in self._connecting:
log.debug("Node %s connection state is %s", node_id, state)
self._connecting.remove(node_id)
+ # Connection failures imply that our metadata is stale, so let's refresh
if state is ConnectionStates.DISCONNECTED:
log.warning("Node %s connect failed -- refreshing metadata", node_id)
self.cluster.request_update()
- return state
+ return self._conns[node_id].connected()
def ready(self, node_id):
"""Check whether a node is connected and ok to send more requests.
@@ -190,19 +189,15 @@ class KafkaClient(object):
Returns:
bool: True if we are ready to send to the given node
"""
- if self.is_ready(node_id):
- return True
-
- if self._can_connect(node_id):
- # if we are interested in sending to a node
- # and we don't have a connection to it, initiate one
- self._initiate_connect(node_id)
-
- if node_id in self._connecting:
- self._finish_connect(node_id)
-
+ self._maybe_connect(node_id)
return self.is_ready(node_id)
+ def connected(self, node_id):
+ """Return True iff the node_id is connected."""
+ if node_id not in self._conns:
+ return False
+ return self._conns[node_id].connected()
+
def close(self, node_id=None):
"""Closes the connection to a particular node (if there is one).
@@ -295,15 +290,13 @@ class KafkaClient(object):
request (Struct): request object (not-encoded)
Raises:
- NodeNotReadyError: if node_id is not ready
+ AssertionError: if node_id is not in current cluster metadata
Returns:
- Future: resolves to Response struct
+ Future: resolves to Response struct or Error
"""
- if not self._can_send_request(node_id):
- raise Errors.NodeNotReadyError("Attempt to send a request to node"
- " which is not ready (node id %s)."
- % node_id)
+ if not self._maybe_connect(node_id):
+ return Future().failure(Errors.NodeNotReadyError(node_id))
# Every request gets a response, except one special case:
expect_response = True
@@ -341,7 +334,7 @@ class KafkaClient(object):
# Attempt to complete pending connections
for node_id in list(self._connecting):
- self._finish_connect(node_id)
+ self._maybe_connect(node_id)
# Send a metadata request if needed
metadata_timeout_ms = self._maybe_refresh_metadata()
@@ -557,7 +550,7 @@ class KafkaClient(object):
elif self._can_connect(node_id):
log.debug("Initializing connection to node %s for metadata request", node_id)
- self._initiate_connect(node_id)
+ self._maybe_connect(node_id)
return 0