summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-05-03 08:08:06 -0700
committerGitHub <noreply@github.com>2017-05-03 08:08:06 -0700
commita2b5ddc37568b285929fc45c17ab19348c320012 (patch)
treefcf7859995fcfd2618f38bbb618bdb356995f685
parent83617b956d43609c8b8d63489585c3f5837f90ee (diff)
downloadkafka-python-a2b5ddc37568b285929fc45c17ab19348c320012.tar.gz
Improve error message when expiring batches in KafkaProducer (#1077)
-rw-r--r--kafka/producer/record_accumulator.py18
1 files changed, 11 insertions, 7 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 965ddbe..fd081aa 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -101,15 +101,19 @@ class RecordBatch(object):
since_backoff = now - (self.last_attempt + retry_backoff_ms / 1000.0)
timeout = request_timeout_ms / 1000.0
- if ((not self.in_retry() and is_full and timeout < since_append) or
- (not self.in_retry() and timeout < since_ready) or
- (self.in_retry() and timeout < since_backoff)):
-
+ error = None
+ if not self.in_retry() and is_full and timeout < since_append:
+ error = "%d ms has passed since last append" % since_append
+ elif not self.in_retry() and timeout < since_ready:
+ error = "%d ms has passed since batch creation plus linger time" % since_ready
+ elif self.in_retry() and timeout < since_backoff:
+ error = "%d ms has passed since last attempt plus backoff time" % since_backoff
+
+ if error:
self.records.close()
self.done(-1, None, Errors.KafkaTimeoutError(
- "Batch containing %s record(s) expired due to timeout while"
- " requesting metadata from brokers for %s", self.record_count,
- self.topic_partition))
+ "Batch for %s containing %s record(s) expired: %s" % (
+ self.topic_partition, self.record_count, error)))
return True
return False