summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-03-24 10:23:28 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-04 09:37:56 -0700
commit0a942176a6030c3fd9e77a3a8f5a63f85f376f14 (patch)
tree4aaa4a086a3416c0069a1c2f7328311ae00befca /kafka/conn.py
parent6188b7bd4b08b043b7e4925360347f06f80f555e (diff)
downloadkafka-python-0a942176a6030c3fd9e77a3a8f5a63f85f376f14.tar.gz
Improve Broker connection handling of not-ready nodes
- simplify connect state logic - add connecting() method to check state - add BrokerConnection details to exceptions - return NodeNotReady as Future if still connecting
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py50
1 files changed, 26 insertions, 24 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 0ce469d..2b82b6d 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -106,24 +106,22 @@ class BrokerConnection(object):
# in non-blocking mode, use repeated calls to socket.connect_ex
# to check connection status
request_timeout = self.config['request_timeout_ms'] / 1000.0
- if time.time() > request_timeout + self.last_attempt:
+ try:
+ ret = self._sock.connect_ex((self.host, self.port))
+ except socket.error as ret:
+ pass
+ if not ret or ret == errno.EISCONN:
+ self.state = ConnectionStates.CONNECTED
+ elif ret not in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, 10022):
+ log.error('Connect attempt to %s returned error %s.'
+ ' Disconnecting.', self, ret)
+ self.close()
+ self.last_failure = time.time()
+ elif time.time() > request_timeout + self.last_attempt:
log.error('Connection attempt to %s timed out', self)
self.close() # error=TimeoutError ?
self.last_failure = time.time()
- else:
- try:
- ret = self._sock.connect_ex((self.host, self.port))
- except socket.error as ret:
- pass
- if not ret or ret == errno.EISCONN:
- self.state = ConnectionStates.CONNECTED
- # WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
- elif ret not in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, 10022):
- log.error('Connect attempt to %s returned error %s.'
- ' Disconnecting.', self, ret)
- self.close()
- self.last_failure = time.time()
return self.state
def blacked_out(self):
@@ -141,6 +139,10 @@ class BrokerConnection(object):
"""Return True iff socket is connected."""
return self.state is ConnectionStates.CONNECTED
+ def connecting(self):
+ """Return True iff socket is in intermediate connecting state."""
+ return self.state is ConnectionStates.CONNECTING
+
def close(self, error=None):
"""Close socket and fail all in-flight-requests.
@@ -158,7 +160,7 @@ class BrokerConnection(object):
self._rbuffer.seek(0)
self._rbuffer.truncate()
if error is None:
- error = Errors.ConnectionError()
+ error = Errors.ConnectionError(str(self))
while self.in_flight_requests:
ifr = self.in_flight_requests.popleft()
ifr.future.failure(error)
@@ -169,10 +171,12 @@ class BrokerConnection(object):
Can block on network if request is larger than send_buffer_bytes
"""
future = Future()
- if not self.connected():
- return future.failure(Errors.ConnectionError())
- if not self.can_send_more():
- return future.failure(Errors.TooManyInFlightRequests())
+ if self.connecting():
+ return future.failure(Errors.NodeNotReadyError(str(self)))
+ elif not self.connected():
+ return future.failure(Errors.ConnectionError(str(self)))
+ elif not self.can_send_more():
+ return future.failure(Errors.TooManyInFlightRequests(str(self)))
correlation_id = self._next_correlation_id()
header = RequestHeader(request,
correlation_id=correlation_id,
@@ -191,7 +195,7 @@ class BrokerConnection(object):
self._sock.setblocking(False)
except (AssertionError, ConnectionError) as e:
log.exception("Error sending %s to %s", request, self)
- error = Errors.ConnectionError(e)
+ error = Errors.ConnectionError("%s: %s" % (str(self), e))
self.close(error=error)
return future.failure(error)
log.debug('%s Request %d: %s', self, correlation_id, request)
@@ -324,11 +328,9 @@ class BrokerConnection(object):
' initialized on the broker')
elif ifr.correlation_id != recv_correlation_id:
-
-
error = Errors.CorrelationIdError(
- 'Correlation ids do not match: sent %d, recv %d'
- % (ifr.correlation_id, recv_correlation_id))
+ '%s: Correlation ids do not match: sent %d, recv %d'
+ % (str(self), ifr.correlation_id, recv_correlation_id))
ifr.future.failure(error)
self.close()
self._processing = False