summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-10 12:21:37 -0800
committerDana Powers <dana.powers@rd.io>2015-12-10 13:31:59 -0800
commit4bd20aad2b7a0710458545009328f8729c89fee1 (patch)
treeeab1c86b1e63418b8d7f7fe13e66d354c3178082
parent1856063f4e0c36a8ec6266358d82432adf879170 (diff)
downloadkafka-python-kafka_version_tests.tar.gz
Refactor kafka_versions to support arbitrary operators (> >= < <= ! =)kafka_version_tests
-rw-r--r--test/test_client_integration.py5
-rw-r--r--test/test_consumer_integration.py25
-rw-r--r--test/test_failover_integration.py10
-rw-r--r--test/test_producer_integration.py21
-rw-r--r--test/testutil.py42
5 files changed, 47 insertions, 56 deletions
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index 8853350..6872dbf 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -27,7 +27,6 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
cls.server.close()
cls.zk.close()
- @kafka_versions("all")
def test_consume_none(self):
fetch = FetchRequest(self.bytes_topic, 0, 0, 1024)
@@ -39,7 +38,6 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
messages = list(fetch_resp.messages)
self.assertEqual(len(messages), 0)
- @kafka_versions("all")
def test_ensure_topic_exists(self):
# assume that self.topic was created by setUp
@@ -50,7 +48,6 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
with self.assertRaises(KafkaTimeoutError):
self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0)
- @kafka_versions('all')
def test_send_produce_request_maintains_request_response_order(self):
self.client.ensure_topic_exists(b'foo')
@@ -83,7 +80,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
# Offset Tests #
####################
- @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
+ @kafka_versions('>=0.8.1')
def test_commit_fetch_offsets(self):
req = OffsetCommitRequest(self.bytes_topic, 0, 42, b"metadata")
(resp,) = self.client.send_offset_commit_request(b"group", [req])
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index fee53f5..ef9a886 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -78,7 +78,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
**configs)
return consumer
- @kafka_versions("all")
def test_simple_consumer(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -90,7 +89,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
- @kafka_versions('all')
def test_simple_consumer_smallest_offset_reset(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -102,7 +100,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# messages from beginning.
self.assert_message_count([message for message in consumer], 200)
- @kafka_versions('all')
def test_simple_consumer_largest_offset_reset(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -120,7 +117,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Since the offset is set to largest we should read all the new messages.
self.assert_message_count([message for message in consumer], 200)
- @kafka_versions('all')
def test_simple_consumer_no_reset(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -132,7 +128,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
with self.assertRaises(OffsetOutOfRangeError):
consumer.get_message()
- @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
+ @kafka_versions('>=0.8.1')
def test_simple_consumer_load_initial_offsets(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -149,7 +145,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer = self.consumer(auto_commit=False)
self.assertEqual(consumer.offsets, {0: 51, 1: 101})
- @kafka_versions("all")
def test_simple_consumer__seek(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -180,7 +175,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
- @kafka_versions("all")
def test_simple_consumer_blocking(self):
consumer = self.consumer()
@@ -214,7 +208,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
- @kafka_versions("all")
def test_simple_consumer_pending(self):
# make sure that we start with no pending messages
consumer = self.consumer()
@@ -242,7 +235,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertEquals(set([0, 1]), set([pending_part1, pending_part2]))
consumer.stop()
- @kafka_versions("all")
def test_multi_process_consumer(self):
# Produce 100 messages to partitions 0 and 1
self.send_messages(0, range(0, 100))
@@ -254,7 +246,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
- @kafka_versions("all")
def test_multi_process_consumer_blocking(self):
consumer = self.consumer(consumer = MultiProcessConsumer)
@@ -292,7 +283,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
- @kafka_versions("all")
def test_multi_proc_pending(self):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
@@ -308,7 +298,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
- @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
+ @kafka_versions('>=0.8.1')
def test_multi_process_consumer_load_initial_offsets(self):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
@@ -326,7 +316,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
auto_commit=False)
self.assertEqual(consumer.offsets, {0: 5, 1: 15})
- @kafka_versions("all")
def test_large_messages(self):
# Produce 10 "normal" size messages
small_messages = self.send_messages(0, [ str(x) for x in range(10) ])
@@ -343,7 +332,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
- @kafka_versions("all")
def test_huge_messages(self):
huge_message, = self.send_messages(0, [
create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)),
@@ -374,7 +362,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
big_consumer.stop()
- @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
+ @kafka_versions('>=0.8.1')
def test_offset_behavior__resuming_behavior(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -401,7 +389,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer1.stop()
consumer2.stop()
- @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
+ @kafka_versions('>=0.8.1')
def test_multi_process_offset_behavior__resuming_behavior(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -437,7 +425,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer2.stop()
# TODO: Make this a unit test -- should not require integration
- @kafka_versions("all")
def test_fetch_buffer_size(self):
# Test parameters (see issue 135 / PR 136)
@@ -455,7 +442,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages = [ message for message in consumer ]
self.assertEqual(len(messages), 2)
- @kafka_versions("all")
def test_kafka_consumer(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -476,7 +462,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertEqual(len(messages[0]), 100)
self.assertEqual(len(messages[1]), 100)
- @kafka_versions("all")
def test_kafka_consumer__blocking(self):
TIMEOUT_MS = 500
consumer = self.kafka_consumer(auto_offset_reset='smallest',
@@ -509,7 +494,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertEqual(len(messages), 5)
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
- @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
+ @kafka_versions('>=0.8.1')
def test_kafka_consumer__offset_commit_resume(self):
GROUP_ID = random_string(10).encode('utf-8')
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 91779d7..28d671a 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -8,9 +8,7 @@ from kafka.producer.base import Producer
from kafka.util import kafka_bytestring
from test.fixtures import ZookeeperFixture, KafkaFixture
-from test.testutil import (
- KafkaIntegrationTestCase, kafka_versions, random_string
-)
+from test.testutil import KafkaIntegrationTestCase, random_string
log = logging.getLogger(__name__)
@@ -21,7 +19,7 @@ class TestFailover(KafkaIntegrationTestCase):
def setUp(self):
if not os.environ.get('KAFKA_VERSION'):
- return
+ self.skipTest('integration test requires KAFKA_VERSION')
zk_chroot = random_string(10)
replicas = 3
@@ -46,7 +44,6 @@ class TestFailover(KafkaIntegrationTestCase):
broker.close()
self.zk.close()
- @kafka_versions("all")
def test_switch_leader(self):
topic = self.topic
partition = 0
@@ -94,7 +91,6 @@ class TestFailover(KafkaIntegrationTestCase):
self.assert_message_count(topic, 201, partitions=(partition,),
at_least=True)
- @kafka_versions("all")
def test_switch_leader_async(self):
topic = self.topic
partition = 0
@@ -142,7 +138,6 @@ class TestFailover(KafkaIntegrationTestCase):
self.assert_message_count(topic, 21, partitions=(partition + 1,),
at_least=True)
- @kafka_versions("all")
def test_switch_leader_keyed_producer(self):
topic = self.topic
@@ -180,7 +175,6 @@ class TestFailover(KafkaIntegrationTestCase):
msg = random_string(10).encode('utf-8')
producer.send_messages(topic, key, msg)
- @kafka_versions("all")
def test_switch_leader_simple_consumer(self):
producer = Producer(self.client, async=False)
consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10)
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index c99ed63..34963d3 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -38,7 +38,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
cls.server.close()
cls.zk.close()
- @kafka_versions("all")
def test_produce_many_simple(self):
start_offset = self.current_offset(self.topic, 0)
@@ -56,7 +55,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
100,
)
- @kafka_versions("all")
def test_produce_10k_simple(self):
start_offset = self.current_offset(self.topic, 0)
@@ -67,7 +65,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
10000,
)
- @kafka_versions("all")
def test_produce_many_gzip(self):
start_offset = self.current_offset(self.topic, 0)
@@ -82,7 +79,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
200,
)
- @kafka_versions("all")
def test_produce_many_snappy(self):
self.skipTest("All snappy integration tests fail with nosnappyjava")
start_offset = self.current_offset(self.topic, 0)
@@ -95,7 +91,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
200,
)
- @kafka_versions("all")
def test_produce_mixed(self):
start_offset = self.current_offset(self.topic, 0)
@@ -113,7 +108,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assert_produce_request(messages, start_offset, msg_count)
- @kafka_versions("all")
def test_produce_100k_gzipped(self):
start_offset = self.current_offset(self.topic, 0)
@@ -139,7 +133,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# SimpleProducer Tests #
############################
- @kafka_versions("all")
def test_simple_producer(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
@@ -164,7 +157,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
- @kafka_versions("all")
def test_produce__new_topic_fails_with_reasonable_error(self):
new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())).encode('utf-8')
producer = SimpleProducer(self.client, random_start=False)
@@ -174,7 +166,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
LeaderNotAvailableError)):
producer.send_messages(new_topic, self.msg("one"))
- @kafka_versions("all")
def test_producer_random_order(self):
producer = SimpleProducer(self.client, random_start=True)
resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
@@ -184,7 +175,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assertEqual(resp1[0].partition, resp3[0].partition)
self.assertNotEqual(resp1[0].partition, resp2[0].partition)
- @kafka_versions("all")
def test_producer_ordered_start(self):
producer = SimpleProducer(self.client, random_start=False)
resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
@@ -195,7 +185,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assertEqual(resp2[0].partition, 1)
self.assertEqual(resp3[0].partition, 0)
- @kafka_versions("all")
def test_async_simple_producer(self):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)
@@ -210,7 +199,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
- @kafka_versions("all")
def test_batched_simple_producer__triggers_by_message(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
@@ -278,7 +266,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
- @kafka_versions("all")
def test_batched_simple_producer__triggers_by_time(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
@@ -339,7 +326,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# KeyedProducer Tests #
############################
- @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+ @kafka_versions('>=0.8.1')
def test_keyedproducer_null_payload(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
@@ -361,7 +348,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
- @kafka_versions("all")
def test_round_robin_partitioner(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
@@ -382,7 +368,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
- @kafka_versions("all")
def test_hashed_partitioner(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
@@ -414,7 +399,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
- @kafka_versions("all")
def test_async_keyed_producer(self):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)
@@ -436,7 +420,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# Producer ACK Tests #
############################
- @kafka_versions("all")
def test_acks_none(self):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)
@@ -454,7 +437,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
producer.stop()
- @kafka_versions("all")
def test_acks_local_write(self):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)
@@ -470,7 +452,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
- @kafka_versions("all")
def test_acks_cluster_commit(self):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)
diff --git a/test/testutil.py b/test/testutil.py
index 3a1d2ba..fc3ebfa 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -1,5 +1,6 @@
import functools
import logging
+import operator
import os
import random
import socket
@@ -26,15 +27,48 @@ def random_string(l):
return "".join(random.choice(string.ascii_letters) for i in xrange(l))
def kafka_versions(*versions):
+
+ def version_str_to_list(s):
+ return list(map(int, s.split('.'))) # e.g., [0, 8, 1, 1]
+
+ def construct_lambda(s):
+ if s[0].isdigit():
+ op_str = '='
+ v_str = s
+ elif s[1].isdigit():
+ op_str = s[0] # ! < > =
+ v_str = s[1:]
+ elif s[2].isdigit():
+ op_str = s[0:2] # >= <=
+ v_str = s[2:]
+ else:
+ raise ValueError('Unrecognized kafka version / operator: %s' % s)
+
+ op_map = {
+ '=': operator.eq,
+ '!': operator.ne,
+ '>': operator.gt,
+ '<': operator.lt,
+ '>=': operator.ge,
+ '<=': operator.le
+ }
+ op = op_map[op_str]
+ version = version_str_to_list(v_str)
+ return lambda a: op(version_str_to_list(a), version)
+
+ validators = map(construct_lambda, versions)
+
def kafka_versions(func):
@functools.wraps(func)
def wrapper(self):
kafka_version = os.environ.get('KAFKA_VERSION')
if not kafka_version:
- self.skipTest("no kafka version specified")
- elif 'all' not in versions and kafka_version not in versions:
- self.skipTest("unsupported kafka version")
+ self.skipTest("no kafka version set in KAFKA_VERSION env var")
+
+ for f in validators:
+ if not f(kafka_version):
+ self.skipTest("unsupported kafka version")
return func(self)
return wrapper
@@ -57,7 +91,7 @@ class KafkaIntegrationTestCase(unittest.TestCase):
def setUp(self):
super(KafkaIntegrationTestCase, self).setUp()
if not os.environ.get('KAFKA_VERSION'):
- return
+ self.skipTest('Integration test requires KAFKA_VERSION')
if not self.topic:
topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10))