diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-12-21 14:46:10 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-12-21 14:46:10 -0800 |
commit | ad024d1e897dbf16bd629fa63895bd7af4a8d959 (patch) | |
tree | f1993351b2c6487e8e623cefabf42ddf7477f666 /test | |
parent | 995664c7d407009a0a1030c7541848eb5ad51c97 (diff) | |
download | kafka-python-ad024d1e897dbf16bd629fa63895bd7af4a8d959.tar.gz |
KAFKA-3888 Use background thread to process consumer heartbeats (#1266)
Diffstat (limited to 'test')
-rw-r--r-- | test/test_client_async.py | 11 | ||||
-rw-r--r-- | test/test_consumer.py | 4 | ||||
-rw-r--r-- | test/test_consumer_group.py | 45 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 3 | ||||
-rw-r--r-- | test/test_coordinator.py | 100 |
5 files changed, 107 insertions, 56 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py index ec45543..eece139 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -253,11 +253,9 @@ def test_poll(mocker): metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata') _poll = mocker.patch.object(KafkaClient, '_poll') cli = KafkaClient(api_version=(0, 9)) - tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') # metadata timeout wins metadata.return_value = 1000 - tasks.return_value = 2 cli.poll() _poll.assert_called_with(1.0) @@ -265,14 +263,8 @@ def test_poll(mocker): cli.poll(250) _poll.assert_called_with(0.25) - # tasks timeout wins - tasks.return_value = 0 - cli.poll(250) - _poll.assert_called_with(0) - # default is request_timeout_ms metadata.return_value = 1000000 - tasks.return_value = 10000 cli.poll() _poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0) @@ -325,9 +317,6 @@ def client(mocker): connections_max_idle_ms=float('inf'), api_version=(0, 9)) - tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') - tasks.return_value = 9999999 - ttl = mocker.patch.object(cli.cluster, 'ttl') ttl.return_value = 0 return cli diff --git a/test/test_consumer.py b/test/test_consumer.py index e5dd946..013529f 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -14,11 +14,11 @@ from kafka.structs import ( class TestKafkaConsumer(unittest.TestCase): def test_non_integer_partitions(self): with self.assertRaises(AssertionError): - SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ]) + SimpleConsumer(MagicMock(), 'group', 'topic', partitions=['0']) def test_session_timeout_larger_than_request_timeout_raises(self): with self.assertRaises(KafkaConfigurationError): - KafkaConsumer(bootstrap_servers='localhost:9092', session_timeout_ms=60000, request_timeout_ms=40000) + KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0,9), group_id='foo', session_timeout_ms=60000, request_timeout_ms=40000) def test_fetch_max_wait_larger_than_request_timeout_raises(self): with self.assertRaises(KafkaConfigurationError): diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 8f25e9f..690d45a 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -9,6 +9,7 @@ import six from kafka import SimpleClient from kafka.conn import ConnectionStates from kafka.consumer.group import KafkaConsumer +from kafka.coordinator.base import MemberState, Generation from kafka.structs import TopicPartition from test.conftest import version @@ -92,9 +93,10 @@ def test_group(kafka_broker, topic): # If all consumers exist and have an assignment else: + logging.info('All consumers have assignment... checking for stable group') # Verify all consumers are in the same generation # then log state and break while loop - generations = set([consumer._coordinator.generation + generations = set([consumer._coordinator._generation.generation_id for consumer in list(consumers.values())]) # New generation assignment is not complete until @@ -105,12 +107,16 @@ def test_group(kafka_broker, topic): if not rejoining and len(generations) == 1: for c, consumer in list(consumers.items()): logging.info("[%s] %s %s: %s", c, - consumer._coordinator.generation, - consumer._coordinator.member_id, + consumer._coordinator._generation.generation_id, + consumer._coordinator._generation.member_id, consumer.assignment()) break + else: + logging.info('Rejoining: %s, generations: %s', rejoining, generations) + time.sleep(1) assert time.time() < timeout, "timeout waiting for assignments" + logging.info('Group stabilized; verifying assignment') group_assignment = set() for c in range(num_consumers): assert len(consumers[c].assignment()) != 0 @@ -120,9 +126,12 @@ def test_group(kafka_broker, topic): assert group_assignment == set([ TopicPartition(topic, partition) for partition in range(num_partitions)]) + logging.info('Assignment looks good!') finally: + logging.info('Shutting down %s consumers', num_consumers) for c in range(num_consumers): + logging.info('Stopping consumer %s', c) stop[c].set() threads[c].join() @@ -143,3 +152,33 @@ def test_paused(kafka_broker, topic): consumer.unsubscribe() assert set() == consumer.paused() + + +@pytest.mark.skipif(version() < (0, 9), reason='Unsupported Kafka Version') +@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") +def test_heartbeat_thread(kafka_broker, topic): + group_id = 'test-group-' + random_string(6) + consumer = KafkaConsumer(topic, + bootstrap_servers=get_connect_str(kafka_broker), + group_id=group_id, + heartbeat_interval_ms=500) + + # poll until we have joined group / have assignment + while not consumer.assignment(): + consumer.poll(timeout_ms=100) + + assert consumer._coordinator.state is MemberState.STABLE + last_poll = consumer._coordinator.heartbeat.last_poll + last_beat = consumer._coordinator.heartbeat.last_send + + timeout = time.time() + 30 + while True: + if time.time() > timeout: + raise RuntimeError('timeout waiting for heartbeat') + if consumer._coordinator.heartbeat.last_send > last_beat: + break + time.sleep(0.5) + + assert consumer._coordinator.heartbeat.last_poll == last_poll + consumer.poll(timeout_ms=100) + assert consumer._coordinator.heartbeat.last_poll > last_poll diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index d1843b3..ded2314 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -739,7 +739,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): @kafka_versions('>=0.10.1') def test_kafka_consumer_offsets_for_times_errors(self): - consumer = self.kafka_consumer() + consumer = self.kafka_consumer(fetch_max_wait_ms=200, + request_timeout_ms=500) tp = TopicPartition(self.topic, 0) bad_tp = TopicPartition(self.topic, 100) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 0e96110..7dc0e04 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -10,6 +10,7 @@ from kafka.consumer.subscription_state import ( SubscriptionState, ConsumerRebalanceListener) from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from kafka.coordinator.base import Generation, MemberState, HeartbeatThread from kafka.coordinator.consumer import ConsumerCoordinator from kafka.coordinator.protocol import ( ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment) @@ -43,13 +44,13 @@ def test_autocommit_enable_api_version(client, api_version): coordinator = ConsumerCoordinator(client, SubscriptionState(), Metrics(), enable_auto_commit=True, + session_timeout_ms=30000, # session_timeout_ms and max_poll_interval_ms + max_poll_interval_ms=30000, # should be the same to avoid KafkaConfigurationError group_id='foobar', api_version=api_version) if api_version < (0, 8, 1): - assert coordinator._auto_commit_task is None assert coordinator.config['enable_auto_commit'] is False else: - assert coordinator._auto_commit_task is not None assert coordinator.config['enable_auto_commit'] is True @@ -269,19 +270,19 @@ def test_close(mocker, coordinator): mocker.patch.object(coordinator, '_handle_leave_group_response') mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) coordinator.coordinator_id = 0 - coordinator.generation = 1 + coordinator._generation = Generation(1, 'foobar', b'') + coordinator.state = MemberState.STABLE cli = coordinator._client - mocker.patch.object(cli, 'unschedule') mocker.patch.object(cli, 'send', return_value=Future().success('foobar')) mocker.patch.object(cli, 'poll') coordinator.close() assert coordinator._maybe_auto_commit_offsets_sync.call_count == 1 - cli.unschedule.assert_called_with(coordinator.heartbeat_task) coordinator._handle_leave_group_response.assert_called_with('foobar') - assert coordinator.generation == -1 - assert coordinator.member_id == '' + assert coordinator.generation() is None + assert coordinator._generation is Generation.NO_GENERATION + assert coordinator.state is MemberState.UNJOINED assert coordinator.rejoin_needed is True @@ -296,6 +297,7 @@ def offsets(): def test_commit_offsets_async(mocker, coordinator, offsets): mocker.patch.object(coordinator._client, 'poll') mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) + mocker.patch.object(coordinator, 'ensure_coordinator_ready') mocker.patch.object(coordinator, '_send_offset_commit_request', return_value=Future().success('fizzbuzz')) coordinator.commit_offsets_async(offsets) @@ -362,19 +364,21 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable, coordinator = ConsumerCoordinator(client, SubscriptionState(), Metrics(), api_version=api_version, + session_timeout_ms=30000, + max_poll_interval_ms=30000, enable_auto_commit=enable, group_id=group_id) commit_sync = mocker.patch.object(coordinator, 'commit_offsets_sync', side_effect=error) if has_auto_commit: - assert coordinator._auto_commit_task is not None + assert coordinator.next_auto_commit_deadline is not None else: - assert coordinator._auto_commit_task is None + assert coordinator.next_auto_commit_deadline is None assert coordinator._maybe_auto_commit_offsets_sync() is None if has_auto_commit: - assert coordinator._auto_commit_task is not None + assert coordinator.next_auto_commit_deadline is not None assert commit_sync.call_count == (1 if commit_offsets else 0) assert mock_warn.call_count == (1 if warn else 0) @@ -387,24 +391,25 @@ def patched_coord(mocker, coordinator): coordinator._subscription.needs_partition_assignment = False mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) coordinator.coordinator_id = 0 - coordinator.generation = 0 + mocker.patch.object(coordinator, 'coordinator', return_value=0) + coordinator._generation = Generation(0, 'foobar', b'') + coordinator.state = MemberState.STABLE + coordinator.rejoin_needed = False mocker.patch.object(coordinator, 'need_rejoin', return_value=False) mocker.patch.object(coordinator._client, 'least_loaded_node', return_value=1) mocker.patch.object(coordinator._client, 'ready', return_value=True) mocker.patch.object(coordinator._client, 'send') - mocker.patch.object(coordinator._client, 'schedule') mocker.spy(coordinator, '_failed_request') mocker.spy(coordinator, '_handle_offset_commit_response') mocker.spy(coordinator, '_handle_offset_fetch_response') - mocker.spy(coordinator.heartbeat_task, '_handle_heartbeat_success') - mocker.spy(coordinator.heartbeat_task, '_handle_heartbeat_failure') return coordinator -def test_send_offset_commit_request_fail(patched_coord, offsets): +def test_send_offset_commit_request_fail(mocker, patched_coord, offsets): patched_coord.coordinator_unknown.return_value = True patched_coord.coordinator_id = None + patched_coord.coordinator.return_value = None # No offsets ret = patched_coord._send_offset_commit_request({}) @@ -488,7 +493,14 @@ def test_handle_offset_commit_response(mocker, patched_coord, offsets, response) assert isinstance(future.exception, error) assert patched_coord.coordinator_id is (None if dead else 0) - assert patched_coord._subscription.needs_partition_assignment is reassign + if reassign: + assert patched_coord._generation is Generation.NO_GENERATION + assert patched_coord.rejoin_needed is True + assert patched_coord.state is MemberState.UNJOINED + else: + assert patched_coord._generation is not Generation.NO_GENERATION + assert patched_coord.rejoin_needed is False + assert patched_coord.state is MemberState.STABLE @pytest.fixture @@ -496,9 +508,10 @@ def partitions(): return [TopicPartition('foobar', 0), TopicPartition('foobar', 1)] -def test_send_offset_fetch_request_fail(patched_coord, partitions): +def test_send_offset_fetch_request_fail(mocker, patched_coord, partitions): patched_coord.coordinator_unknown.return_value = True patched_coord.coordinator_id = None + patched_coord.coordinator.return_value = None # No partitions ret = patched_coord._send_offset_fetch_request([]) @@ -551,28 +564,18 @@ def test_send_offset_fetch_request_success(patched_coord, partitions): future, response) -@pytest.mark.parametrize('response,error,dead,reassign', [ - #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 30), (1, 234, b'', 30)])]), - # Errors.GroupAuthorizationFailedError, False, False), - #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 7), (1, 234, b'', 7)])]), - # Errors.RequestTimedOutError, True, False), - #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 27), (1, 234, b'', 27)])]), - # Errors.RebalanceInProgressError, False, True), +@pytest.mark.parametrize('response,error,dead', [ (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]), - Errors.GroupLoadInProgressError, False, False), + Errors.GroupLoadInProgressError, False), (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]), - Errors.NotCoordinatorForGroupError, True, False), - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]), - Errors.UnknownMemberIdError, False, True), - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]), - Errors.IllegalGenerationError, False, True), + Errors.NotCoordinatorForGroupError, True), (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]), - Errors.TopicAuthorizationFailedError, False, False), + Errors.TopicAuthorizationFailedError, False), (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]), - None, False, False), + None, False), ]) def test_handle_offset_fetch_response(patched_coord, offsets, - response, error, dead, reassign): + response, error, dead): future = Future() patched_coord._handle_offset_fetch_response(future, response) if error is not None: @@ -581,15 +584,34 @@ def test_handle_offset_fetch_response(patched_coord, offsets, assert future.succeeded() assert future.value == offsets assert patched_coord.coordinator_id is (None if dead else 0) - assert patched_coord._subscription.needs_partition_assignment is reassign -def test_heartbeat(patched_coord): - patched_coord.coordinator_unknown.return_value = True +def test_heartbeat(mocker, patched_coord): + heartbeat = HeartbeatThread(patched_coord) + + assert not heartbeat.enabled and not heartbeat.closed + + heartbeat.enable() + assert heartbeat.enabled + + heartbeat.disable() + assert not heartbeat.enabled + + # heartbeat disables when un-joined + heartbeat.enable() + patched_coord.state = MemberState.UNJOINED + heartbeat._run_once() + assert not heartbeat.enabled + + heartbeat.enable() + patched_coord.state = MemberState.STABLE + mocker.spy(patched_coord, '_send_heartbeat_request') + mocker.patch.object(patched_coord.heartbeat, 'should_heartbeat', return_value=True) + heartbeat._run_once() + assert patched_coord._send_heartbeat_request.call_count == 1 - patched_coord.heartbeat_task() - assert patched_coord._client.schedule.call_count == 1 - assert patched_coord.heartbeat_task._handle_heartbeat_failure.call_count == 1 + heartbeat.close() + assert heartbeat.closed def test_lookup_coordinator_failure(mocker, coordinator): |