diff options
Diffstat (limited to 'kafka/producer/sender.py')
-rw-r--r-- | kafka/producer/sender.py | 29 |
1 files changed, 23 insertions, 6 deletions
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 705b58f..35688d3 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -195,15 +195,22 @@ class Sender(threading.Thread): for topic, partitions in response.topics: for partition_info in partitions: + global_error = None + log_start_offset = None if response.API_VERSION < 2: partition, error_code, offset = partition_info ts = None - else: + elif 2 <= response.API_VERSION <= 4: partition, error_code, offset, ts = partition_info + elif 5 <= response.API_VERSION <= 7: + partition, error_code, offset, ts, log_start_offset = partition_info + else: + # the ignored parameter is record_error of type list[(batch_index: int, error_message: str)] + partition, error_code, offset, ts, log_start_offset, _, global_error = partition_info tp = TopicPartition(topic, partition) error = Errors.for_code(error_code) batch = batches_by_partition[tp] - self._complete_batch(batch, error, offset, ts) + self._complete_batch(batch, error, offset, ts, log_start_offset, global_error) if response.API_VERSION > 0: self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id) @@ -213,7 +220,7 @@ class Sender(threading.Thread): for batch in batches: self._complete_batch(batch, None, -1, None) - def _complete_batch(self, batch, error, base_offset, timestamp_ms=None): + def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None, global_error=None): """Complete or retry the given batch of records. Arguments: @@ -221,6 +228,8 @@ class Sender(threading.Thread): error (Exception): The error (or None if none) base_offset (int): The base offset assigned to the records if successful timestamp_ms (int, optional): The timestamp returned by the broker for this batch + log_start_offset (int): The start offset of the log at the time this produce response was created + global_error (str): The summarising error message """ # Standardize no-error to None if error is Errors.NoError: @@ -232,7 +241,7 @@ class Sender(threading.Thread): " retrying (%d attempts left). Error: %s", batch.topic_partition, self.config['retries'] - batch.attempts - 1, - error) + global_error or error) self._accumulator.reenqueue(batch) self._sensors.record_retries(batch.topic_partition.topic, batch.record_count) else: @@ -240,7 +249,7 @@ class Sender(threading.Thread): error = error(batch.topic_partition.topic) # tell the user the result of their request - batch.done(base_offset, timestamp_ms, error) + batch.done(base_offset, timestamp_ms, error, log_start_offset, global_error) self._accumulator.deallocate(batch) if error is not None: self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) @@ -293,7 +302,15 @@ class Sender(threading.Thread): produce_records_by_partition[topic][partition] = buf kwargs = {} - if self.config['api_version'] >= (0, 11): + if self.config['api_version'] >= (2, 1): + version = 7 + elif self.config['api_version'] >= (2, 0): + version = 6 + elif self.config['api_version'] >= (1, 1): + version = 5 + elif self.config['api_version'] >= (1, 0): + version = 4 + elif self.config['api_version'] >= (0, 11): version = 3 kwargs = dict(transactional_id=None) elif self.config['api_version'] >= (0, 10): |