summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py42
1 files changed, 21 insertions, 21 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index fdffd05..cb05242 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -1,19 +1,18 @@
import logging
import os
import time
-from mock import patch
-import pytest
-import kafka.codec
+from mock import patch
import pytest
-from kafka.vendor.six.moves import range
from kafka.vendor import six
+from kafka.vendor.six.moves import range
from . import unittest
from kafka import (
KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message,
create_gzip_message, KafkaProducer
)
+import kafka.codec
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.errors import (
ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError,
@@ -23,11 +22,11 @@ from kafka.structs import (
ProduceRequestPayload, TopicPartition, OffsetAndTimestamp
)
-from test.fixtures import ZookeeperFixture, KafkaFixture, random_string, version
-from test.testutil import KafkaIntegrationTestCase, kafka_versions, Timer
+from test.fixtures import ZookeeperFixture, KafkaFixture
+from test.testutil import KafkaIntegrationTestCase, Timer, env_kafka_version, random_string
-@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory):
"""Test KafkaConsumer"""
kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest')
@@ -54,7 +53,7 @@ def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory):
kafka_consumer.close()
-@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_kafka_consumer_unsupported_encoding(
topic, kafka_producer_factory, kafka_consumer_factory):
# Send a compressed message
@@ -211,7 +210,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
with self.assertRaises(OffsetOutOfRangeError):
consumer.get_message()
- @kafka_versions('>=0.8.1')
+ @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_simple_consumer_load_initial_offsets(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -388,7 +387,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
@unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
- @kafka_versions('>=0.8.1')
+ @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_multi_process_consumer_load_initial_offsets(self):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
@@ -459,7 +458,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
big_consumer.stop()
- @kafka_versions('>=0.8.1')
+ @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_offset_behavior__resuming_behavior(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -491,7 +490,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer2.stop()
@unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
- @kafka_versions('>=0.8.1')
+ @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_multi_process_offset_behavior__resuming_behavior(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))
@@ -548,6 +547,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages = [ message for message in consumer ]
self.assertEqual(len(messages), 2)
+ @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_kafka_consumer__blocking(self):
TIMEOUT_MS = 500
consumer = self.kafka_consumer(auto_offset_reset='earliest',
@@ -586,7 +586,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
consumer.close()
- @kafka_versions('>=0.8.1')
+ @pytest.mark.skipif(env_kafka_version() < (0, 8, 1), reason="Requires KAFKA_VERSION >= 0.8.1")
def test_kafka_consumer__offset_commit_resume(self):
GROUP_ID = random_string(10)
@@ -605,7 +605,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
output_msgs1 = []
for _ in range(180):
m = next(consumer1)
- output_msgs1.append(m)
+ output_msgs1.append((m.key, m.value))
self.assert_message_count(output_msgs1, 180)
consumer1.close()
@@ -621,12 +621,12 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
output_msgs2 = []
for _ in range(20):
m = next(consumer2)
- output_msgs2.append(m)
+ output_msgs2.append((m.key, m.value))
self.assert_message_count(output_msgs2, 20)
self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200)
consumer2.close()
- @kafka_versions('>=0.10.1')
+ @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
def test_kafka_consumer_max_bytes_simple(self):
self.send_messages(0, range(100, 200))
self.send_messages(1, range(200, 300))
@@ -647,7 +647,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
TopicPartition(self.topic, 0), TopicPartition(self.topic, 1)]))
consumer.close()
- @kafka_versions('>=0.10.1')
+ @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
def test_kafka_consumer_max_bytes_one_msg(self):
# We send to only 1 partition so we don't have parallel requests to 2
# nodes for data.
@@ -673,7 +673,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertEqual(len(fetched_msgs), 10)
consumer.close()
- @kafka_versions('>=0.10.1')
+ @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
def test_kafka_consumer_offsets_for_time(self):
late_time = int(time.time()) * 1000
middle_time = late_time - 1000
@@ -727,7 +727,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
})
consumer.close()
- @kafka_versions('>=0.10.1')
+ @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
def test_kafka_consumer_offsets_search_many_partitions(self):
tp0 = TopicPartition(self.topic, 0)
tp1 = TopicPartition(self.topic, 1)
@@ -766,7 +766,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
})
consumer.close()
- @kafka_versions('<0.10.1')
+ @pytest.mark.skipif(env_kafka_version() >= (0, 10, 1), reason="Requires KAFKA_VERSION < 0.10.1")
def test_kafka_consumer_offsets_for_time_old(self):
consumer = self.kafka_consumer()
tp = TopicPartition(self.topic, 0)
@@ -774,7 +774,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
with self.assertRaises(UnsupportedVersionError):
consumer.offsets_for_times({tp: int(time.time())})
- @kafka_versions('>=0.10.1')
+ @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
def test_kafka_consumer_offsets_for_times_errors(self):
consumer = self.kafka_consumer(fetch_max_wait_ms=200,
request_timeout_ms=500)