summaryrefslogtreecommitdiff
path: root/test/test_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_integration.py')
-rw-r--r--test/test_integration.py129
1 files changed, 120 insertions, 9 deletions
diff --git a/test/test_integration.py b/test/test_integration.py
index d607b73..7908a34 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -1,5 +1,6 @@
import logging
import unittest
+from datetime import datetime
from kafka import * # noqa
from kafka.common import * # noqa
@@ -273,7 +274,7 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(messages[0].message.value, "three")
-class TestSimpleConsumer(unittest.TestCase):
+class TestConsumer(unittest.TestCase):
@classmethod
def setUpClass(cls): # noqa
cls.zk = ZookeeperFixture.instance()
@@ -288,9 +289,9 @@ class TestSimpleConsumer(unittest.TestCase):
cls.server2.close()
cls.zk.close()
- def test_consumer(self):
+ def test_simple_consumer(self):
# Produce 100 messages to partition 0
- produce1 = ProduceRequest("test_consumer", 0, messages=[
+ produce1 = ProduceRequest("test_simple_consumer", 0, messages=[
create_message("Test message 0 %d" % i) for i in range(100)
])
@@ -299,7 +300,7 @@ class TestSimpleConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 0)
# Produce 100 messages to partition 1
- produce2 = ProduceRequest("test_consumer", 1, messages=[
+ produce2 = ProduceRequest("test_simple_consumer", 1, messages=[
create_message("Test message 1 %d" % i) for i in range(100)
])
@@ -308,7 +309,7 @@ class TestSimpleConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 0)
# Start a consumer
- consumer = SimpleConsumer(self.client, "group1", "test_consumer")
+ consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer")
all_messages = []
for message in consumer:
all_messages.append(message)
@@ -331,29 +332,139 @@ class TestSimpleConsumer(unittest.TestCase):
self.assertEquals(len(all_messages), 13)
+ # Blocking API
+ start = datetime.now()
+ messages = consumer.get_messages(block=True, timeout=5)
+ diff = (datetime.now() - start).total_seconds()
+ self.assertGreaterEqual(diff, 5)
+ self.assertEqual(len(messages), 0)
+
+ # Send 10 messages
+ produce = ProduceRequest("test_simple_consumer", 0, messages=[
+ create_message("Test message 0 %d" % i) for i in range(10)
+ ])
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 100)
+
+ # Fetch 5 messages
+ messages = consumer.get_messages(count=5, block=True, timeout=5)
+ self.assertEqual(len(messages), 5)
+
+ # Fetch 10 messages
+ start = datetime.now()
+ messages = consumer.get_messages(count=10, block=True, timeout=5)
+ self.assertEqual(len(messages), 5)
+ diff = (datetime.now() - start).total_seconds()
+ self.assertGreaterEqual(diff, 5)
+
consumer.stop()
- def test_pending(self):
+ def test_simple_consumer_pending(self):
# Produce 10 messages to partition 0 and 1
- produce1 = ProduceRequest("test_pending", 0, messages=[
+ produce1 = ProduceRequest("test_simple_pending", 0, messages=[
+ create_message("Test message 0 %d" % i) for i in range(10)
+ ])
+ for resp in self.client.send_produce_request([produce1]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ produce2 = ProduceRequest("test_simple_pending", 1, messages=[
+ create_message("Test message 1 %d" % i) for i in range(10)
+ ])
+ for resp in self.client.send_produce_request([produce2]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ consumer = SimpleConsumer(self.client, "group1", "test_simple_pending")
+ self.assertEquals(consumer.pending(), 20)
+ self.assertEquals(consumer.pending(partitions=[0]), 10)
+ self.assertEquals(consumer.pending(partitions=[1]), 10)
+ consumer.stop()
+
+ def test_multi_process_consumer(self):
+ # Produce 100 messages to partition 0
+ produce1 = ProduceRequest("test_mpconsumer", 0, messages=[
+ create_message("Test message 0 %d" % i) for i in range(100)
+ ])
+
+ for resp in self.client.send_produce_request([produce1]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ # Produce 100 messages to partition 1
+ produce2 = ProduceRequest("test_mpconsumer", 1, messages=[
+ create_message("Test message 1 %d" % i) for i in range(100)
+ ])
+
+ for resp in self.client.send_produce_request([produce2]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ # Start a consumer
+ consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer")
+ all_messages = []
+ for message in consumer:
+ all_messages.append(message)
+
+ self.assertEquals(len(all_messages), 200)
+ # Make sure there are no duplicates
+ self.assertEquals(len(all_messages), len(set(all_messages)))
+
+ # Blocking API
+ start = datetime.now()
+ messages = consumer.get_messages(block=True, timeout=5)
+ diff = (datetime.now() - start).total_seconds()
+ self.assertGreaterEqual(diff, 5)
+ self.assertEqual(len(messages), 0)
+
+ # Send 10 messages
+ produce = ProduceRequest("test_mpconsumer", 0, messages=[
create_message("Test message 0 %d" % i) for i in range(10)
])
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 100)
+
+ # Fetch 5 messages
+ messages = consumer.get_messages(count=5, block=True, timeout=5)
+ self.assertEqual(len(messages), 5)
+
+ # Fetch 10 messages
+ start = datetime.now()
+ messages = consumer.get_messages(count=10, block=True, timeout=5)
+ self.assertEqual(len(messages), 5)
+ diff = (datetime.now() - start).total_seconds()
+ self.assertGreaterEqual(diff, 5)
+
+ consumer.stop()
+
+ def test_multi_proc_pending(self):
+ # Produce 10 messages to partition 0 and 1
+ produce1 = ProduceRequest("test_mppending", 0, messages=[
+ create_message("Test message 0 %d" % i) for i in range(10)
+ ])
+
for resp in self.client.send_produce_request([produce1]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- produce2 = ProduceRequest("test_pending", 1, messages=[
+ produce2 = ProduceRequest("test_mppending", 1, messages=[
create_message("Test message 1 %d" % i) for i in range(10)
])
+
for resp in self.client.send_produce_request([produce2]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- consumer = SimpleConsumer(self.client, "group1", "test_pending")
+ consumer = MultiProcessConsumer(self.client, "group1", "test_mppending")
self.assertEquals(consumer.pending(), 20)
self.assertEquals(consumer.pending(partitions=[0]), 10)
self.assertEquals(consumer.pending(partitions=[1]), 10)
+
consumer.stop()