summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py79
1 files changed, 52 insertions, 27 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index b43b0f4..9db4b5d 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -439,14 +439,14 @@ class KafkaConsumer(six.Iterator):
Returns:
dict: map of topic to list of records (may be empty)
"""
- if self.config['group_id'] is not None:
- if self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_known()
+ if self._use_consumer_group():
+ self._coordinator.ensure_coordinator_known()
+ self._coordinator.ensure_active_group()
+
+ # 0.8.2 brokers support kafka-backed offset storage via group coordinator
+ elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
+ self._coordinator.ensure_coordinator_known()
- if self.config['api_version'] >= (0, 9):
- # ensure we have partitions assigned if we expect to
- if self._subscription.partitions_auto_assigned():
- self._coordinator.ensure_active_group()
# fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
@@ -665,6 +665,16 @@ class KafkaConsumer(six.Iterator):
self._client.set_topics([])
log.debug("Unsubscribed all topics or patterns and assigned partitions")
+ def _use_consumer_group(self):
+ """Return True iff this consumer can/should join a broker-coordinated group."""
+ if self.config['api_version'] < (0, 9):
+ return False
+ elif self.config['group_id'] is None:
+ return False
+ elif not self._subscription.partitions_auto_assigned():
+ return False
+ return True
+
def _update_fetch_positions(self, partitions):
"""
Set the fetch position to the committed position (if there is one)
@@ -690,17 +700,16 @@ class KafkaConsumer(six.Iterator):
def _message_generator(self):
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
while time.time() < self._consumer_timeout:
- if self.config['group_id'] is not None:
- if self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_known()
- if self.config['api_version'] >= (0, 9):
- # ensure we have partitions assigned if we expect to
- if self._subscription.partitions_auto_assigned():
- self._coordinator.ensure_active_group()
+ if self._use_consumer_group():
+ self._coordinator.ensure_coordinator_known()
+ self._coordinator.ensure_active_group()
- # fetch positions if we have partitions we're subscribed to that we
- # don't know the offset for
+ # 0.8.2 brokers support kafka-backed offset storage via group coordinator
+ elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
+ self._coordinator.ensure_coordinator_known()
+
+ # fetch offsets for any subscribed partitions that we arent tracking yet
if not self._subscription.has_all_fetch_positions():
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)
@@ -714,14 +723,18 @@ class KafkaConsumer(six.Iterator):
# like heartbeats, auto-commits, and metadata refreshes
timeout_at = self._next_timeout()
- if self.config['api_version'] >= (0, 9):
- if self.config['group_id'] is not None and not self.assignment():
- sleep_time = max(timeout_at - time.time(), 0)
- if sleep_time > 0 and not self._client.in_flight_request_count():
- log.debug('No partitions assigned; sleeping for %s', sleep_time)
- time.sleep(sleep_time)
- continue
-
+ # Because the consumer client poll does not sleep unless blocking on
+ # network IO, we need to explicitly sleep when we know we are idle
+ # because we haven't been assigned any partitions to fetch / consume
+ if self._use_consumer_group() and not self.assignment():
+ sleep_time = max(timeout_at - time.time(), 0)
+ if sleep_time > 0 and not self._client.in_flight_request_count():
+ log.debug('No partitions assigned; sleeping for %s', sleep_time)
+ time.sleep(sleep_time)
+ continue
+
+ # Short-circuit the fetch iterator if we are already timed out
+ # to avoid any unintentional interaction with fetcher setup
if time.time() > timeout_at:
continue
@@ -739,9 +752,21 @@ class KafkaConsumer(six.Iterator):
self._fetcher.init_fetches()
def _next_timeout(self):
- return min(self._consumer_timeout,
- self._client._delayed_tasks.next_at() + time.time(),
- self._client.cluster.ttl() / 1000.0 + time.time())
+ timeout = min(self._consumer_timeout,
+ self._client._delayed_tasks.next_at() + time.time(),
+ self._client.cluster.ttl() / 1000.0 + time.time())
+
+ # Although the delayed_tasks timeout above should cover processing
+ # HeartbeatRequests, it is still possible that HeartbeatResponses
+ # are left unprocessed during a long _fetcher iteration without
+ # an intermediate poll(). And because tasks are responsible for
+ # rescheduling themselves, an unprocessed response will prevent
+ # the next heartbeat from being sent. This check should help
+ # avoid that.
+ if self._use_consumer_group():
+ heartbeat = time.time() + self._coordinator.heartbeat.ttl()
+ timeout = min(timeout, heartbeat)
+ return timeout
def __iter__(self): # pylint: disable=non-iterator-returned
return self