diff options
Diffstat (limited to 'kafka/producer/record_accumulator.py')
-rw-r--r-- | kafka/producer/record_accumulator.py | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 0de5f98..a2aa0e8 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -68,16 +68,16 @@ class ProducerBatch(object): sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1) return future - def done(self, base_offset=None, timestamp_ms=None, exception=None): + def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None, global_error=None): level = logging.DEBUG if exception is None else logging.WARNING log.log(level, "Produced messages to topic-partition %s with base offset" - " %s and error %s.", self.topic_partition, base_offset, - exception) # trace + " %s log start offset %s and error %s.", self.topic_partition, base_offset, + log_start_offset, global_error) # trace if self.produce_future.is_done: log.warning('Batch is already closed -- ignoring batch.done()') return elif exception is None: - self.produce_future.success((base_offset, timestamp_ms)) + self.produce_future.success((base_offset, timestamp_ms, log_start_offset)) else: self.produce_future.failure(exception) |