summaryrefslogtreecommitdiff
path: root/test/test_client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_client_async.py')
-rw-r--r--test/test_client_async.py38
1 files changed, 36 insertions, 2 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 8f6ac3f..d4e6d37 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -1,3 +1,5 @@
+from __future__ import absolute_import, division
+
# selectors in stdlib as of py3.4
try:
import selectors # pylint: disable=import-error
@@ -10,7 +12,7 @@ import time
import pytest
-from kafka.client_async import KafkaClient
+from kafka.client_async import KafkaClient, IdleConnectionManager
from kafka.conn import ConnectionStates
import kafka.errors as Errors
from kafka.future import Future
@@ -319,7 +321,10 @@ def client(mocker):
mocker.patch.object(KafkaClient, '_bootstrap')
_poll = mocker.patch.object(KafkaClient, '_poll')
- cli = KafkaClient(request_timeout_ms=9999999, reconnect_backoff_ms=2222, api_version=(0, 9))
+ cli = KafkaClient(request_timeout_ms=9999999,
+ reconnect_backoff_ms=2222,
+ connections_max_idle_ms=float('inf'),
+ api_version=(0, 9))
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
tasks.return_value = 9999999
@@ -395,3 +400,32 @@ def test_schedule():
def test_unschedule():
pass
+
+
+def test_idle_connection_manager(mocker):
+ t = mocker.patch.object(time, 'time')
+ t.return_value = 0
+
+ idle = IdleConnectionManager(100)
+ assert idle.next_check_ms() == float('inf')
+
+ idle.update('foo')
+ assert not idle.is_expired('foo')
+ assert idle.poll_expired_connection() is None
+ assert idle.next_check_ms() == 100
+
+ t.return_value = 90 / 1000
+ assert not idle.is_expired('foo')
+ assert idle.poll_expired_connection() is None
+ assert idle.next_check_ms() == 10
+
+ t.return_value = 100 / 1000
+ assert idle.is_expired('foo')
+ assert idle.next_check_ms() == 0
+
+ conn_id, conn_ts = idle.poll_expired_connection()
+ assert conn_id == 'foo'
+ assert conn_ts == 0
+
+ idle.remove('foo')
+ assert idle.next_check_ms() == float('inf')