diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-10 12:21:37 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-10 13:31:59 -0800 |
commit | 4bd20aad2b7a0710458545009328f8729c89fee1 (patch) | |
tree | eab1c86b1e63418b8d7f7fe13e66d354c3178082 | |
parent | 1856063f4e0c36a8ec6266358d82432adf879170 (diff) | |
download | kafka-python-kafka_version_tests.tar.gz |
Refactor kafka_versions to support arbitrary operators (> >= < <= ! =)kafka_version_tests
-rw-r--r-- | test/test_client_integration.py | 5 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 25 | ||||
-rw-r--r-- | test/test_failover_integration.py | 10 | ||||
-rw-r--r-- | test/test_producer_integration.py | 21 | ||||
-rw-r--r-- | test/testutil.py | 42 |
5 files changed, 47 insertions, 56 deletions
diff --git a/test/test_client_integration.py b/test/test_client_integration.py index 8853350..6872dbf 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -27,7 +27,6 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): cls.server.close() cls.zk.close() - @kafka_versions("all") def test_consume_none(self): fetch = FetchRequest(self.bytes_topic, 0, 0, 1024) @@ -39,7 +38,6 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): messages = list(fetch_resp.messages) self.assertEqual(len(messages), 0) - @kafka_versions("all") def test_ensure_topic_exists(self): # assume that self.topic was created by setUp @@ -50,7 +48,6 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): with self.assertRaises(KafkaTimeoutError): self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0) - @kafka_versions('all') def test_send_produce_request_maintains_request_response_order(self): self.client.ensure_topic_exists(b'foo') @@ -83,7 +80,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): # Offset Tests # #################### - @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1") + @kafka_versions('>=0.8.1') def test_commit_fetch_offsets(self): req = OffsetCommitRequest(self.bytes_topic, 0, 42, b"metadata") (resp,) = self.client.send_offset_commit_request(b"group", [req]) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index fee53f5..ef9a886 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -78,7 +78,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): **configs) return consumer - @kafka_versions("all") def test_simple_consumer(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -90,7 +89,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() - @kafka_versions('all') def test_simple_consumer_smallest_offset_reset(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -102,7 +100,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # messages from beginning. self.assert_message_count([message for message in consumer], 200) - @kafka_versions('all') def test_simple_consumer_largest_offset_reset(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -120,7 +117,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): # Since the offset is set to largest we should read all the new messages. self.assert_message_count([message for message in consumer], 200) - @kafka_versions('all') def test_simple_consumer_no_reset(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -132,7 +128,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): with self.assertRaises(OffsetOutOfRangeError): consumer.get_message() - @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1") + @kafka_versions('>=0.8.1') def test_simple_consumer_load_initial_offsets(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -149,7 +145,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer = self.consumer(auto_commit=False) self.assertEqual(consumer.offsets, {0: 51, 1: 101}) - @kafka_versions("all") def test_simple_consumer__seek(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -180,7 +175,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() - @kafka_versions("all") def test_simple_consumer_blocking(self): consumer = self.consumer() @@ -214,7 +208,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() - @kafka_versions("all") def test_simple_consumer_pending(self): # make sure that we start with no pending messages consumer = self.consumer() @@ -242,7 +235,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertEquals(set([0, 1]), set([pending_part1, pending_part2])) consumer.stop() - @kafka_versions("all") def test_multi_process_consumer(self): # Produce 100 messages to partitions 0 and 1 self.send_messages(0, range(0, 100)) @@ -254,7 +246,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() - @kafka_versions("all") def test_multi_process_consumer_blocking(self): consumer = self.consumer(consumer = MultiProcessConsumer) @@ -292,7 +283,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() - @kafka_versions("all") def test_multi_proc_pending(self): self.send_messages(0, range(0, 10)) self.send_messages(1, range(10, 20)) @@ -308,7 +298,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() - @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1") + @kafka_versions('>=0.8.1') def test_multi_process_consumer_load_initial_offsets(self): self.send_messages(0, range(0, 10)) self.send_messages(1, range(10, 20)) @@ -326,7 +316,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): auto_commit=False) self.assertEqual(consumer.offsets, {0: 5, 1: 15}) - @kafka_versions("all") def test_large_messages(self): # Produce 10 "normal" size messages small_messages = self.send_messages(0, [ str(x) for x in range(10) ]) @@ -343,7 +332,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() - @kafka_versions("all") def test_huge_messages(self): huge_message, = self.send_messages(0, [ create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)), @@ -374,7 +362,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): big_consumer.stop() - @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1") + @kafka_versions('>=0.8.1') def test_offset_behavior__resuming_behavior(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -401,7 +389,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer1.stop() consumer2.stop() - @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1") + @kafka_versions('>=0.8.1') def test_multi_process_offset_behavior__resuming_behavior(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -437,7 +425,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer2.stop() # TODO: Make this a unit test -- should not require integration - @kafka_versions("all") def test_fetch_buffer_size(self): # Test parameters (see issue 135 / PR 136) @@ -455,7 +442,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): messages = [ message for message in consumer ] self.assertEqual(len(messages), 2) - @kafka_versions("all") def test_kafka_consumer(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -476,7 +462,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertEqual(len(messages[0]), 100) self.assertEqual(len(messages[1]), 100) - @kafka_versions("all") def test_kafka_consumer__blocking(self): TIMEOUT_MS = 500 consumer = self.kafka_consumer(auto_offset_reset='smallest', @@ -509,7 +494,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertEqual(len(messages), 5) self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 ) - @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1") + @kafka_versions('>=0.8.1') def test_kafka_consumer__offset_commit_resume(self): GROUP_ID = random_string(10).encode('utf-8') diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 91779d7..28d671a 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -8,9 +8,7 @@ from kafka.producer.base import Producer from kafka.util import kafka_bytestring from test.fixtures import ZookeeperFixture, KafkaFixture -from test.testutil import ( - KafkaIntegrationTestCase, kafka_versions, random_string -) +from test.testutil import KafkaIntegrationTestCase, random_string log = logging.getLogger(__name__) @@ -21,7 +19,7 @@ class TestFailover(KafkaIntegrationTestCase): def setUp(self): if not os.environ.get('KAFKA_VERSION'): - return + self.skipTest('integration test requires KAFKA_VERSION') zk_chroot = random_string(10) replicas = 3 @@ -46,7 +44,6 @@ class TestFailover(KafkaIntegrationTestCase): broker.close() self.zk.close() - @kafka_versions("all") def test_switch_leader(self): topic = self.topic partition = 0 @@ -94,7 +91,6 @@ class TestFailover(KafkaIntegrationTestCase): self.assert_message_count(topic, 201, partitions=(partition,), at_least=True) - @kafka_versions("all") def test_switch_leader_async(self): topic = self.topic partition = 0 @@ -142,7 +138,6 @@ class TestFailover(KafkaIntegrationTestCase): self.assert_message_count(topic, 21, partitions=(partition + 1,), at_least=True) - @kafka_versions("all") def test_switch_leader_keyed_producer(self): topic = self.topic @@ -180,7 +175,6 @@ class TestFailover(KafkaIntegrationTestCase): msg = random_string(10).encode('utf-8') producer.send_messages(topic, key, msg) - @kafka_versions("all") def test_switch_leader_simple_consumer(self): producer = Producer(self.client, async=False) consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10) diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index c99ed63..34963d3 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -38,7 +38,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): cls.server.close() cls.zk.close() - @kafka_versions("all") def test_produce_many_simple(self): start_offset = self.current_offset(self.topic, 0) @@ -56,7 +55,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): 100, ) - @kafka_versions("all") def test_produce_10k_simple(self): start_offset = self.current_offset(self.topic, 0) @@ -67,7 +65,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): 10000, ) - @kafka_versions("all") def test_produce_many_gzip(self): start_offset = self.current_offset(self.topic, 0) @@ -82,7 +79,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): 200, ) - @kafka_versions("all") def test_produce_many_snappy(self): self.skipTest("All snappy integration tests fail with nosnappyjava") start_offset = self.current_offset(self.topic, 0) @@ -95,7 +91,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): 200, ) - @kafka_versions("all") def test_produce_mixed(self): start_offset = self.current_offset(self.topic, 0) @@ -113,7 +108,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assert_produce_request(messages, start_offset, msg_count) - @kafka_versions("all") def test_produce_100k_gzipped(self): start_offset = self.current_offset(self.topic, 0) @@ -139,7 +133,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # SimpleProducer Tests # ############################ - @kafka_versions("all") def test_simple_producer(self): partitions = self.client.get_partition_ids_for_topic(self.topic) start_offsets = [self.current_offset(self.topic, p) for p in partitions] @@ -164,7 +157,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() - @kafka_versions("all") def test_produce__new_topic_fails_with_reasonable_error(self): new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())).encode('utf-8') producer = SimpleProducer(self.client, random_start=False) @@ -174,7 +166,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): LeaderNotAvailableError)): producer.send_messages(new_topic, self.msg("one")) - @kafka_versions("all") def test_producer_random_order(self): producer = SimpleProducer(self.client, random_start=True) resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) @@ -184,7 +175,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assertEqual(resp1[0].partition, resp3[0].partition) self.assertNotEqual(resp1[0].partition, resp2[0].partition) - @kafka_versions("all") def test_producer_ordered_start(self): producer = SimpleProducer(self.client, random_start=False) resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) @@ -195,7 +185,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assertEqual(resp2[0].partition, 1) self.assertEqual(resp3[0].partition, 0) - @kafka_versions("all") def test_async_simple_producer(self): partition = self.client.get_partition_ids_for_topic(self.topic)[0] start_offset = self.current_offset(self.topic, partition) @@ -210,7 +199,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) - @kafka_versions("all") def test_batched_simple_producer__triggers_by_message(self): partitions = self.client.get_partition_ids_for_topic(self.topic) start_offsets = [self.current_offset(self.topic, p) for p in partitions] @@ -278,7 +266,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() - @kafka_versions("all") def test_batched_simple_producer__triggers_by_time(self): partitions = self.client.get_partition_ids_for_topic(self.topic) start_offsets = [self.current_offset(self.topic, p) for p in partitions] @@ -339,7 +326,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # KeyedProducer Tests # ############################ - @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") + @kafka_versions('>=0.8.1') def test_keyedproducer_null_payload(self): partitions = self.client.get_partition_ids_for_topic(self.topic) start_offsets = [self.current_offset(self.topic, p) for p in partitions] @@ -361,7 +348,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() - @kafka_versions("all") def test_round_robin_partitioner(self): partitions = self.client.get_partition_ids_for_topic(self.topic) start_offsets = [self.current_offset(self.topic, p) for p in partitions] @@ -382,7 +368,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() - @kafka_versions("all") def test_hashed_partitioner(self): partitions = self.client.get_partition_ids_for_topic(self.topic) start_offsets = [self.current_offset(self.topic, p) for p in partitions] @@ -414,7 +399,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() - @kafka_versions("all") def test_async_keyed_producer(self): partition = self.client.get_partition_ids_for_topic(self.topic)[0] start_offset = self.current_offset(self.topic, partition) @@ -436,7 +420,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # Producer ACK Tests # ############################ - @kafka_versions("all") def test_acks_none(self): partition = self.client.get_partition_ids_for_topic(self.topic)[0] start_offset = self.current_offset(self.topic, partition) @@ -454,7 +437,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) producer.stop() - @kafka_versions("all") def test_acks_local_write(self): partition = self.client.get_partition_ids_for_topic(self.topic)[0] start_offset = self.current_offset(self.topic, partition) @@ -470,7 +452,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() - @kafka_versions("all") def test_acks_cluster_commit(self): partition = self.client.get_partition_ids_for_topic(self.topic)[0] start_offset = self.current_offset(self.topic, partition) diff --git a/test/testutil.py b/test/testutil.py index 3a1d2ba..fc3ebfa 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -1,5 +1,6 @@ import functools import logging +import operator import os import random import socket @@ -26,15 +27,48 @@ def random_string(l): return "".join(random.choice(string.ascii_letters) for i in xrange(l)) def kafka_versions(*versions): + + def version_str_to_list(s): + return list(map(int, s.split('.'))) # e.g., [0, 8, 1, 1] + + def construct_lambda(s): + if s[0].isdigit(): + op_str = '=' + v_str = s + elif s[1].isdigit(): + op_str = s[0] # ! < > = + v_str = s[1:] + elif s[2].isdigit(): + op_str = s[0:2] # >= <= + v_str = s[2:] + else: + raise ValueError('Unrecognized kafka version / operator: %s' % s) + + op_map = { + '=': operator.eq, + '!': operator.ne, + '>': operator.gt, + '<': operator.lt, + '>=': operator.ge, + '<=': operator.le + } + op = op_map[op_str] + version = version_str_to_list(v_str) + return lambda a: op(version_str_to_list(a), version) + + validators = map(construct_lambda, versions) + def kafka_versions(func): @functools.wraps(func) def wrapper(self): kafka_version = os.environ.get('KAFKA_VERSION') if not kafka_version: - self.skipTest("no kafka version specified") - elif 'all' not in versions and kafka_version not in versions: - self.skipTest("unsupported kafka version") + self.skipTest("no kafka version set in KAFKA_VERSION env var") + + for f in validators: + if not f(kafka_version): + self.skipTest("unsupported kafka version") return func(self) return wrapper @@ -57,7 +91,7 @@ class KafkaIntegrationTestCase(unittest.TestCase): def setUp(self): super(KafkaIntegrationTestCase, self).setUp() if not os.environ.get('KAFKA_VERSION'): - return + self.skipTest('Integration test requires KAFKA_VERSION') if not self.topic: topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10)) |