summaryrefslogtreecommitdiff
path: root/kafka/producer/record_accumulator.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/record_accumulator.py')
-rw-r--r--kafka/producer/record_accumulator.py8
1 files changed, 4 insertions, 4 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 0de5f98..a2aa0e8 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -68,16 +68,16 @@ class ProducerBatch(object):
sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1)
return future
- def done(self, base_offset=None, timestamp_ms=None, exception=None):
+ def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None, global_error=None):
level = logging.DEBUG if exception is None else logging.WARNING
log.log(level, "Produced messages to topic-partition %s with base offset"
- " %s and error %s.", self.topic_partition, base_offset,
- exception) # trace
+ " %s log start offset %s and error %s.", self.topic_partition, base_offset,
+ log_start_offset, global_error) # trace
if self.produce_future.is_done:
log.warning('Batch is already closed -- ignoring batch.done()')
return
elif exception is None:
- self.produce_future.success((base_offset, timestamp_ms))
+ self.produce_future.success((base_offset, timestamp_ms, log_start_offset))
else:
self.produce_future.failure(exception)