summaryrefslogtreecommitdiff
path: root/test/test_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_integration.py')
-rw-r--r--test/test_integration.py26
1 files changed, 25 insertions, 1 deletions
diff --git a/test/test_integration.py b/test/test_integration.py
index d141c36..f638956 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -8,6 +8,7 @@ import random
from kafka import * # noqa
from kafka.common import * # noqa
from kafka.codec import has_gzip, has_snappy
+from kafka.consumer import FETCH_BUFFER_SIZE_BYTES, MAX_FETCH_BUFFER_SIZE_BYTES
from .fixtures import ZookeeperFixture, KafkaFixture
@@ -760,7 +761,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- # Produce 10 messages that are too large (bigger than default fetch size)
+ # Produce 10 messages that are large (bigger than default fetch size)
messages2 = [create_message(random_string(5000)) for i in range(10)]
produce2 = ProduceRequest("test_large_messages", 0, messages2)
@@ -776,6 +777,29 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(all_messages[i], message.message)
self.assertEquals(i, 19)
+ # Produce 1 message that is too large (bigger than max fetch size)
+ big_message_size = MAX_FETCH_BUFFER_SIZE_BYTES + 10
+ big_message = create_message(random_string(big_message_size))
+ produce3 = ProduceRequest("test_large_messages", 0, [big_message])
+ for resp in self.client.send_produce_request([produce3]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 20)
+
+ self.assertRaises(ConsumerFetchSizeTooSmall, consumer.get_message, False, 0.1)
+
+ # Create a consumer with no fetch size limit
+ big_consumer = SimpleConsumer(self.client, "group1", "test_large_messages",
+ max_buffer_size=None, partitions=[0],
+ auto_commit=False, iter_timeout=0)
+
+ # Seek to the last message
+ big_consumer.seek(-1, 2)
+
+ # Consume giant message successfully
+ message = big_consumer.get_message(block=False, timeout=10)
+ self.assertIsNotNone(message)
+ self.assertEquals(message.message.value, big_message.value)
+
class TestFailover(unittest.TestCase):
@classmethod