diff options
author | Omar Ghishan <omar.ghishan@rd.io> | 2014-01-03 17:49:23 -0800 |
---|---|---|
committer | Omar Ghishan <omar.ghishan@rd.io> | 2014-01-06 15:14:51 -0800 |
commit | d736d0b0746dc05046550f077698da45ec620157 (patch) | |
tree | 9c110521d101169dc5b6dc4a21041e6e7b619151 /test/test_integration.py | |
parent | 59f884c5aafc647114382f7bed69a484db62b5b8 (diff) | |
download | kafka-python-d736d0b0746dc05046550f077698da45ec620157.tar.gz |
Add tests for limited and unlimited consumer max_buffer_size
Diffstat (limited to 'test/test_integration.py')
-rw-r--r-- | test/test_integration.py | 26 |
1 files changed, 25 insertions, 1 deletions
diff --git a/test/test_integration.py b/test/test_integration.py index d141c36..f638956 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -8,6 +8,7 @@ import random from kafka import * # noqa from kafka.common import * # noqa from kafka.codec import has_gzip, has_snappy +from kafka.consumer import FETCH_BUFFER_SIZE_BYTES, MAX_FETCH_BUFFER_SIZE_BYTES from .fixtures import ZookeeperFixture, KafkaFixture @@ -760,7 +761,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - # Produce 10 messages that are too large (bigger than default fetch size) + # Produce 10 messages that are large (bigger than default fetch size) messages2 = [create_message(random_string(5000)) for i in range(10)] produce2 = ProduceRequest("test_large_messages", 0, messages2) @@ -776,6 +777,29 @@ class TestConsumer(unittest.TestCase): self.assertEquals(all_messages[i], message.message) self.assertEquals(i, 19) + # Produce 1 message that is too large (bigger than max fetch size) + big_message_size = MAX_FETCH_BUFFER_SIZE_BYTES + 10 + big_message = create_message(random_string(big_message_size)) + produce3 = ProduceRequest("test_large_messages", 0, [big_message]) + for resp in self.client.send_produce_request([produce3]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 20) + + self.assertRaises(ConsumerFetchSizeTooSmall, consumer.get_message, False, 0.1) + + # Create a consumer with no fetch size limit + big_consumer = SimpleConsumer(self.client, "group1", "test_large_messages", + max_buffer_size=None, partitions=[0], + auto_commit=False, iter_timeout=0) + + # Seek to the last message + big_consumer.seek(-1, 2) + + # Consume giant message successfully + message = big_consumer.get_message(block=False, timeout=10) + self.assertIsNotNone(message) + self.assertEquals(message.message.value, big_message.value) + class TestFailover(unittest.TestCase): @classmethod |