summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorAlex Couture-Beil <alex@mofo.ca>2014-04-01 18:07:24 -0700
committerAlex Couture-Beil <alex@mofo.ca>2014-04-01 18:07:24 -0700
commit4d0722aef2f7420f99b25f28e79fecc47b077d2d (patch)
treef045f9d75439648e4b7ddb53a0383968c774f4ff /kafka
parenta81be573b2a410cb4bc6b908a932374fc620bb30 (diff)
downloadkafka-python-4d0722aef2f7420f99b25f28e79fecc47b077d2d.tar.gz
Changed randomization to simply randomize the initial starting partition of the sorted list of partition rather than completely randomizing the initial ordering before round-robin cycling the partitions
Diffstat (limited to 'kafka')
-rw-r--r--kafka/producer.py10
1 files changed, 7 insertions, 3 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index 6359aa2..7a7c48f 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -198,9 +198,13 @@ class SimpleProducer(Producer):
if topic not in self.partition_cycles:
if topic not in self.client.topic_partitions:
self.client.load_metadata_for_topics(topic)
- randomly_ordered_partitions = self.client.topic_partitions[topic][:]
- random.shuffle(randomly_ordered_partitions)
- self.partition_cycles[topic] = cycle(randomly_ordered_partitions)
+ self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
+
+ # Randomize the initial partition that is returned
+ num_partitions = len(self.client.topic_partitions[topic])
+ for _ in xrange(random.randint(0, num_partitions-1)):
+ self.partition_cycles[topic].next()
+
return self.partition_cycles[topic].next()
def send_messages(self, topic, *msg):