summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py31
1 files changed, 24 insertions, 7 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index a1d9515..b1d1a59 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -1,3 +1,4 @@
+import os
import unittest
from datetime import datetime
@@ -7,10 +8,12 @@ from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
from fixtures import ZookeeperFixture, KafkaFixture
from testutil import *
-@unittest.skipIf(skip_integration(), 'Skipping Integration')
class TestConsumerIntegration(KafkaIntegrationTestCase):
@classmethod
def setUpClass(cls):
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
cls.zk = ZookeeperFixture.instance()
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
@@ -19,6 +22,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
@classmethod
def tearDownClass(cls): # noqa
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
cls.server1.close()
cls.server2.close()
cls.zk.close()
@@ -38,6 +44,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Make sure there are no duplicates
self.assertEquals(len(set(messages)), num_messages)
+ @kafka_versions("all")
def test_simple_consumer(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -51,6 +58,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions("all")
def test_simple_consumer__seek(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -69,6 +77,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions("all")
def test_simple_consumer_blocking(self):
consumer = SimpleConsumer(self.client, "group1",
self.topic,
@@ -96,6 +105,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions("all")
def test_simple_consumer_pending(self):
# Produce 10 messages to partitions 0 and 1
self.send_messages(0, range(0, 10))
@@ -110,6 +120,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions("all")
def test_multi_process_consumer(self):
# Produce 100 messages to partitions 0 and 1
self.send_messages(0, range(0, 100))
@@ -121,6 +132,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions("all")
def test_multi_process_consumer_blocking(self):
consumer = MultiProcessConsumer(self.client, "grp1", self.topic, auto_commit=False)
@@ -148,6 +160,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions("all")
def test_multi_proc_pending(self):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
@@ -160,6 +173,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions("all")
def test_large_messages(self):
# Produce 10 "normal" size messages
small_messages = self.send_messages(0, [ str(x) for x in range(10) ])
@@ -177,6 +191,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions("all")
def test_huge_messages(self):
huge_message, = self.send_messages(0, [
create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)),
@@ -213,23 +228,25 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
msgs2 = self.send_messages(1, range(100, 200))
# Start a consumer
- consumer = SimpleConsumer(self.client, "group1",
+ consumer1 = SimpleConsumer(self.client, "group1",
self.topic, auto_commit=True,
+ auto_commit_every_t=600,
auto_commit_every_n=20,
iter_timeout=0)
# Grab the first 195 messages
- output_msgs1 = [ consumer.get_message().message.value for _ in xrange(195) ]
+ output_msgs1 = [ consumer1.get_message().message.value for _ in xrange(195) ]
self.assert_message_count(output_msgs1, 195)
- consumer.stop()
# The offset should be at 180
- consumer = SimpleConsumer(self.client, "group1",
+ consumer2 = SimpleConsumer(self.client, "group1",
self.topic, auto_commit=True,
+ auto_commit_every_t=600,
auto_commit_every_n=20,
iter_timeout=0)
# 180-200
- self.assert_message_count([ message for message in consumer ], 20)
+ self.assert_message_count([ message for message in consumer2 ], 20)
- consumer.stop()
+ consumer1.stop()
+ consumer2.stop()