summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-10-23 11:59:40 -0700
committerDana Powers <dana.powers@gmail.com>2016-10-23 11:59:40 -0700
commit45ab884a5c398fe394e1df1e5c587392e0f3faba (patch)
treef0dd38495b63bd01f0e40045fa60b1bbb9181288 /kafka/consumer/group.py
parent9450a6bfff8517371162a968f4345ffc09380bb8 (diff)
downloadkafka-python-consumer_iterator.tar.gz
Revert consumer iterators from max_poll_recordsconsumer_iterator
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py95
1 files changed, 88 insertions, 7 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index efadde1..3ab68a7 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -298,6 +298,8 @@ class KafkaConsumer(six.Iterator):
assignors=self.config['partition_assignment_strategy'],
**self.config)
self._closed = False
+ self._iterator = None
+ self._consumer_timeout = float('inf')
if topics:
self._subscription.subscribe(topics=topics)
@@ -835,17 +837,96 @@ class KafkaConsumer(six.Iterator):
# then do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions)
+ def _message_generator(self):
+ assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
+ while time.time() < self._consumer_timeout:
+
+ if self._use_consumer_group():
+ self._coordinator.ensure_coordinator_known()
+ self._coordinator.ensure_active_group()
+
+ # 0.8.2 brokers support kafka-backed offset storage via group coordinator
+ elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
+ self._coordinator.ensure_coordinator_known()
+
+ # fetch offsets for any subscribed partitions that we arent tracking yet
+ if not self._subscription.has_all_fetch_positions():
+ partitions = self._subscription.missing_fetch_positions()
+ self._update_fetch_positions(partitions)
+
+ poll_ms = 1000 * (self._consumer_timeout - time.time())
+ if not self._fetcher.in_flight_fetches():
+ poll_ms = 0
+ self._client.poll(timeout_ms=poll_ms, sleep=True)
+
+ # We need to make sure we at least keep up with scheduled tasks,
+ # like heartbeats, auto-commits, and metadata refreshes
+ timeout_at = self._next_timeout()
+
+ # Because the consumer client poll does not sleep unless blocking on
+ # network IO, we need to explicitly sleep when we know we are idle
+ # because we haven't been assigned any partitions to fetch / consume
+ if self._use_consumer_group() and not self.assignment():
+ sleep_time = max(timeout_at - time.time(), 0)
+ if sleep_time > 0 and not self._client.in_flight_request_count():
+ log.debug('No partitions assigned; sleeping for %s', sleep_time)
+ time.sleep(sleep_time)
+ continue
+
+ # Short-circuit the fetch iterator if we are already timed out
+ # to avoid any unintentional interaction with fetcher setup
+ if time.time() > timeout_at:
+ continue
+
+ for msg in self._fetcher:
+ yield msg
+ if time.time() > timeout_at:
+ log.debug("internal iterator timeout - breaking for poll")
+ break
+
+ # an else block on a for loop only executes if there was no break
+ # so this should only be called on a StopIteration from the fetcher
+ # and we assume that it is safe to init_fetches when fetcher is done
+ # i.e., there are no more records stored internally
+ else:
+ self._fetcher.send_fetches()
+
+ def _next_timeout(self):
+ timeout = min(self._consumer_timeout,
+ self._client._delayed_tasks.next_at() + time.time(),
+ self._client.cluster.ttl() / 1000.0 + time.time())
+
+ # Although the delayed_tasks timeout above should cover processing
+ # HeartbeatRequests, it is still possible that HeartbeatResponses
+ # are left unprocessed during a long _fetcher iteration without
+ # an intermediate poll(). And because tasks are responsible for
+ # rescheduling themselves, an unprocessed response will prevent
+ # the next heartbeat from being sent. This check should help
+ # avoid that.
+ if self._use_consumer_group():
+ heartbeat = time.time() + self._coordinator.heartbeat.ttl()
+ timeout = min(timeout, heartbeat)
+ return timeout
+
def __iter__(self): # pylint: disable=non-iterator-returned
return self
def __next__(self):
- ret = self.poll(timeout_ms=self.config['consumer_timeout_ms'], max_records=1)
- if not ret:
- raise StopIteration
- assert len(ret) == 1
- (messages,) = ret.values()
- assert len(messages) == 1
- return messages[0]
+ if not self._iterator:
+ self._iterator = self._message_generator()
+
+ self._set_consumer_timeout()
+ try:
+ return next(self._iterator)
+ except StopIteration:
+ self._iterator = None
+ raise
+
+ def _set_consumer_timeout(self):
+ # consumer_timeout_ms can be used to stop iteration early
+ if self.config['consumer_timeout_ms'] >= 0:
+ self._consumer_timeout = time.time() + (
+ self.config['consumer_timeout_ms'] / 1000.0)
# old KafkaConsumer methods are deprecated
def configure(self, **configs):