diff options
-rw-r--r-- | kafka/consumer.py | 11 | ||||
-rw-r--r-- | test/test_integration.py | 34 |
2 files changed, 41 insertions, 4 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 4c64cf2..fbc9f94 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -206,6 +206,8 @@ class SimpleConsumer(Consumer): auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit + fetch_size_bytes: number of bytes to request in a FetchRequest + Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the @@ -214,11 +216,12 @@ class SimpleConsumer(Consumer): """ def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, - auto_commit_every_t=AUTO_COMMIT_INTERVAL): + auto_commit_every_t=AUTO_COMMIT_INTERVAL, + fetch_size_bytes=FETCH_MIN_BYTES): self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME - self.fetch_min_bytes = FETCH_MIN_BYTES + self.fetch_min_bytes = fetch_size_bytes self.fetch_started = defaultdict(bool) # defaults to false super(SimpleConsumer, self).__init__(client, group, topic, @@ -243,6 +246,7 @@ class SimpleConsumer(Consumer): 1 is relative to the current offset 2 is relative to the latest known offset (tail) """ + if whence == 1: # relative to current position for partition, _offset in self.offsets.items(): self.offsets[partition] = _offset + offset @@ -354,8 +358,7 @@ class SimpleConsumer(Consumer): offset += 1 while True: - # TODO: configure fetch size - req = FetchRequest(self.topic, partition, offset, 1024) + req = FetchRequest(self.topic, partition, offset, self.fetch_min_bytes) (resp,) = self.client.send_fetch_request([req], max_wait_time=self.fetch_max_wait_time, diff --git a/test/test_integration.py b/test/test_integration.py index 3c524cf..6384b09 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -2,6 +2,8 @@ import logging import unittest import time from datetime import datetime +import string +import random from kafka import * # noqa from kafka.common import * # noqa @@ -738,6 +740,38 @@ class TestConsumer(unittest.TestCase): consumer.stop() + def test_large_messages(self): + # Produce 10 "normal" size messages + messages1 = [create_message(random_string(1024)) for i in range(10)] + produce1 = ProduceRequest("test_large_messages", 0, messages1) + + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # Produce 10 messages that are too large (bigger than default fetch size) + messages2=[create_message(random_string(5000)) for i in range(10)] + produce2 = ProduceRequest("test_large_messages", 0, messages2) + + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 10) + + consumer = SimpleConsumer(self.client, "group1", "test_large_messages") + it = consumer.__iter__() + for i in range(10): + self.assertEquals(messages1[i], it.next().message) + + consumer = SimpleConsumer(self.client, "group2", "test_large_messages", fetch_size_bytes=5120) + it = consumer.__iter__() + for i in range(10): + self.assertEquals(messages1[i], it.next().message) + for i in range(10): + self.assertEquals(messages2[i], it.next().message) + +def random_string(l): + s = "".join(random.choice(string.printable) for i in xrange(l)) + return s if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) |