summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/fetcher.py49
-rw-r--r--kafka/consumer/group.py22
2 files changed, 71 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index c133a31..8a48575 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -316,6 +316,55 @@ class Fetcher(object):
tp, fetch_offset)
return dict(drained)
+ def __iter__(self):
+ """Iterate over fetched_records"""
+ if self._subscriptions.needs_partition_assignment:
+ raise StopIteration('Subscription needs partition assignment')
+
+ self._raise_if_offset_out_of_range()
+ self._raise_if_unauthorized_topics()
+ self._raise_if_record_too_large()
+
+ while self._records:
+ (fetch_offset, tp, messages) = self._records.popleft()
+
+ if not self._subscriptions.is_assigned(tp):
+ # this can happen when a rebalance happened before
+ # fetched records are returned
+ log.warning("Not returning fetched records for partition %s"
+ " since it is no longer assigned", tp)
+ continue
+
+ # note that the consumed position should always be available
+ # as long as the partition is still assigned
+ consumed = self._subscriptions.assignment[tp].consumed
+ if not self._subscriptions.is_fetchable(tp):
+ # this can happen when a partition consumption paused before
+ # fetched records are returned
+ log.warning("Not returning fetched records for assigned partition"
+ " %s since it is no longer fetchable", tp)
+
+ # we also need to reset the fetch positions to pretend we did
+ # not fetch this partition in the previous request at all
+ self._subscriptions.assignment[tp].fetched = consumed
+
+ elif fetch_offset == consumed:
+ # TODO: handle compressed messages
+ for offset, size, msg in messages:
+ if msg.attributes:
+ raise Errors.KafkaError('Compressed messages not supported yet')
+ elif self.config['check_crcs'] and not msg.validate_crc():
+ raise Errors.InvalidMessageError(msg)
+
+ self._subscriptions.assignment[tp].consumed = offset + 1
+ key, value = self._deserialize(msg)
+ yield ConsumerRecord(tp.topic, tp.partition, offset, key, value)
+ else:
+ # these records aren't next in line based on the last consumed
+ # position, ignore them they must be from an obsolete request
+ log.warning("Ignoring fetched records for %s at offset %s",
+ tp, fetch_offset)
+
def _deserialize(self, msg):
if self.config['key_deserializer']:
key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 90d9d37..bde283c 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -4,6 +4,8 @@ import copy
import logging
import time
+import six
+
import kafka.common as Errors
from kafka.client_async import KafkaClient
@@ -565,3 +567,23 @@ class KafkaConsumer(object):
# then do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions)
+
+ def __iter__(self):
+ while True:
+ # records = self._poll_once(self.config['request_timeout_ms'])
+ self._coordinator.ensure_coordinator_known()
+
+ # ensure we have partitions assigned if we expect to
+ if self._subscription.partitions_auto_assigned():
+ self._coordinator.ensure_active_group()
+
+ # fetch positions if we have partitions we're subscribed to that we
+ # don't know the offset for
+ if not self._subscription.has_all_fetch_positions():
+ self._update_fetch_positions(self._subscription.missing_fetch_positions())
+
+ # init any new fetches (won't resend pending fetches)
+ self._fetcher.init_fetches()
+ self._client.poll(self.config['request_timeout_ms'] / 1000.0)
+ for msg in self._fetcher:
+ yield msg