summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/common.py17
-rw-r--r--kafka/producer/base.py92
-rw-r--r--test/test_producer.py8
3 files changed, 54 insertions, 63 deletions
diff --git a/kafka/common.py b/kafka/common.py
index cbb4013..50f8a77 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -82,6 +82,8 @@ RetryOptions = namedtuple("RetryOptions",
["limit", "backoff_ms", "retry_on_timeouts"])
+
+
#################
# Exceptions #
#################
@@ -228,3 +230,18 @@ def check_error(response):
if response.error:
error_class = kafka_errors.get(response.error, UnknownError)
raise error_class(response)
+
+
+RETRY_BACKOFF_ERROR_TYPES = (
+ KafkaUnavailableError, LeaderNotAvailableError,
+ ConnectionError, FailedPayloadsError
+)
+
+
+RETRY_REFRESH_ERROR_TYPES = (
+ NotLeaderForPartitionError, UnknownTopicOrPartitionError,
+ LeaderNotAvailableError, ConnectionError
+)
+
+
+RETRY_ERROR_TYPES = list(set(RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES))
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 1d5e045..a989e3f 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -15,12 +15,12 @@ from threading import Thread, Event
import six
from kafka.common import (
- ProduceRequest, TopicAndPartition,
- UnsupportedCodecError, FailedPayloadsError, RetryOptions,
- RequestTimedOutError, KafkaUnavailableError, LeaderNotAvailableError,
- UnknownTopicOrPartitionError, NotLeaderForPartitionError, ConnectionError,
- InvalidMessageError, MessageSizeTooLargeError
+ ProduceRequest, TopicAndPartition, RetryOptions,
+ UnsupportedCodecError, FailedPayloadsError, RequestTimedOutError
)
+from kafka.common import (
+ RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
+
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
from kafka.util import kafka_bytestring
@@ -88,70 +88,42 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
acks=req_acks,
timeout=ack_timeout)
- except RequestTimedOutError as ex:
- # should retry only if user is fine with duplicates
- if retry_options.retry_on_timeouts:
- reqs_to_retry = reqs
-
- except KafkaUnavailableError as ex:
- # backoff + retry
- do_backoff(retry_options)
- reqs_to_retry = get_requests_for_retry(reqs, retry_options)
-
- except (NotLeaderForPartitionError, UnknownTopicOrPartitionError) as ex:
- # refresh + retry
- client.load_metadata_for_topics()
- reqs_to_retry = get_requests_for_retry(reqs, retry_options)
-
- except (LeaderNotAvailableError, ConnectionError) as ex:
- # backoff + refresh + retry
- do_backoff(retry_options)
- client.load_metadata_for_topics()
- reqs_to_retry = get_requests_for_retry(reqs, retry_options)
-
- except FailedPayloadsError as ex:
- # retry only failed messages with backoff
- failed_reqs = ex.failed_payloads
- do_backoff(retry_options)
- reqs_to_retry = get_requests_for_retry(failed_reqs, retry_options)
-
- except (InvalidMessageError, MessageSizeTooLargeError) as ex:
- # "bad" messages, doesn't make sense to retry
- log.exception("Message error when sending: %s" % type(ex))
-
- except Exception as ex:
- log.exception("Unable to send message: %s" % type(ex))
-
- finally:
- reqs = []
+ except tuple(RETRY_ERROR_TYPES) as ex:
- if reqs_to_retry:
- reqs = reqs_to_retry
+ # by default, retry all sent messages
+ reqs_to_retry = reqs
+ if type(ex) == FailedPayloadsError:
+ reqs_to_retry = ex.failed_payloads
-def get_requests_for_retry(requests, retry_options):
- log.exception("Failed payloads count %s" % len(requests))
+ elif (type(ex) == RequestTimedOutError and
+ not retry_options.retry_on_timeouts):
+ reqs_to_retry = []
- # if no limit, retry all failed messages until success
- if retry_options.limit is None:
- return requests
+ # filter reqs_to_retry if there's a retry limit
+ if retry_options.limit and retry_options.limit > 0:
+ reqs_to_retry = [req._replace(retries=req.retries+1)
+ for req in reqs_to_retry
+ if req.retries < retry_options.limit]
- # makes sense to check failed reqs only if we have a limit > 0
- reqs_to_retry = []
- if retry_options.limit > 0:
- for req in requests:
- if req.retries < retry_options.limit:
- updated_req = req._replace(retries=req.retries+1)
- reqs_to_retry.append(updated_req)
+ # doing backoff before next retry
+ if (reqs_to_retry and type(ex) in RETRY_BACKOFF_ERROR_TYPES
+ and retry_options.backoff_ms):
+ log.warning("Doing backoff for %s(ms)." % retry_options.backoff_ms)
+ time.sleep(float(retry_options.backoff_ms) / 1000)
- return reqs_to_retry
+ # refresh topic metadata before next retry
+ if reqs_to_retry and type(ex) in RETRY_REFRESH_ERROR_TYPES:
+ client.load_metadata_for_topics()
+ except Exception as ex:
+ log.exception("Unable to send message: %s" % type(ex))
-def do_backoff(retry_options):
- if retry_options.backoff_ms:
- log.warning("Doing backoff for %s(ms)." % retry_options.backoff_ms)
- time.sleep(float(retry_options.backoff_ms) / 1000)
+ finally:
+ reqs = []
+ if reqs_to_retry:
+ reqs = reqs_to_retry
class Producer(object):
diff --git a/test/test_producer.py b/test/test_producer.py
index c9bdc47..c0dc873 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -61,6 +61,9 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
def _run_process(self, retries_limit=3, sleep_timeout=1):
# run _send_upstream process with the queue
stop_event = threading.Event()
+ retry_options = RetryOptions(limit=retries_limit,
+ backoff_ms=50,
+ retry_on_timeouts=False)
self.thread = threading.Thread(
target=_send_upstream,
args=(self.queue, self.client, CODEC_NONE,
@@ -68,8 +71,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
3, # batch length
Producer.ACK_AFTER_LOCAL_WRITE,
Producer.DEFAULT_ACK_TIMEOUT,
- RetryOptions(limit=retries_limit, backoff_ms=50,
- retry_on_timeouts=True),
+ retry_options,
stop_event))
self.thread.daemon = True
self.thread.start()
@@ -121,7 +123,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
# lets create a queue and add 10 messages for 10 different partitions
# to show how retries should work ideally
for i in range(10):
- self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
+ self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i))
def send_side_effect(reqs, *args, **kwargs):
raise FailedPayloadsError(reqs)