diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-05-22 00:31:16 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-05-22 09:51:37 -0700 |
commit | 795cb9b29fa05d4425f807f54dfa639c125fc0dd (patch) | |
tree | 7fba03e95f26185c126aa95d1acdd2af5d2ad925 | |
parent | 7f4a9361ea168a0e1073801d0b86868de47d1de2 (diff) | |
download | kafka-python-795cb9b29fa05d4425f807f54dfa639c125fc0dd.tar.gz |
KAFKA-3025: Message v1 -- add timetamp and use relative offset in compressed messagesets
-rw-r--r-- | kafka/consumer/fetcher.py | 24 | ||||
-rw-r--r-- | kafka/producer/buffer.py | 6 | ||||
-rw-r--r-- | kafka/producer/future.py | 18 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 15 | ||||
-rw-r--r-- | kafka/producer/record_accumulator.py | 32 | ||||
-rw-r--r-- | kafka/producer/sender.py | 9 | ||||
-rw-r--r-- | kafka/protocol/message.py | 78 |
7 files changed, 132 insertions, 50 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 3a5e37e..bf59775 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -19,7 +19,7 @@ log = logging.getLogger(__name__) ConsumerRecord = collections.namedtuple("ConsumerRecord", - ["topic", "partition", "offset", "key", "value"]) + ["topic", "partition", "offset", "timestamp", "timestamp_type", "key", "value"]) class NoOffsetForPartitionError(Errors.KafkaError): @@ -351,17 +351,33 @@ class Fetcher(six.Iterator): position) return dict(drained) - def _unpack_message_set(self, tp, messages): + def _unpack_message_set(self, tp, messages, relative_offset=0): try: for offset, size, msg in messages: if self.config['check_crcs'] and not msg.validate_crc(): raise Errors.InvalidMessageError(msg) elif msg.is_compressed(): - for record in self._unpack_message_set(tp, msg.decompress()): + mset = msg.decompress() + # new format uses relative offsets for compressed messages + if msg.magic > 0: + last_offset, _, _ = mset[-1] + relative = offset - last_offset + else: + relative = 0 + for record in self._unpack_message_set(tp, mset, relative): yield record else: + # Message v1 adds timestamp + if msg.magic > 0: + timestamp = msg.timestamp + timestamp_type = msg.timestamp_type + else: + timestamp = timestamp_type = None key, value = self._deserialize(msg) - yield ConsumerRecord(tp.topic, tp.partition, offset, key, value) + yield ConsumerRecord(tp.topic, tp.partition, + offset + relative_offset, + timestamp, timestamp_type, + key, value) # If unpacking raises StopIteration, it is erroneously # caught by the generator. We want all exceptions to be raised # back to the user. See Issue 545 diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index b2ac747..ba9b5db 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -29,7 +29,7 @@ class MessageSetBuffer(object): 'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY), 'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4), } - def __init__(self, buf, batch_size, compression_type=None): + def __init__(self, buf, batch_size, compression_type=None, message_version=0): if compression_type is not None: assert compression_type in self._COMPRESSORS, 'Unrecognized compression type' checker, encoder, attributes = self._COMPRESSORS[compression_type] @@ -40,6 +40,7 @@ class MessageSetBuffer(object): self._compressor = None self._compression_attributes = None + self._message_version = message_version self._buffer = buf # Init MessageSetSize to 0 -- update on close self._buffer.seek(0) @@ -85,7 +86,8 @@ class MessageSetBuffer(object): # TODO: avoid copies with bytearray / memoryview self._buffer.seek(4) msg = Message(self._compressor(self._buffer.read()), - attributes=self._compression_attributes) + attributes=self._compression_attributes, + magic=self._message_version) encoded = msg.encode() self._buffer.seek(4) self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg diff --git a/kafka/producer/future.py b/kafka/producer/future.py index 35520d8..acf4255 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -29,16 +29,21 @@ class FutureProduceResult(Future): class FutureRecordMetadata(Future): - def __init__(self, produce_future, relative_offset): + def __init__(self, produce_future, relative_offset, timestamp_ms): super(FutureRecordMetadata, self).__init__() self._produce_future = produce_future self.relative_offset = relative_offset + self.timestamp_ms = timestamp_ms produce_future.add_callback(self._produce_success) produce_future.add_errback(self.failure) - def _produce_success(self, base_offset): + def _produce_success(self, offset_and_timestamp): + base_offset, timestamp_ms = offset_and_timestamp + if timestamp_ms is None: + timestamp_ms = self.timestamp_ms self.success(RecordMetadata(self._produce_future.topic_partition, - base_offset, self.relative_offset)) + base_offset, timestamp_ms, + self.relative_offset)) def get(self, timeout=None): if not self.is_done and not self._produce_future.await(timeout): @@ -51,12 +56,13 @@ class FutureRecordMetadata(Future): class RecordMetadata(collections.namedtuple( - 'RecordMetadata', 'topic partition topic_partition offset')): - def __new__(cls, tp, base_offset, relative_offset=None): + 'RecordMetadata', 'topic partition topic_partition offset timestamp')): + def __new__(cls, tp, base_offset, timestamp, relative_offset=None): offset = base_offset if relative_offset is not None and base_offset != -1: offset += relative_offset - return super(RecordMetadata, cls).__new__(cls, tp.topic, tp.partition, tp, offset) + return super(RecordMetadata, cls).__new__(cls, tp.topic, tp.partition, + tp, offset, timestamp) def __str__(self): return 'RecordMetadata(topic=%s, partition=%s, offset=%s)' % ( diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 7e8f625..7aa24b3 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -347,7 +347,7 @@ class KafkaProducer(object): max_wait = self.config['max_block_ms'] / 1000.0 return self._wait_on_metadata(topic, max_wait) - def send(self, topic, value=None, key=None, partition=None): + def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): """Publish a message to a topic. Arguments: @@ -368,6 +368,8 @@ class KafkaProducer(object): partition (but if key is None, partition is chosen randomly). Must be type bytes, or be serializable to bytes via configured key_serializer. + timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC) + to use as the message timestamp. Defaults to current time. Returns: FutureRecordMetadata: resolves to RecordMetadata @@ -396,8 +398,11 @@ class KafkaProducer(object): self._ensure_valid_record_size(message_size) tp = TopicPartition(topic, partition) + if timestamp_ms is None: + timestamp_ms = int(time.time() * 1000) log.debug("Sending (key=%s value=%s) to %s", key, value, tp) - result = self._accumulator.append(tp, key_bytes, value_bytes, + result = self._accumulator.append(tp, timestamp_ms, + key_bytes, value_bytes, self.config['max_block_ms']) future, batch_is_full, new_batch_created = result if batch_is_full or new_batch_created: @@ -416,8 +421,10 @@ class KafkaProducer(object): except Exception as e: log.debug("Exception occurred during message send: %s", e) return FutureRecordMetadata( - FutureProduceResult(TopicPartition(topic, partition)), - -1).failure(e) + FutureProduceResult( + TopicPartition(topic, partition)), + -1, None + ).failure(e) def flush(self, timeout=None): """ diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 9eb0e95..4434b18 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -36,7 +36,7 @@ class AtomicInteger(object): class RecordBatch(object): - def __init__(self, tp, records): + def __init__(self, tp, records, message_version=0): self.record_count = 0 #self.max_record_size = 0 # for metrics only now = time.time() @@ -46,22 +46,25 @@ class RecordBatch(object): self.last_attempt = now self.last_append = now self.records = records + self.message_version = message_version self.topic_partition = tp self.produce_future = FutureProduceResult(tp) self._retry = False - def try_append(self, key, value): + def try_append(self, timestamp_ms, key, value): if not self.records.has_room_for(key, value): return None - self.records.append(self.record_count, Message(value, key=key)) + msg = Message(value, key=key, magic=self.message_version) + self.records.append(self.record_count, msg) # self.max_record_size = max(self.max_record_size, Record.record_size(key, value)) # for metrics only self.last_append = time.time() - future = FutureRecordMetadata(self.produce_future, self.record_count) + future = FutureRecordMetadata(self.produce_future, self.record_count, + timestamp_ms) self.record_count += 1 return future - def done(self, base_offset=None, exception=None): + def done(self, base_offset=None, timestamp_ms=None, exception=None): log.debug("Produced messages to topic-partition %s with base offset" " %s and error %s.", self.topic_partition, base_offset, exception) # trace @@ -69,7 +72,7 @@ class RecordBatch(object): log.warning('Batch is already closed -- ignoring batch.done()') return elif exception is None: - self.produce_future.success(base_offset) + self.produce_future.success((base_offset, timestamp_ms)) else: self.produce_future.failure(exception) @@ -78,7 +81,7 @@ class RecordBatch(object): if ((self.records.is_full() and request_timeout_ms < since_append_ms) or (request_timeout_ms < (since_append_ms + linger_ms))): self.records.close() - self.done(-1, Errors.KafkaTimeoutError( + self.done(-1, None, Errors.KafkaTimeoutError( "Batch containing %s record(s) expired due to timeout while" " requesting metadata from brokers for %s", self.record_count, self.topic_partition)) @@ -137,6 +140,7 @@ class RecordAccumulator(object): 'compression_type': None, 'linger_ms': 0, 'retry_backoff_ms': 100, + 'message_version': 0, } def __init__(self, **configs): @@ -155,7 +159,7 @@ class RecordAccumulator(object): self.config['batch_size']) self._incomplete = IncompleteRecordBatches() - def append(self, tp, key, value, max_time_to_block_ms): + def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms): """Add a record to the accumulator, return the append result. The append result will contain the future metadata, and flag for @@ -164,6 +168,7 @@ class RecordAccumulator(object): Arguments: tp (TopicPartition): The topic/partition to which this record is being sent + timestamp_ms (int): The timestamp of the record (epoch ms) key (bytes): The key for the record value (bytes): The value for the record max_time_to_block_ms (int): The maximum time in milliseconds to @@ -188,7 +193,7 @@ class RecordAccumulator(object): dq = self._batches[tp] if dq: last = dq[-1] - future = last.try_append(key, value) + future = last.try_append(timestamp_ms, key, value) if future is not None: batch_is_full = len(dq) > 1 or last.records.is_full() return future, batch_is_full, False @@ -211,7 +216,7 @@ class RecordAccumulator(object): if dq: last = dq[-1] - future = last.try_append(key, value) + future = last.try_append(timestamp_ms, key, value) if future is not None: # Somebody else found us a batch, return the one we # waited for! Hopefully this doesn't happen often... @@ -220,9 +225,10 @@ class RecordAccumulator(object): return future, batch_is_full, False records = MessageSetBuffer(buf, self.config['batch_size'], - self.config['compression_type']) - batch = RecordBatch(tp, records) - future = batch.try_append(key, value) + self.config['compression_type'], + self.config['message_version']) + batch = RecordBatch(tp, records, self.config['message_version']) + future = batch.try_append(timestamp_ms, key, value) if not future: raise Exception() diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index bf7c163..9c36c9b 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -163,7 +163,7 @@ class Sender(threading.Thread): def _failed_produce(self, batches, node_id, error): log.debug("Error sending produce request to node %d: %s", node_id, error) # trace for batch in batches: - self._complete_batch(batch, error, -1) + self._complete_batch(batch, error, -1, None) def _handle_produce_response(self, batches, response): """Handle a produce response.""" @@ -183,15 +183,16 @@ class Sender(threading.Thread): else: # this is the acks = 0 case, just complete all requests for batch in batches: - self._complete_batch(batch, None, -1) + self._complete_batch(batch, None, -1, None) - def _complete_batch(self, batch, error, base_offset): + def _complete_batch(self, batch, error, base_offset, timestamp_ms=None): """Complete or retry the given batch of records. Arguments: batch (RecordBatch): The record batch error (Exception): The error (or None if none) base_offset (int): The base offset assigned to the records if successful + timestamp_ms (int, optional): The timestamp returned by the broker for this batch """ # Standardize no-error to None if error is Errors.NoError: @@ -210,7 +211,7 @@ class Sender(threading.Thread): error = error(batch.topic_partition.topic) # tell the user the result of their request - batch.done(base_offset, error) + batch.done(base_offset, timestamp_ms, error) self._accumulator.deallocate(batch) if getattr(error, 'invalid_metadata', False): diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 8458ac5..473ca56 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -1,4 +1,5 @@ import io +import time from ..codec import (has_gzip, has_snappy, has_lz4, gzip_decode, snappy_decode, lz4_decode) @@ -11,22 +12,39 @@ from ..util import crc32 class Message(Struct): - SCHEMA = Schema( - ('crc', Int32), - ('magic', Int8), - ('attributes', Int8), - ('key', Bytes), - ('value', Bytes) - ) - CODEC_MASK = 0x03 + SCHEMAS = [ + Schema( + ('crc', Int32), + ('magic', Int8), + ('attributes', Int8), + ('key', Bytes), + ('value', Bytes)), + Schema( + ('crc', Int32), + ('magic', Int8), + ('attributes', Int8), + ('timestamp', Int64), + ('key', Bytes), + ('value', Bytes)), + ] + SCHEMA = SCHEMAS[1] + CODEC_MASK = 0x07 CODEC_GZIP = 0x01 CODEC_SNAPPY = 0x02 CODEC_LZ4 = 0x03 - HEADER_SIZE = 14 # crc(4), magic(1), attributes(1), key+value size(4*2) + TIMESTAMP_TYPE_MASK = 0x08 + HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2) - def __init__(self, value, key=None, magic=0, attributes=0, crc=0): + def __init__(self, value, key=None, magic=0, attributes=0, crc=0, + timestamp=None): assert value is None or isinstance(value, bytes), 'value must be bytes' assert key is None or isinstance(key, bytes), 'key must be bytes' + assert magic > 0 or timestamp is None, 'timestamp not supported in v0' + + # Default timestamp to now for v1 messages + if magic > 0 and timestamp is None: + timestamp = int(time.time() * 1000) + self.timestamp = timestamp self.crc = crc self.magic = magic self.attributes = attributes @@ -34,22 +52,48 @@ class Message(Struct): self.value = value self.encode = self._encode_self + @property + def timestamp_type(self): + """0 for CreateTime; 1 for LogAppendTime; None if unsupported. + + Value is determined by broker; produced messages should always set to 0 + Requires Kafka >= 0.10 / message version >= 1 + """ + if self.magic == 0: + return None + return self.attributes & self.TIMESTAMP_TYPE_MASK + def _encode_self(self, recalc_crc=True): - message = Message.SCHEMA.encode( - (self.crc, self.magic, self.attributes, self.key, self.value) - ) + version = self.magic + if version == 1: + fields = (self.crc, self.magic, self.attributes, self.timestamp, self.key, self.value) + elif version == 0: + fields = (self.crc, self.magic, self.attributes, self.key, self.value) + else: + raise ValueError('Unrecognized message version: %s' % version) + message = Message.SCHEMAS[version].encode(fields) if not recalc_crc: return message self.crc = crc32(message[4:]) - return self.SCHEMA.fields[0].encode(self.crc) + message[4:] + crc_field = self.SCHEMAS[version].fields[0] + return crc_field.encode(self.crc) + message[4:] @classmethod def decode(cls, data): if isinstance(data, bytes): data = io.BytesIO(data) - fields = [field.decode(data) for field in cls.SCHEMA.fields] - return cls(fields[4], key=fields[3], - magic=fields[1], attributes=fields[2], crc=fields[0]) + # Partial decode required to determine message version + base_fields = cls.SCHEMAS[0].fields[0:3] + crc, magic, attributes = [field.decode(data) for field in base_fields] + remaining = cls.SCHEMAS[magic].fields[3:] + fields = [field.decode(data) for field in remaining] + if magic == 1: + timestamp = fields[0] + else: + timestamp = None + return cls(fields[-1], key=fields[-2], + magic=magic, attributes=attributes, crc=crc, + timestamp=timestamp) def validate_crc(self): raw_msg = self._encode_self(recalc_crc=False) |