diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-30 16:26:46 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-30 16:26:46 -0800 |
commit | 14de82535a66e2bfadddb76e7cb2b842be63b0fe (patch) | |
tree | 487d6116638a63bcd74d8923db290cf1967937d7 | |
parent | 93b8afed014f354dd6d348d97dfa2b159c17c5da (diff) | |
download | kafka-python-14de82535a66e2bfadddb76e7cb2b842be63b0fe.tar.gz |
Support simple message iteration in Fetcher and new KafkaConsumer
-rw-r--r-- | kafka/consumer/fetcher.py | 49 | ||||
-rw-r--r-- | kafka/consumer/group.py | 22 |
2 files changed, 71 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c133a31..8a48575 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -316,6 +316,55 @@ class Fetcher(object): tp, fetch_offset) return dict(drained) + def __iter__(self): + """Iterate over fetched_records""" + if self._subscriptions.needs_partition_assignment: + raise StopIteration('Subscription needs partition assignment') + + self._raise_if_offset_out_of_range() + self._raise_if_unauthorized_topics() + self._raise_if_record_too_large() + + while self._records: + (fetch_offset, tp, messages) = self._records.popleft() + + if not self._subscriptions.is_assigned(tp): + # this can happen when a rebalance happened before + # fetched records are returned + log.warning("Not returning fetched records for partition %s" + " since it is no longer assigned", tp) + continue + + # note that the consumed position should always be available + # as long as the partition is still assigned + consumed = self._subscriptions.assignment[tp].consumed + if not self._subscriptions.is_fetchable(tp): + # this can happen when a partition consumption paused before + # fetched records are returned + log.warning("Not returning fetched records for assigned partition" + " %s since it is no longer fetchable", tp) + + # we also need to reset the fetch positions to pretend we did + # not fetch this partition in the previous request at all + self._subscriptions.assignment[tp].fetched = consumed + + elif fetch_offset == consumed: + # TODO: handle compressed messages + for offset, size, msg in messages: + if msg.attributes: + raise Errors.KafkaError('Compressed messages not supported yet') + elif self.config['check_crcs'] and not msg.validate_crc(): + raise Errors.InvalidMessageError(msg) + + self._subscriptions.assignment[tp].consumed = offset + 1 + key, value = self._deserialize(msg) + yield ConsumerRecord(tp.topic, tp.partition, offset, key, value) + else: + # these records aren't next in line based on the last consumed + # position, ignore them they must be from an obsolete request + log.warning("Ignoring fetched records for %s at offset %s", + tp, fetch_offset) + def _deserialize(self, msg): if self.config['key_deserializer']: key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 90d9d37..bde283c 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -4,6 +4,8 @@ import copy import logging import time +import six + import kafka.common as Errors from kafka.client_async import KafkaClient @@ -565,3 +567,23 @@ class KafkaConsumer(object): # then do any offset lookups in case some positions are not known self._fetcher.update_fetch_positions(partitions) + + def __iter__(self): + while True: + # records = self._poll_once(self.config['request_timeout_ms']) + self._coordinator.ensure_coordinator_known() + + # ensure we have partitions assigned if we expect to + if self._subscription.partitions_auto_assigned(): + self._coordinator.ensure_active_group() + + # fetch positions if we have partitions we're subscribed to that we + # don't know the offset for + if not self._subscription.has_all_fetch_positions(): + self._update_fetch_positions(self._subscription.missing_fetch_positions()) + + # init any new fetches (won't resend pending fetches) + self._fetcher.init_fetches() + self._client.poll(self.config['request_timeout_ms'] / 1000.0) + for msg in self._fetcher: + yield msg |