diff options
author | Mark Roberts <wizzat@gmail.com> | 2014-05-06 21:54:02 -0700 |
---|---|---|
committer | Mark Roberts <wizzat@gmail.com> | 2014-05-06 21:54:02 -0700 |
commit | b053da2203f2597801609824b510cfcc2e821d63 (patch) | |
tree | c0601359f26e5d9e88f5638a68241520297972f1 /kafka | |
parent | efcf58b84214aeda6cf79319f182407cde7833a6 (diff) | |
parent | 3b18043821f37242bde2b186684fa05d36c61921 (diff) | |
download | kafka-python-b053da2203f2597801609824b510cfcc2e821d63.tar.gz |
Merge branch 'master' into add_tests
kafka/client.py contained duplicate copies of same refactor, merged.
Move test/test_integration.py changes into test/test_producer_integration.
Conflicts:
kafka/client.py
servers/0.8.0/kafka-src
test/test_integration.py
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/producer.py | 16 |
1 files changed, 15 insertions, 1 deletions
diff --git a/kafka/producer.py b/kafka/producer.py index 12a2934..8f35963 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -2,6 +2,7 @@ from __future__ import absolute_import import logging import time +import random from Queue import Empty from collections import defaultdict @@ -180,14 +181,20 @@ class SimpleProducer(Producer): batch_send - If True, messages are send in batches batch_send_every_n - If set, messages are send in batches of this size batch_send_every_t - If set, messages are send after this timeout + random_start - If true, randomize the initial partition which the + the first message block will be published to, otherwise + if false, the first message block will always publish + to partition 0 before cycling through each partition """ def __init__(self, client, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + random_start=False): self.partition_cycles = {} + self.random_start = random_start super(SimpleProducer, self).__init__(client, async, req_acks, ack_timeout, batch_send, batch_send_every_n, @@ -198,6 +205,13 @@ class SimpleProducer(Producer): if topic not in self.client.topic_partitions: self.client.load_metadata_for_topics(topic) self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic]) + + # Randomize the initial partition that is returned + if self.random_start: + num_partitions = len(self.client.topic_partitions[topic]) + for _ in xrange(random.randint(0, num_partitions-1)): + self.partition_cycles[topic].next() + return self.partition_cycles[topic].next() def send_messages(self, topic, *msg): |