summaryrefslogtreecommitdiff
path: root/kafka/consumer/simple.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-09 15:45:18 -0800
committerDana Powers <dana.powers@rd.io>2015-12-09 15:45:18 -0800
commit172a272c4258ddb76f8e8e246ade1618116610c7 (patch)
tree57cf767fb8bbc7330820c61d8d62d145199702a0 /kafka/consumer/simple.py
parenta3ec9bd8e8c730c9f6715b536c0c590230fc2e28 (diff)
downloadkafka-python-172a272c4258ddb76f8e8e246ade1618116610c7.tar.gz
Handle PartialMessage / ConsumerFetchSizeTooSmall in SimpleConsumer
Diffstat (limited to 'kafka/consumer/simple.py')
-rw-r--r--kafka/consumer/simple.py45
1 files changed, 24 insertions, 21 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index 1c2aee6..9e76730 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -32,6 +32,7 @@ from ..common import (
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, FailedPayloadsError, check_error
)
+from kafka.protocol.message import PartialMessage
log = logging.getLogger(__name__)
@@ -407,32 +408,34 @@ class SimpleConsumer(Consumer):
partition = resp.partition
buffer_size = partitions[partition]
- try:
- for message in resp.messages:
- if message.offset < self.fetch_offsets[partition]:
- log.debug('Skipping message %s because its offset is less than the consumer offset',
- message)
- continue
- # Put the message in our queue
- self.queue.put((partition, message))
- self.fetch_offsets[partition] = message.offset + 1
- except ConsumerFetchSizeTooSmall:
+
+ # Check for partial message
+ if resp.messages and isinstance(resp.messages[-1].message, PartialMessage):
+
+ # If buffer is at max and all we got was a partial message
+ # raise ConsumerFetchSizeTooSmall
if (self.max_buffer_size is not None and
- buffer_size == self.max_buffer_size):
- log.error('Max fetch size %d too small',
- self.max_buffer_size)
- raise
+ buffer_size == self.max_buffer_size and
+ len(resp.messages) == 1):
+
+ log.error('Max fetch size %d too small', self.max_buffer_size)
+ raise ConsumerFetchSizeTooSmall()
+
if self.max_buffer_size is None:
buffer_size *= 2
else:
- buffer_size = min(buffer_size * 2,
- self.max_buffer_size)
+ buffer_size = min(buffer_size * 2, self.max_buffer_size)
log.warning('Fetch size too small, increase to %d (2x) '
'and retry', buffer_size)
retry_partitions[partition] = buffer_size
- except ConsumerNoMoreData as e:
- log.debug('Iteration was ended by %r', e)
- except StopIteration:
- # Stop iterating through this partition
- log.debug('Done iterating over partition %s', partition)
+ resp.messages.pop()
+
+ for message in resp.messages:
+ if message.offset < self.fetch_offsets[partition]:
+ log.debug('Skipping message %s because its offset is less than the consumer offset',
+ message)
+ continue
+ # Put the message in our queue
+ self.queue.put((partition, message))
+ self.fetch_offsets[partition] = message.offset + 1
partitions = retry_partitions