summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-30 16:26:46 -0800
committerDana Powers <dana.powers@rd.io>2015-12-30 16:26:46 -0800
commit14de82535a66e2bfadddb76e7cb2b842be63b0fe (patch)
tree487d6116638a63bcd74d8923db290cf1967937d7 /kafka/consumer/fetcher.py
parent93b8afed014f354dd6d348d97dfa2b159c17c5da (diff)
downloadkafka-python-14de82535a66e2bfadddb76e7cb2b842be63b0fe.tar.gz
Support simple message iteration in Fetcher and new KafkaConsumer
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py49
1 files changed, 49 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index c133a31..8a48575 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -316,6 +316,55 @@ class Fetcher(object):
tp, fetch_offset)
return dict(drained)
+ def __iter__(self):
+ """Iterate over fetched_records"""
+ if self._subscriptions.needs_partition_assignment:
+ raise StopIteration('Subscription needs partition assignment')
+
+ self._raise_if_offset_out_of_range()
+ self._raise_if_unauthorized_topics()
+ self._raise_if_record_too_large()
+
+ while self._records:
+ (fetch_offset, tp, messages) = self._records.popleft()
+
+ if not self._subscriptions.is_assigned(tp):
+ # this can happen when a rebalance happened before
+ # fetched records are returned
+ log.warning("Not returning fetched records for partition %s"
+ " since it is no longer assigned", tp)
+ continue
+
+ # note that the consumed position should always be available
+ # as long as the partition is still assigned
+ consumed = self._subscriptions.assignment[tp].consumed
+ if not self._subscriptions.is_fetchable(tp):
+ # this can happen when a partition consumption paused before
+ # fetched records are returned
+ log.warning("Not returning fetched records for assigned partition"
+ " %s since it is no longer fetchable", tp)
+
+ # we also need to reset the fetch positions to pretend we did
+ # not fetch this partition in the previous request at all
+ self._subscriptions.assignment[tp].fetched = consumed
+
+ elif fetch_offset == consumed:
+ # TODO: handle compressed messages
+ for offset, size, msg in messages:
+ if msg.attributes:
+ raise Errors.KafkaError('Compressed messages not supported yet')
+ elif self.config['check_crcs'] and not msg.validate_crc():
+ raise Errors.InvalidMessageError(msg)
+
+ self._subscriptions.assignment[tp].consumed = offset + 1
+ key, value = self._deserialize(msg)
+ yield ConsumerRecord(tp.topic, tp.partition, offset, key, value)
+ else:
+ # these records aren't next in line based on the last consumed
+ # position, ignore them they must be from an obsolete request
+ log.warning("Ignoring fetched records for %s at offset %s",
+ tp, fetch_offset)
+
def _deserialize(self, msg):
if self.config['key_deserializer']:
key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable