diff options
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r-- | test/test_consumer_integration.py | 50 |
1 files changed, 15 insertions, 35 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index c7e2ebf..c9bca2b 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -623,48 +623,28 @@ def test_kafka_consumer__offset_commit_resume(kafka_consumer_factory, send_messa @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") -def test_kafka_consumer_max_bytes_simple(kafka_consumer_factory, topic, send_messages): - send_messages(range(100, 200), partition=0) - send_messages(range(200, 300), partition=1) - - # Start a consumer - consumer = kafka_consumer_factory( - auto_offset_reset='earliest', fetch_max_bytes=300) - seen_partitions = set() - for i in range(90): - poll_res = consumer.poll(timeout_ms=100) - for partition, msgs in poll_res.items(): - for msg in msgs: - seen_partitions.add(partition) +def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, topic, send_messages): + """Check that messages larger than fetch_max_bytes are still received. - # Check that we fetched at least 1 message from both partitions - assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)} - - -@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") -def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, send_messages): - # We send to only 1 partition so we don't have parallel requests to 2 - # nodes for data. - send_messages(range(100, 200)) + We are checking for both partition starvation and messages simply not being + received. The broker should reply with them, just making sure the consumer + isn't doing anything unexpected client-side that blocks them. + """ + send_messages(range(0, 100), partition=0) + send_messages(range(100, 200), partition=1) # Start a consumer. FetchResponse_v3 should always include at least 1 # full msg, so by setting fetch_max_bytes=1 we should get 1 msg at a time # But 0.11.0.0 returns 1 MessageSet at a time when the messages are # stored in the new v2 format by the broker. - # - # DP Note: This is a strange test. The consumer shouldn't care - # how many messages are included in a FetchResponse, as long as it is - # non-zero. I would not mind if we deleted this test. It caused - # a minor headache when testing 0.11.0.0. - group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string(5) - consumer = kafka_consumer_factory( - group_id=group, - auto_offset_reset='earliest', - consumer_timeout_ms=5000, - fetch_max_bytes=1) + consumer = kafka_consumer_factory(auto_offset_reset='earliest', fetch_max_bytes=1) + + messages = [next(consumer) for i in range(10)] + assert_message_count(messages, 10) - fetched_msgs = [next(consumer) for i in range(10)] - assert_message_count(fetched_msgs, 10) + # Check that we fetched at least 1 message from both partitions + seen_partitions = {(m.topic, m.partition) for m in messages} + assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)} @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") |