summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2014-05-06 21:03:10 -0700
committerDana Powers <dana.powers@gmail.com>2014-05-06 21:03:10 -0700
commit3b18043821f37242bde2b186684fa05d36c61921 (patch)
treef7203196a2803b0ef4c5e1eeb3bf474c4e49776c
parent914c2e686acd3ad8f8c6dca6bf714e0b166dcbc3 (diff)
parent1fd35a558f96df5a0400dbfd07b7adce97fa050e (diff)
downloadkafka-python-3b18043821f37242bde2b186684fa05d36c61921.tar.gz
Merge pull request #139 from alexcb/master
SimpleProducer randomization of initial round robin ordering
-rw-r--r--kafka/producer.py16
-rw-r--r--test/test_integration.py81
2 files changed, 73 insertions, 24 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index 12a2934..8f35963 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -2,6 +2,7 @@ from __future__ import absolute_import
import logging
import time
+import random
from Queue import Empty
from collections import defaultdict
@@ -180,14 +181,20 @@ class SimpleProducer(Producer):
batch_send - If True, messages are send in batches
batch_send_every_n - If set, messages are send in batches of this size
batch_send_every_t - If set, messages are send after this timeout
+ random_start - If true, randomize the initial partition which the
+ the first message block will be published to, otherwise
+ if false, the first message block will always publish
+ to partition 0 before cycling through each partition
"""
def __init__(self, client, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
- batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
+ random_start=False):
self.partition_cycles = {}
+ self.random_start = random_start
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, batch_send,
batch_send_every_n,
@@ -198,6 +205,13 @@ class SimpleProducer(Producer):
if topic not in self.client.topic_partitions:
self.client.load_metadata_for_topics(topic)
self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
+
+ # Randomize the initial partition that is returned
+ if self.random_start:
+ num_partitions = len(self.client.topic_partitions[topic])
+ for _ in xrange(random.randint(0, num_partitions-1)):
+ self.partition_cycles[topic].next()
+
return self.partition_cycles[topic].next()
def send_messages(self, topic, *msg):
diff --git a/test/test_integration.py b/test/test_integration.py
index 3d6ccf6..4087df7 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -287,21 +287,26 @@ class TestKafkaClient(KafkaTestCase):
producer = SimpleProducer(self.client)
resp = producer.send_messages(self.topic, "one", "two")
- # Will go to partition 0
+ partition_for_first_batch = resp[0].partition
+
self.assertEquals(len(resp), 1)
self.assertEquals(resp[0].error, 0)
self.assertEquals(resp[0].offset, 0) # offset of first msg
- # Will go to partition 1
+ # ensure this partition is different from the first partition
resp = producer.send_messages(self.topic, "three")
+ partition_for_second_batch = resp[0].partition
+ self.assertNotEquals(partition_for_first_batch, partition_for_second_batch)
+
self.assertEquals(len(resp), 1)
self.assertEquals(resp[0].error, 0)
self.assertEquals(resp[0].offset, 0) # offset of first msg
- fetch1 = FetchRequest(self.topic, 0, 0, 1024)
- fetch2 = FetchRequest(self.topic, 1, 0, 1024)
- fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
- fetch2])
+ fetch_requests = (
+ FetchRequest(self.topic, partition_for_first_batch, 0, 1024),
+ FetchRequest(self.topic, partition_for_second_batch, 0, 1024),
+ )
+ fetch_resp1, fetch_resp2 = self.client.send_fetch_request(fetch_requests)
self.assertEquals(fetch_resp1.error, 0)
self.assertEquals(fetch_resp1.highwaterMark, 2)
messages = list(fetch_resp1.messages)
@@ -314,11 +319,12 @@ class TestKafkaClient(KafkaTestCase):
self.assertEquals(len(messages), 1)
self.assertEquals(messages[0].message.value, "three")
- # Will go to partition 0
+ # Will go to same partition as first batch
resp = producer.send_messages(self.topic, "four", "five")
self.assertEquals(len(resp), 1)
self.assertEquals(resp[0].error, 0)
self.assertEquals(resp[0].offset, 2) # offset of first msg
+ self.assertEquals(resp[0].partition, partition_for_first_batch)
producer.stop()
@@ -396,14 +402,25 @@ class TestKafkaClient(KafkaTestCase):
resp = producer.send_messages(self.topic, "one")
self.assertEquals(len(resp), 0)
- fetch = FetchRequest(self.topic, 0, 0, 1024)
- fetch_resp = self.client.send_fetch_request([fetch])
+ # fetch from both partitions
+ fetch_requests = (
+ FetchRequest(self.topic, 0, 0, 1024),
+ FetchRequest(self.topic, 1, 0, 1024),
+ )
+ fetch_resps = self.client.send_fetch_request(fetch_requests)
- self.assertEquals(fetch_resp[0].error, 0)
- self.assertEquals(fetch_resp[0].highwaterMark, 1)
- self.assertEquals(fetch_resp[0].partition, 0)
+ # determine which partition was selected (due to random round-robin)
+ published_to_resp = max(fetch_resps, key=lambda x: x.highwaterMark)
+ not_published_to_resp = min(fetch_resps, key=lambda x: x.highwaterMark)
+ self.assertNotEquals(published_to_resp.partition, not_published_to_resp.partition)
- messages = list(fetch_resp[0].messages)
+ self.assertEquals(published_to_resp.error, 0)
+ self.assertEquals(published_to_resp.highwaterMark, 1)
+
+ self.assertEquals(not_published_to_resp.error, 0)
+ self.assertEquals(not_published_to_resp.highwaterMark, 0)
+
+ messages = list(published_to_resp.messages)
self.assertEquals(len(messages), 1)
self.assertEquals(messages[0].message.value, "one")
@@ -415,12 +432,14 @@ class TestKafkaClient(KafkaTestCase):
resp = producer.send_messages(self.topic, "one")
self.assertEquals(len(resp), 1)
- fetch = FetchRequest(self.topic, 0, 0, 1024)
+ partition = resp[0].partition
+
+ fetch = FetchRequest(self.topic, partition, 0, 1024)
fetch_resp = self.client.send_fetch_request([fetch])
self.assertEquals(fetch_resp[0].error, 0)
self.assertEquals(fetch_resp[0].highwaterMark, 1)
- self.assertEquals(fetch_resp[0].partition, 0)
+ self.assertEquals(fetch_resp[0].partition, partition)
messages = list(fetch_resp[0].messages)
self.assertEquals(len(messages), 1)
@@ -435,12 +454,14 @@ class TestKafkaClient(KafkaTestCase):
resp = producer.send_messages(self.topic, "one")
self.assertEquals(len(resp), 1)
- fetch = FetchRequest(self.topic, 0, 0, 1024)
+ partition = resp[0].partition
+
+ fetch = FetchRequest(self.topic, partition, 0, 1024)
fetch_resp = self.client.send_fetch_request([fetch])
self.assertEquals(fetch_resp[0].error, 0)
self.assertEquals(fetch_resp[0].highwaterMark, 1)
- self.assertEquals(fetch_resp[0].partition, 0)
+ self.assertEquals(fetch_resp[0].partition, partition)
messages = list(fetch_resp[0].messages)
self.assertEquals(len(messages), 1)
@@ -456,17 +477,31 @@ class TestKafkaClient(KafkaTestCase):
# Give it some time
time.sleep(2)
- fetch = FetchRequest(self.topic, 0, 0, 1024)
- fetch_resp = self.client.send_fetch_request([fetch])
+ # fetch from both partitions
+ fetch_requests = (
+ FetchRequest(self.topic, 0, 0, 1024),
+ FetchRequest(self.topic, 1, 0, 1024),
+ )
+ fetch_resps = self.client.send_fetch_request(fetch_requests)
- self.assertEquals(fetch_resp[0].error, 0)
- self.assertEquals(fetch_resp[0].highwaterMark, 1)
- self.assertEquals(fetch_resp[0].partition, 0)
+ # determine which partition was selected (due to random round-robin)
+ published_to_resp = max(fetch_resps, key=lambda x: x.highwaterMark)
+ not_published_to_resp = min(fetch_resps, key=lambda x: x.highwaterMark)
+ self.assertNotEquals(published_to_resp.partition, not_published_to_resp.partition)
- messages = list(fetch_resp[0].messages)
+ self.assertEquals(published_to_resp.error, 0)
+ self.assertEquals(published_to_resp.highwaterMark, 1)
+
+ self.assertEquals(not_published_to_resp.error, 0)
+ self.assertEquals(not_published_to_resp.highwaterMark, 0)
+
+ messages = list(published_to_resp.messages)
self.assertEquals(len(messages), 1)
self.assertEquals(messages[0].message.value, "one")
+ messages = list(not_published_to_resp.messages)
+ self.assertEquals(len(messages), 0)
+
producer.stop()
def test_async_keyed_producer(self):