summaryrefslogtreecommitdiff
path: root/test/test_integration.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-09-08 20:20:12 -0400
committerDavid Arthur <mumrah@gmail.com>2013-09-08 20:20:12 -0400
commit40d8e9e550b48755e2f40cfd0877a5b848a3254f (patch)
treee634245b6b0a52e40c9e03359413512feac252ed /test/test_integration.py
parentc3bce13b845999483f7601836a2e7681dcd8ff10 (diff)
downloadkafka-python-40d8e9e550b48755e2f40cfd0877a5b848a3254f.tar.gz
Fixed #42, make fetch size configurable
Was hard coded to 1024 bytes which meant that larger messages were unconsumable since they would always get split causing the consumer to stop. It would probably be best to automatically retry truncated messages with a larger request size so you don't have to know your max message size ahead of time
Diffstat (limited to 'test/test_integration.py')
-rw-r--r--test/test_integration.py34
1 files changed, 34 insertions, 0 deletions
diff --git a/test/test_integration.py b/test/test_integration.py
index 3c524cf..6384b09 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -2,6 +2,8 @@ import logging
import unittest
import time
from datetime import datetime
+import string
+import random
from kafka import * # noqa
from kafka.common import * # noqa
@@ -738,6 +740,38 @@ class TestConsumer(unittest.TestCase):
consumer.stop()
+ def test_large_messages(self):
+ # Produce 10 "normal" size messages
+ messages1 = [create_message(random_string(1024)) for i in range(10)]
+ produce1 = ProduceRequest("test_large_messages", 0, messages1)
+
+ for resp in self.client.send_produce_request([produce1]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ # Produce 10 messages that are too large (bigger than default fetch size)
+ messages2=[create_message(random_string(5000)) for i in range(10)]
+ produce2 = ProduceRequest("test_large_messages", 0, messages2)
+
+ for resp in self.client.send_produce_request([produce2]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 10)
+
+ consumer = SimpleConsumer(self.client, "group1", "test_large_messages")
+ it = consumer.__iter__()
+ for i in range(10):
+ self.assertEquals(messages1[i], it.next().message)
+
+ consumer = SimpleConsumer(self.client, "group2", "test_large_messages", fetch_size_bytes=5120)
+ it = consumer.__iter__()
+ for i in range(10):
+ self.assertEquals(messages1[i], it.next().message)
+ for i in range(10):
+ self.assertEquals(messages2[i], it.next().message)
+
+def random_string(l):
+ s = "".join(random.choice(string.printable) for i in xrange(l))
+ return s
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)