summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-05-22 00:31:16 -0700
committerDana Powers <dana.powers@gmail.com>2016-05-22 09:51:37 -0700
commit795cb9b29fa05d4425f807f54dfa639c125fc0dd (patch)
tree7fba03e95f26185c126aa95d1acdd2af5d2ad925
parent7f4a9361ea168a0e1073801d0b86868de47d1de2 (diff)
downloadkafka-python-795cb9b29fa05d4425f807f54dfa639c125fc0dd.tar.gz
KAFKA-3025: Message v1 -- add timetamp and use relative offset in compressed messagesets
-rw-r--r--kafka/consumer/fetcher.py24
-rw-r--r--kafka/producer/buffer.py6
-rw-r--r--kafka/producer/future.py18
-rw-r--r--kafka/producer/kafka.py15
-rw-r--r--kafka/producer/record_accumulator.py32
-rw-r--r--kafka/producer/sender.py9
-rw-r--r--kafka/protocol/message.py78
7 files changed, 132 insertions, 50 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 3a5e37e..bf59775 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -19,7 +19,7 @@ log = logging.getLogger(__name__)
ConsumerRecord = collections.namedtuple("ConsumerRecord",
- ["topic", "partition", "offset", "key", "value"])
+ ["topic", "partition", "offset", "timestamp", "timestamp_type", "key", "value"])
class NoOffsetForPartitionError(Errors.KafkaError):
@@ -351,17 +351,33 @@ class Fetcher(six.Iterator):
position)
return dict(drained)
- def _unpack_message_set(self, tp, messages):
+ def _unpack_message_set(self, tp, messages, relative_offset=0):
try:
for offset, size, msg in messages:
if self.config['check_crcs'] and not msg.validate_crc():
raise Errors.InvalidMessageError(msg)
elif msg.is_compressed():
- for record in self._unpack_message_set(tp, msg.decompress()):
+ mset = msg.decompress()
+ # new format uses relative offsets for compressed messages
+ if msg.magic > 0:
+ last_offset, _, _ = mset[-1]
+ relative = offset - last_offset
+ else:
+ relative = 0
+ for record in self._unpack_message_set(tp, mset, relative):
yield record
else:
+ # Message v1 adds timestamp
+ if msg.magic > 0:
+ timestamp = msg.timestamp
+ timestamp_type = msg.timestamp_type
+ else:
+ timestamp = timestamp_type = None
key, value = self._deserialize(msg)
- yield ConsumerRecord(tp.topic, tp.partition, offset, key, value)
+ yield ConsumerRecord(tp.topic, tp.partition,
+ offset + relative_offset,
+ timestamp, timestamp_type,
+ key, value)
# If unpacking raises StopIteration, it is erroneously
# caught by the generator. We want all exceptions to be raised
# back to the user. See Issue 545
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index b2ac747..ba9b5db 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -29,7 +29,7 @@ class MessageSetBuffer(object):
'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY),
'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
}
- def __init__(self, buf, batch_size, compression_type=None):
+ def __init__(self, buf, batch_size, compression_type=None, message_version=0):
if compression_type is not None:
assert compression_type in self._COMPRESSORS, 'Unrecognized compression type'
checker, encoder, attributes = self._COMPRESSORS[compression_type]
@@ -40,6 +40,7 @@ class MessageSetBuffer(object):
self._compressor = None
self._compression_attributes = None
+ self._message_version = message_version
self._buffer = buf
# Init MessageSetSize to 0 -- update on close
self._buffer.seek(0)
@@ -85,7 +86,8 @@ class MessageSetBuffer(object):
# TODO: avoid copies with bytearray / memoryview
self._buffer.seek(4)
msg = Message(self._compressor(self._buffer.read()),
- attributes=self._compression_attributes)
+ attributes=self._compression_attributes,
+ magic=self._message_version)
encoded = msg.encode()
self._buffer.seek(4)
self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg
diff --git a/kafka/producer/future.py b/kafka/producer/future.py
index 35520d8..acf4255 100644
--- a/kafka/producer/future.py
+++ b/kafka/producer/future.py
@@ -29,16 +29,21 @@ class FutureProduceResult(Future):
class FutureRecordMetadata(Future):
- def __init__(self, produce_future, relative_offset):
+ def __init__(self, produce_future, relative_offset, timestamp_ms):
super(FutureRecordMetadata, self).__init__()
self._produce_future = produce_future
self.relative_offset = relative_offset
+ self.timestamp_ms = timestamp_ms
produce_future.add_callback(self._produce_success)
produce_future.add_errback(self.failure)
- def _produce_success(self, base_offset):
+ def _produce_success(self, offset_and_timestamp):
+ base_offset, timestamp_ms = offset_and_timestamp
+ if timestamp_ms is None:
+ timestamp_ms = self.timestamp_ms
self.success(RecordMetadata(self._produce_future.topic_partition,
- base_offset, self.relative_offset))
+ base_offset, timestamp_ms,
+ self.relative_offset))
def get(self, timeout=None):
if not self.is_done and not self._produce_future.await(timeout):
@@ -51,12 +56,13 @@ class FutureRecordMetadata(Future):
class RecordMetadata(collections.namedtuple(
- 'RecordMetadata', 'topic partition topic_partition offset')):
- def __new__(cls, tp, base_offset, relative_offset=None):
+ 'RecordMetadata', 'topic partition topic_partition offset timestamp')):
+ def __new__(cls, tp, base_offset, timestamp, relative_offset=None):
offset = base_offset
if relative_offset is not None and base_offset != -1:
offset += relative_offset
- return super(RecordMetadata, cls).__new__(cls, tp.topic, tp.partition, tp, offset)
+ return super(RecordMetadata, cls).__new__(cls, tp.topic, tp.partition,
+ tp, offset, timestamp)
def __str__(self):
return 'RecordMetadata(topic=%s, partition=%s, offset=%s)' % (
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 7e8f625..7aa24b3 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -347,7 +347,7 @@ class KafkaProducer(object):
max_wait = self.config['max_block_ms'] / 1000.0
return self._wait_on_metadata(topic, max_wait)
- def send(self, topic, value=None, key=None, partition=None):
+ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
"""Publish a message to a topic.
Arguments:
@@ -368,6 +368,8 @@ class KafkaProducer(object):
partition (but if key is None, partition is chosen randomly).
Must be type bytes, or be serializable to bytes via configured
key_serializer.
+ timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC)
+ to use as the message timestamp. Defaults to current time.
Returns:
FutureRecordMetadata: resolves to RecordMetadata
@@ -396,8 +398,11 @@ class KafkaProducer(object):
self._ensure_valid_record_size(message_size)
tp = TopicPartition(topic, partition)
+ if timestamp_ms is None:
+ timestamp_ms = int(time.time() * 1000)
log.debug("Sending (key=%s value=%s) to %s", key, value, tp)
- result = self._accumulator.append(tp, key_bytes, value_bytes,
+ result = self._accumulator.append(tp, timestamp_ms,
+ key_bytes, value_bytes,
self.config['max_block_ms'])
future, batch_is_full, new_batch_created = result
if batch_is_full or new_batch_created:
@@ -416,8 +421,10 @@ class KafkaProducer(object):
except Exception as e:
log.debug("Exception occurred during message send: %s", e)
return FutureRecordMetadata(
- FutureProduceResult(TopicPartition(topic, partition)),
- -1).failure(e)
+ FutureProduceResult(
+ TopicPartition(topic, partition)),
+ -1, None
+ ).failure(e)
def flush(self, timeout=None):
"""
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 9eb0e95..4434b18 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -36,7 +36,7 @@ class AtomicInteger(object):
class RecordBatch(object):
- def __init__(self, tp, records):
+ def __init__(self, tp, records, message_version=0):
self.record_count = 0
#self.max_record_size = 0 # for metrics only
now = time.time()
@@ -46,22 +46,25 @@ class RecordBatch(object):
self.last_attempt = now
self.last_append = now
self.records = records
+ self.message_version = message_version
self.topic_partition = tp
self.produce_future = FutureProduceResult(tp)
self._retry = False
- def try_append(self, key, value):
+ def try_append(self, timestamp_ms, key, value):
if not self.records.has_room_for(key, value):
return None
- self.records.append(self.record_count, Message(value, key=key))
+ msg = Message(value, key=key, magic=self.message_version)
+ self.records.append(self.record_count, msg)
# self.max_record_size = max(self.max_record_size, Record.record_size(key, value)) # for metrics only
self.last_append = time.time()
- future = FutureRecordMetadata(self.produce_future, self.record_count)
+ future = FutureRecordMetadata(self.produce_future, self.record_count,
+ timestamp_ms)
self.record_count += 1
return future
- def done(self, base_offset=None, exception=None):
+ def done(self, base_offset=None, timestamp_ms=None, exception=None):
log.debug("Produced messages to topic-partition %s with base offset"
" %s and error %s.", self.topic_partition, base_offset,
exception) # trace
@@ -69,7 +72,7 @@ class RecordBatch(object):
log.warning('Batch is already closed -- ignoring batch.done()')
return
elif exception is None:
- self.produce_future.success(base_offset)
+ self.produce_future.success((base_offset, timestamp_ms))
else:
self.produce_future.failure(exception)
@@ -78,7 +81,7 @@ class RecordBatch(object):
if ((self.records.is_full() and request_timeout_ms < since_append_ms)
or (request_timeout_ms < (since_append_ms + linger_ms))):
self.records.close()
- self.done(-1, Errors.KafkaTimeoutError(
+ self.done(-1, None, Errors.KafkaTimeoutError(
"Batch containing %s record(s) expired due to timeout while"
" requesting metadata from brokers for %s", self.record_count,
self.topic_partition))
@@ -137,6 +140,7 @@ class RecordAccumulator(object):
'compression_type': None,
'linger_ms': 0,
'retry_backoff_ms': 100,
+ 'message_version': 0,
}
def __init__(self, **configs):
@@ -155,7 +159,7 @@ class RecordAccumulator(object):
self.config['batch_size'])
self._incomplete = IncompleteRecordBatches()
- def append(self, tp, key, value, max_time_to_block_ms):
+ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms):
"""Add a record to the accumulator, return the append result.
The append result will contain the future metadata, and flag for
@@ -164,6 +168,7 @@ class RecordAccumulator(object):
Arguments:
tp (TopicPartition): The topic/partition to which this record is
being sent
+ timestamp_ms (int): The timestamp of the record (epoch ms)
key (bytes): The key for the record
value (bytes): The value for the record
max_time_to_block_ms (int): The maximum time in milliseconds to
@@ -188,7 +193,7 @@ class RecordAccumulator(object):
dq = self._batches[tp]
if dq:
last = dq[-1]
- future = last.try_append(key, value)
+ future = last.try_append(timestamp_ms, key, value)
if future is not None:
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False
@@ -211,7 +216,7 @@ class RecordAccumulator(object):
if dq:
last = dq[-1]
- future = last.try_append(key, value)
+ future = last.try_append(timestamp_ms, key, value)
if future is not None:
# Somebody else found us a batch, return the one we
# waited for! Hopefully this doesn't happen often...
@@ -220,9 +225,10 @@ class RecordAccumulator(object):
return future, batch_is_full, False
records = MessageSetBuffer(buf, self.config['batch_size'],
- self.config['compression_type'])
- batch = RecordBatch(tp, records)
- future = batch.try_append(key, value)
+ self.config['compression_type'],
+ self.config['message_version'])
+ batch = RecordBatch(tp, records, self.config['message_version'])
+ future = batch.try_append(timestamp_ms, key, value)
if not future:
raise Exception()
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index bf7c163..9c36c9b 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -163,7 +163,7 @@ class Sender(threading.Thread):
def _failed_produce(self, batches, node_id, error):
log.debug("Error sending produce request to node %d: %s", node_id, error) # trace
for batch in batches:
- self._complete_batch(batch, error, -1)
+ self._complete_batch(batch, error, -1, None)
def _handle_produce_response(self, batches, response):
"""Handle a produce response."""
@@ -183,15 +183,16 @@ class Sender(threading.Thread):
else:
# this is the acks = 0 case, just complete all requests
for batch in batches:
- self._complete_batch(batch, None, -1)
+ self._complete_batch(batch, None, -1, None)
- def _complete_batch(self, batch, error, base_offset):
+ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None):
"""Complete or retry the given batch of records.
Arguments:
batch (RecordBatch): The record batch
error (Exception): The error (or None if none)
base_offset (int): The base offset assigned to the records if successful
+ timestamp_ms (int, optional): The timestamp returned by the broker for this batch
"""
# Standardize no-error to None
if error is Errors.NoError:
@@ -210,7 +211,7 @@ class Sender(threading.Thread):
error = error(batch.topic_partition.topic)
# tell the user the result of their request
- batch.done(base_offset, error)
+ batch.done(base_offset, timestamp_ms, error)
self._accumulator.deallocate(batch)
if getattr(error, 'invalid_metadata', False):
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index 8458ac5..473ca56 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -1,4 +1,5 @@
import io
+import time
from ..codec import (has_gzip, has_snappy, has_lz4,
gzip_decode, snappy_decode, lz4_decode)
@@ -11,22 +12,39 @@ from ..util import crc32
class Message(Struct):
- SCHEMA = Schema(
- ('crc', Int32),
- ('magic', Int8),
- ('attributes', Int8),
- ('key', Bytes),
- ('value', Bytes)
- )
- CODEC_MASK = 0x03
+ SCHEMAS = [
+ Schema(
+ ('crc', Int32),
+ ('magic', Int8),
+ ('attributes', Int8),
+ ('key', Bytes),
+ ('value', Bytes)),
+ Schema(
+ ('crc', Int32),
+ ('magic', Int8),
+ ('attributes', Int8),
+ ('timestamp', Int64),
+ ('key', Bytes),
+ ('value', Bytes)),
+ ]
+ SCHEMA = SCHEMAS[1]
+ CODEC_MASK = 0x07
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
CODEC_LZ4 = 0x03
- HEADER_SIZE = 14 # crc(4), magic(1), attributes(1), key+value size(4*2)
+ TIMESTAMP_TYPE_MASK = 0x08
+ HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2)
- def __init__(self, value, key=None, magic=0, attributes=0, crc=0):
+ def __init__(self, value, key=None, magic=0, attributes=0, crc=0,
+ timestamp=None):
assert value is None or isinstance(value, bytes), 'value must be bytes'
assert key is None or isinstance(key, bytes), 'key must be bytes'
+ assert magic > 0 or timestamp is None, 'timestamp not supported in v0'
+
+ # Default timestamp to now for v1 messages
+ if magic > 0 and timestamp is None:
+ timestamp = int(time.time() * 1000)
+ self.timestamp = timestamp
self.crc = crc
self.magic = magic
self.attributes = attributes
@@ -34,22 +52,48 @@ class Message(Struct):
self.value = value
self.encode = self._encode_self
+ @property
+ def timestamp_type(self):
+ """0 for CreateTime; 1 for LogAppendTime; None if unsupported.
+
+ Value is determined by broker; produced messages should always set to 0
+ Requires Kafka >= 0.10 / message version >= 1
+ """
+ if self.magic == 0:
+ return None
+ return self.attributes & self.TIMESTAMP_TYPE_MASK
+
def _encode_self(self, recalc_crc=True):
- message = Message.SCHEMA.encode(
- (self.crc, self.magic, self.attributes, self.key, self.value)
- )
+ version = self.magic
+ if version == 1:
+ fields = (self.crc, self.magic, self.attributes, self.timestamp, self.key, self.value)
+ elif version == 0:
+ fields = (self.crc, self.magic, self.attributes, self.key, self.value)
+ else:
+ raise ValueError('Unrecognized message version: %s' % version)
+ message = Message.SCHEMAS[version].encode(fields)
if not recalc_crc:
return message
self.crc = crc32(message[4:])
- return self.SCHEMA.fields[0].encode(self.crc) + message[4:]
+ crc_field = self.SCHEMAS[version].fields[0]
+ return crc_field.encode(self.crc) + message[4:]
@classmethod
def decode(cls, data):
if isinstance(data, bytes):
data = io.BytesIO(data)
- fields = [field.decode(data) for field in cls.SCHEMA.fields]
- return cls(fields[4], key=fields[3],
- magic=fields[1], attributes=fields[2], crc=fields[0])
+ # Partial decode required to determine message version
+ base_fields = cls.SCHEMAS[0].fields[0:3]
+ crc, magic, attributes = [field.decode(data) for field in base_fields]
+ remaining = cls.SCHEMAS[magic].fields[3:]
+ fields = [field.decode(data) for field in remaining]
+ if magic == 1:
+ timestamp = fields[0]
+ else:
+ timestamp = None
+ return cls(fields[-1], key=fields[-2],
+ magic=magic, attributes=attributes, crc=crc,
+ timestamp=timestamp)
def validate_crc(self):
raw_msg = self._encode_self(recalc_crc=False)