diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-09 15:45:18 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-09 15:45:18 -0800 |
commit | 172a272c4258ddb76f8e8e246ade1618116610c7 (patch) | |
tree | 57cf767fb8bbc7330820c61d8d62d145199702a0 /kafka/consumer/simple.py | |
parent | a3ec9bd8e8c730c9f6715b536c0c590230fc2e28 (diff) | |
download | kafka-python-172a272c4258ddb76f8e8e246ade1618116610c7.tar.gz |
Handle PartialMessage / ConsumerFetchSizeTooSmall in SimpleConsumer
Diffstat (limited to 'kafka/consumer/simple.py')
-rw-r--r-- | kafka/consumer/simple.py | 45 |
1 files changed, 24 insertions, 21 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 1c2aee6..9e76730 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -32,6 +32,7 @@ from ..common import ( UnknownTopicOrPartitionError, NotLeaderForPartitionError, OffsetOutOfRangeError, FailedPayloadsError, check_error ) +from kafka.protocol.message import PartialMessage log = logging.getLogger(__name__) @@ -407,32 +408,34 @@ class SimpleConsumer(Consumer): partition = resp.partition buffer_size = partitions[partition] - try: - for message in resp.messages: - if message.offset < self.fetch_offsets[partition]: - log.debug('Skipping message %s because its offset is less than the consumer offset', - message) - continue - # Put the message in our queue - self.queue.put((partition, message)) - self.fetch_offsets[partition] = message.offset + 1 - except ConsumerFetchSizeTooSmall: + + # Check for partial message + if resp.messages and isinstance(resp.messages[-1].message, PartialMessage): + + # If buffer is at max and all we got was a partial message + # raise ConsumerFetchSizeTooSmall if (self.max_buffer_size is not None and - buffer_size == self.max_buffer_size): - log.error('Max fetch size %d too small', - self.max_buffer_size) - raise + buffer_size == self.max_buffer_size and + len(resp.messages) == 1): + + log.error('Max fetch size %d too small', self.max_buffer_size) + raise ConsumerFetchSizeTooSmall() + if self.max_buffer_size is None: buffer_size *= 2 else: - buffer_size = min(buffer_size * 2, - self.max_buffer_size) + buffer_size = min(buffer_size * 2, self.max_buffer_size) log.warning('Fetch size too small, increase to %d (2x) ' 'and retry', buffer_size) retry_partitions[partition] = buffer_size - except ConsumerNoMoreData as e: - log.debug('Iteration was ended by %r', e) - except StopIteration: - # Stop iterating through this partition - log.debug('Done iterating over partition %s', partition) + resp.messages.pop() + + for message in resp.messages: + if message.offset < self.fetch_offsets[partition]: + log.debug('Skipping message %s because its offset is less than the consumer offset', + message) + continue + # Put the message in our queue + self.queue.put((partition, message)) + self.fetch_offsets[partition] = message.offset + 1 partitions = retry_partitions |