summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-30 16:26:46 -0800
committerDana Powers <dana.powers@rd.io>2015-12-30 16:26:46 -0800
commit14de82535a66e2bfadddb76e7cb2b842be63b0fe (patch)
tree487d6116638a63bcd74d8923db290cf1967937d7 /kafka/consumer/group.py
parent93b8afed014f354dd6d348d97dfa2b159c17c5da (diff)
downloadkafka-python-14de82535a66e2bfadddb76e7cb2b842be63b0fe.tar.gz
Support simple message iteration in Fetcher and new KafkaConsumer
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py22
1 files changed, 22 insertions, 0 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 90d9d37..bde283c 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -4,6 +4,8 @@ import copy
import logging
import time
+import six
+
import kafka.common as Errors
from kafka.client_async import KafkaClient
@@ -565,3 +567,23 @@ class KafkaConsumer(object):
# then do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions)
+
+ def __iter__(self):
+ while True:
+ # records = self._poll_once(self.config['request_timeout_ms'])
+ self._coordinator.ensure_coordinator_known()
+
+ # ensure we have partitions assigned if we expect to
+ if self._subscription.partitions_auto_assigned():
+ self._coordinator.ensure_active_group()
+
+ # fetch positions if we have partitions we're subscribed to that we
+ # don't know the offset for
+ if not self._subscription.has_all_fetch_positions():
+ self._update_fetch_positions(self._subscription.missing_fetch_positions())
+
+ # init any new fetches (won't resend pending fetches)
+ self._fetcher.init_fetches()
+ self._client.poll(self.config['request_timeout_ms'] / 1000.0)
+ for msg in self._fetcher:
+ yield msg