summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer.py15
1 files changed, 11 insertions, 4 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index 7a7c48f..8f35963 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -181,14 +181,20 @@ class SimpleProducer(Producer):
batch_send - If True, messages are send in batches
batch_send_every_n - If set, messages are send in batches of this size
batch_send_every_t - If set, messages are send after this timeout
+ random_start - If true, randomize the initial partition which the
+ the first message block will be published to, otherwise
+ if false, the first message block will always publish
+ to partition 0 before cycling through each partition
"""
def __init__(self, client, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
- batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
+ random_start=False):
self.partition_cycles = {}
+ self.random_start = random_start
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, batch_send,
batch_send_every_n,
@@ -201,9 +207,10 @@ class SimpleProducer(Producer):
self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
# Randomize the initial partition that is returned
- num_partitions = len(self.client.topic_partitions[topic])
- for _ in xrange(random.randint(0, num_partitions-1)):
- self.partition_cycles[topic].next()
+ if self.random_start:
+ num_partitions = len(self.client.topic_partitions[topic])
+ for _ in xrange(random.randint(0, num_partitions-1)):
+ self.partition_cycles[topic].next()
return self.partition_cycles[topic].next()