diff options
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r-- | test/test_consumer_integration.py | 31 |
1 files changed, 24 insertions, 7 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index a1d9515..b1d1a59 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -1,3 +1,4 @@ +import os import unittest from datetime import datetime @@ -7,10 +8,12 @@ from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES from fixtures import ZookeeperFixture, KafkaFixture from testutil import * -@unittest.skipIf(skip_integration(), 'Skipping Integration') class TestConsumerIntegration(KafkaIntegrationTestCase): @classmethod def setUpClass(cls): + if not os.environ.get('KAFKA_VERSION'): + return + cls.zk = ZookeeperFixture.instance() cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) @@ -19,6 +22,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): @classmethod def tearDownClass(cls): # noqa + if not os.environ.get('KAFKA_VERSION'): + return + cls.server1.close() cls.server2.close() cls.zk.close() @@ -38,6 +44,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # Make sure there are no duplicates self.assertEquals(len(set(messages)), num_messages) + @kafka_versions("all") def test_simple_consumer(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -51,6 +58,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions("all") def test_simple_consumer__seek(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -69,6 +77,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions("all") def test_simple_consumer_blocking(self): consumer = SimpleConsumer(self.client, "group1", self.topic, @@ -96,6 +105,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions("all") def test_simple_consumer_pending(self): # Produce 10 messages to partitions 0 and 1 self.send_messages(0, range(0, 10)) @@ -110,6 +120,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions("all") def test_multi_process_consumer(self): # Produce 100 messages to partitions 0 and 1 self.send_messages(0, range(0, 100)) @@ -121,6 +132,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions("all") def test_multi_process_consumer_blocking(self): consumer = MultiProcessConsumer(self.client, "grp1", self.topic, auto_commit=False) @@ -148,6 +160,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions("all") def test_multi_proc_pending(self): self.send_messages(0, range(0, 10)) self.send_messages(1, range(10, 20)) @@ -160,6 +173,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions("all") def test_large_messages(self): # Produce 10 "normal" size messages small_messages = self.send_messages(0, [ str(x) for x in range(10) ]) @@ -177,6 +191,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions("all") def test_huge_messages(self): huge_message, = self.send_messages(0, [ create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)), @@ -213,23 +228,25 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): msgs2 = self.send_messages(1, range(100, 200)) # Start a consumer - consumer = SimpleConsumer(self.client, "group1", + consumer1 = SimpleConsumer(self.client, "group1", self.topic, auto_commit=True, + auto_commit_every_t=600, auto_commit_every_n=20, iter_timeout=0) # Grab the first 195 messages - output_msgs1 = [ consumer.get_message().message.value for _ in xrange(195) ] + output_msgs1 = [ consumer1.get_message().message.value for _ in xrange(195) ] self.assert_message_count(output_msgs1, 195) - consumer.stop() # The offset should be at 180 - consumer = SimpleConsumer(self.client, "group1", + consumer2 = SimpleConsumer(self.client, "group1", self.topic, auto_commit=True, + auto_commit_every_t=600, auto_commit_every_n=20, iter_timeout=0) # 180-200 - self.assert_message_count([ message for message in consumer ], 20) + self.assert_message_count([ message for message in consumer2 ], 20) - consumer.stop() + consumer1.stop() + consumer2.stop() |