diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-06-19 09:26:43 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-06-19 09:26:43 -0700 |
commit | 38d89d90941047cfcba3790ceb1d1998ed66dac4 (patch) | |
tree | 7f5b07d4d11263442db8c677ce3b126363f3f67c | |
parent | 461ccbd9ecf06722c9ff73f6ed439be4b8391672 (diff) | |
download | kafka-python-fetcher_randomize_order.tar.gz |
Randomize order of topics/partitions processed by fetcher to improve balancefetcher_randomize_order
-rw-r--r-- | kafka/consumer/fetcher.py | 4 |
1 files changed, 4 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index e5a165e..9c06aba 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -3,6 +3,7 @@ from __future__ import absolute_import import collections import copy import logging +import random import time import six @@ -607,7 +608,10 @@ class Fetcher(six.Iterator): for partition, offset, _ in partitions: fetch_offsets[TopicPartition(topic, partition)] = offset + # randomized ordering should improve balance for short-lived consumers + random.shuffle(response.topics) for topic, partitions in response.topics: + random.shuffle(partitions) for partition, error_code, highwater, messages in partitions: tp = TopicPartition(topic, partition) error_type = Errors.for_code(error_code) |