From c8be93b44bb0939dd512a72be578d42a4d7426b7 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 16 Feb 2016 12:35:28 -0800 Subject: Add RangePartitionAssignor (and use as default); add assignor tests --- kafka/consumer/group.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'kafka/consumer/group.py') diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 4174b07..d4ddc2d 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -10,6 +10,7 @@ from kafka.client_async import KafkaClient from kafka.consumer.fetcher import Fetcher from kafka.consumer.subscription_state import SubscriptionState from kafka.coordinator.consumer import ConsumerCoordinator +from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.protocol.offset import OffsetResetStrategy from kafka.version import __version__ @@ -98,7 +99,8 @@ class KafkaConsumer(six.Iterator): brokers or partitions. Default: 300000 partition_assignment_strategy (list): List of objects to use to distribute partition ownership amongst consumer instances when - group management is used. Default: [RoundRobinPartitionAssignor] + group management is used. + Default: [RangePartitionAssignor, RoundRobinPartitionAssignor] heartbeat_interval_ms (int): The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -146,7 +148,7 @@ class KafkaConsumer(six.Iterator): 'auto_commit_interval_ms': 5000, 'check_crcs': True, 'metadata_max_age_ms': 5 * 60 * 1000, - 'partition_assignment_strategy': (RoundRobinPartitionAssignor,), + 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor), 'heartbeat_interval_ms': 3000, 'session_timeout_ms': 30000, 'send_buffer_bytes': 128 * 1024, -- cgit v1.2.1