diff options
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 22 |
1 files changed, 22 insertions, 0 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 90d9d37..bde283c 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -4,6 +4,8 @@ import copy import logging import time +import six + import kafka.common as Errors from kafka.client_async import KafkaClient @@ -565,3 +567,23 @@ class KafkaConsumer(object): # then do any offset lookups in case some positions are not known self._fetcher.update_fetch_positions(partitions) + + def __iter__(self): + while True: + # records = self._poll_once(self.config['request_timeout_ms']) + self._coordinator.ensure_coordinator_known() + + # ensure we have partitions assigned if we expect to + if self._subscription.partitions_auto_assigned(): + self._coordinator.ensure_active_group() + + # fetch positions if we have partitions we're subscribed to that we + # don't know the offset for + if not self._subscription.has_all_fetch_positions(): + self._update_fetch_positions(self._subscription.missing_fetch_positions()) + + # init any new fetches (won't resend pending fetches) + self._fetcher.init_fetches() + self._client.poll(self.config['request_timeout_ms'] / 1000.0) + for msg in self._fetcher: + yield msg |