summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-09-09 00:44:36 -0400
committerDavid Arthur <mumrah@gmail.com>2013-09-09 00:47:28 -0400
commitf67ad27f72aca077f24fa801a9d2d3075d6d5b60 (patch)
treeb6c76cfe70429b079a147d6e84952b1f63352dc9 /kafka/consumer.py
parent40d8e9e550b48755e2f40cfd0877a5b848a3254f (diff)
downloadkafka-python-f67ad27f72aca077f24fa801a9d2d3075d6d5b60.tar.gz
Auto-adjusting consumer fetch size
Related to #42 Adds new ConsumerFetchSizeTooSmall exception that is thrown when `_decode_message_set_iter` gets a BufferUnderflowError but has not yet yielded a message In this event, SimpleConsumer will increase the fetch size by 1.5 and continue the fetching loop while _not_ increasing the offset (basically just retries the request with a larger fetch size) Once the consumer fetch size has been increased, it will remain increased while SimpleConsumer fetches from that partition
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py40
1 files changed, 25 insertions, 15 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index fbc9f94..6ac13c7 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -12,7 +12,7 @@ from kafka.common import (
)
from kafka.util import (
- ReentrantTimer
+ ReentrantTimer, ConsumerFetchSizeTooSmall
)
log = logging.getLogger("kafka")
@@ -357,29 +357,39 @@ class SimpleConsumer(Consumer):
if self.fetch_started[partition]:
offset += 1
+ fetch_size = self.fetch_min_bytes
+
while True:
- req = FetchRequest(self.topic, partition, offset, self.fetch_min_bytes)
+ req = FetchRequest(self.topic, partition, offset, fetch_size)
(resp,) = self.client.send_fetch_request([req],
max_wait_time=self.fetch_max_wait_time,
- min_bytes=self.fetch_min_bytes)
+ min_bytes=fetch_size)
assert resp.topic == self.topic
assert resp.partition == partition
next_offset = None
- for message in resp.messages:
- next_offset = message.offset
-
- # update the offset before the message is yielded. This is
- # so that the consumer state is not lost in certain cases.
- # For eg: the message is yielded and consumed by the caller,
- # but the caller does not come back into the generator again.
- # The message will be consumed but the status will not be
- # updated in the consumer
- self.fetch_started[partition] = True
- self.offsets[partition] = message.offset
- yield message
+ try:
+ for message in resp.messages:
+ next_offset = message.offset
+
+ # update the offset before the message is yielded. This is
+ # so that the consumer state is not lost in certain cases.
+ # For eg: the message is yielded and consumed by the caller,
+ # but the caller does not come back into the generator again.
+ # The message will be consumed but the status will not be
+ # updated in the consumer
+ self.fetch_started[partition] = True
+ self.offsets[partition] = message.offset
+ yield message
+ except ConsumerFetchSizeTooSmall, e:
+ log.warn("Fetch size is too small, increasing by 1.5x and retrying")
+ fetch_size *= 1.5
+ continue
+ except ConsumerNoMoreData, e:
+ log.debug("Iteration was ended by %r", e)
+
if next_offset is None:
break
else: