summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHeikki Nousiainen <htn@aiven.io>2018-08-14 15:17:23 +0300
committerJeff Widman <jeff@jeffwidman.com>2018-09-27 15:22:03 -0700
commit08c77499a2e8bc79d6788d70ef96d77752ed6325 (patch)
tree88931e1c6ff33bd0aed7a8297c06b81db1fe6b13
parent0ca4313170df2657456009af5550942ace9f1a81 (diff)
downloadkafka-python-08c77499a2e8bc79d6788d70ef96d77752ed6325.tar.gz
Support produce with Kafka record headers
-rw-r--r--README.rst4
-rw-r--r--kafka/producer/future.py10
-rw-r--r--kafka/producer/kafka.py18
-rw-r--r--kafka/producer/record_accumulator.py16
-rw-r--r--test/test_producer.py10
5 files changed, 40 insertions, 18 deletions
diff --git a/README.rst b/README.rst
index 28cb7e7..a82573b 100644
--- a/README.rst
+++ b/README.rst
@@ -117,6 +117,10 @@ for more details.
>>> for i in range(1000):
... producer.send('foobar', b'msg %d' % i)
+>>> # Include record headers. The format is list of tuples with string key
+>>> # and bytes value.
+>>> producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])
+
>>> # Get producer performance metrics
>>> metrics = producer.metrics()
diff --git a/kafka/producer/future.py b/kafka/producer/future.py
index aa216c4..1c5d6d7 100644
--- a/kafka/producer/future.py
+++ b/kafka/producer/future.py
@@ -29,11 +29,11 @@ class FutureProduceResult(Future):
class FutureRecordMetadata(Future):
- def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size):
+ def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size):
super(FutureRecordMetadata, self).__init__()
self._produce_future = produce_future
# packing args as a tuple is a minor speed optimization
- self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size)
+ self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size)
produce_future.add_callback(self._produce_success)
produce_future.add_errback(self.failure)
@@ -42,7 +42,7 @@ class FutureRecordMetadata(Future):
# Unpacking from args tuple is minor speed optimization
(relative_offset, timestamp_ms, checksum,
- serialized_key_size, serialized_value_size) = self.args
+ serialized_key_size, serialized_value_size, serialized_header_size) = self.args
# None is when Broker does not support the API (<0.10) and
# -1 is when the broker is configured for CREATE_TIME timestamps
@@ -53,7 +53,7 @@ class FutureRecordMetadata(Future):
tp = self._produce_future.topic_partition
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
checksum, serialized_key_size,
- serialized_value_size)
+ serialized_value_size, serialized_header_size)
self.success(metadata)
def get(self, timeout=None):
@@ -68,4 +68,4 @@ class FutureRecordMetadata(Future):
RecordMetadata = collections.namedtuple(
'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp',
- 'checksum', 'serialized_key_size', 'serialized_value_size'])
+ 'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size'])
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 24b58fe..4fc7bc6 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -513,7 +513,7 @@ class KafkaProducer(object):
return LegacyRecordBatchBuilder.estimate_size_in_bytes(
magic, self.config['compression_type'], key, value)
- def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
+ def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
"""Publish a message to a topic.
Arguments:
@@ -534,6 +534,8 @@ class KafkaProducer(object):
partition (but if key is None, partition is chosen randomly).
Must be type bytes, or be serializable to bytes via configured
key_serializer.
+ headers (optional): a list of header key value pairs. List items
+ are tuples of str key and bytes value.
timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC)
to use as the message timestamp. Defaults to current time.
@@ -563,13 +565,18 @@ class KafkaProducer(object):
partition = self._partition(topic, partition, key, value,
key_bytes, value_bytes)
- message_size = self._estimate_size_in_bytes(key_bytes, value_bytes)
+ if headers is None:
+ headers = []
+ assert type(headers) == list
+ assert all(type(item) == tuple and len(item) == 2 and type(item[0]) == str and type(item[1]) == bytes for item in headers)
+
+ message_size = self._estimate_size_in_bytes(key_bytes, value_bytes, headers)
self._ensure_valid_record_size(message_size)
tp = TopicPartition(topic, partition)
- log.debug("Sending (key=%r value=%r) to %s", key, value, tp)
+ log.debug("Sending (key=%r value=%r headers=%r) to %s", key, value, headers, tp)
result = self._accumulator.append(tp, timestamp_ms,
- key_bytes, value_bytes,
+ key_bytes, value_bytes, headers,
self.config['max_block_ms'],
estimated_size=message_size)
future, batch_is_full, new_batch_created = result
@@ -588,7 +595,8 @@ class KafkaProducer(object):
FutureProduceResult(TopicPartition(topic, partition)),
-1, None, None,
len(key_bytes) if key_bytes is not None else -1,
- len(value_bytes) if value_bytes is not None else -1
+ len(value_bytes) if value_bytes is not None else -1,
+ sum(len(h_key.encode("utf-8")) + len(h_value) for h_key, h_value in headers) if headers else -1,
).failure(e)
def flush(self, timeout=None):
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 1cd5413..84b01d1 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -55,8 +55,8 @@ class ProducerBatch(object):
def record_count(self):
return self.records.next_offset()
- def try_append(self, timestamp_ms, key, value):
- metadata = self.records.append(timestamp_ms, key, value)
+ def try_append(self, timestamp_ms, key, value, headers):
+ metadata = self.records.append(timestamp_ms, key, value, headers)
if metadata is None:
return None
@@ -65,7 +65,8 @@ class ProducerBatch(object):
future = FutureRecordMetadata(self.produce_future, metadata.offset,
metadata.timestamp, metadata.crc,
len(key) if key is not None else -1,
- len(value) if value is not None else -1)
+ len(value) if value is not None else -1,
+ sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1)
return future
def done(self, base_offset=None, timestamp_ms=None, exception=None):
@@ -196,7 +197,7 @@ class RecordAccumulator(object):
self.muted = set()
self._drain_index = 0
- def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
+ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
estimated_size=0):
"""Add a record to the accumulator, return the append result.
@@ -209,6 +210,7 @@ class RecordAccumulator(object):
timestamp_ms (int): The timestamp of the record (epoch ms)
key (bytes): The key for the record
value (bytes): The value for the record
+ headers (List[Tuple[str, bytes]]): The header fields for the record
max_time_to_block_ms (int): The maximum time in milliseconds to
block for buffer memory to be available
@@ -231,7 +233,7 @@ class RecordAccumulator(object):
dq = self._batches[tp]
if dq:
last = dq[-1]
- future = last.try_append(timestamp_ms, key, value)
+ future = last.try_append(timestamp_ms, key, value, headers)
if future is not None:
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False
@@ -246,7 +248,7 @@ class RecordAccumulator(object):
if dq:
last = dq[-1]
- future = last.try_append(timestamp_ms, key, value)
+ future = last.try_append(timestamp_ms, key, value, headers)
if future is not None:
# Somebody else found us a batch, return the one we
# waited for! Hopefully this doesn't happen often...
@@ -261,7 +263,7 @@ class RecordAccumulator(object):
)
batch = ProducerBatch(tp, records, buf)
- future = batch.try_append(timestamp_ms, key, value)
+ future = batch.try_append(timestamp_ms, key, value, headers)
if not future:
raise Exception()
diff --git a/test/test_producer.py b/test/test_producer.py
index 09d184f..176b239 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -91,10 +91,16 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
compression_type=compression)
magic = producer._max_usable_produce_magic()
+ # record headers are supported in 0.11.0
+ if version() < (0, 11, 0):
+ headers = None
+ else:
+ headers = [("Header Key", b"Header Value")]
+
topic = random_string(5)
future = producer.send(
topic,
- value=b"Simple value", key=b"Simple key", timestamp_ms=9999999,
+ value=b"Simple value", key=b"Simple key", headers=headers, timestamp_ms=9999999,
partition=0)
record = future.get(timeout=5)
assert record is not None
@@ -116,6 +122,8 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
assert record.serialized_key_size == 10
assert record.serialized_value_size == 12
+ if headers:
+ assert record.serialized_header_size == 22
# generated timestamp case is skipped for broker 0.9 and below
if magic == 0: