diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-06-27 17:47:47 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-06-27 17:47:47 +0530 |
commit | 36b5f8154304a7fef437795250885230dff835b1 (patch) | |
tree | c3bd8ca4d4f36db0475ef3ab90becbebf3685ae6 /test/test_integration.py | |
parent | b3fece508dff6a4fe8c31bc7c2282c114676646b (diff) | |
download | kafka-python-36b5f8154304a7fef437795250885230dff835b1.tar.gz |
Test cases for multi-process consumer and blocking APIs
Diffstat (limited to 'test/test_integration.py')
-rw-r--r-- | test/test_integration.py | 129 |
1 files changed, 120 insertions, 9 deletions
diff --git a/test/test_integration.py b/test/test_integration.py index d607b73..7908a34 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -1,5 +1,6 @@ import logging import unittest +from datetime import datetime from kafka import * # noqa from kafka.common import * # noqa @@ -273,7 +274,7 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(messages[0].message.value, "three") -class TestSimpleConsumer(unittest.TestCase): +class TestConsumer(unittest.TestCase): @classmethod def setUpClass(cls): # noqa cls.zk = ZookeeperFixture.instance() @@ -288,9 +289,9 @@ class TestSimpleConsumer(unittest.TestCase): cls.server2.close() cls.zk.close() - def test_consumer(self): + def test_simple_consumer(self): # Produce 100 messages to partition 0 - produce1 = ProduceRequest("test_consumer", 0, messages=[ + produce1 = ProduceRequest("test_simple_consumer", 0, messages=[ create_message("Test message 0 %d" % i) for i in range(100) ]) @@ -299,7 +300,7 @@ class TestSimpleConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Produce 100 messages to partition 1 - produce2 = ProduceRequest("test_consumer", 1, messages=[ + produce2 = ProduceRequest("test_simple_consumer", 1, messages=[ create_message("Test message 1 %d" % i) for i in range(100) ]) @@ -308,7 +309,7 @@ class TestSimpleConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = SimpleConsumer(self.client, "group1", "test_consumer") + consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer") all_messages = [] for message in consumer: all_messages.append(message) @@ -331,29 +332,139 @@ class TestSimpleConsumer(unittest.TestCase): self.assertEquals(len(all_messages), 13) + # Blocking API + start = datetime.now() + messages = consumer.get_messages(block=True, timeout=5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 5) + self.assertEqual(len(messages), 0) + + # Send 10 messages + produce = ProduceRequest("test_simple_consumer", 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(10) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 100) + + # Fetch 5 messages + messages = consumer.get_messages(count=5, block=True, timeout=5) + self.assertEqual(len(messages), 5) + + # Fetch 10 messages + start = datetime.now() + messages = consumer.get_messages(count=10, block=True, timeout=5) + self.assertEqual(len(messages), 5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 5) + consumer.stop() - def test_pending(self): + def test_simple_consumer_pending(self): # Produce 10 messages to partition 0 and 1 - produce1 = ProduceRequest("test_pending", 0, messages=[ + produce1 = ProduceRequest("test_simple_pending", 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(10) + ]) + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + produce2 = ProduceRequest("test_simple_pending", 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(10) + ]) + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + consumer = SimpleConsumer(self.client, "group1", "test_simple_pending") + self.assertEquals(consumer.pending(), 20) + self.assertEquals(consumer.pending(partitions=[0]), 10) + self.assertEquals(consumer.pending(partitions=[1]), 10) + consumer.stop() + + def test_multi_process_consumer(self): + # Produce 100 messages to partition 0 + produce1 = ProduceRequest("test_mpconsumer", 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # Produce 100 messages to partition 1 + produce2 = ProduceRequest("test_mpconsumer", 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # Start a consumer + consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer") + all_messages = [] + for message in consumer: + all_messages.append(message) + + self.assertEquals(len(all_messages), 200) + # Make sure there are no duplicates + self.assertEquals(len(all_messages), len(set(all_messages))) + + # Blocking API + start = datetime.now() + messages = consumer.get_messages(block=True, timeout=5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 5) + self.assertEqual(len(messages), 0) + + # Send 10 messages + produce = ProduceRequest("test_mpconsumer", 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 100) + + # Fetch 5 messages + messages = consumer.get_messages(count=5, block=True, timeout=5) + self.assertEqual(len(messages), 5) + + # Fetch 10 messages + start = datetime.now() + messages = consumer.get_messages(count=10, block=True, timeout=5) + self.assertEqual(len(messages), 5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 5) + + consumer.stop() + + def test_multi_proc_pending(self): + # Produce 10 messages to partition 0 and 1 + produce1 = ProduceRequest("test_mppending", 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(10) + ]) + for resp in self.client.send_produce_request([produce1]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - produce2 = ProduceRequest("test_pending", 1, messages=[ + produce2 = ProduceRequest("test_mppending", 1, messages=[ create_message("Test message 1 %d" % i) for i in range(10) ]) + for resp in self.client.send_produce_request([produce2]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = SimpleConsumer(self.client, "group1", "test_pending") + consumer = MultiProcessConsumer(self.client, "group1", "test_mppending") self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) + consumer.stop() |