summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-06-19 22:27:10 -0700
committerDana Powers <dana.powers@gmail.com>2017-06-18 23:30:50 -0700
commitc12e04bc0c30c627d849d0057e4a460edc5ffafe (patch)
tree47b5c83ad5affaeac6847f3a0886c2d8714e90de
parentbbbac3dc3678df069ef72ecfea62d435bc519a07 (diff)
downloadkafka-python-c12e04bc0c30c627d849d0057e4a460edc5ffafe.tar.gz
Use randomized exponential backoff policy for BrokerConnection
-rw-r--r--kafka/client_async.py29
-rw-r--r--kafka/conn.py32
-rw-r--r--kafka/consumer/group.py5
-rw-r--r--kafka/producer/kafka.py5
4 files changed, 49 insertions, 22 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index d8c2389..44bc9af 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -67,6 +67,10 @@ class KafkaClient(object):
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host.
Default: 50.
+ reconnect_backoff_max (int): If higher than reconnect_backoff_ms,
+ node reconnect backoff will increase on each consecutive failure
+ up to this maximum. The actual backoff is chosen randomly from
+ an exponentially increasing range. Default: 60000.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 40000.
retry_backoff_ms (int): Milliseconds to backoff when retrying on
@@ -137,6 +141,7 @@ class KafkaClient(object):
'request_timeout_ms': 40000,
'connections_max_idle_ms': 9 * 60 * 1000,
'reconnect_backoff_ms': 50,
+ 'reconnect_backoff_max': 60000,
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
@@ -432,15 +437,7 @@ class KafkaClient(object):
"""
if node_id not in self._conns:
return 0
-
- conn = self._conns[node_id]
- time_waited_ms = time.time() - (conn.last_attempt or 0)
- if conn.disconnected():
- return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0)
- elif conn.connecting():
- return 0
- else:
- return 999999999
+ return self._conns[node_id].connection_delay()
def is_ready(self, node_id, metadata_priority=True):
"""Check whether a node is ready to send more requests.
@@ -655,12 +652,10 @@ class KafkaClient(object):
def least_loaded_node(self):
"""Choose the node with fewest outstanding requests, with fallbacks.
- This method will prefer a node with an existing connection, but will
- potentially choose a node for which we don't yet have a connection if
- all existing connections are in use. This method will never choose a
- node that was disconnected within the reconnect backoff period.
- If all else fails, the method will attempt to bootstrap again using the
- bootstrap_servers list.
+ This method will prefer a node with an existing connection and no
+ in-flight-requests. If no such node is found, a node will be chosen
+ randomly from disconnected nodes that are not "blacked out" (i.e.,
+ are not subject to a reconnect backoff).
Returns:
node_id or None if no suitable node was found
@@ -695,10 +690,6 @@ class KafkaClient(object):
elif 'bootstrap' in self._conns:
return 'bootstrap'
- # Last option: try to bootstrap again
- # this should only happen if no prior bootstrap has been successful
- log.error('No nodes found in metadata -- retrying bootstrap')
- self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
return None
def set_topics(self, topics):
diff --git a/kafka/conn.py b/kafka/conn.py
index 12bd08d..687b748 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -5,7 +5,7 @@ import copy
import errno
import logging
import io
-from random import shuffle
+from random import randint, shuffle
import socket
import time
import traceback
@@ -140,6 +140,7 @@ class BrokerConnection(object):
'node_id': 0,
'request_timeout_ms': 40000,
'reconnect_backoff_ms': 50,
+ 'reconnect_backoff_max': 60000,
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
@@ -199,6 +200,7 @@ class BrokerConnection(object):
assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
self.state = ConnectionStates.DISCONNECTED
+ self._reset_reconnect_backoff()
self._sock = None
self._ssl_context = None
if self.config['ssl_context'] is not None:
@@ -305,6 +307,7 @@ class BrokerConnection(object):
else:
log.debug('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
+ self._reset_reconnect_backoff()
self.config['state_change_callback'](self)
# Connection failed
@@ -340,6 +343,7 @@ class BrokerConnection(object):
log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username'])
log.debug('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
+ self._reset_reconnect_backoff()
self.config['state_change_callback'](self)
return self.state
@@ -475,11 +479,19 @@ class BrokerConnection(object):
re-establish a connection yet
"""
if self.state is ConnectionStates.DISCONNECTED:
- backoff = self.config['reconnect_backoff_ms'] / 1000.0
- if time.time() < self.last_attempt + backoff:
+ if time.time() < self.last_attempt + self._reconnect_backoff:
return True
return False
+ def connection_delay(self):
+ time_waited_ms = time.time() - (self.last_attempt or 0)
+ if conn.state is ConnectionStates.DISCONNECTED:
+ return max(self._reconnect_backoff - time_waited_ms, 0)
+ elif conn.connecting():
+ return 0
+ else:
+ return 999999999
+
def connected(self):
"""Return True iff socket is connected."""
return self.state is ConnectionStates.CONNECTED
@@ -495,6 +507,19 @@ class BrokerConnection(object):
"""Return True iff socket is closed"""
return self.state is ConnectionStates.DISCONNECTED
+ def _reset_reconnect_backoff(self):
+ self._failures = 0
+ self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0
+
+ def _update_reconnect_backoff(self):
+ if self.config['reconnect_backoff_max'] > self.config['reconnect_backoff_ms']:
+ self._failures += 1
+ self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1)
+ self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max'])
+ self._reconnect_backoff = randint(self.config['reconnect_backoff_ms'], self._reconnect_backoff)
+ self._reconnect_backoff /= 1000.0
+ log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures)
+
def close(self, error=None):
"""Close socket and fail all in-flight-requests.
@@ -512,6 +537,7 @@ class BrokerConnection(object):
log.info('%s: Closing connection. %s', self, error or '')
self.state = ConnectionStates.DISCONNECTING
self.config['state_change_callback'](self)
+ self._update_reconnect_backoff()
if self._sock:
self._sock.close()
self._sock = None
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 15a8947..2512139 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -91,6 +91,10 @@ class KafkaConsumer(six.Iterator):
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host.
Default: 50.
+ reconnect_backoff_max (int): If higher than reconnect_backoff_ms,
+ node reconnect backoff will increase on each consecutive failure
+ up to this maximum. The actual backoff is chosen randomly from
+ an exponentially increasing range. Default: 60000.
max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per
broker connection. Default: 5.
@@ -230,6 +234,7 @@ class KafkaConsumer(six.Iterator):
'request_timeout_ms': 40 * 1000,
'retry_backoff_ms': 100,
'reconnect_backoff_ms': 50,
+ 'reconnect_backoff_max': 60000,
'max_in_flight_requests_per_connection': 5,
'auto_offset_reset': 'latest',
'enable_auto_commit': True,
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 51c2182..482628d 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -199,6 +199,10 @@ class KafkaProducer(object):
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host.
Default: 50.
+ reconnect_backoff_max (int): If higher than reconnect_backoff_ms,
+ node reconnect backoff will increase on each consecutive failure
+ up to this maximum. The actual backoff is chosen randomly from
+ an exponentially increasing range. Default: 60000.
max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per
broker connection. Default: 5.
@@ -276,6 +280,7 @@ class KafkaProducer(object):
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
'reconnect_backoff_ms': 50,
+ 'reconnect_backoff_max': 60000,
'max_in_flight_requests_per_connection': 5,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,