diff options
author | Jeff Widman <jeff@jeffwidman.com> | 2019-08-22 21:27:57 -0700 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2019-08-23 00:23:06 -0700 |
commit | e5683ed290e8ca72a37856a8af80c9950790eb77 (patch) | |
tree | 59e7300be297e2faa1f607d43774516a67a7c6cb /test | |
parent | 61fa0b27685c2d4e67d1b6575ca6797f36eb1bfa (diff) | |
download | kafka-python-cleanup-max-bytes-tests.tar.gz |
Merge two tests that are very similarcleanup-max-bytes-tests
Previously the `test_kafka_consumer_max_bytes_simple()` was seeing
occasional test failures because it was doing only 10 iterations. And
much of the purpose of it was gutted when Kafka 0.11 came out and
changed the behavior. So this merges the two tests into one which should
be relatively straightforward.
Further discussion in https://github.com/dpkp/kafka-python/pull/1886/files#r316860737
Diffstat (limited to 'test')
-rw-r--r-- | test/test_consumer_integration.py | 50 |
1 files changed, 15 insertions, 35 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index c7e2ebf..c9bca2b 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -623,48 +623,28 @@ def test_kafka_consumer__offset_commit_resume(kafka_consumer_factory, send_messa @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") -def test_kafka_consumer_max_bytes_simple(kafka_consumer_factory, topic, send_messages): - send_messages(range(100, 200), partition=0) - send_messages(range(200, 300), partition=1) - - # Start a consumer - consumer = kafka_consumer_factory( - auto_offset_reset='earliest', fetch_max_bytes=300) - seen_partitions = set() - for i in range(90): - poll_res = consumer.poll(timeout_ms=100) - for partition, msgs in poll_res.items(): - for msg in msgs: - seen_partitions.add(partition) +def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, topic, send_messages): + """Check that messages larger than fetch_max_bytes are still received. - # Check that we fetched at least 1 message from both partitions - assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)} - - -@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") -def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, send_messages): - # We send to only 1 partition so we don't have parallel requests to 2 - # nodes for data. - send_messages(range(100, 200)) + We are checking for both partition starvation and messages simply not being + received. The broker should reply with them, just making sure the consumer + isn't doing anything unexpected client-side that blocks them. + """ + send_messages(range(0, 100), partition=0) + send_messages(range(100, 200), partition=1) # Start a consumer. FetchResponse_v3 should always include at least 1 # full msg, so by setting fetch_max_bytes=1 we should get 1 msg at a time # But 0.11.0.0 returns 1 MessageSet at a time when the messages are # stored in the new v2 format by the broker. - # - # DP Note: This is a strange test. The consumer shouldn't care - # how many messages are included in a FetchResponse, as long as it is - # non-zero. I would not mind if we deleted this test. It caused - # a minor headache when testing 0.11.0.0. - group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string(5) - consumer = kafka_consumer_factory( - group_id=group, - auto_offset_reset='earliest', - consumer_timeout_ms=5000, - fetch_max_bytes=1) + consumer = kafka_consumer_factory(auto_offset_reset='earliest', fetch_max_bytes=1) + + messages = [next(consumer) for i in range(10)] + assert_message_count(messages, 10) - fetched_msgs = [next(consumer) for i in range(10)] - assert_message_count(fetched_msgs, 10) + # Check that we fetched at least 1 message from both partitions + seen_partitions = {(m.topic, m.partition) for m in messages} + assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)} @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") |