summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer.py11
-rw-r--r--test/test_integration.py34
2 files changed, 41 insertions, 4 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 4c64cf2..fbc9f94 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -206,6 +206,8 @@ class SimpleConsumer(Consumer):
auto_commit_every_t: default 5000. How much time (in milliseconds) to
wait before commit
+ fetch_size_bytes: number of bytes to request in a FetchRequest
+
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
reset one another when one is triggered. These triggers simply call the
@@ -214,11 +216,12 @@ class SimpleConsumer(Consumer):
"""
def __init__(self, client, group, topic, auto_commit=True, partitions=None,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
- auto_commit_every_t=AUTO_COMMIT_INTERVAL):
+ auto_commit_every_t=AUTO_COMMIT_INTERVAL,
+ fetch_size_bytes=FETCH_MIN_BYTES):
self.partition_info = False # Do not return partition info in msgs
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
- self.fetch_min_bytes = FETCH_MIN_BYTES
+ self.fetch_min_bytes = fetch_size_bytes
self.fetch_started = defaultdict(bool) # defaults to false
super(SimpleConsumer, self).__init__(client, group, topic,
@@ -243,6 +246,7 @@ class SimpleConsumer(Consumer):
1 is relative to the current offset
2 is relative to the latest known offset (tail)
"""
+
if whence == 1: # relative to current position
for partition, _offset in self.offsets.items():
self.offsets[partition] = _offset + offset
@@ -354,8 +358,7 @@ class SimpleConsumer(Consumer):
offset += 1
while True:
- # TODO: configure fetch size
- req = FetchRequest(self.topic, partition, offset, 1024)
+ req = FetchRequest(self.topic, partition, offset, self.fetch_min_bytes)
(resp,) = self.client.send_fetch_request([req],
max_wait_time=self.fetch_max_wait_time,
diff --git a/test/test_integration.py b/test/test_integration.py
index 3c524cf..6384b09 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -2,6 +2,8 @@ import logging
import unittest
import time
from datetime import datetime
+import string
+import random
from kafka import * # noqa
from kafka.common import * # noqa
@@ -738,6 +740,38 @@ class TestConsumer(unittest.TestCase):
consumer.stop()
+ def test_large_messages(self):
+ # Produce 10 "normal" size messages
+ messages1 = [create_message(random_string(1024)) for i in range(10)]
+ produce1 = ProduceRequest("test_large_messages", 0, messages1)
+
+ for resp in self.client.send_produce_request([produce1]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ # Produce 10 messages that are too large (bigger than default fetch size)
+ messages2=[create_message(random_string(5000)) for i in range(10)]
+ produce2 = ProduceRequest("test_large_messages", 0, messages2)
+
+ for resp in self.client.send_produce_request([produce2]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 10)
+
+ consumer = SimpleConsumer(self.client, "group1", "test_large_messages")
+ it = consumer.__iter__()
+ for i in range(10):
+ self.assertEquals(messages1[i], it.next().message)
+
+ consumer = SimpleConsumer(self.client, "group2", "test_large_messages", fetch_size_bytes=5120)
+ it = consumer.__iter__()
+ for i in range(10):
+ self.assertEquals(messages1[i], it.next().message)
+ for i in range(10):
+ self.assertEquals(messages2[i], it.next().message)
+
+def random_string(l):
+ s = "".join(random.choice(string.printable) for i in xrange(l))
+ return s
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)