summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/test_consumer_integration.py10
1 files changed, 8 insertions, 2 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 045e81e..3c5fbd7 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -352,8 +352,14 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Produce 10 messages that are large (bigger than default fetch size)
large_messages = self.send_messages(0, [ random_string(5000) for x in range(10) ])
- # Consumer should still get all of them
- consumer = self.consumer()
+ # Brokers prior to 0.11 will return the next message
+ # if it is smaller than max_bytes (called buffer_size in SimpleConsumer)
+ # Brokers 0.11 and later that store messages in v2 format
+ # internally will return the next message only if the
+ # full MessageSet is smaller than max_bytes.
+ # For that reason, we set the max buffer size to a little more
+ # than the size of all large messages combined
+ consumer = self.consumer(max_buffer_size=60000)
expected_messages = set(small_messages + large_messages)
actual_messages = set([ x.message.value for x in consumer ])