summaryrefslogtreecommitdiff
path: root/test/test_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_integration.py')
-rw-r--r--test/test_integration.py129
1 files changed, 120 insertions, 9 deletions
diff --git a/test/test_integration.py b/test/test_integration.py
index 6d96b6a..3c524cf 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -1,6 +1,7 @@
import logging
import unittest
import time
+from datetime import datetime
from kafka import * # noqa
from kafka.common import * # noqa
@@ -544,7 +545,7 @@ class TestKafkaClient(unittest.TestCase):
producer.stop()
-class TestSimpleConsumer(unittest.TestCase):
+class TestConsumer(unittest.TestCase):
@classmethod
def setUpClass(cls): # noqa
cls.zk = ZookeeperFixture.instance()
@@ -559,9 +560,9 @@ class TestSimpleConsumer(unittest.TestCase):
cls.server2.close()
cls.zk.close()
- def test_consumer(self):
+ def test_simple_consumer(self):
# Produce 100 messages to partition 0
- produce1 = ProduceRequest("test_consumer", 0, messages=[
+ produce1 = ProduceRequest("test_simple_consumer", 0, messages=[
create_message("Test message 0 %d" % i) for i in range(100)
])
@@ -570,7 +571,7 @@ class TestSimpleConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 0)
# Produce 100 messages to partition 1
- produce2 = ProduceRequest("test_consumer", 1, messages=[
+ produce2 = ProduceRequest("test_simple_consumer", 1, messages=[
create_message("Test message 1 %d" % i) for i in range(100)
])
@@ -579,7 +580,7 @@ class TestSimpleConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 0)
# Start a consumer
- consumer = SimpleConsumer(self.client, "group1", "test_consumer")
+ consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer")
all_messages = []
for message in consumer:
all_messages.append(message)
@@ -602,29 +603,139 @@ class TestSimpleConsumer(unittest.TestCase):
self.assertEquals(len(all_messages), 13)
+ # Blocking API
+ start = datetime.now()
+ messages = consumer.get_messages(block=True, timeout=5)
+ diff = (datetime.now() - start).total_seconds()
+ self.assertGreaterEqual(diff, 5)
+ self.assertEqual(len(messages), 0)
+
+ # Send 10 messages
+ produce = ProduceRequest("test_simple_consumer", 0, messages=[
+ create_message("Test message 0 %d" % i) for i in range(10)
+ ])
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 100)
+
+ # Fetch 5 messages
+ messages = consumer.get_messages(count=5, block=True, timeout=5)
+ self.assertEqual(len(messages), 5)
+
+ # Fetch 10 messages
+ start = datetime.now()
+ messages = consumer.get_messages(count=10, block=True, timeout=5)
+ self.assertEqual(len(messages), 5)
+ diff = (datetime.now() - start).total_seconds()
+ self.assertGreaterEqual(diff, 5)
+
consumer.stop()
- def test_pending(self):
+ def test_simple_consumer_pending(self):
# Produce 10 messages to partition 0 and 1
- produce1 = ProduceRequest("test_pending", 0, messages=[
+ produce1 = ProduceRequest("test_simple_pending", 0, messages=[
+ create_message("Test message 0 %d" % i) for i in range(10)
+ ])
+ for resp in self.client.send_produce_request([produce1]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ produce2 = ProduceRequest("test_simple_pending", 1, messages=[
+ create_message("Test message 1 %d" % i) for i in range(10)
+ ])
+ for resp in self.client.send_produce_request([produce2]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ consumer = SimpleConsumer(self.client, "group1", "test_simple_pending")
+ self.assertEquals(consumer.pending(), 20)
+ self.assertEquals(consumer.pending(partitions=[0]), 10)
+ self.assertEquals(consumer.pending(partitions=[1]), 10)
+ consumer.stop()
+
+ def test_multi_process_consumer(self):
+ # Produce 100 messages to partition 0
+ produce1 = ProduceRequest("test_mpconsumer", 0, messages=[
+ create_message("Test message 0 %d" % i) for i in range(100)
+ ])
+
+ for resp in self.client.send_produce_request([produce1]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ # Produce 100 messages to partition 1
+ produce2 = ProduceRequest("test_mpconsumer", 1, messages=[
+ create_message("Test message 1 %d" % i) for i in range(100)
+ ])
+
+ for resp in self.client.send_produce_request([produce2]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ # Start a consumer
+ consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer")
+ all_messages = []
+ for message in consumer:
+ all_messages.append(message)
+
+ self.assertEquals(len(all_messages), 200)
+ # Make sure there are no duplicates
+ self.assertEquals(len(all_messages), len(set(all_messages)))
+
+ # Blocking API
+ start = datetime.now()
+ messages = consumer.get_messages(block=True, timeout=5)
+ diff = (datetime.now() - start).total_seconds()
+ self.assertGreaterEqual(diff, 5)
+ self.assertEqual(len(messages), 0)
+
+ # Send 10 messages
+ produce = ProduceRequest("test_mpconsumer", 0, messages=[
create_message("Test message 0 %d" % i) for i in range(10)
])
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 100)
+
+ # Fetch 5 messages
+ messages = consumer.get_messages(count=5, block=True, timeout=5)
+ self.assertEqual(len(messages), 5)
+
+ # Fetch 10 messages
+ start = datetime.now()
+ messages = consumer.get_messages(count=10, block=True, timeout=5)
+ self.assertEqual(len(messages), 5)
+ diff = (datetime.now() - start).total_seconds()
+ self.assertGreaterEqual(diff, 5)
+
+ consumer.stop()
+
+ def test_multi_proc_pending(self):
+ # Produce 10 messages to partition 0 and 1
+ produce1 = ProduceRequest("test_mppending", 0, messages=[
+ create_message("Test message 0 %d" % i) for i in range(10)
+ ])
+
for resp in self.client.send_produce_request([produce1]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- produce2 = ProduceRequest("test_pending", 1, messages=[
+ produce2 = ProduceRequest("test_mppending", 1, messages=[
create_message("Test message 1 %d" % i) for i in range(10)
])
+
for resp in self.client.send_produce_request([produce2]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- consumer = SimpleConsumer(self.client, "group1", "test_pending")
+ consumer = MultiProcessConsumer(self.client, "group1", "test_mppending")
self.assertEquals(consumer.pending(), 20)
self.assertEquals(consumer.pending(partitions=[0]), 10)
self.assertEquals(consumer.pending(partitions=[1]), 10)
+
consumer.stop()