summaryrefslogtreecommitdiff
path: root/kazoo/protocol/connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'kazoo/protocol/connection.py')
-rw-r--r--kazoo/protocol/connection.py31
1 files changed, 21 insertions, 10 deletions
diff --git a/kazoo/protocol/connection.py b/kazoo/protocol/connection.py
index fc3586c..70de241 100644
--- a/kazoo/protocol/connection.py
+++ b/kazoo/protocol/connection.py
@@ -561,7 +561,6 @@ class ConnectionHandler(object):
def _connect_attempt(self, host, hostip, port, retry):
client = self.client
KazooTimeoutError = self.handler.timeout_exception
- close_connection = False
self._socket = None
@@ -582,13 +581,14 @@ class ConnectionHandler(object):
connect_timeout = connect_timeout / 1000.0
retry.reset()
self.ping_outstanding.clear()
+ last_send = time.time()
with self._socket_error_handling():
- while not close_connection:
+ while True:
# Watch for something to read or send
- jitter_time = random.randint(0, 40) / 100.0
+ jitter_time = random.randint(1, 40) / 100.0
+ deadline = last_send + read_timeout / 2.0 - jitter_time
# Ensure our timeout is positive
- timeout = max([read_timeout / 2.0 - jitter_time,
- jitter_time])
+ timeout = max([deadline - time.time(), jitter_time])
s = self.handler.select([self._socket, self._read_sock],
[], [], timeout)[0]
@@ -597,12 +597,23 @@ class ConnectionHandler(object):
self.ping_outstanding.clear()
raise ConnectionDropped(
"outstanding heartbeat ping not received")
- self._send_ping(connect_timeout)
- elif s[0] == self._socket:
- response = self._read_socket(read_timeout)
- close_connection = response == CLOSE_RESPONSE
else:
- self._send_request(read_timeout, connect_timeout)
+ if self._socket in s:
+ response = self._read_socket(read_timeout)
+ if response == CLOSE_RESPONSE:
+ break
+ # Check if any requests need sending before proceeding
+ # to process more responses. Otherwise the responses
+ # may choke out the requests. See PR#633.
+ if self._read_sock in s:
+ self._send_request(read_timeout, connect_timeout)
+ # Requests act as implicit pings.
+ last_send = time.time()
+ continue
+
+ if time.time() >= deadline:
+ self._send_ping(connect_timeout)
+ last_send = time.time()
self.logger.info('Closing connection to %s:%s', host, port)
client._session_callback(KeeperState.CLOSED)
return STOP_CONNECTING