diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/consumer/fetcher.py | 4 |
1 files changed, 4 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index e5a165e..9c06aba 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -3,6 +3,7 @@ from __future__ import absolute_import import collections import copy import logging +import random import time import six @@ -607,7 +608,10 @@ class Fetcher(six.Iterator): for partition, offset, _ in partitions: fetch_offsets[TopicPartition(topic, partition)] = offset + # randomized ordering should improve balance for short-lived consumers + random.shuffle(response.topics) for topic, partitions in response.topics: + random.shuffle(partitions) for partition, error_code, highwater, messages in partitions: tp = TopicPartition(topic, partition) error_type = Errors.for_code(error_code) |