diff options
author | Taras Voinarovskyi <voyn1991@gmail.com> | 2017-09-10 18:45:05 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-10 18:45:05 +0300 |
commit | 370169029a9104f1963227005349f1dc6420a924 (patch) | |
tree | 327ba3d95ff3ce6c712c80f19456fb1573125b0a | |
parent | 24bf504cd894c85a861c6691e778359220e40323 (diff) | |
parent | d0813ab695c9f5c57a7168220bbfca985d7c70af (diff) | |
download | kafka-python-370169029a9104f1963227005349f1dc6420a924.tar.gz |
Merge pull request #1200 from buptljy/offset
remove beginning/end offsets request version limit
-rw-r--r-- | kafka/consumer/group.py | 8 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 6 |
2 files changed, 0 insertions, 14 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index a629821..b7fbd83 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -928,10 +928,6 @@ class KafkaConsumer(six.Iterator): up the offsets by timestamp. KafkaTimeoutError: If fetch failed in request_timeout_ms. """ - if self.config['api_version'] <= (0, 10, 0): - raise UnsupportedVersionError( - "offsets_for_times API not supported for cluster version {}" - .format(self.config['api_version'])) offsets = self._fetcher.beginning_offsets( partitions, self.config['request_timeout_ms']) return offsets @@ -959,10 +955,6 @@ class KafkaConsumer(six.Iterator): up the offsets by timestamp. KafkaTimeoutError: If fetch failed in request_timeout_ms """ - if self.config['api_version'] <= (0, 10, 0): - raise UnsupportedVersionError( - "offsets_for_times API not supported for cluster version {}" - .format(self.config['api_version'])) offsets = self._fetcher.end_offsets( partitions, self.config['request_timeout_ms']) return offsets diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 4b5e78a..17e7401 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -735,12 +735,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): with self.assertRaises(UnsupportedVersionError): consumer.offsets_for_times({tp: int(time.time())}) - with self.assertRaises(UnsupportedVersionError): - consumer.beginning_offsets([tp]) - - with self.assertRaises(UnsupportedVersionError): - consumer.end_offsets([tp]) - @kafka_versions('>=0.10.1') def test_kafka_consumer_offsets_for_times_errors(self): consumer = self.kafka_consumer() |