diff options
Diffstat (limited to 'test/test_producer_integration.py')
-rw-r--r-- | test/test_producer_integration.py | 21 |
1 files changed, 1 insertions, 20 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index c99ed63..34963d3 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -38,7 +38,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): cls.server.close() cls.zk.close() - @kafka_versions("all") def test_produce_many_simple(self): start_offset = self.current_offset(self.topic, 0) @@ -56,7 +55,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): 100, ) - @kafka_versions("all") def test_produce_10k_simple(self): start_offset = self.current_offset(self.topic, 0) @@ -67,7 +65,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): 10000, ) - @kafka_versions("all") def test_produce_many_gzip(self): start_offset = self.current_offset(self.topic, 0) @@ -82,7 +79,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): 200, ) - @kafka_versions("all") def test_produce_many_snappy(self): self.skipTest("All snappy integration tests fail with nosnappyjava") start_offset = self.current_offset(self.topic, 0) @@ -95,7 +91,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): 200, ) - @kafka_versions("all") def test_produce_mixed(self): start_offset = self.current_offset(self.topic, 0) @@ -113,7 +108,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assert_produce_request(messages, start_offset, msg_count) - @kafka_versions("all") def test_produce_100k_gzipped(self): start_offset = self.current_offset(self.topic, 0) @@ -139,7 +133,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # SimpleProducer Tests # ############################ - @kafka_versions("all") def test_simple_producer(self): partitions = self.client.get_partition_ids_for_topic(self.topic) start_offsets = [self.current_offset(self.topic, p) for p in partitions] @@ -164,7 +157,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() - @kafka_versions("all") def test_produce__new_topic_fails_with_reasonable_error(self): new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())).encode('utf-8') producer = SimpleProducer(self.client, random_start=False) @@ -174,7 +166,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): LeaderNotAvailableError)): producer.send_messages(new_topic, self.msg("one")) - @kafka_versions("all") def test_producer_random_order(self): producer = SimpleProducer(self.client, random_start=True) resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) @@ -184,7 +175,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assertEqual(resp1[0].partition, resp3[0].partition) self.assertNotEqual(resp1[0].partition, resp2[0].partition) - @kafka_versions("all") def test_producer_ordered_start(self): producer = SimpleProducer(self.client, random_start=False) resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) @@ -195,7 +185,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assertEqual(resp2[0].partition, 1) self.assertEqual(resp3[0].partition, 0) - @kafka_versions("all") def test_async_simple_producer(self): partition = self.client.get_partition_ids_for_topic(self.topic)[0] start_offset = self.current_offset(self.topic, partition) @@ -210,7 +199,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) - @kafka_versions("all") def test_batched_simple_producer__triggers_by_message(self): partitions = self.client.get_partition_ids_for_topic(self.topic) start_offsets = [self.current_offset(self.topic, p) for p in partitions] @@ -278,7 +266,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() - @kafka_versions("all") def test_batched_simple_producer__triggers_by_time(self): partitions = self.client.get_partition_ids_for_topic(self.topic) start_offsets = [self.current_offset(self.topic, p) for p in partitions] @@ -339,7 +326,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # KeyedProducer Tests # ############################ - @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") + @kafka_versions('>=0.8.1') def test_keyedproducer_null_payload(self): partitions = self.client.get_partition_ids_for_topic(self.topic) start_offsets = [self.current_offset(self.topic, p) for p in partitions] @@ -361,7 +348,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() - @kafka_versions("all") def test_round_robin_partitioner(self): partitions = self.client.get_partition_ids_for_topic(self.topic) start_offsets = [self.current_offset(self.topic, p) for p in partitions] @@ -382,7 +368,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() - @kafka_versions("all") def test_hashed_partitioner(self): partitions = self.client.get_partition_ids_for_topic(self.topic) start_offsets = [self.current_offset(self.topic, p) for p in partitions] @@ -414,7 +399,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() - @kafka_versions("all") def test_async_keyed_producer(self): partition = self.client.get_partition_ids_for_topic(self.topic)[0] start_offset = self.current_offset(self.topic, partition) @@ -436,7 +420,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # Producer ACK Tests # ############################ - @kafka_versions("all") def test_acks_none(self): partition = self.client.get_partition_ids_for_topic(self.topic)[0] start_offset = self.current_offset(self.topic, partition) @@ -454,7 +437,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) producer.stop() - @kafka_versions("all") def test_acks_local_write(self): partition = self.client.get_partition_ids_for_topic(self.topic)[0] start_offset = self.current_offset(self.topic, partition) @@ -470,7 +452,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() - @kafka_versions("all") def test_acks_cluster_commit(self): partition = self.client.get_partition_ids_for_topic(self.topic)[0] start_offset = self.current_offset(self.topic, partition) |