summaryrefslogtreecommitdiff
path: root/test/test_producer_integration.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-10 12:21:37 -0800
committerDana Powers <dana.powers@rd.io>2015-12-10 13:31:59 -0800
commit4bd20aad2b7a0710458545009328f8729c89fee1 (patch)
treeeab1c86b1e63418b8d7f7fe13e66d354c3178082 /test/test_producer_integration.py
parent1856063f4e0c36a8ec6266358d82432adf879170 (diff)
downloadkafka-python-kafka_version_tests.tar.gz
Refactor kafka_versions to support arbitrary operators (> >= < <= ! =)kafka_version_tests
Diffstat (limited to 'test/test_producer_integration.py')
-rw-r--r--test/test_producer_integration.py21
1 files changed, 1 insertions, 20 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index c99ed63..34963d3 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -38,7 +38,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
cls.server.close()
cls.zk.close()
- @kafka_versions("all")
def test_produce_many_simple(self):
start_offset = self.current_offset(self.topic, 0)
@@ -56,7 +55,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
100,
)
- @kafka_versions("all")
def test_produce_10k_simple(self):
start_offset = self.current_offset(self.topic, 0)
@@ -67,7 +65,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
10000,
)
- @kafka_versions("all")
def test_produce_many_gzip(self):
start_offset = self.current_offset(self.topic, 0)
@@ -82,7 +79,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
200,
)
- @kafka_versions("all")
def test_produce_many_snappy(self):
self.skipTest("All snappy integration tests fail with nosnappyjava")
start_offset = self.current_offset(self.topic, 0)
@@ -95,7 +91,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
200,
)
- @kafka_versions("all")
def test_produce_mixed(self):
start_offset = self.current_offset(self.topic, 0)
@@ -113,7 +108,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assert_produce_request(messages, start_offset, msg_count)
- @kafka_versions("all")
def test_produce_100k_gzipped(self):
start_offset = self.current_offset(self.topic, 0)
@@ -139,7 +133,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# SimpleProducer Tests #
############################
- @kafka_versions("all")
def test_simple_producer(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
@@ -164,7 +157,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
- @kafka_versions("all")
def test_produce__new_topic_fails_with_reasonable_error(self):
new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())).encode('utf-8')
producer = SimpleProducer(self.client, random_start=False)
@@ -174,7 +166,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
LeaderNotAvailableError)):
producer.send_messages(new_topic, self.msg("one"))
- @kafka_versions("all")
def test_producer_random_order(self):
producer = SimpleProducer(self.client, random_start=True)
resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
@@ -184,7 +175,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assertEqual(resp1[0].partition, resp3[0].partition)
self.assertNotEqual(resp1[0].partition, resp2[0].partition)
- @kafka_versions("all")
def test_producer_ordered_start(self):
producer = SimpleProducer(self.client, random_start=False)
resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
@@ -195,7 +185,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assertEqual(resp2[0].partition, 1)
self.assertEqual(resp3[0].partition, 0)
- @kafka_versions("all")
def test_async_simple_producer(self):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)
@@ -210,7 +199,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
- @kafka_versions("all")
def test_batched_simple_producer__triggers_by_message(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
@@ -278,7 +266,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
- @kafka_versions("all")
def test_batched_simple_producer__triggers_by_time(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
@@ -339,7 +326,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# KeyedProducer Tests #
############################
- @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+ @kafka_versions('>=0.8.1')
def test_keyedproducer_null_payload(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
@@ -361,7 +348,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
- @kafka_versions("all")
def test_round_robin_partitioner(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
@@ -382,7 +368,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
- @kafka_versions("all")
def test_hashed_partitioner(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
@@ -414,7 +399,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
- @kafka_versions("all")
def test_async_keyed_producer(self):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)
@@ -436,7 +420,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# Producer ACK Tests #
############################
- @kafka_versions("all")
def test_acks_none(self):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)
@@ -454,7 +437,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
producer.stop()
- @kafka_versions("all")
def test_acks_local_write(self):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)
@@ -470,7 +452,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
- @kafka_versions("all")
def test_acks_cluster_commit(self):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)