diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-06-20 09:23:07 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-06-20 09:56:07 -0700 |
commit | 06abc63522a32ddf15a9de9c168132d85326cc0d (patch) | |
tree | 85e21b8e2796d61f9b1c62ae829aad1926706c88 /test | |
parent | 461ccbd9ecf06722c9ff73f6ed439be4b8391672 (diff) | |
download | kafka-python-metadata_refresh_backoff.tar.gz |
Avoid busy poll during metadata refresh failure with retry_backoff_msmetadata_refresh_backoff
Diffstat (limited to 'test')
-rw-r--r-- | test/test_client_async.py | 102 |
1 files changed, 100 insertions, 2 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py index 5870501..06c2bf5 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -293,8 +293,106 @@ def test_set_topics(): pass -def test_maybe_refresh_metadata(): - pass +def test_maybe_refresh_metadata_ttl(mocker): + mocker.patch.object(KafkaClient, '_bootstrap') + _poll = mocker.patch.object(KafkaClient, '_poll') + + cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) + + tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') + tasks.return_value = 9999999 + + ttl = mocker.patch.object(cli.cluster, 'ttl') + ttl.return_value = 1234 + + cli.poll(timeout_ms=9999999, sleep=True) + _poll.assert_called_with(1.234, sleep=True) + + +def test_maybe_refresh_metadata_backoff(mocker): + mocker.patch.object(KafkaClient, '_bootstrap') + _poll = mocker.patch.object(KafkaClient, '_poll') + + cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) + + tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') + tasks.return_value = 9999999 + + ttl = mocker.patch.object(cli.cluster, 'ttl') + ttl.return_value = 0 + + now = time.time() + t = mocker.patch('time.time') + t.return_value = now + cli._last_no_node_available_ms = now * 1000 + + cli.poll(timeout_ms=9999999, sleep=True) + _poll.assert_called_with(2.222, sleep=True) + + +def test_maybe_refresh_metadata_in_progress(mocker): + mocker.patch.object(KafkaClient, '_bootstrap') + _poll = mocker.patch.object(KafkaClient, '_poll') + + cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) + + tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') + tasks.return_value = 9999999 + + ttl = mocker.patch.object(cli.cluster, 'ttl') + ttl.return_value = 0 + + cli._metadata_refresh_in_progress = True + + cli.poll(timeout_ms=9999999, sleep=True) + _poll.assert_called_with(9999.999, sleep=True) + + +def test_maybe_refresh_metadata_update(mocker): + mocker.patch.object(KafkaClient, '_bootstrap') + _poll = mocker.patch.object(KafkaClient, '_poll') + + cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) + + tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') + tasks.return_value = 9999999 + + ttl = mocker.patch.object(cli.cluster, 'ttl') + ttl.return_value = 0 + + mocker.patch.object(cli, 'least_loaded_node', return_value='foobar') + mocker.patch.object(cli, '_can_send_request', return_value=True) + send = mocker.patch.object(cli, 'send') + + cli.poll(timeout_ms=9999999, sleep=True) + _poll.assert_called_with(0, sleep=True) + assert cli._metadata_refresh_in_progress + request = MetadataRequest[0]([]) + send.assert_called_with('foobar', request) + + +def test_maybe_refresh_metadata_failure(mocker): + mocker.patch.object(KafkaClient, '_bootstrap') + _poll = mocker.patch.object(KafkaClient, '_poll') + + cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) + + tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') + tasks.return_value = 9999999 + + ttl = mocker.patch.object(cli.cluster, 'ttl') + ttl.return_value = 0 + + mocker.patch.object(cli, 'least_loaded_node', return_value='foobar') + + now = time.time() + t = mocker.patch('time.time') + t.return_value = now + + cli.poll(timeout_ms=9999999, sleep=True) + _poll.assert_called_with(0, sleep=True) + assert cli._last_no_node_available_ms == now * 1000 + assert not cli._metadata_refresh_in_progress def test_schedule(): |