summaryrefslogtreecommitdiff
path: root/kafka/producer/sender.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/sender.py')
-rw-r--r--kafka/producer/sender.py29
1 files changed, 23 insertions, 6 deletions
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 705b58f..35688d3 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -195,15 +195,22 @@ class Sender(threading.Thread):
for topic, partitions in response.topics:
for partition_info in partitions:
+ global_error = None
+ log_start_offset = None
if response.API_VERSION < 2:
partition, error_code, offset = partition_info
ts = None
- else:
+ elif 2 <= response.API_VERSION <= 4:
partition, error_code, offset, ts = partition_info
+ elif 5 <= response.API_VERSION <= 7:
+ partition, error_code, offset, ts, log_start_offset = partition_info
+ else:
+ # the ignored parameter is record_error of type list[(batch_index: int, error_message: str)]
+ partition, error_code, offset, ts, log_start_offset, _, global_error = partition_info
tp = TopicPartition(topic, partition)
error = Errors.for_code(error_code)
batch = batches_by_partition[tp]
- self._complete_batch(batch, error, offset, ts)
+ self._complete_batch(batch, error, offset, ts, log_start_offset, global_error)
if response.API_VERSION > 0:
self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id)
@@ -213,7 +220,7 @@ class Sender(threading.Thread):
for batch in batches:
self._complete_batch(batch, None, -1, None)
- def _complete_batch(self, batch, error, base_offset, timestamp_ms=None):
+ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None, global_error=None):
"""Complete or retry the given batch of records.
Arguments:
@@ -221,6 +228,8 @@ class Sender(threading.Thread):
error (Exception): The error (or None if none)
base_offset (int): The base offset assigned to the records if successful
timestamp_ms (int, optional): The timestamp returned by the broker for this batch
+ log_start_offset (int): The start offset of the log at the time this produce response was created
+ global_error (str): The summarising error message
"""
# Standardize no-error to None
if error is Errors.NoError:
@@ -232,7 +241,7 @@ class Sender(threading.Thread):
" retrying (%d attempts left). Error: %s",
batch.topic_partition,
self.config['retries'] - batch.attempts - 1,
- error)
+ global_error or error)
self._accumulator.reenqueue(batch)
self._sensors.record_retries(batch.topic_partition.topic, batch.record_count)
else:
@@ -240,7 +249,7 @@ class Sender(threading.Thread):
error = error(batch.topic_partition.topic)
# tell the user the result of their request
- batch.done(base_offset, timestamp_ms, error)
+ batch.done(base_offset, timestamp_ms, error, log_start_offset, global_error)
self._accumulator.deallocate(batch)
if error is not None:
self._sensors.record_errors(batch.topic_partition.topic, batch.record_count)
@@ -293,7 +302,15 @@ class Sender(threading.Thread):
produce_records_by_partition[topic][partition] = buf
kwargs = {}
- if self.config['api_version'] >= (0, 11):
+ if self.config['api_version'] >= (2, 1):
+ version = 7
+ elif self.config['api_version'] >= (2, 0):
+ version = 6
+ elif self.config['api_version'] >= (1, 1):
+ version = 5
+ elif self.config['api_version'] >= (1, 0):
+ version = 4
+ elif self.config['api_version'] >= (0, 11):
version = 3
kwargs = dict(transactional_id=None)
elif self.config['api_version'] >= (0, 10):