summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDamien Diederen <dd@crosstwine.com>2020-11-17 16:16:11 +0100
committerJeff Widman <jeff@jeffwidman.com>2020-12-13 12:28:37 -0800
commit89e0660371df940a4c15f5f6ab4c540bbd109d20 (patch)
tree207bb1bdcf9b738403991e3e078666618ae0a2c6
parentb2f7a4670c627bab26183f7f9a0e08f12d98774f (diff)
downloadkazoo-89e0660371df940a4c15f5f6ab4c540bbd109d20.tar.gz
fix(core): do not allow responses to choke request and ping processing
Without this patch, a single select event is processed by iteration in the 'ConnectionHandler' event loop. In a scenario where the client issues a large number of async requests with an important amplification factor, e.g. 'get_children_async' on a large node, it is possible for the 'select' operation to almost always return a "response ready" socket--as the server is often able to process, serialize and ship a new reponse while Kazoo processes the previous one. That response socket often (always?) ends up at the beginning of the list returned by 'select'. As only 'select_result[0]' is processed in the loop, this can cause the client to ignore the "request ready" FD for a long time, during which no requests or pings are sent. In effect, asynchronously "browsing" a large tree of nodes can stretch that duration to the point where it exceeds the timeout--causing the client to lose its session. This patch considers both descriptors after 'select', and also arranges for pings to be sent in case it encounters an "unending" stream of responses to requests which were sent earlier.
-rw-r--r--kazoo/protocol/connection.py31
1 files changed, 21 insertions, 10 deletions
diff --git a/kazoo/protocol/connection.py b/kazoo/protocol/connection.py
index fc3586c..70de241 100644
--- a/kazoo/protocol/connection.py
+++ b/kazoo/protocol/connection.py
@@ -561,7 +561,6 @@ class ConnectionHandler(object):
def _connect_attempt(self, host, hostip, port, retry):
client = self.client
KazooTimeoutError = self.handler.timeout_exception
- close_connection = False
self._socket = None
@@ -582,13 +581,14 @@ class ConnectionHandler(object):
connect_timeout = connect_timeout / 1000.0
retry.reset()
self.ping_outstanding.clear()
+ last_send = time.time()
with self._socket_error_handling():
- while not close_connection:
+ while True:
# Watch for something to read or send
- jitter_time = random.randint(0, 40) / 100.0
+ jitter_time = random.randint(1, 40) / 100.0
+ deadline = last_send + read_timeout / 2.0 - jitter_time
# Ensure our timeout is positive
- timeout = max([read_timeout / 2.0 - jitter_time,
- jitter_time])
+ timeout = max([deadline - time.time(), jitter_time])
s = self.handler.select([self._socket, self._read_sock],
[], [], timeout)[0]
@@ -597,12 +597,23 @@ class ConnectionHandler(object):
self.ping_outstanding.clear()
raise ConnectionDropped(
"outstanding heartbeat ping not received")
- self._send_ping(connect_timeout)
- elif s[0] == self._socket:
- response = self._read_socket(read_timeout)
- close_connection = response == CLOSE_RESPONSE
else:
- self._send_request(read_timeout, connect_timeout)
+ if self._socket in s:
+ response = self._read_socket(read_timeout)
+ if response == CLOSE_RESPONSE:
+ break
+ # Check if any requests need sending before proceeding
+ # to process more responses. Otherwise the responses
+ # may choke out the requests. See PR#633.
+ if self._read_sock in s:
+ self._send_request(read_timeout, connect_timeout)
+ # Requests act as implicit pings.
+ last_send = time.time()
+ continue
+
+ if time.time() >= deadline:
+ self._send_ping(connect_timeout)
+ last_send = time.time()
self.logger.info('Closing connection to %s:%s', host, port)
client._session_callback(KeeperState.CLOSED)
return STOP_CONNECTING