diff options
-rw-r--r-- | kafka/consumer/group.py | 79 | ||||
-rw-r--r-- | kafka/coordinator/base.py | 31 | ||||
-rw-r--r-- | test/test_consumer_group.py | 27 | ||||
-rw-r--r-- | test/test_coordinator.py | 16 |
4 files changed, 108 insertions, 45 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index b43b0f4..9db4b5d 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -439,14 +439,14 @@ class KafkaConsumer(six.Iterator): Returns: dict: map of topic to list of records (may be empty) """ - if self.config['group_id'] is not None: - if self.config['api_version'] >= (0, 8, 2): - self._coordinator.ensure_coordinator_known() + if self._use_consumer_group(): + self._coordinator.ensure_coordinator_known() + self._coordinator.ensure_active_group() + + # 0.8.2 brokers support kafka-backed offset storage via group coordinator + elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): + self._coordinator.ensure_coordinator_known() - if self.config['api_version'] >= (0, 9): - # ensure we have partitions assigned if we expect to - if self._subscription.partitions_auto_assigned(): - self._coordinator.ensure_active_group() # fetch positions if we have partitions we're subscribed to that we # don't know the offset for @@ -665,6 +665,16 @@ class KafkaConsumer(six.Iterator): self._client.set_topics([]) log.debug("Unsubscribed all topics or patterns and assigned partitions") + def _use_consumer_group(self): + """Return True iff this consumer can/should join a broker-coordinated group.""" + if self.config['api_version'] < (0, 9): + return False + elif self.config['group_id'] is None: + return False + elif not self._subscription.partitions_auto_assigned(): + return False + return True + def _update_fetch_positions(self, partitions): """ Set the fetch position to the committed position (if there is one) @@ -690,17 +700,16 @@ class KafkaConsumer(six.Iterator): def _message_generator(self): assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment' while time.time() < self._consumer_timeout: - if self.config['group_id'] is not None: - if self.config['api_version'] >= (0, 8, 2): - self._coordinator.ensure_coordinator_known() - if self.config['api_version'] >= (0, 9): - # ensure we have partitions assigned if we expect to - if self._subscription.partitions_auto_assigned(): - self._coordinator.ensure_active_group() + if self._use_consumer_group(): + self._coordinator.ensure_coordinator_known() + self._coordinator.ensure_active_group() - # fetch positions if we have partitions we're subscribed to that we - # don't know the offset for + # 0.8.2 brokers support kafka-backed offset storage via group coordinator + elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): + self._coordinator.ensure_coordinator_known() + + # fetch offsets for any subscribed partitions that we arent tracking yet if not self._subscription.has_all_fetch_positions(): partitions = self._subscription.missing_fetch_positions() self._update_fetch_positions(partitions) @@ -714,14 +723,18 @@ class KafkaConsumer(six.Iterator): # like heartbeats, auto-commits, and metadata refreshes timeout_at = self._next_timeout() - if self.config['api_version'] >= (0, 9): - if self.config['group_id'] is not None and not self.assignment(): - sleep_time = max(timeout_at - time.time(), 0) - if sleep_time > 0 and not self._client.in_flight_request_count(): - log.debug('No partitions assigned; sleeping for %s', sleep_time) - time.sleep(sleep_time) - continue - + # Because the consumer client poll does not sleep unless blocking on + # network IO, we need to explicitly sleep when we know we are idle + # because we haven't been assigned any partitions to fetch / consume + if self._use_consumer_group() and not self.assignment(): + sleep_time = max(timeout_at - time.time(), 0) + if sleep_time > 0 and not self._client.in_flight_request_count(): + log.debug('No partitions assigned; sleeping for %s', sleep_time) + time.sleep(sleep_time) + continue + + # Short-circuit the fetch iterator if we are already timed out + # to avoid any unintentional interaction with fetcher setup if time.time() > timeout_at: continue @@ -739,9 +752,21 @@ class KafkaConsumer(six.Iterator): self._fetcher.init_fetches() def _next_timeout(self): - return min(self._consumer_timeout, - self._client._delayed_tasks.next_at() + time.time(), - self._client.cluster.ttl() / 1000.0 + time.time()) + timeout = min(self._consumer_timeout, + self._client._delayed_tasks.next_at() + time.time(), + self._client.cluster.ttl() / 1000.0 + time.time()) + + # Although the delayed_tasks timeout above should cover processing + # HeartbeatRequests, it is still possible that HeartbeatResponses + # are left unprocessed during a long _fetcher iteration without + # an intermediate poll(). And because tasks are responsible for + # rescheduling themselves, an unprocessed response will prevent + # the next heartbeat from being sent. This check should help + # avoid that. + if self._use_consumer_group(): + heartbeat = time.time() + self._coordinator.heartbeat.ttl() + timeout = min(timeout, heartbeat) + return timeout def __iter__(self): # pylint: disable=non-iterator-returned return self diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index c49c38b..dca809e 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -536,26 +536,27 @@ class BaseCoordinator(object): #self.sensors.heartbeat_latency.record(response.requestLatencyMs()) error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - log.debug("Received successful heartbeat response.") + log.info("Heartbeat successful") future.success(None) elif error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError): - log.info("Heartbeat failed: coordinator is either not started or" - " not valid; will refresh metadata and retry") + log.warning("Heartbeat failed: coordinator is either not started or" + " not valid; will refresh metadata and retry") self.coordinator_dead() future.failure(error_type()) elif error_type is Errors.RebalanceInProgressError: - log.info("Heartbeat failed: group is rebalancing; re-joining group") + log.warning("Heartbeat: group is rebalancing; this consumer needs to" + " re-join") self.rejoin_needed = True future.failure(error_type()) elif error_type is Errors.IllegalGenerationError: - log.info("Heartbeat failed: local generation id is not current;" - " re-joining group") + log.warning("Heartbeat: generation id is not current; this consumer" + " needs to re-join") self.rejoin_needed = True future.failure(error_type()) elif error_type is Errors.UnknownMemberIdError: - log.info("Heartbeat failed: local member_id was not recognized;" - " resetting and re-joining group") + log.warning("Heartbeat: local member_id was not recognized;" + " this consumer needs to re-join") self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID self.rejoin_needed = True future.failure(error_type) @@ -594,12 +595,16 @@ class HeartbeatTask(object): def __call__(self): if (self._coordinator.generation < 0 or - self._coordinator.need_rejoin() or - self._coordinator.coordinator_unknown()): + self._coordinator.need_rejoin()): # no need to send the heartbeat we're not using auto-assignment # or if we are awaiting a rebalance - log.debug("Skipping heartbeat: no auto-assignment" - " or waiting on rebalance") + log.info("Skipping heartbeat: no auto-assignment" + " or waiting on rebalance") + return + + if self._coordinator.coordinator_unknown(): + log.warning("Coordinator unknown during heartbeat -- will retry") + self._handle_heartbeat_failure(Errors.GroupCoordinatorNotAvailableError()) return if self._heartbeat.session_expired(): @@ -629,7 +634,7 @@ class HeartbeatTask(object): self._client.schedule(self, time.time() + ttl) def _handle_heartbeat_failure(self, e): - log.debug("Heartbeat failed; retrying") + log.warning("Heartbeat failed; retrying") self._request_in_flight = False etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0 self._client.schedule(self, etd) diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 6ef2020..3d10f8f 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -1,16 +1,17 @@ import collections import logging import threading -import os import time import pytest import six -from kafka import SimpleClient, SimpleProducer +from kafka import SimpleClient from kafka.common import TopicPartition -from kafka.conn import BrokerConnection, ConnectionStates +from kafka.conn import ConnectionStates from kafka.consumer.group import KafkaConsumer +from kafka.future import Future +from kafka.protocol.metadata import MetadataResponse from test.conftest import version from test.testutil import random_string @@ -115,3 +116,23 @@ def test_group(kafka_broker, topic): finally: for c in range(num_consumers): stop[c].set() + + +@pytest.fixture +def conn(mocker): + conn = mocker.patch('kafka.client_async.BrokerConnection') + conn.return_value = conn + conn.state = ConnectionStates.CONNECTED + conn.send.return_value = Future().success( + MetadataResponse( + [(0, 'foo', 12), (1, 'bar', 34)], # brokers + [])) # topics + return conn + + +def test_heartbeat_timeout(conn, mocker): + mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = '0.9') + mocker.patch('time.time', return_value = 1234) + consumer = KafkaConsumer('foobar') + mocker.patch.object(consumer._coordinator.heartbeat, 'ttl', return_value = 0) + assert consumer._next_timeout() == 1234 diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 94e0e66..847cbc1 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -380,16 +380,20 @@ def test_maybe_auto_commit_offsets_sync(mocker, coordinator, def patched_coord(mocker, coordinator): coordinator._subscription.subscribe(topics=['foobar']) coordinator._subscription.needs_partition_assignment = False - mocker.patch.object(coordinator, 'coordinator_unknown') - coordinator.coordinator_unknown.return_value = False + mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) coordinator.coordinator_id = 0 + coordinator.generation = 0 + mocker.patch.object(coordinator, 'need_rejoin', return_value=False) mocker.patch.object(coordinator._client, 'least_loaded_node', return_value=1) mocker.patch.object(coordinator._client, 'ready', return_value=True) mocker.patch.object(coordinator._client, 'send') + mocker.patch.object(coordinator._client, 'schedule') mocker.spy(coordinator, '_failed_request') mocker.spy(coordinator, '_handle_offset_commit_response') mocker.spy(coordinator, '_handle_offset_fetch_response') + mocker.spy(coordinator.heartbeat_task, '_handle_heartbeat_success') + mocker.spy(coordinator.heartbeat_task, '_handle_heartbeat_failure') return coordinator @@ -573,3 +577,11 @@ def test_handle_offset_fetch_response(patched_coord, offsets, assert future.value == offsets assert patched_coord.coordinator_id is (None if dead else 0) assert patched_coord._subscription.needs_partition_assignment is reassign + + +def test_heartbeat(patched_coord): + patched_coord.coordinator_unknown.return_value = True + + patched_coord.heartbeat_task() + assert patched_coord._client.schedule.call_count == 1 + assert patched_coord.heartbeat_task._handle_heartbeat_failure.call_count == 1 |