summaryrefslogtreecommitdiff
path: root/kafka/consumer/simple.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/simple.py')
-rw-r--r--kafka/consumer/simple.py84
1 files changed, 39 insertions, 45 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index 7c63246..29eb480 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -1,18 +1,15 @@
from __future__ import absolute_import
try:
- from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611
+ from itertools import zip_longest as izip_longest, repeat # pylint: disable=E0611
except ImportError:
- from itertools import izip_longest as izip_longest, repeat # python 2
+ from itertools import izip_longest as izip_longest, repeat # pylint: disable=E0611
import logging
-try:
- import queue # python 3
-except ImportError:
- import Queue as queue # python 2
import sys
import time
import six
+from six.moves import queue
from .base import (
Consumer,
@@ -27,11 +24,12 @@ from .base import (
NO_MESSAGES_WAIT_TIME_SECONDS
)
from ..common import (
- FetchRequest, KafkaError, OffsetRequest,
+ FetchRequestPayload, KafkaError, OffsetRequestPayload,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, FailedPayloadsError, check_error
)
+from kafka.protocol.message import PartialMessage
log = logging.getLogger(__name__)
@@ -72,7 +70,7 @@ class SimpleConsumer(Consumer):
for a topic
Arguments:
- client: a connected KafkaClient
+ client: a connected SimpleClient
group: a name for this consumer, used for offset storage and must be unique
If you are connecting to a server that does not support offset
commit/fetch (any prior to 0.8.1.1), then you *must* set this to None
@@ -153,9 +151,9 @@ class SimpleConsumer(Consumer):
LATEST = -1
EARLIEST = -2
if self.auto_offset_reset == 'largest':
- reqs = [OffsetRequest(self.topic, partition, LATEST, 1)]
+ reqs = [OffsetRequestPayload(self.topic, partition, LATEST, 1)]
elif self.auto_offset_reset == 'smallest':
- reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)]
+ reqs = [OffsetRequestPayload(self.topic, partition, EARLIEST, 1)]
else:
# Let's raise an reasonable exception type if user calls
# outside of an exception context
@@ -166,7 +164,7 @@ class SimpleConsumer(Consumer):
# Otherwise we should re-raise the upstream exception
# b/c it typically includes additional data about
# the request that triggered it, and we do not want to drop that
- raise # pylint: disable-msg=E0704
+ raise # pylint: disable=E0704
# send_offset_request
log.info('Resetting topic-partition offset to %s for %s:%d',
@@ -224,23 +222,17 @@ class SimpleConsumer(Consumer):
for tmp_partition in self.offsets.keys():
if whence == 0:
- reqs.append(OffsetRequest(self.topic,
- tmp_partition,
- -2,
- 1))
+ reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -2, 1))
elif whence == 2:
- reqs.append(OffsetRequest(self.topic,
- tmp_partition,
- -1,
- 1))
+ reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -1, 1))
else:
pass
else:
deltas[partition] = offset
if whence == 0:
- reqs.append(OffsetRequest(self.topic, partition, -2, 1))
+ reqs.append(OffsetRequestPayload(self.topic, partition, -2, 1))
elif whence == 2:
- reqs.append(OffsetRequest(self.topic, partition, -1, 1))
+ reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1))
else:
pass
@@ -370,9 +362,9 @@ class SimpleConsumer(Consumer):
while partitions:
requests = []
for partition, buffer_size in six.iteritems(partitions):
- requests.append(FetchRequest(self.topic, partition,
- self.fetch_offsets[partition],
- buffer_size))
+ requests.append(FetchRequestPayload(self.topic, partition,
+ self.fetch_offsets[partition],
+ buffer_size))
# Send request
responses = self.client.send_fetch_request(
requests,
@@ -413,32 +405,34 @@ class SimpleConsumer(Consumer):
partition = resp.partition
buffer_size = partitions[partition]
- try:
- for message in resp.messages:
- if message.offset < self.fetch_offsets[partition]:
- log.debug('Skipping message %s because its offset is less than the consumer offset',
- message)
- continue
- # Put the message in our queue
- self.queue.put((partition, message))
- self.fetch_offsets[partition] = message.offset + 1
- except ConsumerFetchSizeTooSmall:
+
+ # Check for partial message
+ if resp.messages and isinstance(resp.messages[-1].message, PartialMessage):
+
+ # If buffer is at max and all we got was a partial message
+ # raise ConsumerFetchSizeTooSmall
if (self.max_buffer_size is not None and
- buffer_size == self.max_buffer_size):
- log.error('Max fetch size %d too small',
- self.max_buffer_size)
- raise
+ buffer_size == self.max_buffer_size and
+ len(resp.messages) == 1):
+
+ log.error('Max fetch size %d too small', self.max_buffer_size)
+ raise ConsumerFetchSizeTooSmall()
+
if self.max_buffer_size is None:
buffer_size *= 2
else:
- buffer_size = min(buffer_size * 2,
- self.max_buffer_size)
+ buffer_size = min(buffer_size * 2, self.max_buffer_size)
log.warning('Fetch size too small, increase to %d (2x) '
'and retry', buffer_size)
retry_partitions[partition] = buffer_size
- except ConsumerNoMoreData as e:
- log.debug('Iteration was ended by %r', e)
- except StopIteration:
- # Stop iterating through this partition
- log.debug('Done iterating over partition %s', partition)
+ resp.messages.pop()
+
+ for message in resp.messages:
+ if message.offset < self.fetch_offsets[partition]:
+ log.debug('Skipping message %s because its offset is less than the consumer offset',
+ message)
+ continue
+ # Put the message in our queue
+ self.queue.put((partition, message))
+ self.fetch_offsets[partition] = message.offset + 1
partitions = retry_partitions