summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-04-04 18:07:44 -0700
committerDana Powers <dana.powers@gmail.com>2017-04-05 23:14:23 -0700
commite5117a2b61ceea86dbb46f6510fc4a878f669487 (patch)
tree4df566314f7cb072b9d87034d3615db4ad17540d
parent29e699d940df5fa3ae3ee77cc57e9f90da1396c7 (diff)
downloadkafka-python-timeout_idle_connections.tar.gz
Add support for connections_max_idle_mstimeout_idle_connections
-rw-r--r--kafka/client_async.py95
-rw-r--r--kafka/producer/kafka.py2
-rw-r--r--test/test_client_async.py38
3 files changed, 131 insertions, 4 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 16ebb99..e1b10b3 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -135,6 +135,7 @@ class KafkaClient(object):
'bootstrap_servers': 'localhost',
'client_id': 'kafka-python-' + __version__,
'request_timeout_ms': 40000,
+ 'connections_max_idle_ms': 9 * 60 * 1000,
'reconnect_backoff_ms': 50,
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
@@ -194,6 +195,7 @@ class KafkaClient(object):
self._wake_r.setblocking(False)
self._wake_lock = threading.Lock()
self._selector.register(self._wake_r, selectors.EVENT_READ)
+ self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms'])
self._closed = False
self._sensors = None
if self.config['metrics']:
@@ -291,6 +293,8 @@ class KafkaClient(object):
if self._sensors:
self._sensors.connection_created.record()
+ self._idle_expiry_manager.update(node_id)
+
if 'bootstrap' in self._conns and node_id != 'bootstrap':
bootstrap = self._conns.pop('bootstrap')
# XXX: make conn.close() require error to cause refresh
@@ -308,7 +312,13 @@ class KafkaClient(object):
pass
if self._sensors:
self._sensors.connection_closed.record()
- if self._refresh_on_disconnects and not self._closed:
+
+ idle_disconnect = False
+ if self._idle_expiry_manager.is_expired(node_id):
+ idle_disconnect = True
+ self._idle_expiry_manager.remove(node_id)
+
+ if self._refresh_on_disconnects and not self._closed and not idle_disconnect:
log.warning("Node %s connection failed -- refreshing metadata", node_id)
self.cluster.request_update()
@@ -514,10 +524,12 @@ class KafkaClient(object):
if future and future.is_done:
timeout = 0
else:
+ idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms()
timeout = min(
timeout_ms,
metadata_timeout_ms,
self._delayed_tasks.next_at() * 1000,
+ idle_connection_timeout_ms,
self.config['request_timeout_ms'])
timeout = max(0, timeout / 1000.0) # avoid negative timeouts
@@ -572,6 +584,8 @@ class KafkaClient(object):
conn.close(Errors.ConnectionError('Socket EVENT_READ without in-flight-requests'))
continue
+ self._idle_expiry_manager.update(conn.node_id)
+
# Accumulate as many responses as the connection has pending
while conn.in_flight_requests:
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
@@ -601,6 +615,7 @@ class KafkaClient(object):
if self._sensors:
self._sensors.io_time.record((time.time() - end_select) * 1000000000)
+ self._maybe_close_oldest_connection()
return responses
def in_flight_request_count(self, node_id=None):
@@ -846,6 +861,14 @@ class KafkaClient(object):
except socket.error:
break
+ def _maybe_close_oldest_connection(self):
+ expired_connection = self._idle_expiry_manager.poll_expired_connection()
+ if expired_connection:
+ conn_id, ts = expired_connection
+ idle_ms = (time.time() - ts) * 1000
+ log.info('Closing idle connection %s, last active %d ms ago', conn_id, idle_ms)
+ self.close(node_id=conn_id)
+
class DelayedTaskQueue(object):
# see https://docs.python.org/2/library/heapq.html
@@ -920,6 +943,76 @@ class DelayedTaskQueue(object):
return ready_tasks
+# OrderedDict requires python2.7+
+try:
+ from collections import OrderedDict
+except ImportError:
+ # If we dont have OrderedDict, we'll fallback to dict with O(n) priority reads
+ OrderedDict = dict
+
+
+class IdleConnectionManager(object):
+ def __init__(self, connections_max_idle_ms):
+ if connections_max_idle_ms > 0:
+ self.connections_max_idle = connections_max_idle_ms / 1000
+ else:
+ self.connections_max_idle = float('inf')
+ self.next_idle_close_check_time = None
+ self.update_next_idle_close_check_time(time.time())
+ self.lru_connections = OrderedDict()
+
+ def update(self, conn_id):
+ # order should reflect last-update
+ if conn_id in self.lru_connections:
+ del self.lru_connections[conn_id]
+ self.lru_connections[conn_id] = time.time()
+
+ def remove(self, conn_id):
+ if conn_id in self.lru_connections:
+ del self.lru_connections[conn_id]
+
+ def is_expired(self, conn_id):
+ if conn_id not in self.lru_connections:
+ return None
+ return time.time() >= self.lru_connections[conn_id] + self.connections_max_idle
+
+ def next_check_ms(self):
+ now = time.time()
+ if not self.lru_connections:
+ return float('inf')
+ elif self.next_idle_close_check_time <= now:
+ return 0
+ else:
+ return int((self.next_idle_close_check_time - now) * 1000)
+
+ def update_next_idle_close_check_time(self, ts):
+ self.next_idle_close_check_time = ts + self.connections_max_idle
+
+ def poll_expired_connection(self):
+ if time.time() < self.next_idle_close_check_time:
+ return None
+
+ if not len(self.lru_connections):
+ return None
+
+ oldest_conn_id = None
+ oldest_ts = None
+ if OrderedDict is dict:
+ for conn_id, ts in self.lru_connections.items():
+ if oldest_conn_id is None or ts < oldest_ts:
+ oldest_conn_id = conn_id
+ oldest_ts = ts
+ else:
+ (oldest_conn_id, oldest_ts) = next(iter(self.lru_connections.items()))
+
+ self.update_next_idle_close_check_time(oldest_ts)
+
+ if time.time() >= oldest_ts + self.connections_max_idle:
+ return (oldest_conn_id, oldest_ts)
+ else:
+ return None
+
+
class KafkaClientMetrics(object):
def __init__(self, metrics, metric_group_prefix, conns):
self.metrics = metrics
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 91e253b..22f60bd 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -266,7 +266,7 @@ class KafkaProducer(object):
'linger_ms': 0,
'partitioner': DefaultPartitioner(),
'buffer_memory': 33554432,
- 'connections_max_idle_ms': 600000, # not implemented yet
+ 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
'max_block_ms': 60000,
'max_request_size': 1048576,
'metadata_max_age_ms': 300000,
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 8f6ac3f..d4e6d37 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -1,3 +1,5 @@
+from __future__ import absolute_import, division
+
# selectors in stdlib as of py3.4
try:
import selectors # pylint: disable=import-error
@@ -10,7 +12,7 @@ import time
import pytest
-from kafka.client_async import KafkaClient
+from kafka.client_async import KafkaClient, IdleConnectionManager
from kafka.conn import ConnectionStates
import kafka.errors as Errors
from kafka.future import Future
@@ -319,7 +321,10 @@ def client(mocker):
mocker.patch.object(KafkaClient, '_bootstrap')
_poll = mocker.patch.object(KafkaClient, '_poll')
- cli = KafkaClient(request_timeout_ms=9999999, reconnect_backoff_ms=2222, api_version=(0, 9))
+ cli = KafkaClient(request_timeout_ms=9999999,
+ reconnect_backoff_ms=2222,
+ connections_max_idle_ms=float('inf'),
+ api_version=(0, 9))
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
tasks.return_value = 9999999
@@ -395,3 +400,32 @@ def test_schedule():
def test_unschedule():
pass
+
+
+def test_idle_connection_manager(mocker):
+ t = mocker.patch.object(time, 'time')
+ t.return_value = 0
+
+ idle = IdleConnectionManager(100)
+ assert idle.next_check_ms() == float('inf')
+
+ idle.update('foo')
+ assert not idle.is_expired('foo')
+ assert idle.poll_expired_connection() is None
+ assert idle.next_check_ms() == 100
+
+ t.return_value = 90 / 1000
+ assert not idle.is_expired('foo')
+ assert idle.poll_expired_connection() is None
+ assert idle.next_check_ms() == 10
+
+ t.return_value = 100 / 1000
+ assert idle.is_expired('foo')
+ assert idle.next_check_ms() == 0
+
+ conn_id, conn_ts = idle.poll_expired_connection()
+ assert conn_id == 'foo'
+ assert conn_ts == 0
+
+ idle.remove('foo')
+ assert idle.next_check_ms() == float('inf')