diff options
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r-- | test/test_consumer_integration.py | 42 |
1 files changed, 21 insertions, 21 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index fdffd05..cb05242 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -1,19 +1,18 @@ import logging import os import time -from mock import patch -import pytest -import kafka.codec +from mock import patch import pytest -from kafka.vendor.six.moves import range from kafka.vendor import six +from kafka.vendor.six.moves import range from . import unittest from kafka import ( KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message, create_gzip_message, KafkaProducer ) +import kafka.codec from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES from kafka.errors import ( ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError, @@ -23,11 +22,11 @@ from kafka.structs import ( ProduceRequestPayload, TopicPartition, OffsetAndTimestamp ) -from test.fixtures import ZookeeperFixture, KafkaFixture, random_string, version -from test.testutil import KafkaIntegrationTestCase, kafka_versions, Timer +from test.fixtures import ZookeeperFixture, KafkaFixture +from test.testutil import KafkaIntegrationTestCase, Timer, env_kafka_version, random_string -@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory): """Test KafkaConsumer""" kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest') @@ -54,7 +53,7 @@ def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory): kafka_consumer.close() -@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_kafka_consumer_unsupported_encoding( topic, kafka_producer_factory, kafka_consumer_factory): # Send a compressed message @@ -211,7 +210,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): with self.assertRaises(OffsetOutOfRangeError): consumer.get_message() - @kafka_versions('>=0.8.1') + @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_simple_consumer_load_initial_offsets(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -388,7 +387,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky') - @kafka_versions('>=0.8.1') + @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_multi_process_consumer_load_initial_offsets(self): self.send_messages(0, range(0, 10)) self.send_messages(1, range(10, 20)) @@ -459,7 +458,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): big_consumer.stop() - @kafka_versions('>=0.8.1') + @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_offset_behavior__resuming_behavior(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -491,7 +490,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer2.stop() @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky') - @kafka_versions('>=0.8.1') + @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_multi_process_offset_behavior__resuming_behavior(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -548,6 +547,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): messages = [ message for message in consumer ] self.assertEqual(len(messages), 2) + @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_kafka_consumer__blocking(self): TIMEOUT_MS = 500 consumer = self.kafka_consumer(auto_offset_reset='earliest', @@ -586,7 +586,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 ) consumer.close() - @kafka_versions('>=0.8.1') + @pytest.mark.skipif(env_kafka_version() < (0, 8, 1), reason="Requires KAFKA_VERSION >= 0.8.1") def test_kafka_consumer__offset_commit_resume(self): GROUP_ID = random_string(10) @@ -605,7 +605,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): output_msgs1 = [] for _ in range(180): m = next(consumer1) - output_msgs1.append(m) + output_msgs1.append((m.key, m.value)) self.assert_message_count(output_msgs1, 180) consumer1.close() @@ -621,12 +621,12 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): output_msgs2 = [] for _ in range(20): m = next(consumer2) - output_msgs2.append(m) + output_msgs2.append((m.key, m.value)) self.assert_message_count(output_msgs2, 20) self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200) consumer2.close() - @kafka_versions('>=0.10.1') + @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") def test_kafka_consumer_max_bytes_simple(self): self.send_messages(0, range(100, 200)) self.send_messages(1, range(200, 300)) @@ -647,7 +647,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): TopicPartition(self.topic, 0), TopicPartition(self.topic, 1)])) consumer.close() - @kafka_versions('>=0.10.1') + @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") def test_kafka_consumer_max_bytes_one_msg(self): # We send to only 1 partition so we don't have parallel requests to 2 # nodes for data. @@ -673,7 +673,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertEqual(len(fetched_msgs), 10) consumer.close() - @kafka_versions('>=0.10.1') + @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") def test_kafka_consumer_offsets_for_time(self): late_time = int(time.time()) * 1000 middle_time = late_time - 1000 @@ -727,7 +727,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): }) consumer.close() - @kafka_versions('>=0.10.1') + @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") def test_kafka_consumer_offsets_search_many_partitions(self): tp0 = TopicPartition(self.topic, 0) tp1 = TopicPartition(self.topic, 1) @@ -766,7 +766,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): }) consumer.close() - @kafka_versions('<0.10.1') + @pytest.mark.skipif(env_kafka_version() >= (0, 10, 1), reason="Requires KAFKA_VERSION < 0.10.1") def test_kafka_consumer_offsets_for_time_old(self): consumer = self.kafka_consumer() tp = TopicPartition(self.topic, 0) @@ -774,7 +774,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): with self.assertRaises(UnsupportedVersionError): consumer.offsets_for_times({tp: int(time.time())}) - @kafka_versions('>=0.10.1') + @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") def test_kafka_consumer_offsets_for_times_errors(self): consumer = self.kafka_consumer(fetch_max_wait_ms=200, request_timeout_ms=500) |