diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_coordinator.py | 565 |
1 files changed, 565 insertions, 0 deletions
diff --git a/test/test_coordinator.py b/test/test_coordinator.py new file mode 100644 index 0000000..43f0c22 --- /dev/null +++ b/test/test_coordinator.py @@ -0,0 +1,565 @@ +# pylint: skip-file +from __future__ import absolute_import + +import pytest + +from kafka.client_async import KafkaClient +from kafka.common import TopicPartition, OffsetAndMetadata +from kafka.consumer.subscription_state import ( + SubscriptionState, ConsumerRebalanceListener) +from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from kafka.coordinator.consumer import ConsumerCoordinator +from kafka.coordinator.protocol import ( + ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment) +from kafka.conn import ConnectionStates +from kafka.future import Future +from kafka.protocol.commit import ( + OffsetCommitRequest_v0, OffsetCommitRequest_v1, OffsetCommitRequest_v2, + OffsetCommitResponse, OffsetFetchRequest_v0, OffsetFetchRequest_v1, + OffsetFetchResponse) +from kafka.protocol.metadata import MetadataResponse + +import kafka.common as Errors + + +@pytest.fixture +def conn(mocker): + conn = mocker.patch('kafka.client_async.BrokerConnection') + conn.return_value = conn + conn.state = ConnectionStates.CONNECTED + conn.send.return_value = Future().success( + MetadataResponse( + [(0, 'foo', 12), (1, 'bar', 34)], # brokers + [])) # topics + return conn + + +@pytest.fixture +def coordinator(conn): + return ConsumerCoordinator(KafkaClient(), SubscriptionState()) + + +def test_init(conn): + cli = KafkaClient() + coordinator = ConsumerCoordinator(cli, SubscriptionState()) + + # metadata update on init + assert cli.cluster._need_update is True + assert coordinator._handle_metadata_update in cli.cluster._listeners + + +@pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) +def test_autocommit_enable_api_version(conn, api_version): + coordinator = ConsumerCoordinator( + KafkaClient(), SubscriptionState(), api_version=api_version) + if api_version < (0, 8, 1): + assert coordinator._auto_commit_task is None + else: + assert coordinator._auto_commit_task is not None + + +def test_protocol_type(coordinator): + assert coordinator.protocol_type() is 'consumer' + + +def test_group_protocols(coordinator): + # Requires a subscription + try: + coordinator.group_protocols() + except AssertionError: + pass + else: + assert False, 'Exception not raised when expected' + + coordinator._subscription.subscribe(topics=['foobar']) + assert coordinator.group_protocols() == [( + 'roundrobin', + ConsumerProtocolMemberMetadata( + RoundRobinPartitionAssignor.version, + ['foobar'], + b'') + )] + + +@pytest.mark.parametrize('api_version', [(0, 8), (0, 8, 1), (0, 8, 2), (0, 9)]) +def test_pattern_subscription(coordinator, api_version): + coordinator.config['api_version'] = api_version + coordinator._subscription.subscribe(pattern='foo') + assert coordinator._subscription.subscription == set([]) + assert coordinator._subscription_metadata_changed() is False + assert coordinator._subscription.needs_partition_assignment is False + + cluster = coordinator._client.cluster + cluster.update_metadata(MetadataResponse( + # brokers + [(0, 'foo', 12), (1, 'bar', 34)], + # topics + [(0, 'fizz', []), + (0, 'foo1', [(0, 0, 0, [], [])]), + (0, 'foo2', [(0, 0, 1, [], [])])])) + assert coordinator._subscription.subscription == set(['foo1', 'foo2']) + + # 0.9 consumers should trigger dynamic partition assignment + if api_version >= (0, 9): + assert coordinator._subscription.needs_partition_assignment is True + assert coordinator._subscription.assignment == {} + + # earlier consumers get all partitions assigned locally + else: + assert coordinator._subscription.needs_partition_assignment is False + assert set(coordinator._subscription.assignment.keys()) == set([ + TopicPartition('foo1', 0), + TopicPartition('foo2', 0)]) + + +def test_lookup_assignor(coordinator): + assignor = coordinator._lookup_assignor('roundrobin') + assert assignor is RoundRobinPartitionAssignor + assert coordinator._lookup_assignor('foobar') is None + + +def test_join_complete(mocker, coordinator): + coordinator._subscription.subscribe(topics=['foobar']) + mocker.spy(RoundRobinPartitionAssignor, 'on_assignment') + assert RoundRobinPartitionAssignor.on_assignment.call_count == 0 + assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') + coordinator._on_join_complete( + 0, 'member-foo', 'roundrobin', assignment.encode()) + assert RoundRobinPartitionAssignor.on_assignment.call_count == 1 + assert RoundRobinPartitionAssignor.on_assignment.called_with(assignment) + + +def test_subscription_listener(mocker, coordinator): + listener = mocker.MagicMock(spec=ConsumerRebalanceListener) + coordinator._subscription.subscribe( + topics=['foobar'], + listener=listener) + + coordinator._on_join_prepare(0, 'member-foo') + assert listener.on_partitions_revoked.call_count == 1 + assert listener.on_partitions_revoked.called_with(set([])) + + assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') + coordinator._on_join_complete( + 0, 'member-foo', 'roundrobin', assignment.encode()) + assert listener.on_partitions_assigned.call_count == 1 + assert listener.on_partitions_assigned.called_with(set([ + TopicPartition('foobar', 0), + TopicPartition('foobar', 1)])) + + listener.on_partitions_assigned.side_effect = Exception('crash') + coordinator._on_join_prepare(0, 'member-foo') + assert listener.on_partitions_revoked.call_count == 2 + coordinator._on_join_complete( + 0, 'member-foo', 'roundrobin', assignment.encode()) + assert listener.on_partitions_assigned.call_count == 2 + + +def test_perform_assignment(mocker, coordinator): + member_metadata = { + 'member-foo': ConsumerProtocolMemberMetadata(0, ['foo1'], b''), + 'member-bar': ConsumerProtocolMemberMetadata(0, ['foo1'], b'') + } + assignments = { + 'member-foo': ConsumerProtocolMemberAssignment( + 0, [('foo1', [0])], b''), + 'member-bar': ConsumerProtocolMemberAssignment( + 0, [('foo1', [1])], b'') + } + + mocker.patch.object(RoundRobinPartitionAssignor, 'assign') + RoundRobinPartitionAssignor.assign.return_value = assignments + + ret = coordinator._perform_assignment( + 'member-foo', 'roundrobin', + [(member, metadata.encode()) + for member, metadata in member_metadata.items()]) + + assert RoundRobinPartitionAssignor.assign.call_count == 1 + assert RoundRobinPartitionAssignor.assign.called_with( + coordinator._client.cluster, member_metadata) + assert ret == assignments + + +def test_on_join_prepare(coordinator): + coordinator._subscription.subscribe(topics=['foobar']) + coordinator._on_join_prepare(0, 'member-foo') + assert coordinator._subscription.needs_partition_assignment is True + + +def test_need_rejoin(coordinator): + # No subscription - no rejoin + assert coordinator.need_rejoin() is False + + coordinator._subscription.subscribe(topics=['foobar']) + assert coordinator.need_rejoin() is True + + coordinator._subscription.needs_partition_assignment = False + coordinator.rejoin_needed = False + assert coordinator.need_rejoin() is False + + coordinator._subscription.needs_partition_assignment = True + assert coordinator.need_rejoin() is True + + +def test_refresh_committed_offsets_if_needed(mocker, coordinator): + mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets', + return_value = { + TopicPartition('foobar', 0): OffsetAndMetadata(123, b''), + TopicPartition('foobar', 1): OffsetAndMetadata(234, b'')}) + coordinator._subscription.assign_from_user([TopicPartition('foobar', 0)]) + assert coordinator._subscription.needs_fetch_committed_offsets is True + coordinator.refresh_committed_offsets_if_needed() + assignment = coordinator._subscription.assignment + assert assignment[TopicPartition('foobar', 0)].committed == 123 + assert TopicPartition('foobar', 1) not in assignment + assert coordinator._subscription.needs_fetch_committed_offsets is False + + +def test_fetch_committed_offsets(mocker, coordinator): + + # No partitions, no IO polling + mocker.patch.object(coordinator._client, 'poll') + assert coordinator.fetch_committed_offsets([]) == {} + assert coordinator._client.poll.call_count == 0 + + # general case -- send offset fetch request, get successful future + mocker.patch.object(coordinator, 'ensure_coordinator_known') + mocker.patch.object(coordinator, '_send_offset_fetch_request', + return_value=Future().success('foobar')) + partitions = [TopicPartition('foobar', 0)] + ret = coordinator.fetch_committed_offsets(partitions) + assert ret == 'foobar' + assert coordinator._send_offset_fetch_request.called_with(partitions) + assert coordinator._client.poll.call_count == 1 + + # Failed future is raised if not retriable + coordinator._send_offset_fetch_request.return_value = Future().failure(AssertionError) + try: + coordinator.fetch_committed_offsets(partitions) + except AssertionError: + pass + else: + assert False, 'Exception not raised when expected' + assert coordinator._client.poll.call_count == 2 + + coordinator._send_offset_fetch_request.side_effect = [ + Future().failure(Errors.RequestTimedOutError), + Future().success('fizzbuzz')] + + ret = coordinator.fetch_committed_offsets(partitions) + assert ret == 'fizzbuzz' + assert coordinator._client.poll.call_count == 4 # call + retry + + +def test_close(mocker, coordinator): + mocker.patch.object(coordinator, '_maybe_auto_commit_offsets_sync') + mocker.patch.object(coordinator, '_handle_leave_group_response') + coordinator.coordinator_id = 0 + coordinator.generation = 1 + cli = coordinator._client + mocker.patch.object(cli, 'unschedule') + mocker.patch.object(cli, 'send', return_value=Future().success('foobar')) + mocker.patch.object(cli, 'poll') + + coordinator.close() + assert coordinator._maybe_auto_commit_offsets_sync.call_count == 1 + assert cli.unschedule.called_with(coordinator.heartbeat_task) + assert coordinator._handle_leave_group_response.called_with('foobar') + + assert coordinator.generation == -1 + assert coordinator.member_id == '' + assert coordinator.rejoin_needed is True + + +@pytest.fixture +def offsets(): + return { + TopicPartition('foobar', 0): OffsetAndMetadata(123, b''), + TopicPartition('foobar', 1): OffsetAndMetadata(234, b''), + } + + +def test_commit_offsets_async(mocker, coordinator, offsets): + mocker.patch.object(coordinator._client, 'poll') + mocker.patch.object(coordinator, 'ensure_coordinator_known') + mocker.patch.object(coordinator, '_send_offset_commit_request', + return_value=Future().success('fizzbuzz')) + ret = coordinator.commit_offsets_async(offsets) + assert isinstance(ret, Future) + assert coordinator._send_offset_commit_request.call_count == 1 + + +def test_commit_offsets_sync(mocker, coordinator, offsets): + mocker.patch.object(coordinator, 'ensure_coordinator_known') + mocker.patch.object(coordinator, '_send_offset_commit_request', + return_value=Future().success('fizzbuzz')) + cli = coordinator._client + mocker.patch.object(cli, 'poll') + + # No offsets, no calls + assert coordinator.commit_offsets_sync({}) is None + assert coordinator._send_offset_commit_request.call_count == 0 + assert cli.poll.call_count == 0 + + ret = coordinator.commit_offsets_sync(offsets) + assert coordinator._send_offset_commit_request.call_count == 1 + assert cli.poll.call_count == 1 + assert ret == 'fizzbuzz' + + # Failed future is raised if not retriable + coordinator._send_offset_commit_request.return_value = Future().failure(AssertionError) + try: + coordinator.commit_offsets_sync(offsets) + except AssertionError: + pass + else: + assert False, 'Exception not raised when expected' + assert coordinator._client.poll.call_count == 2 + + coordinator._send_offset_commit_request.side_effect = [ + Future().failure(Errors.RequestTimedOutError), + Future().success('fizzbuzz')] + + ret = coordinator.commit_offsets_sync(offsets) + assert ret == 'fizzbuzz' + assert coordinator._client.poll.call_count == 4 # call + retry + + +def test_maybe_auto_commit_offsets_sync(mocker, coordinator): + mocker.patch.object(coordinator, '_auto_commit_task') + mocker.patch.object(coordinator, 'commit_offsets_sync', + side_effect=[ + Errors.UnknownMemberIdError(), + Errors.IllegalGenerationError(), + Errors.RebalanceInProgressError(), + Exception(), + None, + ]) + mock_warn = mocker.patch('kafka.coordinator.consumer.log.warning') + mock_exc = mocker.patch('kafka.coordinator.consumer.log.exception') + + # 0.8 brokers dont support commits, so this should be a noop + coordinator.config['api_version'] = (0, 8) + assert coordinator._maybe_auto_commit_offsets_sync() is None + assert coordinator._auto_commit_task.disable.call_count == 0 + assert coordinator.commit_offsets_sync.call_count == 0 + + # if auto-commit is disabled, noop + coordinator.config['api_version'] = (0, 9) + coordinator.config['enable_auto_commit'] = False + assert coordinator._maybe_auto_commit_offsets_sync() is None + assert coordinator._auto_commit_task.disable.call_count == 0 + assert coordinator.commit_offsets_sync.call_count == 0 + + # UnknownMemberIdError + coordinator.config['enable_auto_commit'] = True + assert coordinator._maybe_auto_commit_offsets_sync() is None + assert coordinator._auto_commit_task.disable.call_count == 1 + assert coordinator.commit_offsets_sync.call_count == 1 + assert mock_warn.call_count == 1 + assert mock_exc.call_count == 0 + + # IllegalGenerationError + assert coordinator._maybe_auto_commit_offsets_sync() is None + assert mock_warn.call_count == 2 + assert mock_exc.call_count == 0 + + # RebalanceInProgressError + assert coordinator._maybe_auto_commit_offsets_sync() is None + assert mock_warn.call_count == 3 + assert mock_exc.call_count == 0 + + # Exception (unhandled) + assert coordinator._maybe_auto_commit_offsets_sync() is None + assert mock_warn.call_count == 3 + assert mock_exc.call_count == 1 + + # Success + assert coordinator._maybe_auto_commit_offsets_sync() is None + assert mock_warn.call_count == 3 + assert mock_exc.call_count == 1 + + +@pytest.fixture +def patched_coord(mocker, coordinator): + coordinator._subscription.subscribe(topics=['foobar']) + coordinator._subscription.needs_partition_assignment = False + mocker.patch.object(coordinator, 'coordinator_unknown') + coordinator.coordinator_unknown.return_value = False + coordinator.coordinator_id = 0 + mocker.patch.object(coordinator._client, 'least_loaded_node', + return_value=1) + mocker.patch.object(coordinator._client, 'send') + mocker.spy(coordinator, '_failed_request') + mocker.spy(coordinator, '_handle_offset_commit_response') + mocker.spy(coordinator, '_handle_offset_fetch_response') + return coordinator + + +def test_send_offset_commit_request_fail(patched_coord, offsets): + patched_coord.coordinator_unknown.return_value = True + patched_coord.coordinator_id = None + + # No offsets + ret = patched_coord._send_offset_commit_request({}) + assert isinstance(ret, Future) + assert ret.succeeded() + + # No coordinator + ret = patched_coord._send_offset_commit_request(offsets) + assert ret.failed() + assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError) + + +@pytest.mark.parametrize('api_version,req_type', [ + ((0, 8, 1), OffsetCommitRequest_v0), + ((0, 8, 2), OffsetCommitRequest_v1), + ((0, 9), OffsetCommitRequest_v2)]) +def test_send_offset_commit_request_versions(patched_coord, offsets, + api_version, req_type): + # assuming fixture sets coordinator=0, least_loaded_node=1 + expect_node = 0 if api_version >= (0, 8, 2) else 1 + patched_coord.config['api_version'] = api_version + + patched_coord._send_offset_commit_request(offsets) + (node, request), _ = patched_coord._client.send.call_args + assert node == expect_node, 'Unexpected coordinator node' + assert isinstance(request, req_type) + + +def test_send_offset_commit_request_failure(patched_coord, offsets): + _f = Future() + patched_coord._client.send.return_value = _f + future = patched_coord._send_offset_commit_request(offsets) + (node, request), _ = patched_coord._client.send.call_args + error = Exception() + _f.failure(error) + patched_coord._failed_request.assert_called_with(0, request, future, error) + assert future.failed() + assert future.exception is error + + +def test_send_offset_commit_request_success(patched_coord, offsets): + _f = Future() + patched_coord._client.send.return_value = _f + future = patched_coord._send_offset_commit_request(offsets) + (node, request), _ = patched_coord._client.send.call_args + response = OffsetCommitResponse([('foobar', [(0, 0), (1, 0)])]) + _f.success(response) + patched_coord._handle_offset_commit_response.assert_called_with( + offsets, future, response) + + +@pytest.mark.parametrize('response,error,dead,reassign', [ + (OffsetCommitResponse([('foobar', [(0, 30), (1, 30)])]), + Errors.GroupAuthorizationFailedError, False, False), + (OffsetCommitResponse([('foobar', [(0, 12), (1, 12)])]), + Errors.OffsetMetadataTooLargeError, False, False), + (OffsetCommitResponse([('foobar', [(0, 28), (1, 28)])]), + Errors.InvalidCommitOffsetSizeError, False, False), + (OffsetCommitResponse([('foobar', [(0, 14), (1, 14)])]), + Errors.GroupLoadInProgressError, False, False), + (OffsetCommitResponse([('foobar', [(0, 15), (1, 15)])]), + Errors.GroupCoordinatorNotAvailableError, True, False), + (OffsetCommitResponse([('foobar', [(0, 16), (1, 16)])]), + Errors.NotCoordinatorForGroupError, True, False), + (OffsetCommitResponse([('foobar', [(0, 7), (1, 7)])]), + Errors.RequestTimedOutError, True, False), + (OffsetCommitResponse([('foobar', [(0, 25), (1, 25)])]), + Errors.UnknownMemberIdError, False, True), + (OffsetCommitResponse([('foobar', [(0, 22), (1, 22)])]), + Errors.IllegalGenerationError, False, True), + (OffsetCommitResponse([('foobar', [(0, 27), (1, 27)])]), + Errors.RebalanceInProgressError, False, True)]) +def test_handle_offset_commit_response(patched_coord, offsets, + response, error, dead, reassign): + future = Future() + patched_coord._handle_offset_commit_response(offsets, future, response) + assert isinstance(future.exception, error) + assert patched_coord.coordinator_id is (None if dead else 0) + assert patched_coord._subscription.needs_partition_assignment is reassign + + +@pytest.fixture +def partitions(): + return [TopicPartition('foobar', 0), TopicPartition('foobar', 1)] + + +def test_send_offset_fetch_request_fail(patched_coord, partitions): + patched_coord.coordinator_unknown.return_value = True + patched_coord.coordinator_id = None + + # No partitions + ret = patched_coord._send_offset_fetch_request([]) + assert isinstance(ret, Future) + assert ret.succeeded() + assert ret.value == {} + + # No coordinator + ret = patched_coord._send_offset_fetch_request(partitions) + assert ret.failed() + assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError) + + +@pytest.mark.parametrize('api_version,req_type', [ + ((0, 8, 1), OffsetFetchRequest_v0), + ((0, 8, 2), OffsetFetchRequest_v1), + ((0, 9), OffsetFetchRequest_v1)]) +def test_send_offset_fetch_request_versions(patched_coord, partitions, + api_version, req_type): + # assuming fixture sets coordinator=0, least_loaded_node=1 + expect_node = 0 if api_version >= (0, 8, 2) else 1 + patched_coord.config['api_version'] = api_version + + patched_coord._send_offset_fetch_request(partitions) + (node, request), _ = patched_coord._client.send.call_args + assert node == expect_node, 'Unexpected coordinator node' + assert isinstance(request, req_type) + + +def test_send_offset_fetch_request_failure(patched_coord, partitions): + _f = Future() + patched_coord._client.send.return_value = _f + future = patched_coord._send_offset_fetch_request(partitions) + (node, request), _ = patched_coord._client.send.call_args + error = Exception() + _f.failure(error) + patched_coord._failed_request.assert_called_with(0, request, future, error) + assert future.failed() + assert future.exception is error + + +def test_send_offset_fetch_request_success(patched_coord, partitions): + _f = Future() + patched_coord._client.send.return_value = _f + future = patched_coord._send_offset_fetch_request(partitions) + (node, request), _ = patched_coord._client.send.call_args + response = OffsetFetchResponse([('foobar', [(0, 0), (1, 0)])]) + _f.success(response) + patched_coord._handle_offset_fetch_response.assert_called_with( + future, response) + + +@pytest.mark.parametrize('response,error,dead,reassign', [ + #(OffsetFetchResponse([('foobar', [(0, 30), (1, 30)])]), + # Errors.GroupAuthorizationFailedError, False, False), + #(OffsetFetchResponse([('foobar', [(0, 0, b'', 7), (1, 0, b'', 7)])]), + # Errors.RequestTimedOutError, True, False), + #(OffsetFetchResponse([('foobar', [(0, 0, b'', 27), (1, 0, b'', 27)])]), + # Errors.RebalanceInProgressError, False, True), + (OffsetFetchResponse([('foobar', [(0, 0, b'', 14), (1, 0, b'', 14)])]), + Errors.GroupLoadInProgressError, False, False), + (OffsetFetchResponse([('foobar', [(0, 0, b'', 16), (1, 0, b'', 16)])]), + Errors.NotCoordinatorForGroupError, True, False), + (OffsetFetchResponse([('foobar', [(0, 0, b'', 25), (1, 0, b'', 25)])]), + Errors.UnknownMemberIdError, False, True), + (OffsetFetchResponse([('foobar', [(0, 0, b'', 22), (1, 0, b'', 22)])]), + Errors.IllegalGenerationError, False, True)]) +def test_handle_offset_fetch_response(patched_coord, offsets, + response, error, dead, reassign): + future = Future() + patched_coord._handle_offset_fetch_response(future, response) + assert isinstance(future.exception, error) + assert patched_coord.coordinator_id is (None if dead else 0) + assert patched_coord._subscription.needs_partition_assignment is reassign |