diff options
-rw-r--r-- | kafka/consumer.py | 40 | ||||
-rw-r--r-- | kafka/protocol.py | 16 | ||||
-rw-r--r-- | kafka/util.py | 2 | ||||
-rw-r--r-- | test/test_integration.py | 18 |
4 files changed, 43 insertions, 33 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index fbc9f94..6ac13c7 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -12,7 +12,7 @@ from kafka.common import ( ) from kafka.util import ( - ReentrantTimer + ReentrantTimer, ConsumerFetchSizeTooSmall ) log = logging.getLogger("kafka") @@ -357,29 +357,39 @@ class SimpleConsumer(Consumer): if self.fetch_started[partition]: offset += 1 + fetch_size = self.fetch_min_bytes + while True: - req = FetchRequest(self.topic, partition, offset, self.fetch_min_bytes) + req = FetchRequest(self.topic, partition, offset, fetch_size) (resp,) = self.client.send_fetch_request([req], max_wait_time=self.fetch_max_wait_time, - min_bytes=self.fetch_min_bytes) + min_bytes=fetch_size) assert resp.topic == self.topic assert resp.partition == partition next_offset = None - for message in resp.messages: - next_offset = message.offset - - # update the offset before the message is yielded. This is - # so that the consumer state is not lost in certain cases. - # For eg: the message is yielded and consumed by the caller, - # but the caller does not come back into the generator again. - # The message will be consumed but the status will not be - # updated in the consumer - self.fetch_started[partition] = True - self.offsets[partition] = message.offset - yield message + try: + for message in resp.messages: + next_offset = message.offset + + # update the offset before the message is yielded. This is + # so that the consumer state is not lost in certain cases. + # For eg: the message is yielded and consumed by the caller, + # but the caller does not come back into the generator again. + # The message will be consumed but the status will not be + # updated in the consumer + self.fetch_started[partition] = True + self.offsets[partition] = message.offset + yield message + except ConsumerFetchSizeTooSmall, e: + log.warn("Fetch size is too small, increasing by 1.5x and retrying") + fetch_size *= 1.5 + continue + except ConsumerNoMoreData, e: + log.debug("Iteration was ended by %r", e) + if next_offset is None: break else: diff --git a/kafka/protocol.py b/kafka/protocol.py index 6bd5c73..f985479 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -13,7 +13,7 @@ from kafka.common import ( from kafka.util import ( read_short_string, read_int_string, relative_unpack, write_short_string, write_int_string, group_by_topic_and_partition, - BufferUnderflowError, ChecksumError + BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall ) log = logging.getLogger("kafka") @@ -110,17 +110,21 @@ class KafkaProtocol(object): recurse easily. """ cur = 0 + read_message = False while cur < len(data): try: ((offset, ), cur) = relative_unpack('>q', data, cur) (msg, cur) = read_int_string(data, cur) - for (offset, message) in KafkaProtocol._decode_message(msg, - offset): + for (offset, message) in KafkaProtocol._decode_message(msg, offset): + read_message = True yield OffsetAndMessage(offset, message) - except BufferUnderflowError: - # If we get a partial read of a message, stop - raise StopIteration() + if read_message is False: + # If we get a partial read of a message, but haven't yielded anyhting + # there's a problem + raise ConsumerFetchSizeTooSmall() + else: + raise StopIteration() @classmethod def _decode_message(cls, data, offset): diff --git a/kafka/util.py b/kafka/util.py index 11178f5..bdda7ed 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -73,6 +73,8 @@ class BufferUnderflowError(Exception): class ChecksumError(Exception): pass +class ConsumerFetchSizeTooSmall(Exception): + pass class ReentrantTimer(object): """ diff --git a/test/test_integration.py b/test/test_integration.py index 6384b09..bf1acc8 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -8,7 +8,6 @@ import random from kafka import * # noqa from kafka.common import * # noqa from kafka.codec import has_gzip, has_snappy - from .fixtures import ZookeeperFixture, KafkaFixture @@ -757,20 +756,15 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 10) + # Consumer should still get all of them consumer = SimpleConsumer(self.client, "group1", "test_large_messages") - it = consumer.__iter__() - for i in range(10): - self.assertEquals(messages1[i], it.next().message) - - consumer = SimpleConsumer(self.client, "group2", "test_large_messages", fetch_size_bytes=5120) - it = consumer.__iter__() - for i in range(10): - self.assertEquals(messages1[i], it.next().message) - for i in range(10): - self.assertEquals(messages2[i], it.next().message) + all_messages = messages1 + messages2 + for i, message in enumerate(consumer): + self.assertEquals(all_messages[i], message.message) + self.assertEquals(i, 19) def random_string(l): - s = "".join(random.choice(string.printable) for i in xrange(l)) + s = "".join(random.choice(string.letters) for i in xrange(l)) return s if __name__ == "__main__": |