summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-11 21:48:31 -0700
committerJeff Widman <jeff@jeffwidman.com>2019-03-12 13:36:52 -0700
commit2a91ca1a8fd767d2e1f9981c7431ce31dcbddf00 (patch)
tree0a7df1ada8e4e502c90215d9b54bf16684b3b8e1 /test
parent8c0792581d8a38822c01b40f5d3926c659b0c439 (diff)
downloadkafka-python-2a91ca1a8fd767d2e1f9981c7431ce31dcbddf00.tar.gz
Synchronize puts to KafkaConsumer protocol buffer during async sends
Diffstat (limited to 'test')
-rw-r--r--test/test_conn.py28
1 files changed, 24 insertions, 4 deletions
diff --git a/test/test_conn.py b/test/test_conn.py
index 27d77be..953c112 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -112,8 +112,8 @@ def test_send_connecting(conn):
def test_send_max_ifr(conn):
conn.state = ConnectionStates.CONNECTED
max_ifrs = conn.config['max_in_flight_requests_per_connection']
- for _ in range(max_ifrs):
- conn.in_flight_requests.append('foo')
+ for i in range(max_ifrs):
+ conn.in_flight_requests[i] = 'foo'
f = conn.send('foobar')
assert f.failed() is True
assert isinstance(f.exception, Errors.TooManyInFlightRequests)
@@ -170,9 +170,9 @@ def test_send_error(_socket, conn):
def test_can_send_more(conn):
assert conn.can_send_more() is True
max_ifrs = conn.config['max_in_flight_requests_per_connection']
- for _ in range(max_ifrs):
+ for i in range(max_ifrs):
assert conn.can_send_more() is True
- conn.in_flight_requests.append('foo')
+ conn.in_flight_requests[i] = 'foo'
assert conn.can_send_more() is False
@@ -311,3 +311,23 @@ def test_relookup_on_failure():
assert conn._sock_afi == afi2
assert conn._sock_addr == sockaddr2
conn.close()
+
+
+def test_requests_timed_out(conn):
+ with mock.patch("time.time", return_value=0):
+ # No in-flight requests, not timed out
+ assert not conn.requests_timed_out()
+
+ # Single request, timestamp = now (0)
+ conn.in_flight_requests[0] = ('foo', 0)
+ assert not conn.requests_timed_out()
+
+ # Add another request w/ timestamp > request_timeout ago
+ request_timeout = conn.config['request_timeout_ms']
+ expired_timestamp = 0 - request_timeout - 1
+ conn.in_flight_requests[1] = ('bar', expired_timestamp)
+ assert conn.requests_timed_out()
+
+ # Drop the expired request and we should be good to go again
+ conn.in_flight_requests.pop(1)
+ assert not conn.requests_timed_out()