summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py45
1 files changed, 35 insertions, 10 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 704c994..333ef64 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -611,6 +611,7 @@ class KafkaConsumer(six.Iterator):
self._fetcher.update_fetch_positions(partitions)
def _message_generator(self):
+ assert self.assignment() or self.subscription() is not None
while time.time() < self._consumer_timeout:
if self.config['api_version'] >= (0, 8, 2):
self._coordinator.ensure_coordinator_known()
@@ -626,21 +627,43 @@ class KafkaConsumer(six.Iterator):
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)
- # init any new fetches (won't resend pending fetches)
- self._fetcher.init_fetches()
- self._client.poll(
- max(0, self._consumer_timeout - time.time()) * 1000)
-
+ # We need to make sure we at least keep up with scheduled tasks,
+ # like heartbeats, auto-commits, and metadata refreshes
timeout_at = min(self._consumer_timeout,
self._client._delayed_tasks.next_at() + time.time(),
self._client.cluster.ttl() / 1000.0 + time.time())
+
+ if self.config['api_version'] >= (0, 9):
+ if not self.assignment():
+ sleep_time = time.time() - timeout_at
+ log.debug('No partitions assigned; sleeping for %s', sleep_time)
+ time.sleep(sleep_time)
+ continue
+
+ poll_ms = 1000 * (time.time() - self._consumer_timeout)
+
+ # Dont bother blocking if there are no fetches
+ if not self._fetcher.in_flight_fetches():
+ poll_ms = 0
+
+ self._client.poll(poll_ms)
+
if time.time() > timeout_at:
continue
+
for msg in self._fetcher:
yield msg
if time.time() > timeout_at:
+ log.debug("internal iterator timeout - breaking for poll")
break
+ # an else block on a for loop only executes if there was no break
+ # so this should only be called on a StopIteration from the fetcher
+ # and we assume that it is safe to init_fetches when fetcher is done
+ # i.e., there are no more records stored internally
+ else:
+ self._fetcher.init_fetches()
+
def __iter__(self): # pylint: disable=non-iterator-returned
return self
@@ -648,17 +671,19 @@ class KafkaConsumer(six.Iterator):
if not self._iterator:
self._iterator = self._message_generator()
- # consumer_timeout_ms can be used to stop iteration early
- if self.config['consumer_timeout_ms'] >= 0:
- self._consumer_timeout = time.time() + (
- self.config['consumer_timeout_ms'] / 1000.0)
-
+ self._set_consumer_timeout()
try:
return next(self._iterator)
except StopIteration:
self._iterator = None
raise
+ def _set_consumer_timeout(self):
+ # consumer_timeout_ms can be used to stop iteration early
+ if self.config['consumer_timeout_ms'] >= 0:
+ self._consumer_timeout = time.time() + (
+ self.config['consumer_timeout_ms'] / 1000.0)
+
# old KafkaConsumer methods are deprecated
def configure(self, **configs):
raise NotImplementedError(