diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-05 12:50:43 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-06 19:27:56 -0700 |
commit | 0f1579b047fc63c09596897cc1c83730bd0ddb94 (patch) | |
tree | fa412a8027b5624132700dc76df8dc97cda0ffae /kafka/producer/base.py | |
parent | 9712f613c9e7e4b0436f501b513249eab4edc4e9 (diff) | |
download | kafka-python-0f1579b047fc63c09596897cc1c83730bd0ddb94.tar.gz |
Log retries and failed messages in async producer (configurable as full messages or hash())
Diffstat (limited to 'kafka/producer/base.py')
-rw-r--r-- | kafka/producer/base.py | 36 |
1 files changed, 24 insertions, 12 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 2f47d87..cd14ab6 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -36,12 +36,14 @@ ASYNC_QUEUE_PUT_TIMEOUT = 0 ASYNC_RETRY_LIMIT = None ASYNC_RETRY_BACKOFF_MS = 100 ASYNC_RETRY_ON_TIMEOUTS = True +ASYNC_LOG_MESSAGES_ON_ERROR = True STOP_ASYNC_PRODUCER = -1 def _send_upstream(queue, client, codec, batch_time, batch_size, - req_acks, ack_timeout, retry_options, stop_event): + req_acks, ack_timeout, retry_options, stop_event, + log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR): """ Listen on the queue for a specified number of messages or till a specified timeout and send them upstream to the brokers in one @@ -123,6 +125,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if error_cls: _handle_error(error_cls, orig_req) + log.error('Error sending ProduceRequest to %s:%d with msgs %s', + orig_req.topic, orig_req.partition, + orig_req.messages if log_messages_on_error + else hash(orig_req.messages)) if not reqs_to_retry: request_tries = {} @@ -147,6 +153,13 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, or (count < retry_options.limit)) ) + # Log messages we are going to retry + for orig_req in request_tries.keys(): + log.info('Retrying ProduceRequest to %s:%d with msgs %s', + orig_req.topic, orig_req.partition, + orig_req.messages if log_messages_on_error + else hash(orig_req.messages)) + class Producer(object): """ @@ -185,7 +198,8 @@ class Producer(object): async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, - async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): + async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT, + async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR): if batch_send: async = True @@ -218,16 +232,14 @@ class Producer(object): backoff_ms=async_retry_backoff_ms, retry_on_timeouts=async_retry_on_timeouts) self.thread_stop_event = Event() - self.thread = Thread(target=_send_upstream, - args=(self.queue, - self.client.copy(), - self.codec, - batch_send_every_t, - batch_send_every_n, - self.req_acks, - self.ack_timeout, - async_retry_options, - self.thread_stop_event)) + self.thread = Thread( + target=_send_upstream, + args=(self.queue, self.client.copy(), self.codec, + batch_send_every_t, batch_send_every_n, + self.req_acks, self.ack_timeout, + async_retry_options, self.thread_stop_event), + kwargs={'log_messages_on_error': async_log_messages_on_error} + ) # Thread will die if main thread exits self.thread.daemon = True |