summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Couture-Beil <alex@mofo.ca>2014-03-11 18:09:52 -0700
committerAlex Couture-Beil <alex@mofo.ca>2014-03-11 18:09:52 -0700
commit92fefb06fa2475c0421dd065d0d15a659bad9d3c (patch)
tree41f9e3bd0d99dd9c32b8f54435eb157c0d0a99c4
parent09c053a1be858c383019c3ac63839d5bc249f622 (diff)
downloadkafka-python-92fefb06fa2475c0421dd065d0d15a659bad9d3c.tar.gz
Modified SimpleProducer to randomize the initial round robin ordering
of partitions to prevent the first message from always being published to partition 0.
-rw-r--r--kafka/producer.py5
1 files changed, 4 insertions, 1 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index 12a2934..6359aa2 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -2,6 +2,7 @@ from __future__ import absolute_import
import logging
import time
+import random
from Queue import Empty
from collections import defaultdict
@@ -197,7 +198,9 @@ class SimpleProducer(Producer):
if topic not in self.partition_cycles:
if topic not in self.client.topic_partitions:
self.client.load_metadata_for_topics(topic)
- self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
+ randomly_ordered_partitions = self.client.topic_partitions[topic][:]
+ random.shuffle(randomly_ordered_partitions)
+ self.partition_cycles[topic] = cycle(randomly_ordered_partitions)
return self.partition_cycles[topic].next()
def send_messages(self, topic, *msg):