summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-09-09 00:44:36 -0400
committerDavid Arthur <mumrah@gmail.com>2013-09-09 00:47:28 -0400
commitf67ad27f72aca077f24fa801a9d2d3075d6d5b60 (patch)
treeb6c76cfe70429b079a147d6e84952b1f63352dc9
parent40d8e9e550b48755e2f40cfd0877a5b848a3254f (diff)
downloadkafka-python-f67ad27f72aca077f24fa801a9d2d3075d6d5b60.tar.gz
Auto-adjusting consumer fetch size
Related to #42 Adds new ConsumerFetchSizeTooSmall exception that is thrown when `_decode_message_set_iter` gets a BufferUnderflowError but has not yet yielded a message In this event, SimpleConsumer will increase the fetch size by 1.5 and continue the fetching loop while _not_ increasing the offset (basically just retries the request with a larger fetch size) Once the consumer fetch size has been increased, it will remain increased while SimpleConsumer fetches from that partition
-rw-r--r--kafka/consumer.py40
-rw-r--r--kafka/protocol.py16
-rw-r--r--kafka/util.py2
-rw-r--r--test/test_integration.py18
4 files changed, 43 insertions, 33 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index fbc9f94..6ac13c7 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -12,7 +12,7 @@ from kafka.common import (
)
from kafka.util import (
- ReentrantTimer
+ ReentrantTimer, ConsumerFetchSizeTooSmall
)
log = logging.getLogger("kafka")
@@ -357,29 +357,39 @@ class SimpleConsumer(Consumer):
if self.fetch_started[partition]:
offset += 1
+ fetch_size = self.fetch_min_bytes
+
while True:
- req = FetchRequest(self.topic, partition, offset, self.fetch_min_bytes)
+ req = FetchRequest(self.topic, partition, offset, fetch_size)
(resp,) = self.client.send_fetch_request([req],
max_wait_time=self.fetch_max_wait_time,
- min_bytes=self.fetch_min_bytes)
+ min_bytes=fetch_size)
assert resp.topic == self.topic
assert resp.partition == partition
next_offset = None
- for message in resp.messages:
- next_offset = message.offset
-
- # update the offset before the message is yielded. This is
- # so that the consumer state is not lost in certain cases.
- # For eg: the message is yielded and consumed by the caller,
- # but the caller does not come back into the generator again.
- # The message will be consumed but the status will not be
- # updated in the consumer
- self.fetch_started[partition] = True
- self.offsets[partition] = message.offset
- yield message
+ try:
+ for message in resp.messages:
+ next_offset = message.offset
+
+ # update the offset before the message is yielded. This is
+ # so that the consumer state is not lost in certain cases.
+ # For eg: the message is yielded and consumed by the caller,
+ # but the caller does not come back into the generator again.
+ # The message will be consumed but the status will not be
+ # updated in the consumer
+ self.fetch_started[partition] = True
+ self.offsets[partition] = message.offset
+ yield message
+ except ConsumerFetchSizeTooSmall, e:
+ log.warn("Fetch size is too small, increasing by 1.5x and retrying")
+ fetch_size *= 1.5
+ continue
+ except ConsumerNoMoreData, e:
+ log.debug("Iteration was ended by %r", e)
+
if next_offset is None:
break
else:
diff --git a/kafka/protocol.py b/kafka/protocol.py
index 6bd5c73..f985479 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -13,7 +13,7 @@ from kafka.common import (
from kafka.util import (
read_short_string, read_int_string, relative_unpack,
write_short_string, write_int_string, group_by_topic_and_partition,
- BufferUnderflowError, ChecksumError
+ BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall
)
log = logging.getLogger("kafka")
@@ -110,17 +110,21 @@ class KafkaProtocol(object):
recurse easily.
"""
cur = 0
+ read_message = False
while cur < len(data):
try:
((offset, ), cur) = relative_unpack('>q', data, cur)
(msg, cur) = read_int_string(data, cur)
- for (offset, message) in KafkaProtocol._decode_message(msg,
- offset):
+ for (offset, message) in KafkaProtocol._decode_message(msg, offset):
+ read_message = True
yield OffsetAndMessage(offset, message)
-
except BufferUnderflowError:
- # If we get a partial read of a message, stop
- raise StopIteration()
+ if read_message is False:
+ # If we get a partial read of a message, but haven't yielded anyhting
+ # there's a problem
+ raise ConsumerFetchSizeTooSmall()
+ else:
+ raise StopIteration()
@classmethod
def _decode_message(cls, data, offset):
diff --git a/kafka/util.py b/kafka/util.py
index 11178f5..bdda7ed 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -73,6 +73,8 @@ class BufferUnderflowError(Exception):
class ChecksumError(Exception):
pass
+class ConsumerFetchSizeTooSmall(Exception):
+ pass
class ReentrantTimer(object):
"""
diff --git a/test/test_integration.py b/test/test_integration.py
index 6384b09..bf1acc8 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -8,7 +8,6 @@ import random
from kafka import * # noqa
from kafka.common import * # noqa
from kafka.codec import has_gzip, has_snappy
-
from .fixtures import ZookeeperFixture, KafkaFixture
@@ -757,20 +756,15 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 10)
+ # Consumer should still get all of them
consumer = SimpleConsumer(self.client, "group1", "test_large_messages")
- it = consumer.__iter__()
- for i in range(10):
- self.assertEquals(messages1[i], it.next().message)
-
- consumer = SimpleConsumer(self.client, "group2", "test_large_messages", fetch_size_bytes=5120)
- it = consumer.__iter__()
- for i in range(10):
- self.assertEquals(messages1[i], it.next().message)
- for i in range(10):
- self.assertEquals(messages2[i], it.next().message)
+ all_messages = messages1 + messages2
+ for i, message in enumerate(consumer):
+ self.assertEquals(all_messages[i], message.message)
+ self.assertEquals(i, 19)
def random_string(l):
- s = "".join(random.choice(string.printable) for i in xrange(l))
+ s = "".join(random.choice(string.letters) for i in xrange(l))
return s
if __name__ == "__main__":