diff options
Diffstat (limited to 'kafka/producer/future.py')
-rw-r--r-- | kafka/producer/future.py | 18 |
1 files changed, 12 insertions, 6 deletions
diff --git a/kafka/producer/future.py b/kafka/producer/future.py index 35520d8..acf4255 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -29,16 +29,21 @@ class FutureProduceResult(Future): class FutureRecordMetadata(Future): - def __init__(self, produce_future, relative_offset): + def __init__(self, produce_future, relative_offset, timestamp_ms): super(FutureRecordMetadata, self).__init__() self._produce_future = produce_future self.relative_offset = relative_offset + self.timestamp_ms = timestamp_ms produce_future.add_callback(self._produce_success) produce_future.add_errback(self.failure) - def _produce_success(self, base_offset): + def _produce_success(self, offset_and_timestamp): + base_offset, timestamp_ms = offset_and_timestamp + if timestamp_ms is None: + timestamp_ms = self.timestamp_ms self.success(RecordMetadata(self._produce_future.topic_partition, - base_offset, self.relative_offset)) + base_offset, timestamp_ms, + self.relative_offset)) def get(self, timeout=None): if not self.is_done and not self._produce_future.await(timeout): @@ -51,12 +56,13 @@ class FutureRecordMetadata(Future): class RecordMetadata(collections.namedtuple( - 'RecordMetadata', 'topic partition topic_partition offset')): - def __new__(cls, tp, base_offset, relative_offset=None): + 'RecordMetadata', 'topic partition topic_partition offset timestamp')): + def __new__(cls, tp, base_offset, timestamp, relative_offset=None): offset = base_offset if relative_offset is not None and base_offset != -1: offset += relative_offset - return super(RecordMetadata, cls).__new__(cls, tp.topic, tp.partition, tp, offset) + return super(RecordMetadata, cls).__new__(cls, tp.topic, tp.partition, + tp, offset, timestamp) def __str__(self): return 'RecordMetadata(topic=%s, partition=%s, offset=%s)' % ( |