From 38d89d90941047cfcba3790ceb1d1998ed66dac4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 19 Jun 2016 09:26:43 -0700 Subject: Randomize order of topics/partitions processed by fetcher to improve balance --- kafka/consumer/fetcher.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'kafka') diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index e5a165e..9c06aba 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -3,6 +3,7 @@ from __future__ import absolute_import import collections import copy import logging +import random import time import six @@ -607,7 +608,10 @@ class Fetcher(six.Iterator): for partition, offset, _ in partitions: fetch_offsets[TopicPartition(topic, partition)] = offset + # randomized ordering should improve balance for short-lived consumers + random.shuffle(response.topics) for topic, partitions in response.topics: + random.shuffle(partitions) for partition, error_code, highwater, messages in partitions: tp = TopicPartition(topic, partition) error_type = Errors.for_code(error_code) -- cgit v1.2.1