summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-10 12:21:37 -0800
committerDana Powers <dana.powers@rd.io>2015-12-10 13:31:59 -0800
commit4bd20aad2b7a0710458545009328f8729c89fee1 (patch)
treeeab1c86b1e63418b8d7f7fe13e66d354c3178082 /test/test_consumer_integration.py
parent1856063f4e0c36a8ec6266358d82432adf879170 (diff)
downloadkafka-python-kafka_version_tests.tar.gz
Refactor kafka_versions to support arbitrary operators (> >= < <= ! =)kafka_version_tests
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py25
1 files changed, 5 insertions, 20 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index fee53f5..ef9a886 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -78,7 +78,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
**configs)
return consumer
- @kafka_versions("all")
def test_simple_consumer(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -90,7 +89,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
- @kafka_versions('all')
def test_simple_consumer_smallest_offset_reset(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -102,7 +100,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# messages from beginning.
self.assert_message_count([message for message in consumer], 200)
- @kafka_versions('all')
def test_simple_consumer_largest_offset_reset(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -120,7 +117,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Since the offset is set to largest we should read all the new messages.
self.assert_message_count([message for message in consumer], 200)
- @kafka_versions('all')
def test_simple_consumer_no_reset(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -132,7 +128,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
with self.assertRaises(OffsetOutOfRangeError):
consumer.get_message()
- @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
+ @kafka_versions('>=0.8.1')
def test_simple_consumer_load_initial_offsets(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -149,7 +145,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer = self.consumer(auto_commit=False)
self.assertEqual(consumer.offsets, {0: 51, 1: 101})
- @kafka_versions("all")
def test_simple_consumer__seek(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -180,7 +175,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
- @kafka_versions("all")
def test_simple_consumer_blocking(self):
consumer = self.consumer()
@@ -214,7 +208,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
- @kafka_versions("all")
def test_simple_consumer_pending(self):
# make sure that we start with no pending messages
consumer = self.consumer()
@@ -242,7 +235,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertEquals(set([0, 1]), set([pending_part1, pending_part2]))
consumer.stop()
- @kafka_versions("all")
def test_multi_process_consumer(self):
# Produce 100 messages to partitions 0 and 1
self.send_messages(0, range(0, 100))
@@ -254,7 +246,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
- @kafka_versions("all")
def test_multi_process_consumer_blocking(self):
consumer = self.consumer(consumer = MultiProcessConsumer)
@@ -292,7 +283,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
- @kafka_versions("all")
def test_multi_proc_pending(self):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
@@ -308,7 +298,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
- @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
+ @kafka_versions('>=0.8.1')
def test_multi_process_consumer_load_initial_offsets(self):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
@@ -326,7 +316,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
auto_commit=False)
self.assertEqual(consumer.offsets, {0: 5, 1: 15})
- @kafka_versions("all")
def test_large_messages(self):
# Produce 10 "normal" size messages
small_messages = self.send_messages(0, [ str(x) for x in range(10) ])
@@ -343,7 +332,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
- @kafka_versions("all")
def test_huge_messages(self):
huge_message, = self.send_messages(0, [
create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)),
@@ -374,7 +362,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
big_consumer.stop()
- @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
+ @kafka_versions('>=0.8.1')
def test_offset_behavior__resuming_behavior(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -401,7 +389,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer1.stop()
consumer2.stop()
- @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
+ @kafka_versions('>=0.8.1')
def test_multi_process_offset_behavior__resuming_behavior(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -437,7 +425,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer2.stop()
# TODO: Make this a unit test -- should not require integration
- @kafka_versions("all")
def test_fetch_buffer_size(self):
# Test parameters (see issue 135 / PR 136)
@@ -455,7 +442,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages = [ message for message in consumer ]
self.assertEqual(len(messages), 2)
- @kafka_versions("all")
def test_kafka_consumer(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -476,7 +462,6 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertEqual(len(messages[0]), 100)
self.assertEqual(len(messages[1]), 100)
- @kafka_versions("all")
def test_kafka_consumer__blocking(self):
TIMEOUT_MS = 500
consumer = self.kafka_consumer(auto_offset_reset='smallest',
@@ -509,7 +494,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertEqual(len(messages), 5)
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
- @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
+ @kafka_versions('>=0.8.1')
def test_kafka_consumer__offset_commit_resume(self):
GROUP_ID = random_string(10).encode('utf-8')