diff options
author | Alex Couture-Beil <alex@mofo.ca> | 2014-03-11 18:09:52 -0700 |
---|---|---|
committer | Alex Couture-Beil <alex@mofo.ca> | 2014-03-11 18:09:52 -0700 |
commit | 92fefb06fa2475c0421dd065d0d15a659bad9d3c (patch) | |
tree | 41f9e3bd0d99dd9c32b8f54435eb157c0d0a99c4 | |
parent | 09c053a1be858c383019c3ac63839d5bc249f622 (diff) | |
download | kafka-python-92fefb06fa2475c0421dd065d0d15a659bad9d3c.tar.gz |
Modified SimpleProducer to randomize the initial round robin ordering
of partitions to prevent the first message from always being published
to partition 0.
-rw-r--r-- | kafka/producer.py | 5 |
1 files changed, 4 insertions, 1 deletions
diff --git a/kafka/producer.py b/kafka/producer.py index 12a2934..6359aa2 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -2,6 +2,7 @@ from __future__ import absolute_import import logging import time +import random from Queue import Empty from collections import defaultdict @@ -197,7 +198,9 @@ class SimpleProducer(Producer): if topic not in self.partition_cycles: if topic not in self.client.topic_partitions: self.client.load_metadata_for_topics(topic) - self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic]) + randomly_ordered_partitions = self.client.topic_partitions[topic][:] + random.shuffle(randomly_ordered_partitions) + self.partition_cycles[topic] = cycle(randomly_ordered_partitions) return self.partition_cycles[topic].next() def send_messages(self, topic, *msg): |