summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/group.py79
-rw-r--r--kafka/coordinator/base.py31
-rw-r--r--test/test_consumer_group.py27
-rw-r--r--test/test_coordinator.py16
4 files changed, 108 insertions, 45 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index b43b0f4..9db4b5d 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -439,14 +439,14 @@ class KafkaConsumer(six.Iterator):
Returns:
dict: map of topic to list of records (may be empty)
"""
- if self.config['group_id'] is not None:
- if self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_known()
+ if self._use_consumer_group():
+ self._coordinator.ensure_coordinator_known()
+ self._coordinator.ensure_active_group()
+
+ # 0.8.2 brokers support kafka-backed offset storage via group coordinator
+ elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
+ self._coordinator.ensure_coordinator_known()
- if self.config['api_version'] >= (0, 9):
- # ensure we have partitions assigned if we expect to
- if self._subscription.partitions_auto_assigned():
- self._coordinator.ensure_active_group()
# fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
@@ -665,6 +665,16 @@ class KafkaConsumer(six.Iterator):
self._client.set_topics([])
log.debug("Unsubscribed all topics or patterns and assigned partitions")
+ def _use_consumer_group(self):
+ """Return True iff this consumer can/should join a broker-coordinated group."""
+ if self.config['api_version'] < (0, 9):
+ return False
+ elif self.config['group_id'] is None:
+ return False
+ elif not self._subscription.partitions_auto_assigned():
+ return False
+ return True
+
def _update_fetch_positions(self, partitions):
"""
Set the fetch position to the committed position (if there is one)
@@ -690,17 +700,16 @@ class KafkaConsumer(six.Iterator):
def _message_generator(self):
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
while time.time() < self._consumer_timeout:
- if self.config['group_id'] is not None:
- if self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_known()
- if self.config['api_version'] >= (0, 9):
- # ensure we have partitions assigned if we expect to
- if self._subscription.partitions_auto_assigned():
- self._coordinator.ensure_active_group()
+ if self._use_consumer_group():
+ self._coordinator.ensure_coordinator_known()
+ self._coordinator.ensure_active_group()
- # fetch positions if we have partitions we're subscribed to that we
- # don't know the offset for
+ # 0.8.2 brokers support kafka-backed offset storage via group coordinator
+ elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
+ self._coordinator.ensure_coordinator_known()
+
+ # fetch offsets for any subscribed partitions that we arent tracking yet
if not self._subscription.has_all_fetch_positions():
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)
@@ -714,14 +723,18 @@ class KafkaConsumer(six.Iterator):
# like heartbeats, auto-commits, and metadata refreshes
timeout_at = self._next_timeout()
- if self.config['api_version'] >= (0, 9):
- if self.config['group_id'] is not None and not self.assignment():
- sleep_time = max(timeout_at - time.time(), 0)
- if sleep_time > 0 and not self._client.in_flight_request_count():
- log.debug('No partitions assigned; sleeping for %s', sleep_time)
- time.sleep(sleep_time)
- continue
-
+ # Because the consumer client poll does not sleep unless blocking on
+ # network IO, we need to explicitly sleep when we know we are idle
+ # because we haven't been assigned any partitions to fetch / consume
+ if self._use_consumer_group() and not self.assignment():
+ sleep_time = max(timeout_at - time.time(), 0)
+ if sleep_time > 0 and not self._client.in_flight_request_count():
+ log.debug('No partitions assigned; sleeping for %s', sleep_time)
+ time.sleep(sleep_time)
+ continue
+
+ # Short-circuit the fetch iterator if we are already timed out
+ # to avoid any unintentional interaction with fetcher setup
if time.time() > timeout_at:
continue
@@ -739,9 +752,21 @@ class KafkaConsumer(six.Iterator):
self._fetcher.init_fetches()
def _next_timeout(self):
- return min(self._consumer_timeout,
- self._client._delayed_tasks.next_at() + time.time(),
- self._client.cluster.ttl() / 1000.0 + time.time())
+ timeout = min(self._consumer_timeout,
+ self._client._delayed_tasks.next_at() + time.time(),
+ self._client.cluster.ttl() / 1000.0 + time.time())
+
+ # Although the delayed_tasks timeout above should cover processing
+ # HeartbeatRequests, it is still possible that HeartbeatResponses
+ # are left unprocessed during a long _fetcher iteration without
+ # an intermediate poll(). And because tasks are responsible for
+ # rescheduling themselves, an unprocessed response will prevent
+ # the next heartbeat from being sent. This check should help
+ # avoid that.
+ if self._use_consumer_group():
+ heartbeat = time.time() + self._coordinator.heartbeat.ttl()
+ timeout = min(timeout, heartbeat)
+ return timeout
def __iter__(self): # pylint: disable=non-iterator-returned
return self
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index c49c38b..dca809e 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -536,26 +536,27 @@ class BaseCoordinator(object):
#self.sensors.heartbeat_latency.record(response.requestLatencyMs())
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
- log.debug("Received successful heartbeat response.")
+ log.info("Heartbeat successful")
future.success(None)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
- log.info("Heartbeat failed: coordinator is either not started or"
- " not valid; will refresh metadata and retry")
+ log.warning("Heartbeat failed: coordinator is either not started or"
+ " not valid; will refresh metadata and retry")
self.coordinator_dead()
future.failure(error_type())
elif error_type is Errors.RebalanceInProgressError:
- log.info("Heartbeat failed: group is rebalancing; re-joining group")
+ log.warning("Heartbeat: group is rebalancing; this consumer needs to"
+ " re-join")
self.rejoin_needed = True
future.failure(error_type())
elif error_type is Errors.IllegalGenerationError:
- log.info("Heartbeat failed: local generation id is not current;"
- " re-joining group")
+ log.warning("Heartbeat: generation id is not current; this consumer"
+ " needs to re-join")
self.rejoin_needed = True
future.failure(error_type())
elif error_type is Errors.UnknownMemberIdError:
- log.info("Heartbeat failed: local member_id was not recognized;"
- " resetting and re-joining group")
+ log.warning("Heartbeat: local member_id was not recognized;"
+ " this consumer needs to re-join")
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
self.rejoin_needed = True
future.failure(error_type)
@@ -594,12 +595,16 @@ class HeartbeatTask(object):
def __call__(self):
if (self._coordinator.generation < 0 or
- self._coordinator.need_rejoin() or
- self._coordinator.coordinator_unknown()):
+ self._coordinator.need_rejoin()):
# no need to send the heartbeat we're not using auto-assignment
# or if we are awaiting a rebalance
- log.debug("Skipping heartbeat: no auto-assignment"
- " or waiting on rebalance")
+ log.info("Skipping heartbeat: no auto-assignment"
+ " or waiting on rebalance")
+ return
+
+ if self._coordinator.coordinator_unknown():
+ log.warning("Coordinator unknown during heartbeat -- will retry")
+ self._handle_heartbeat_failure(Errors.GroupCoordinatorNotAvailableError())
return
if self._heartbeat.session_expired():
@@ -629,7 +634,7 @@ class HeartbeatTask(object):
self._client.schedule(self, time.time() + ttl)
def _handle_heartbeat_failure(self, e):
- log.debug("Heartbeat failed; retrying")
+ log.warning("Heartbeat failed; retrying")
self._request_in_flight = False
etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0
self._client.schedule(self, etd)
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index 6ef2020..3d10f8f 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -1,16 +1,17 @@
import collections
import logging
import threading
-import os
import time
import pytest
import six
-from kafka import SimpleClient, SimpleProducer
+from kafka import SimpleClient
from kafka.common import TopicPartition
-from kafka.conn import BrokerConnection, ConnectionStates
+from kafka.conn import ConnectionStates
from kafka.consumer.group import KafkaConsumer
+from kafka.future import Future
+from kafka.protocol.metadata import MetadataResponse
from test.conftest import version
from test.testutil import random_string
@@ -115,3 +116,23 @@ def test_group(kafka_broker, topic):
finally:
for c in range(num_consumers):
stop[c].set()
+
+
+@pytest.fixture
+def conn(mocker):
+ conn = mocker.patch('kafka.client_async.BrokerConnection')
+ conn.return_value = conn
+ conn.state = ConnectionStates.CONNECTED
+ conn.send.return_value = Future().success(
+ MetadataResponse(
+ [(0, 'foo', 12), (1, 'bar', 34)], # brokers
+ [])) # topics
+ return conn
+
+
+def test_heartbeat_timeout(conn, mocker):
+ mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = '0.9')
+ mocker.patch('time.time', return_value = 1234)
+ consumer = KafkaConsumer('foobar')
+ mocker.patch.object(consumer._coordinator.heartbeat, 'ttl', return_value = 0)
+ assert consumer._next_timeout() == 1234
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 94e0e66..847cbc1 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -380,16 +380,20 @@ def test_maybe_auto_commit_offsets_sync(mocker, coordinator,
def patched_coord(mocker, coordinator):
coordinator._subscription.subscribe(topics=['foobar'])
coordinator._subscription.needs_partition_assignment = False
- mocker.patch.object(coordinator, 'coordinator_unknown')
- coordinator.coordinator_unknown.return_value = False
+ mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
coordinator.coordinator_id = 0
+ coordinator.generation = 0
+ mocker.patch.object(coordinator, 'need_rejoin', return_value=False)
mocker.patch.object(coordinator._client, 'least_loaded_node',
return_value=1)
mocker.patch.object(coordinator._client, 'ready', return_value=True)
mocker.patch.object(coordinator._client, 'send')
+ mocker.patch.object(coordinator._client, 'schedule')
mocker.spy(coordinator, '_failed_request')
mocker.spy(coordinator, '_handle_offset_commit_response')
mocker.spy(coordinator, '_handle_offset_fetch_response')
+ mocker.spy(coordinator.heartbeat_task, '_handle_heartbeat_success')
+ mocker.spy(coordinator.heartbeat_task, '_handle_heartbeat_failure')
return coordinator
@@ -573,3 +577,11 @@ def test_handle_offset_fetch_response(patched_coord, offsets,
assert future.value == offsets
assert patched_coord.coordinator_id is (None if dead else 0)
assert patched_coord._subscription.needs_partition_assignment is reassign
+
+
+def test_heartbeat(patched_coord):
+ patched_coord.coordinator_unknown.return_value = True
+
+ patched_coord.heartbeat_task()
+ assert patched_coord._client.schedule.call_count == 1
+ assert patched_coord.heartbeat_task._handle_heartbeat_failure.call_count == 1