diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-06-29 11:20:08 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-06-29 11:20:08 -0700 |
commit | 6b5a0687996bdb8f40e210da504aecb7f8c12141 (patch) | |
tree | adb3769f8774a3987fa79c8ea836905b7f75f489 /kafka/consumer/fetcher.py | |
parent | db47136671a05283d9801c5a3ec74b3e0f38004e (diff) | |
download | kafka-python-6b5a0687996bdb8f40e210da504aecb7f8c12141.tar.gz |
Randomize order of topics/partitions processed by fetcher to improve balance (#732)
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 4 |
1 files changed, 4 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index e5a165e..9c06aba 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -3,6 +3,7 @@ from __future__ import absolute_import import collections import copy import logging +import random import time import six @@ -607,7 +608,10 @@ class Fetcher(six.Iterator): for partition, offset, _ in partitions: fetch_offsets[TopicPartition(topic, partition)] = offset + # randomized ordering should improve balance for short-lived consumers + random.shuffle(response.topics) for topic, partitions in response.topics: + random.shuffle(partitions) for partition, error_code, highwater, messages in partitions: tp = TopicPartition(topic, partition) error_type = Errors.for_code(error_code) |