diff options
author | David Arthur <mumrah@gmail.com> | 2013-07-26 08:39:44 -0400 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-07-26 08:39:44 -0400 |
commit | e297a7a8c8f9de341b0e91346236455d12ae7f82 (patch) | |
tree | f4db35ab88fa39684d4ab680428adf815000eaa5 /test/test_integration.py | |
parent | 5684af438e6cf871540aa8ea8b556737f56e9798 (diff) | |
parent | 1d278f0f60cb0a7b76fbc6b80c8e112a0deb2e0c (diff) | |
download | kafka-python-e297a7a8c8f9de341b0e91346236455d12ae7f82.tar.gz |
Merge branch 'issue-35'
Conflicts:
kafka/__init__.py
kafka/consumer.py
test/test_integration.py
Diffstat (limited to 'test/test_integration.py')
-rw-r--r-- | test/test_integration.py | 129 |
1 files changed, 120 insertions, 9 deletions
diff --git a/test/test_integration.py b/test/test_integration.py index 6d96b6a..3c524cf 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -1,6 +1,7 @@ import logging import unittest import time +from datetime import datetime from kafka import * # noqa from kafka.common import * # noqa @@ -544,7 +545,7 @@ class TestKafkaClient(unittest.TestCase): producer.stop() -class TestSimpleConsumer(unittest.TestCase): +class TestConsumer(unittest.TestCase): @classmethod def setUpClass(cls): # noqa cls.zk = ZookeeperFixture.instance() @@ -559,9 +560,9 @@ class TestSimpleConsumer(unittest.TestCase): cls.server2.close() cls.zk.close() - def test_consumer(self): + def test_simple_consumer(self): # Produce 100 messages to partition 0 - produce1 = ProduceRequest("test_consumer", 0, messages=[ + produce1 = ProduceRequest("test_simple_consumer", 0, messages=[ create_message("Test message 0 %d" % i) for i in range(100) ]) @@ -570,7 +571,7 @@ class TestSimpleConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Produce 100 messages to partition 1 - produce2 = ProduceRequest("test_consumer", 1, messages=[ + produce2 = ProduceRequest("test_simple_consumer", 1, messages=[ create_message("Test message 1 %d" % i) for i in range(100) ]) @@ -579,7 +580,7 @@ class TestSimpleConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = SimpleConsumer(self.client, "group1", "test_consumer") + consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer") all_messages = [] for message in consumer: all_messages.append(message) @@ -602,29 +603,139 @@ class TestSimpleConsumer(unittest.TestCase): self.assertEquals(len(all_messages), 13) + # Blocking API + start = datetime.now() + messages = consumer.get_messages(block=True, timeout=5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 5) + self.assertEqual(len(messages), 0) + + # Send 10 messages + produce = ProduceRequest("test_simple_consumer", 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(10) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 100) + + # Fetch 5 messages + messages = consumer.get_messages(count=5, block=True, timeout=5) + self.assertEqual(len(messages), 5) + + # Fetch 10 messages + start = datetime.now() + messages = consumer.get_messages(count=10, block=True, timeout=5) + self.assertEqual(len(messages), 5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 5) + consumer.stop() - def test_pending(self): + def test_simple_consumer_pending(self): # Produce 10 messages to partition 0 and 1 - produce1 = ProduceRequest("test_pending", 0, messages=[ + produce1 = ProduceRequest("test_simple_pending", 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(10) + ]) + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + produce2 = ProduceRequest("test_simple_pending", 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(10) + ]) + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + consumer = SimpleConsumer(self.client, "group1", "test_simple_pending") + self.assertEquals(consumer.pending(), 20) + self.assertEquals(consumer.pending(partitions=[0]), 10) + self.assertEquals(consumer.pending(partitions=[1]), 10) + consumer.stop() + + def test_multi_process_consumer(self): + # Produce 100 messages to partition 0 + produce1 = ProduceRequest("test_mpconsumer", 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # Produce 100 messages to partition 1 + produce2 = ProduceRequest("test_mpconsumer", 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # Start a consumer + consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer") + all_messages = [] + for message in consumer: + all_messages.append(message) + + self.assertEquals(len(all_messages), 200) + # Make sure there are no duplicates + self.assertEquals(len(all_messages), len(set(all_messages))) + + # Blocking API + start = datetime.now() + messages = consumer.get_messages(block=True, timeout=5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 5) + self.assertEqual(len(messages), 0) + + # Send 10 messages + produce = ProduceRequest("test_mpconsumer", 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 100) + + # Fetch 5 messages + messages = consumer.get_messages(count=5, block=True, timeout=5) + self.assertEqual(len(messages), 5) + + # Fetch 10 messages + start = datetime.now() + messages = consumer.get_messages(count=10, block=True, timeout=5) + self.assertEqual(len(messages), 5) + diff = (datetime.now() - start).total_seconds() + self.assertGreaterEqual(diff, 5) + + consumer.stop() + + def test_multi_proc_pending(self): + # Produce 10 messages to partition 0 and 1 + produce1 = ProduceRequest("test_mppending", 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(10) + ]) + for resp in self.client.send_produce_request([produce1]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - produce2 = ProduceRequest("test_pending", 1, messages=[ + produce2 = ProduceRequest("test_mppending", 1, messages=[ create_message("Test message 1 %d" % i) for i in range(10) ]) + for resp in self.client.send_produce_request([produce2]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = SimpleConsumer(self.client, "group1", "test_pending") + consumer = MultiProcessConsumer(self.client, "group1", "test_mppending") self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) + consumer.stop() |