summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-03-13 23:52:58 -0700
committerDana Powers <dana.powers@gmail.com>2016-03-14 00:37:56 -0700
commit479e70872e986eaa25773da3aa1277e12f681eb2 (patch)
treeca45338f0bade32a2294057c1eab68ba51bdbe5a
parent0330036bef996815c5ef384ab6803697816e4189 (diff)
downloadkafka-python-topic_partition_type_error.tar.gz
Raise TypeError in KafkaConsumer when partition is not a TopicPartitiontopic_partition_type_error
-rw-r--r--kafka/consumer/group.py17
1 files changed, 17 insertions, 0 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 9db4b5d..a4381a9 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -7,6 +7,7 @@ import time
import six
from kafka.client_async import KafkaClient
+from kafka.common import TopicPartition
from kafka.consumer.fetcher import Fetcher
from kafka.consumer.subscription_state import SubscriptionState
from kafka.coordinator.consumer import ConsumerCoordinator
@@ -344,6 +345,8 @@ class KafkaConsumer(six.Iterator):
"""
assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
assert self.config['group_id'] is not None, 'Requires group_id'
+ if not isinstance(partition, TopicPartition):
+ raise TypeError('partition must be a TopicPartition namedtuple')
if self._subscription.is_assigned(partition):
committed = self._subscription.assignment[partition].committed
if committed is None:
@@ -474,6 +477,8 @@ class KafkaConsumer(six.Iterator):
Returns:
int: offset
"""
+ if not isinstance(partition, TopicPartition):
+ raise TypeError('partition must be a TopicPartition namedtuple')
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
offset = self._subscription.assignment[partition].position
if offset is None:
@@ -500,6 +505,8 @@ class KafkaConsumer(six.Iterator):
Returns:
int or None: offset if available
"""
+ if not isinstance(partition, TopicPartition):
+ raise TypeError('partition must be a TopicPartition namedtuple')
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
return self._subscription.assignment[partition].highwater
@@ -514,6 +521,8 @@ class KafkaConsumer(six.Iterator):
Arguments:
*partitions (TopicPartition): partitions to pause
"""
+ if not all([isinstance(p, TopicPartition) for p in partitions]):
+ raise TypeError('partitions must be TopicPartition namedtuples')
for partition in partitions:
log.debug("Pausing partition %s", partition)
self._subscription.pause(partition)
@@ -524,6 +533,8 @@ class KafkaConsumer(six.Iterator):
Arguments:
*partitions (TopicPartition): partitions to resume
"""
+ if not all([isinstance(p, TopicPartition) for p in partitions]):
+ raise TypeError('partitions must be TopicPartition namedtuples')
for partition in partitions:
log.debug("Resuming partition %s", partition)
self._subscription.resume(partition)
@@ -545,6 +556,8 @@ class KafkaConsumer(six.Iterator):
AssertionError: if offset is not an int >= 0; or if partition is not
currently assigned.
"""
+ if not isinstance(partition, TopicPartition):
+ raise TypeError('partition must be a TopicPartition namedtuple')
assert isinstance(offset, int) and offset >= 0, 'Offset must be >= 0'
assert partition in self._subscription.assigned_partitions(), 'Unassigned partition'
log.debug("Seeking to offset %s for partition %s", offset, partition)
@@ -561,6 +574,8 @@ class KafkaConsumer(six.Iterator):
AssertionError: if any partition is not currently assigned, or if
no partitions are assigned
"""
+ if not all([isinstance(p, TopicPartition) for p in partitions]):
+ raise TypeError('partitions must be TopicPartition namedtuples')
if not partitions:
partitions = self._subscription.assigned_partitions()
assert partitions, 'No partitions are currently assigned'
@@ -583,6 +598,8 @@ class KafkaConsumer(six.Iterator):
AssertionError: if any partition is not currently assigned, or if
no partitions are assigned
"""
+ if not all([isinstance(p, TopicPartition) for p in partitions]):
+ raise TypeError('partitions must be TopicPartition namedtuples')
if not partitions:
partitions = self._subscription.assigned_partitions()
assert partitions, 'No partitions are currently assigned'