From 479e70872e986eaa25773da3aa1277e12f681eb2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 13 Mar 2016 23:52:58 -0700 Subject: Raise TypeError in KafkaConsumer when partition is not a TopicPartition --- kafka/consumer/group.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 9db4b5d..a4381a9 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -7,6 +7,7 @@ import time import six from kafka.client_async import KafkaClient +from kafka.common import TopicPartition from kafka.consumer.fetcher import Fetcher from kafka.consumer.subscription_state import SubscriptionState from kafka.coordinator.consumer import ConsumerCoordinator @@ -344,6 +345,8 @@ class KafkaConsumer(six.Iterator): """ assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1' assert self.config['group_id'] is not None, 'Requires group_id' + if not isinstance(partition, TopicPartition): + raise TypeError('partition must be a TopicPartition namedtuple') if self._subscription.is_assigned(partition): committed = self._subscription.assignment[partition].committed if committed is None: @@ -474,6 +477,8 @@ class KafkaConsumer(six.Iterator): Returns: int: offset """ + if not isinstance(partition, TopicPartition): + raise TypeError('partition must be a TopicPartition namedtuple') assert self._subscription.is_assigned(partition), 'Partition is not assigned' offset = self._subscription.assignment[partition].position if offset is None: @@ -500,6 +505,8 @@ class KafkaConsumer(six.Iterator): Returns: int or None: offset if available """ + if not isinstance(partition, TopicPartition): + raise TypeError('partition must be a TopicPartition namedtuple') assert self._subscription.is_assigned(partition), 'Partition is not assigned' return self._subscription.assignment[partition].highwater @@ -514,6 +521,8 @@ class KafkaConsumer(six.Iterator): Arguments: *partitions (TopicPartition): partitions to pause """ + if not all([isinstance(p, TopicPartition) for p in partitions]): + raise TypeError('partitions must be TopicPartition namedtuples') for partition in partitions: log.debug("Pausing partition %s", partition) self._subscription.pause(partition) @@ -524,6 +533,8 @@ class KafkaConsumer(six.Iterator): Arguments: *partitions (TopicPartition): partitions to resume """ + if not all([isinstance(p, TopicPartition) for p in partitions]): + raise TypeError('partitions must be TopicPartition namedtuples') for partition in partitions: log.debug("Resuming partition %s", partition) self._subscription.resume(partition) @@ -545,6 +556,8 @@ class KafkaConsumer(six.Iterator): AssertionError: if offset is not an int >= 0; or if partition is not currently assigned. """ + if not isinstance(partition, TopicPartition): + raise TypeError('partition must be a TopicPartition namedtuple') assert isinstance(offset, int) and offset >= 0, 'Offset must be >= 0' assert partition in self._subscription.assigned_partitions(), 'Unassigned partition' log.debug("Seeking to offset %s for partition %s", offset, partition) @@ -561,6 +574,8 @@ class KafkaConsumer(six.Iterator): AssertionError: if any partition is not currently assigned, or if no partitions are assigned """ + if not all([isinstance(p, TopicPartition) for p in partitions]): + raise TypeError('partitions must be TopicPartition namedtuples') if not partitions: partitions = self._subscription.assigned_partitions() assert partitions, 'No partitions are currently assigned' @@ -583,6 +598,8 @@ class KafkaConsumer(six.Iterator): AssertionError: if any partition is not currently assigned, or if no partitions are assigned """ + if not all([isinstance(p, TopicPartition) for p in partitions]): + raise TypeError('partitions must be TopicPartition namedtuples') if not partitions: partitions = self._subscription.assigned_partitions() assert partitions, 'No partitions are currently assigned' -- cgit v1.2.1