diff options
Diffstat (limited to 'kafka/consumer/simple.py')
-rw-r--r-- | kafka/consumer/simple.py | 84 |
1 files changed, 39 insertions, 45 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 7c63246..29eb480 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -1,18 +1,15 @@ from __future__ import absolute_import try: - from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611 + from itertools import zip_longest as izip_longest, repeat # pylint: disable=E0611 except ImportError: - from itertools import izip_longest as izip_longest, repeat # python 2 + from itertools import izip_longest as izip_longest, repeat # pylint: disable=E0611 import logging -try: - import queue # python 3 -except ImportError: - import Queue as queue # python 2 import sys import time import six +from six.moves import queue from .base import ( Consumer, @@ -27,11 +24,12 @@ from .base import ( NO_MESSAGES_WAIT_TIME_SECONDS ) from ..common import ( - FetchRequest, KafkaError, OffsetRequest, + FetchRequestPayload, KafkaError, OffsetRequestPayload, ConsumerFetchSizeTooSmall, ConsumerNoMoreData, UnknownTopicOrPartitionError, NotLeaderForPartitionError, OffsetOutOfRangeError, FailedPayloadsError, check_error ) +from kafka.protocol.message import PartialMessage log = logging.getLogger(__name__) @@ -72,7 +70,7 @@ class SimpleConsumer(Consumer): for a topic Arguments: - client: a connected KafkaClient + client: a connected SimpleClient group: a name for this consumer, used for offset storage and must be unique If you are connecting to a server that does not support offset commit/fetch (any prior to 0.8.1.1), then you *must* set this to None @@ -153,9 +151,9 @@ class SimpleConsumer(Consumer): LATEST = -1 EARLIEST = -2 if self.auto_offset_reset == 'largest': - reqs = [OffsetRequest(self.topic, partition, LATEST, 1)] + reqs = [OffsetRequestPayload(self.topic, partition, LATEST, 1)] elif self.auto_offset_reset == 'smallest': - reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)] + reqs = [OffsetRequestPayload(self.topic, partition, EARLIEST, 1)] else: # Let's raise an reasonable exception type if user calls # outside of an exception context @@ -166,7 +164,7 @@ class SimpleConsumer(Consumer): # Otherwise we should re-raise the upstream exception # b/c it typically includes additional data about # the request that triggered it, and we do not want to drop that - raise # pylint: disable-msg=E0704 + raise # pylint: disable=E0704 # send_offset_request log.info('Resetting topic-partition offset to %s for %s:%d', @@ -224,23 +222,17 @@ class SimpleConsumer(Consumer): for tmp_partition in self.offsets.keys(): if whence == 0: - reqs.append(OffsetRequest(self.topic, - tmp_partition, - -2, - 1)) + reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -2, 1)) elif whence == 2: - reqs.append(OffsetRequest(self.topic, - tmp_partition, - -1, - 1)) + reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -1, 1)) else: pass else: deltas[partition] = offset if whence == 0: - reqs.append(OffsetRequest(self.topic, partition, -2, 1)) + reqs.append(OffsetRequestPayload(self.topic, partition, -2, 1)) elif whence == 2: - reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1)) else: pass @@ -370,9 +362,9 @@ class SimpleConsumer(Consumer): while partitions: requests = [] for partition, buffer_size in six.iteritems(partitions): - requests.append(FetchRequest(self.topic, partition, - self.fetch_offsets[partition], - buffer_size)) + requests.append(FetchRequestPayload(self.topic, partition, + self.fetch_offsets[partition], + buffer_size)) # Send request responses = self.client.send_fetch_request( requests, @@ -413,32 +405,34 @@ class SimpleConsumer(Consumer): partition = resp.partition buffer_size = partitions[partition] - try: - for message in resp.messages: - if message.offset < self.fetch_offsets[partition]: - log.debug('Skipping message %s because its offset is less than the consumer offset', - message) - continue - # Put the message in our queue - self.queue.put((partition, message)) - self.fetch_offsets[partition] = message.offset + 1 - except ConsumerFetchSizeTooSmall: + + # Check for partial message + if resp.messages and isinstance(resp.messages[-1].message, PartialMessage): + + # If buffer is at max and all we got was a partial message + # raise ConsumerFetchSizeTooSmall if (self.max_buffer_size is not None and - buffer_size == self.max_buffer_size): - log.error('Max fetch size %d too small', - self.max_buffer_size) - raise + buffer_size == self.max_buffer_size and + len(resp.messages) == 1): + + log.error('Max fetch size %d too small', self.max_buffer_size) + raise ConsumerFetchSizeTooSmall() + if self.max_buffer_size is None: buffer_size *= 2 else: - buffer_size = min(buffer_size * 2, - self.max_buffer_size) + buffer_size = min(buffer_size * 2, self.max_buffer_size) log.warning('Fetch size too small, increase to %d (2x) ' 'and retry', buffer_size) retry_partitions[partition] = buffer_size - except ConsumerNoMoreData as e: - log.debug('Iteration was ended by %r', e) - except StopIteration: - # Stop iterating through this partition - log.debug('Done iterating over partition %s', partition) + resp.messages.pop() + + for message in resp.messages: + if message.offset < self.fetch_offsets[partition]: + log.debug('Skipping message %s because its offset is less than the consumer offset', + message) + continue + # Put the message in our queue + self.queue.put((partition, message)) + self.fetch_offsets[partition] = message.offset + 1 partitions = retry_partitions |