summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/test_consumer_integration.py50
1 files changed, 15 insertions, 35 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index c7e2ebf..c9bca2b 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -623,48 +623,28 @@ def test_kafka_consumer__offset_commit_resume(kafka_consumer_factory, send_messa
@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
-def test_kafka_consumer_max_bytes_simple(kafka_consumer_factory, topic, send_messages):
- send_messages(range(100, 200), partition=0)
- send_messages(range(200, 300), partition=1)
-
- # Start a consumer
- consumer = kafka_consumer_factory(
- auto_offset_reset='earliest', fetch_max_bytes=300)
- seen_partitions = set()
- for i in range(90):
- poll_res = consumer.poll(timeout_ms=100)
- for partition, msgs in poll_res.items():
- for msg in msgs:
- seen_partitions.add(partition)
+def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, topic, send_messages):
+ """Check that messages larger than fetch_max_bytes are still received.
- # Check that we fetched at least 1 message from both partitions
- assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)}
-
-
-@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
-def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, send_messages):
- # We send to only 1 partition so we don't have parallel requests to 2
- # nodes for data.
- send_messages(range(100, 200))
+ We are checking for both partition starvation and messages simply not being
+ received. The broker should reply with them, just making sure the consumer
+ isn't doing anything unexpected client-side that blocks them.
+ """
+ send_messages(range(0, 100), partition=0)
+ send_messages(range(100, 200), partition=1)
# Start a consumer. FetchResponse_v3 should always include at least 1
# full msg, so by setting fetch_max_bytes=1 we should get 1 msg at a time
# But 0.11.0.0 returns 1 MessageSet at a time when the messages are
# stored in the new v2 format by the broker.
- #
- # DP Note: This is a strange test. The consumer shouldn't care
- # how many messages are included in a FetchResponse, as long as it is
- # non-zero. I would not mind if we deleted this test. It caused
- # a minor headache when testing 0.11.0.0.
- group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string(5)
- consumer = kafka_consumer_factory(
- group_id=group,
- auto_offset_reset='earliest',
- consumer_timeout_ms=5000,
- fetch_max_bytes=1)
+ consumer = kafka_consumer_factory(auto_offset_reset='earliest', fetch_max_bytes=1)
+
+ messages = [next(consumer) for i in range(10)]
+ assert_message_count(messages, 10)
- fetched_msgs = [next(consumer) for i in range(10)]
- assert_message_count(fetched_msgs, 10)
+ # Check that we fetched at least 1 message from both partitions
+ seen_partitions = {(m.topic, m.partition) for m in messages}
+ assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)}
@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")