summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTincu Gabriel <gabi@aiven.io>2020-03-25 16:05:36 +0100
committerGitHub <noreply@github.com>2020-03-25 08:05:36 -0700
commitf9e0264e0b0f8d92afb6177d51976795e3bdbcd8 (patch)
tree2b78823a1f340947e6be351337711b7189427d3a
parent5d4b3ec4d6773740a036edb4103294a503a2a421 (diff)
downloadkafka-python-f9e0264e0b0f8d92afb6177d51976795e3bdbcd8.tar.gz
Add `log_start_offset` to message protocol parsing (#2020)
This is in preparation for adding `zstd` support.
-rw-r--r--kafka/producer/future.py6
-rw-r--r--kafka/producer/record_accumulator.py8
-rw-r--r--kafka/producer/sender.py29
-rw-r--r--kafka/protocol/produce.py79
4 files changed, 107 insertions, 15 deletions
diff --git a/kafka/producer/future.py b/kafka/producer/future.py
index f67db09..07fa4ad 100644
--- a/kafka/producer/future.py
+++ b/kafka/producer/future.py
@@ -38,7 +38,7 @@ class FutureRecordMetadata(Future):
produce_future.add_errback(self.failure)
def _produce_success(self, offset_and_timestamp):
- offset, produce_timestamp_ms = offset_and_timestamp
+ offset, produce_timestamp_ms, log_start_offset = offset_and_timestamp
# Unpacking from args tuple is minor speed optimization
(relative_offset, timestamp_ms, checksum,
@@ -51,7 +51,7 @@ class FutureRecordMetadata(Future):
if offset != -1 and relative_offset is not None:
offset += relative_offset
tp = self._produce_future.topic_partition
- metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
+ metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms, log_start_offset,
checksum, serialized_key_size,
serialized_value_size, serialized_header_size)
self.success(metadata)
@@ -67,5 +67,5 @@ class FutureRecordMetadata(Future):
RecordMetadata = collections.namedtuple(
- 'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp',
+ 'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', 'log_start_offset',
'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size'])
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 0de5f98..a2aa0e8 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -68,16 +68,16 @@ class ProducerBatch(object):
sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1)
return future
- def done(self, base_offset=None, timestamp_ms=None, exception=None):
+ def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None, global_error=None):
level = logging.DEBUG if exception is None else logging.WARNING
log.log(level, "Produced messages to topic-partition %s with base offset"
- " %s and error %s.", self.topic_partition, base_offset,
- exception) # trace
+ " %s log start offset %s and error %s.", self.topic_partition, base_offset,
+ log_start_offset, global_error) # trace
if self.produce_future.is_done:
log.warning('Batch is already closed -- ignoring batch.done()')
return
elif exception is None:
- self.produce_future.success((base_offset, timestamp_ms))
+ self.produce_future.success((base_offset, timestamp_ms, log_start_offset))
else:
self.produce_future.failure(exception)
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 705b58f..35688d3 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -195,15 +195,22 @@ class Sender(threading.Thread):
for topic, partitions in response.topics:
for partition_info in partitions:
+ global_error = None
+ log_start_offset = None
if response.API_VERSION < 2:
partition, error_code, offset = partition_info
ts = None
- else:
+ elif 2 <= response.API_VERSION <= 4:
partition, error_code, offset, ts = partition_info
+ elif 5 <= response.API_VERSION <= 7:
+ partition, error_code, offset, ts, log_start_offset = partition_info
+ else:
+ # the ignored parameter is record_error of type list[(batch_index: int, error_message: str)]
+ partition, error_code, offset, ts, log_start_offset, _, global_error = partition_info
tp = TopicPartition(topic, partition)
error = Errors.for_code(error_code)
batch = batches_by_partition[tp]
- self._complete_batch(batch, error, offset, ts)
+ self._complete_batch(batch, error, offset, ts, log_start_offset, global_error)
if response.API_VERSION > 0:
self._sensors.record_throttle_time(response.throttle_time_ms, node=node_id)
@@ -213,7 +220,7 @@ class Sender(threading.Thread):
for batch in batches:
self._complete_batch(batch, None, -1, None)
- def _complete_batch(self, batch, error, base_offset, timestamp_ms=None):
+ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_start_offset=None, global_error=None):
"""Complete or retry the given batch of records.
Arguments:
@@ -221,6 +228,8 @@ class Sender(threading.Thread):
error (Exception): The error (or None if none)
base_offset (int): The base offset assigned to the records if successful
timestamp_ms (int, optional): The timestamp returned by the broker for this batch
+ log_start_offset (int): The start offset of the log at the time this produce response was created
+ global_error (str): The summarising error message
"""
# Standardize no-error to None
if error is Errors.NoError:
@@ -232,7 +241,7 @@ class Sender(threading.Thread):
" retrying (%d attempts left). Error: %s",
batch.topic_partition,
self.config['retries'] - batch.attempts - 1,
- error)
+ global_error or error)
self._accumulator.reenqueue(batch)
self._sensors.record_retries(batch.topic_partition.topic, batch.record_count)
else:
@@ -240,7 +249,7 @@ class Sender(threading.Thread):
error = error(batch.topic_partition.topic)
# tell the user the result of their request
- batch.done(base_offset, timestamp_ms, error)
+ batch.done(base_offset, timestamp_ms, error, log_start_offset, global_error)
self._accumulator.deallocate(batch)
if error is not None:
self._sensors.record_errors(batch.topic_partition.topic, batch.record_count)
@@ -293,7 +302,15 @@ class Sender(threading.Thread):
produce_records_by_partition[topic][partition] = buf
kwargs = {}
- if self.config['api_version'] >= (0, 11):
+ if self.config['api_version'] >= (2, 1):
+ version = 7
+ elif self.config['api_version'] >= (2, 0):
+ version = 6
+ elif self.config['api_version'] >= (1, 1):
+ version = 5
+ elif self.config['api_version'] >= (1, 0):
+ version = 4
+ elif self.config['api_version'] >= (0, 11):
version = 3
kwargs = dict(transactional_id=None)
elif self.config['api_version'] >= (0, 10):
diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py
index f4032b3..9b3f6bf 100644
--- a/kafka/protocol/produce.py
+++ b/kafka/protocol/produce.py
@@ -78,6 +78,50 @@ class ProduceResponse_v5(Response):
)
+class ProduceResponse_v6(Response):
+ """
+ The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+ """
+ API_KEY = 0
+ API_VERSION = 6
+ SCHEMA = ProduceResponse_v5.SCHEMA
+
+
+class ProduceResponse_v7(Response):
+ """
+ V7 bumped up to indicate ZStandard capability. (see KIP-110)
+ """
+ API_KEY = 0
+ API_VERSION = 7
+ SCHEMA = ProduceResponse_v6.SCHEMA
+
+
+class ProduceResponse_v8(Response):
+ """
+ V8 bumped up to add two new fields record_errors offset list and error_message
+ (See KIP-467)
+ """
+ API_KEY = 0
+ API_VERSION = 8
+ SCHEMA = Schema(
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('offset', Int64),
+ ('timestamp', Int64),
+ ('log_start_offset', Int64)),
+ ('record_errors', (Array(
+ ('batch_index', Int32),
+ ('batch_index_error_message', String('utf-8'))
+ ))),
+ ('error_message', String('utf-8'))
+ ))),
+ ('throttle_time_ms', Int32)
+ )
+
+
class ProduceRequest(Request):
API_KEY = 0
@@ -106,6 +150,7 @@ class ProduceRequest_v1(ProduceRequest):
RESPONSE_TYPE = ProduceResponse_v1
SCHEMA = ProduceRequest_v0.SCHEMA
+
class ProduceRequest_v2(ProduceRequest):
API_VERSION = 2
RESPONSE_TYPE = ProduceResponse_v2
@@ -147,11 +192,41 @@ class ProduceRequest_v5(ProduceRequest):
SCHEMA = ProduceRequest_v4.SCHEMA
+class ProduceRequest_v6(ProduceRequest):
+ """
+ The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+ """
+ API_VERSION = 6
+ RESPONSE_TYPE = ProduceResponse_v6
+ SCHEMA = ProduceRequest_v5.SCHEMA
+
+
+class ProduceRequest_v7(ProduceRequest):
+ """
+ V7 bumped up to indicate ZStandard capability. (see KIP-110)
+ """
+ API_VERSION = 7
+ RESPONSE_TYPE = ProduceResponse_v7
+ SCHEMA = ProduceRequest_v6.SCHEMA
+
+
+class ProduceRequest_v8(ProduceRequest):
+ """
+ V8 bumped up to add two new fields record_errors offset list and error_message to PartitionResponse
+ (See KIP-467)
+ """
+ API_VERSION = 8
+ RESPONSE_TYPE = ProduceResponse_v8
+ SCHEMA = ProduceRequest_v7.SCHEMA
+
+
ProduceRequest = [
ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2,
- ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5
+ ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5,
+ ProduceRequest_v6, ProduceRequest_v7, ProduceRequest_v8,
]
ProduceResponse = [
ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2,
- ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5
+ ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5,
+ ProduceResponse_v6, ProduceResponse_v7, ProduceResponse_v8,
]