diff options
-rw-r--r-- | kafka/common.py | 17 | ||||
-rw-r--r-- | kafka/producer/base.py | 92 | ||||
-rw-r--r-- | test/test_producer.py | 8 |
3 files changed, 54 insertions, 63 deletions
diff --git a/kafka/common.py b/kafka/common.py index cbb4013..50f8a77 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -82,6 +82,8 @@ RetryOptions = namedtuple("RetryOptions", ["limit", "backoff_ms", "retry_on_timeouts"]) + + ################# # Exceptions # ################# @@ -228,3 +230,18 @@ def check_error(response): if response.error: error_class = kafka_errors.get(response.error, UnknownError) raise error_class(response) + + +RETRY_BACKOFF_ERROR_TYPES = ( + KafkaUnavailableError, LeaderNotAvailableError, + ConnectionError, FailedPayloadsError +) + + +RETRY_REFRESH_ERROR_TYPES = ( + NotLeaderForPartitionError, UnknownTopicOrPartitionError, + LeaderNotAvailableError, ConnectionError +) + + +RETRY_ERROR_TYPES = list(set(RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES)) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 1d5e045..a989e3f 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -15,12 +15,12 @@ from threading import Thread, Event import six from kafka.common import ( - ProduceRequest, TopicAndPartition, - UnsupportedCodecError, FailedPayloadsError, RetryOptions, - RequestTimedOutError, KafkaUnavailableError, LeaderNotAvailableError, - UnknownTopicOrPartitionError, NotLeaderForPartitionError, ConnectionError, - InvalidMessageError, MessageSizeTooLargeError + ProduceRequest, TopicAndPartition, RetryOptions, + UnsupportedCodecError, FailedPayloadsError, RequestTimedOutError ) +from kafka.common import ( + RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES) + from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set from kafka.util import kafka_bytestring @@ -88,70 +88,42 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, acks=req_acks, timeout=ack_timeout) - except RequestTimedOutError as ex: - # should retry only if user is fine with duplicates - if retry_options.retry_on_timeouts: - reqs_to_retry = reqs - - except KafkaUnavailableError as ex: - # backoff + retry - do_backoff(retry_options) - reqs_to_retry = get_requests_for_retry(reqs, retry_options) - - except (NotLeaderForPartitionError, UnknownTopicOrPartitionError) as ex: - # refresh + retry - client.load_metadata_for_topics() - reqs_to_retry = get_requests_for_retry(reqs, retry_options) - - except (LeaderNotAvailableError, ConnectionError) as ex: - # backoff + refresh + retry - do_backoff(retry_options) - client.load_metadata_for_topics() - reqs_to_retry = get_requests_for_retry(reqs, retry_options) - - except FailedPayloadsError as ex: - # retry only failed messages with backoff - failed_reqs = ex.failed_payloads - do_backoff(retry_options) - reqs_to_retry = get_requests_for_retry(failed_reqs, retry_options) - - except (InvalidMessageError, MessageSizeTooLargeError) as ex: - # "bad" messages, doesn't make sense to retry - log.exception("Message error when sending: %s" % type(ex)) - - except Exception as ex: - log.exception("Unable to send message: %s" % type(ex)) - - finally: - reqs = [] + except tuple(RETRY_ERROR_TYPES) as ex: - if reqs_to_retry: - reqs = reqs_to_retry + # by default, retry all sent messages + reqs_to_retry = reqs + if type(ex) == FailedPayloadsError: + reqs_to_retry = ex.failed_payloads -def get_requests_for_retry(requests, retry_options): - log.exception("Failed payloads count %s" % len(requests)) + elif (type(ex) == RequestTimedOutError and + not retry_options.retry_on_timeouts): + reqs_to_retry = [] - # if no limit, retry all failed messages until success - if retry_options.limit is None: - return requests + # filter reqs_to_retry if there's a retry limit + if retry_options.limit and retry_options.limit > 0: + reqs_to_retry = [req._replace(retries=req.retries+1) + for req in reqs_to_retry + if req.retries < retry_options.limit] - # makes sense to check failed reqs only if we have a limit > 0 - reqs_to_retry = [] - if retry_options.limit > 0: - for req in requests: - if req.retries < retry_options.limit: - updated_req = req._replace(retries=req.retries+1) - reqs_to_retry.append(updated_req) + # doing backoff before next retry + if (reqs_to_retry and type(ex) in RETRY_BACKOFF_ERROR_TYPES + and retry_options.backoff_ms): + log.warning("Doing backoff for %s(ms)." % retry_options.backoff_ms) + time.sleep(float(retry_options.backoff_ms) / 1000) - return reqs_to_retry + # refresh topic metadata before next retry + if reqs_to_retry and type(ex) in RETRY_REFRESH_ERROR_TYPES: + client.load_metadata_for_topics() + except Exception as ex: + log.exception("Unable to send message: %s" % type(ex)) -def do_backoff(retry_options): - if retry_options.backoff_ms: - log.warning("Doing backoff for %s(ms)." % retry_options.backoff_ms) - time.sleep(float(retry_options.backoff_ms) / 1000) + finally: + reqs = [] + if reqs_to_retry: + reqs = reqs_to_retry class Producer(object): diff --git a/test/test_producer.py b/test/test_producer.py index c9bdc47..c0dc873 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -61,6 +61,9 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): def _run_process(self, retries_limit=3, sleep_timeout=1): # run _send_upstream process with the queue stop_event = threading.Event() + retry_options = RetryOptions(limit=retries_limit, + backoff_ms=50, + retry_on_timeouts=False) self.thread = threading.Thread( target=_send_upstream, args=(self.queue, self.client, CODEC_NONE, @@ -68,8 +71,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): 3, # batch length Producer.ACK_AFTER_LOCAL_WRITE, Producer.DEFAULT_ACK_TIMEOUT, - RetryOptions(limit=retries_limit, backoff_ms=50, - retry_on_timeouts=True), + retry_options, stop_event)) self.thread.daemon = True self.thread.start() @@ -121,7 +123,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): # lets create a queue and add 10 messages for 10 different partitions # to show how retries should work ideally for i in range(10): - self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) + self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i)) def send_side_effect(reqs, *args, **kwargs): raise FailedPayloadsError(reqs) |