diff options
Diffstat (limited to 'test/test_integration.py')
-rw-r--r-- | test/test_integration.py | 34 |
1 files changed, 34 insertions, 0 deletions
diff --git a/test/test_integration.py b/test/test_integration.py index 3c524cf..6384b09 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -2,6 +2,8 @@ import logging import unittest import time from datetime import datetime +import string +import random from kafka import * # noqa from kafka.common import * # noqa @@ -738,6 +740,38 @@ class TestConsumer(unittest.TestCase): consumer.stop() + def test_large_messages(self): + # Produce 10 "normal" size messages + messages1 = [create_message(random_string(1024)) for i in range(10)] + produce1 = ProduceRequest("test_large_messages", 0, messages1) + + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # Produce 10 messages that are too large (bigger than default fetch size) + messages2=[create_message(random_string(5000)) for i in range(10)] + produce2 = ProduceRequest("test_large_messages", 0, messages2) + + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 10) + + consumer = SimpleConsumer(self.client, "group1", "test_large_messages") + it = consumer.__iter__() + for i in range(10): + self.assertEquals(messages1[i], it.next().message) + + consumer = SimpleConsumer(self.client, "group2", "test_large_messages", fetch_size_bytes=5120) + it = consumer.__iter__() + for i in range(10): + self.assertEquals(messages1[i], it.next().message) + for i in range(10): + self.assertEquals(messages2[i], it.next().message) + +def random_string(l): + s = "".join(random.choice(string.printable) for i in xrange(l)) + return s if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) |