summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
authorTaras Voinarovskiy <voyn1991@gmail.com>2017-07-30 15:42:27 +0000
committerTaras Voinarovskiy <voyn1991@gmail.com>2017-08-07 09:34:08 +0000
commit39f0e50b9441609e9dce4e60a1ab2c3f16680476 (patch)
tree2b94ed93bec5ae4f072360c5072cc22b0685f8a1 /test/test_consumer_integration.py
parentda25df6d3c6380e27bf638f3620613d05ac9fd03 (diff)
downloadkafka-python-39f0e50b9441609e9dce4e60a1ab2c3f16680476.tar.gz
Added basic support for offsets_for_times API. Still needs to group by nodes and send in parallel.
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py46
1 files changed, 45 insertions, 1 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 193a570..218ed2c 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -1,12 +1,14 @@
import logging
import os
+import time
from six.moves import xrange
import six
from . import unittest
from kafka import (
- KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message, create_gzip_message
+ KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message,
+ create_gzip_message, KafkaProducer
)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.errors import ConsumerFetchSizeTooSmall, OffsetOutOfRangeError
@@ -88,6 +90,12 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
**configs)
return consumer
+ def kafka_producer(self, **configs):
+ brokers = '%s:%d' % (self.server.host, self.server.port)
+ producer = KafkaProducer(
+ bootstrap_servers=brokers, **configs)
+ return producer
+
def test_simple_consumer(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -624,3 +632,39 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
fetched_msgs = [next(consumer) for i in range(10)]
self.assertEqual(len(fetched_msgs), 10)
+
+ @kafka_versions('>=0.10.1')
+ def test_kafka_consumer_offsets_for_time(self):
+ late_time = int(time.time())
+ middle_time = late_time - 1
+ early_time = late_time - 2
+ tp = TopicPartition(self.topic, 0)
+
+ kafka_producer = self.kafka_producer()
+ early_msg = kafka_producer.send(
+ self.topic, partition=0, value=b"first",
+ timestamp_ms=early_time).get()
+ late_msg = kafka_producer.send(
+ self.topic, partition=0, value=b"last",
+ timestamp_ms=late_time).get()
+
+ consumer = self.kafka_consumer()
+ offsets = consumer.offsets_for_times({tp: early_time})
+ self.assertEqual(offsets[tp].offset, early_msg.offset)
+ self.assertEqual(offsets[tp].timestamp, early_time)
+
+ offsets = consumer.offsets_for_times({tp: middle_time})
+ self.assertEqual(offsets[tp].offset, late_msg.offset)
+ self.assertEqual(offsets[tp].timestamp, late_time)
+
+ offsets = consumer.offsets_for_times({tp: late_time})
+ self.assertEqual(offsets[tp].offset, late_msg.offset)
+ self.assertEqual(offsets[tp].timestamp, late_time)
+
+ @kafka_versions('<0.10.1')
+ def test_kafka_consumer_offsets_for_time_old(self):
+ consumer = self.kafka_consumer()
+ tp = TopicPartition(self.topic, 0)
+
+ with self.assertRaises():
+ consumer.offsets_for_times({tp: int(time.time())})