summaryrefslogtreecommitdiff
path: root/test/test_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_integration.py')
-rw-r--r--test/test_integration.py34
1 files changed, 34 insertions, 0 deletions
diff --git a/test/test_integration.py b/test/test_integration.py
index 3c524cf..6384b09 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -2,6 +2,8 @@ import logging
import unittest
import time
from datetime import datetime
+import string
+import random
from kafka import * # noqa
from kafka.common import * # noqa
@@ -738,6 +740,38 @@ class TestConsumer(unittest.TestCase):
consumer.stop()
+ def test_large_messages(self):
+ # Produce 10 "normal" size messages
+ messages1 = [create_message(random_string(1024)) for i in range(10)]
+ produce1 = ProduceRequest("test_large_messages", 0, messages1)
+
+ for resp in self.client.send_produce_request([produce1]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ # Produce 10 messages that are too large (bigger than default fetch size)
+ messages2=[create_message(random_string(5000)) for i in range(10)]
+ produce2 = ProduceRequest("test_large_messages", 0, messages2)
+
+ for resp in self.client.send_produce_request([produce2]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 10)
+
+ consumer = SimpleConsumer(self.client, "group1", "test_large_messages")
+ it = consumer.__iter__()
+ for i in range(10):
+ self.assertEquals(messages1[i], it.next().message)
+
+ consumer = SimpleConsumer(self.client, "group2", "test_large_messages", fetch_size_bytes=5120)
+ it = consumer.__iter__()
+ for i in range(10):
+ self.assertEquals(messages1[i], it.next().message)
+ for i in range(10):
+ self.assertEquals(messages2[i], it.next().message)
+
+def random_string(l):
+ s = "".join(random.choice(string.printable) for i in xrange(l))
+ return s
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)