summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTaras Voinarovskyi <voyn1991@gmail.com>2017-09-10 18:45:05 +0300
committerGitHub <noreply@github.com>2017-09-10 18:45:05 +0300
commit370169029a9104f1963227005349f1dc6420a924 (patch)
tree327ba3d95ff3ce6c712c80f19456fb1573125b0a
parent24bf504cd894c85a861c6691e778359220e40323 (diff)
parentd0813ab695c9f5c57a7168220bbfca985d7c70af (diff)
downloadkafka-python-370169029a9104f1963227005349f1dc6420a924.tar.gz
Merge pull request #1200 from buptljy/offset
remove beginning/end offsets request version limit
-rw-r--r--kafka/consumer/group.py8
-rw-r--r--test/test_consumer_integration.py6
2 files changed, 0 insertions, 14 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index a629821..b7fbd83 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -928,10 +928,6 @@ class KafkaConsumer(six.Iterator):
up the offsets by timestamp.
KafkaTimeoutError: If fetch failed in request_timeout_ms.
"""
- if self.config['api_version'] <= (0, 10, 0):
- raise UnsupportedVersionError(
- "offsets_for_times API not supported for cluster version {}"
- .format(self.config['api_version']))
offsets = self._fetcher.beginning_offsets(
partitions, self.config['request_timeout_ms'])
return offsets
@@ -959,10 +955,6 @@ class KafkaConsumer(six.Iterator):
up the offsets by timestamp.
KafkaTimeoutError: If fetch failed in request_timeout_ms
"""
- if self.config['api_version'] <= (0, 10, 0):
- raise UnsupportedVersionError(
- "offsets_for_times API not supported for cluster version {}"
- .format(self.config['api_version']))
offsets = self._fetcher.end_offsets(
partitions, self.config['request_timeout_ms'])
return offsets
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 4b5e78a..17e7401 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -735,12 +735,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
with self.assertRaises(UnsupportedVersionError):
consumer.offsets_for_times({tp: int(time.time())})
- with self.assertRaises(UnsupportedVersionError):
- consumer.beginning_offsets([tp])
-
- with self.assertRaises(UnsupportedVersionError):
- consumer.end_offsets([tp])
-
@kafka_versions('>=0.10.1')
def test_kafka_consumer_offsets_for_times_errors(self):
consumer = self.kafka_consumer()