summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Brown <jbrown@easypost.com>2016-04-25 10:56:58 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-25 18:56:00 -0700
commit161fa6d76b8220954eb52554e4bebc470308172d (patch)
tree32b0252ed7daf5990060a190f74971dc56cbf88a
parent22dd002800839fd0788648e8308104bb012d96b7 (diff)
downloadkafka-python-161fa6d76b8220954eb52554e4bebc470308172d.tar.gz
handle unexpected reads in client_async
Should fix #661.
-rw-r--r--kafka/client_async.py19
1 files changed, 19 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index ea2621e..6f5d1fe 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -448,6 +448,25 @@ class KafkaClient(object):
continue
conn = key.data
processed.add(conn)
+
+ if not conn.in_flight_requests:
+ # if we got an EVENT_READ but there were no in-flight requests, one of
+ # two things has happened:
+ #
+ # 1. The remote end closed the connection (because it died, or because
+ # a firewall timed out, or whatever)
+ # 2. The protocol is out of sync.
+ #
+ # either way, we can no longer safely use this connection
+ #
+ # Do a 1-byte read to clear the READ flag, and then close the conn
+ unexpected_data = key.fileobj.recv(1)
+ if unexpected_data: # anything other than a 0-byte read means protocol issues
+ log.warning('Protocol out of sync on %r, closing', conn)
+ conn.close()
+ continue
+
+ # Accumulate as many responses as the connection has pending
while conn.in_flight_requests:
response = conn.recv() # Note: conn.recv runs callbacks / errbacks