summaryrefslogtreecommitdiff
path: root/kafka/producer/base.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-05 12:50:43 -0700
committerDana Powers <dana.powers@rd.io>2015-06-06 19:27:56 -0700
commit0f1579b047fc63c09596897cc1c83730bd0ddb94 (patch)
treefa412a8027b5624132700dc76df8dc97cda0ffae /kafka/producer/base.py
parent9712f613c9e7e4b0436f501b513249eab4edc4e9 (diff)
downloadkafka-python-0f1579b047fc63c09596897cc1c83730bd0ddb94.tar.gz
Log retries and failed messages in async producer (configurable as full messages or hash())
Diffstat (limited to 'kafka/producer/base.py')
-rw-r--r--kafka/producer/base.py36
1 files changed, 24 insertions, 12 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 2f47d87..cd14ab6 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -36,12 +36,14 @@ ASYNC_QUEUE_PUT_TIMEOUT = 0
ASYNC_RETRY_LIMIT = None
ASYNC_RETRY_BACKOFF_MS = 100
ASYNC_RETRY_ON_TIMEOUTS = True
+ASYNC_LOG_MESSAGES_ON_ERROR = True
STOP_ASYNC_PRODUCER = -1
def _send_upstream(queue, client, codec, batch_time, batch_size,
- req_acks, ack_timeout, retry_options, stop_event):
+ req_acks, ack_timeout, retry_options, stop_event,
+ log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR):
"""
Listen on the queue for a specified number of messages or till
a specified timeout and send them upstream to the brokers in one
@@ -123,6 +125,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
if error_cls:
_handle_error(error_cls, orig_req)
+ log.error('Error sending ProduceRequest to %s:%d with msgs %s',
+ orig_req.topic, orig_req.partition,
+ orig_req.messages if log_messages_on_error
+ else hash(orig_req.messages))
if not reqs_to_retry:
request_tries = {}
@@ -147,6 +153,13 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
or (count < retry_options.limit))
)
+ # Log messages we are going to retry
+ for orig_req in request_tries.keys():
+ log.info('Retrying ProduceRequest to %s:%d with msgs %s',
+ orig_req.topic, orig_req.partition,
+ orig_req.messages if log_messages_on_error
+ else hash(orig_req.messages))
+
class Producer(object):
"""
@@ -185,7 +198,8 @@ class Producer(object):
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
- async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
+ async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT,
+ async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR):
if batch_send:
async = True
@@ -218,16 +232,14 @@ class Producer(object):
backoff_ms=async_retry_backoff_ms,
retry_on_timeouts=async_retry_on_timeouts)
self.thread_stop_event = Event()
- self.thread = Thread(target=_send_upstream,
- args=(self.queue,
- self.client.copy(),
- self.codec,
- batch_send_every_t,
- batch_send_every_n,
- self.req_acks,
- self.ack_timeout,
- async_retry_options,
- self.thread_stop_event))
+ self.thread = Thread(
+ target=_send_upstream,
+ args=(self.queue, self.client.copy(), self.codec,
+ batch_send_every_t, batch_send_every_n,
+ self.req_acks, self.ack_timeout,
+ async_retry_options, self.thread_stop_event),
+ kwargs={'log_messages_on_error': async_log_messages_on_error}
+ )
# Thread will die if main thread exits
self.thread.daemon = True