diff options
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r-- | test/test_consumer_integration.py | 25 |
1 files changed, 5 insertions, 20 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index fee53f5..ef9a886 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -78,7 +78,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): **configs) return consumer - @kafka_versions("all") def test_simple_consumer(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -90,7 +89,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() - @kafka_versions('all') def test_simple_consumer_smallest_offset_reset(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -102,7 +100,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # messages from beginning. self.assert_message_count([message for message in consumer], 200) - @kafka_versions('all') def test_simple_consumer_largest_offset_reset(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -120,7 +117,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # Since the offset is set to largest we should read all the new messages. self.assert_message_count([message for message in consumer], 200) - @kafka_versions('all') def test_simple_consumer_no_reset(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -132,7 +128,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): with self.assertRaises(OffsetOutOfRangeError): consumer.get_message() - @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1") + @kafka_versions('>=0.8.1') def test_simple_consumer_load_initial_offsets(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -149,7 +145,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer = self.consumer(auto_commit=False) self.assertEqual(consumer.offsets, {0: 51, 1: 101}) - @kafka_versions("all") def test_simple_consumer__seek(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -180,7 +175,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() - @kafka_versions("all") def test_simple_consumer_blocking(self): consumer = self.consumer() @@ -214,7 +208,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() - @kafka_versions("all") def test_simple_consumer_pending(self): # make sure that we start with no pending messages consumer = self.consumer() @@ -242,7 +235,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertEquals(set([0, 1]), set([pending_part1, pending_part2])) consumer.stop() - @kafka_versions("all") def test_multi_process_consumer(self): # Produce 100 messages to partitions 0 and 1 self.send_messages(0, range(0, 100)) @@ -254,7 +246,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() - @kafka_versions("all") def test_multi_process_consumer_blocking(self): consumer = self.consumer(consumer = MultiProcessConsumer) @@ -292,7 +283,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() - @kafka_versions("all") def test_multi_proc_pending(self): self.send_messages(0, range(0, 10)) self.send_messages(1, range(10, 20)) @@ -308,7 +298,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() - @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1") + @kafka_versions('>=0.8.1') def test_multi_process_consumer_load_initial_offsets(self): self.send_messages(0, range(0, 10)) self.send_messages(1, range(10, 20)) @@ -326,7 +316,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): auto_commit=False) self.assertEqual(consumer.offsets, {0: 5, 1: 15}) - @kafka_versions("all") def test_large_messages(self): # Produce 10 "normal" size messages small_messages = self.send_messages(0, [ str(x) for x in range(10) ]) @@ -343,7 +332,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() - @kafka_versions("all") def test_huge_messages(self): huge_message, = self.send_messages(0, [ create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)), @@ -374,7 +362,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): big_consumer.stop() - @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1") + @kafka_versions('>=0.8.1') def test_offset_behavior__resuming_behavior(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -401,7 +389,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer1.stop() consumer2.stop() - @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1") + @kafka_versions('>=0.8.1') def test_multi_process_offset_behavior__resuming_behavior(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -437,7 +425,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer2.stop() # TODO: Make this a unit test -- should not require integration - @kafka_versions("all") def test_fetch_buffer_size(self): # Test parameters (see issue 135 / PR 136) @@ -455,7 +442,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): messages = [ message for message in consumer ] self.assertEqual(len(messages), 2) - @kafka_versions("all") def test_kafka_consumer(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -476,7 +462,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertEqual(len(messages[0]), 100) self.assertEqual(len(messages[1]), 100) - @kafka_versions("all") def test_kafka_consumer__blocking(self): TIMEOUT_MS = 500 consumer = self.kafka_consumer(auto_offset_reset='smallest', @@ -509,7 +494,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertEqual(len(messages), 5) self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 ) - @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1") + @kafka_versions('>=0.8.1') def test_kafka_consumer__offset_commit_resume(self): GROUP_ID = random_string(10).encode('utf-8') |