summaryrefslogtreecommitdiff
path: root/test/test_integration.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-07-11 05:58:54 -0700
committerDavid Arthur <mumrah@gmail.com>2013-07-11 05:58:54 -0700
commit5684af438e6cf871540aa8ea8b556737f56e9798 (patch)
treef9bb1f6046943946236ceae61d9266a8c14bdcfe /test/test_integration.py
parentffdc08aeec040862d522914a480c135626a19e69 (diff)
parentd2df8f54637490b1dbe858066b74710b57186016 (diff)
downloadkafka-python-5684af438e6cf871540aa8ea8b556737f56e9798.tar.gz
Merge pull request #33 from mahendra/asyncproducer
Support for async producer Merged locally, tests pass, +1
Diffstat (limited to 'test/test_integration.py')
-rw-r--r--test/test_integration.py277
1 files changed, 274 insertions, 3 deletions
diff --git a/test/test_integration.py b/test/test_integration.py
index d607b73..6d96b6a 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -1,5 +1,6 @@
import logging
import unittest
+import time
from kafka import * # noqa
from kafka.common import * # noqa
@@ -254,12 +255,23 @@ class TestKafkaClient(unittest.TestCase):
def test_simple_producer(self):
producer = SimpleProducer(self.client, "test_simple_producer")
- producer.send_messages("one", "two")
- producer.send_messages("three")
+ resp = producer.send_messages("one", "two")
+
+ # Will go to partition 0
+ self.assertEquals(len(resp), 1)
+ self.assertEquals(resp[0].error, 0)
+ self.assertEquals(resp[0].offset, 0) # offset of first msg
+
+ # Will go to partition 1
+ resp = producer.send_messages("three")
+ self.assertEquals(len(resp), 1)
+ self.assertEquals(resp[0].error, 0)
+ self.assertEquals(resp[0].offset, 0) # offset of first msg
fetch1 = FetchRequest("test_simple_producer", 0, 0, 1024)
fetch2 = FetchRequest("test_simple_producer", 1, 0, 1024)
- fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2])
+ fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
+ fetch2])
self.assertEquals(fetch_resp1.error, 0)
self.assertEquals(fetch_resp1.highwaterMark, 2)
messages = list(fetch_resp1.messages)
@@ -272,6 +284,265 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(len(messages), 1)
self.assertEquals(messages[0].message.value, "three")
+ # Will go to partition 0
+ resp = producer.send_messages("four", "five")
+ self.assertEquals(len(resp), 1)
+ self.assertEquals(resp[0].error, 0)
+ self.assertEquals(resp[0].offset, 2) # offset of first msg
+
+ producer.stop()
+
+ def test_round_robin_partitioner(self):
+ producer = KeyedProducer(self.client, "test_round_robin_partitioner",
+ partitioner=RoundRobinPartitioner)
+ producer.send("key1", "one")
+ producer.send("key2", "two")
+ producer.send("key3", "three")
+ producer.send("key4", "four")
+
+ fetch1 = FetchRequest("test_round_robin_partitioner", 0, 0, 1024)
+ fetch2 = FetchRequest("test_round_robin_partitioner", 1, 0, 1024)
+
+ fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
+ fetch2])
+
+ self.assertEquals(fetch_resp1.error, 0)
+ self.assertEquals(fetch_resp1.highwaterMark, 2)
+ self.assertEquals(fetch_resp1.partition, 0)
+
+ messages = list(fetch_resp1.messages)
+ self.assertEquals(len(messages), 2)
+ self.assertEquals(messages[0].message.value, "one")
+ self.assertEquals(messages[1].message.value, "three")
+
+ self.assertEquals(fetch_resp2.error, 0)
+ self.assertEquals(fetch_resp2.highwaterMark, 2)
+ self.assertEquals(fetch_resp2.partition, 1)
+
+ messages = list(fetch_resp2.messages)
+ self.assertEquals(len(messages), 2)
+ self.assertEquals(messages[0].message.value, "two")
+ self.assertEquals(messages[1].message.value, "four")
+
+ producer.stop()
+
+ def test_hashed_partitioner(self):
+ producer = KeyedProducer(self.client, "test_hash_partitioner",
+ partitioner=HashedPartitioner)
+ producer.send(1, "one")
+ producer.send(2, "two")
+ producer.send(3, "three")
+ producer.send(4, "four")
+
+ fetch1 = FetchRequest("test_hash_partitioner", 0, 0, 1024)
+ fetch2 = FetchRequest("test_hash_partitioner", 1, 0, 1024)
+
+ fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
+ fetch2])
+
+ self.assertEquals(fetch_resp1.error, 0)
+ self.assertEquals(fetch_resp1.highwaterMark, 2)
+ self.assertEquals(fetch_resp1.partition, 0)
+
+ messages = list(fetch_resp1.messages)
+ self.assertEquals(len(messages), 2)
+ self.assertEquals(messages[0].message.value, "two")
+ self.assertEquals(messages[1].message.value, "four")
+
+ self.assertEquals(fetch_resp2.error, 0)
+ self.assertEquals(fetch_resp2.highwaterMark, 2)
+ self.assertEquals(fetch_resp2.partition, 1)
+
+ messages = list(fetch_resp2.messages)
+ self.assertEquals(len(messages), 2)
+ self.assertEquals(messages[0].message.value, "one")
+ self.assertEquals(messages[1].message.value, "three")
+
+ producer.stop()
+
+ def test_acks_none(self):
+ producer = SimpleProducer(self.client, "test_acks_none",
+ req_acks=SimpleProducer.ACK_NOT_REQUIRED)
+ resp = producer.send_messages("one")
+ self.assertEquals(len(resp), 0)
+
+ fetch = FetchRequest("test_acks_none", 0, 0, 1024)
+ fetch_resp = self.client.send_fetch_request([fetch])
+
+ self.assertEquals(fetch_resp[0].error, 0)
+ self.assertEquals(fetch_resp[0].highwaterMark, 1)
+ self.assertEquals(fetch_resp[0].partition, 0)
+
+ messages = list(fetch_resp[0].messages)
+ self.assertEquals(len(messages), 1)
+ self.assertEquals(messages[0].message.value, "one")
+
+ producer.stop()
+
+ def test_acks_local_write(self):
+ producer = SimpleProducer(self.client, "test_acks_local_write",
+ req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)
+ resp = producer.send_messages("one")
+ self.assertEquals(len(resp), 1)
+
+ fetch = FetchRequest("test_acks_local_write", 0, 0, 1024)
+ fetch_resp = self.client.send_fetch_request([fetch])
+
+ self.assertEquals(fetch_resp[0].error, 0)
+ self.assertEquals(fetch_resp[0].highwaterMark, 1)
+ self.assertEquals(fetch_resp[0].partition, 0)
+
+ messages = list(fetch_resp[0].messages)
+ self.assertEquals(len(messages), 1)
+ self.assertEquals(messages[0].message.value, "one")
+
+ producer.stop()
+
+ def test_acks_cluster_commit(self):
+ producer = SimpleProducer(self.client, "test_acks_cluster_commit",
+ req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT)
+ resp = producer.send_messages("one")
+ self.assertEquals(len(resp), 1)
+
+ fetch = FetchRequest("test_acks_cluster_commit", 0, 0, 1024)
+ fetch_resp = self.client.send_fetch_request([fetch])
+
+ self.assertEquals(fetch_resp[0].error, 0)
+ self.assertEquals(fetch_resp[0].highwaterMark, 1)
+ self.assertEquals(fetch_resp[0].partition, 0)
+
+ messages = list(fetch_resp[0].messages)
+ self.assertEquals(len(messages), 1)
+ self.assertEquals(messages[0].message.value, "one")
+
+ producer.stop()
+
+ def test_async_simple_producer(self):
+ producer = SimpleProducer(self.client, "test_async_simple_producer",
+ async=True)
+
+ resp = producer.send_messages("one")
+ self.assertEquals(len(resp), 0)
+
+ # Give it some time
+ time.sleep(2)
+
+ fetch = FetchRequest("test_async_simple_producer", 0, 0, 1024)
+ fetch_resp = self.client.send_fetch_request([fetch])
+
+ self.assertEquals(fetch_resp[0].error, 0)
+ self.assertEquals(fetch_resp[0].highwaterMark, 1)
+ self.assertEquals(fetch_resp[0].partition, 0)
+
+ messages = list(fetch_resp[0].messages)
+ self.assertEquals(len(messages), 1)
+ self.assertEquals(messages[0].message.value, "one")
+
+ producer.stop()
+
+ def test_async_keyed_producer(self):
+ producer = KeyedProducer(self.client, "test_async_keyed_producer",
+ async=True)
+
+ resp = producer.send("key1", "one")
+ self.assertEquals(len(resp), 0)
+
+ # Give it some time
+ time.sleep(2)
+
+ fetch = FetchRequest("test_async_keyed_producer", 0, 0, 1024)
+ fetch_resp = self.client.send_fetch_request([fetch])
+
+ self.assertEquals(fetch_resp[0].error, 0)
+ self.assertEquals(fetch_resp[0].highwaterMark, 1)
+ self.assertEquals(fetch_resp[0].partition, 0)
+
+ messages = list(fetch_resp[0].messages)
+ self.assertEquals(len(messages), 1)
+ self.assertEquals(messages[0].message.value, "one")
+
+ producer.stop()
+
+ def test_batched_simple_producer(self):
+ producer = SimpleProducer(self.client, "test_batched_simple_producer",
+ batch_send=True,
+ batch_send_every_n=10,
+ batch_send_every_t=20)
+
+ # Send 5 messages and do a fetch
+ msgs = ["message-%d" % i for i in range(0, 5)]
+ resp = producer.send_messages(*msgs)
+
+ # Batch mode is async. No ack
+ self.assertEquals(len(resp), 0)
+
+ # Give it some time
+ time.sleep(2)
+
+ fetch1 = FetchRequest("test_batched_simple_producer", 0, 0, 1024)
+ fetch2 = FetchRequest("test_batched_simple_producer", 1, 0, 1024)
+ fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
+ fetch2])
+
+ self.assertEquals(fetch_resp1.error, 0)
+ messages = list(fetch_resp1.messages)
+ self.assertEquals(len(messages), 0)
+
+ self.assertEquals(fetch_resp2.error, 0)
+ messages = list(fetch_resp2.messages)
+ self.assertEquals(len(messages), 0)
+
+ # Send 5 more messages, wait for 2 seconds and do a fetch
+ msgs = ["message-%d" % i for i in range(5, 10)]
+ resp = producer.send_messages(*msgs)
+
+ # Give it some time
+ time.sleep(2)
+
+ fetch1 = FetchRequest("test_batched_simple_producer", 0, 0, 1024)
+ fetch2 = FetchRequest("test_batched_simple_producer", 1, 0, 1024)
+ fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
+ fetch2])
+
+ self.assertEquals(fetch_resp1.error, 0)
+ messages = list(fetch_resp1.messages)
+ self.assertEquals(len(messages), 5)
+
+ self.assertEquals(fetch_resp2.error, 0)
+ messages = list(fetch_resp2.messages)
+ self.assertEquals(len(messages), 5)
+
+ # Send 7 messages and wait for 20 seconds
+ msgs = ["message-%d" % i for i in range(10, 15)]
+ resp = producer.send_messages(*msgs)
+ msgs = ["message-%d" % i for i in range(15, 17)]
+ resp = producer.send_messages(*msgs)
+
+ fetch1 = FetchRequest("test_batched_simple_producer", 0, 5, 1024)
+ fetch2 = FetchRequest("test_batched_simple_producer", 1, 5, 1024)
+ fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
+ fetch2])
+
+ self.assertEquals(fetch_resp1.error, 0)
+ self.assertEquals(fetch_resp2.error, 0)
+ messages = list(fetch_resp1.messages) + list(fetch_resp2.messages)
+ self.assertEquals(len(messages), 0)
+
+ # Give it some time
+ time.sleep(22)
+
+ fetch1 = FetchRequest("test_batched_simple_producer", 0, 5, 1024)
+ fetch2 = FetchRequest("test_batched_simple_producer", 1, 5, 1024)
+ fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
+ fetch2])
+
+ self.assertEquals(fetch_resp1.error, 0)
+ self.assertEquals(fetch_resp2.error, 0)
+ messages = list(fetch_resp1.messages) + list(fetch_resp2.messages)
+ self.assertEquals(len(messages), 7)
+
+ producer.stop()
+
class TestSimpleConsumer(unittest.TestCase):
@classmethod