diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-04-04 16:31:13 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-04-04 17:57:04 -0700 |
commit | d39fdd7fafc4a409e92ce9dfb11245eb7e3ec277 (patch) | |
tree | e9f4280e115dce4cb12ca9a37b71984242294688 /test/test_client_async.py | |
parent | 998147dae95020bf7169441118b51dee1a2a69bd (diff) | |
download | kafka-python-metadata_wait_for_connect.tar.gz |
Avoid multiple connection attempts when refreshing metadatametadata_wait_for_connect
Diffstat (limited to 'test/test_client_async.py')
-rw-r--r-- | test/test_client_async.py | 39 |
1 files changed, 25 insertions, 14 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py index 97be827..8f6ac3f 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -319,7 +319,7 @@ def client(mocker): mocker.patch.object(KafkaClient, '_bootstrap') _poll = mocker.patch.object(KafkaClient, '_poll') - cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222, api_version=(0, 9)) + cli = KafkaClient(request_timeout_ms=9999999, reconnect_backoff_ms=2222, api_version=(0, 9)) tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') tasks.return_value = 9999999 @@ -332,7 +332,7 @@ def client(mocker): def test_maybe_refresh_metadata_ttl(mocker, client): client.cluster.ttl.return_value = 1234 - client.poll(timeout_ms=9999999, sleep=True) + client.poll(timeout_ms=12345678, sleep=True) client._poll.assert_called_with(1.234, sleep=True) @@ -340,17 +340,16 @@ def test_maybe_refresh_metadata_backoff(mocker, client): now = time.time() t = mocker.patch('time.time') t.return_value = now - client._last_no_node_available_ms = now * 1000 - client.poll(timeout_ms=9999999, sleep=True) - client._poll.assert_called_with(2.222, sleep=True) + client.poll(timeout_ms=12345678, sleep=True) + client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff def test_maybe_refresh_metadata_in_progress(mocker, client): client._metadata_refresh_in_progress = True - client.poll(timeout_ms=9999999, sleep=True) - client._poll.assert_called_with(9999.999, sleep=True) + client.poll(timeout_ms=12345678, sleep=True) + client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms def test_maybe_refresh_metadata_update(mocker, client): @@ -358,23 +357,35 @@ def test_maybe_refresh_metadata_update(mocker, client): mocker.patch.object(client, '_can_send_request', return_value=True) send = mocker.patch.object(client, 'send') - client.poll(timeout_ms=9999999, sleep=True) - client._poll.assert_called_with(0, sleep=True) + client.poll(timeout_ms=12345678, sleep=True) + client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms assert client._metadata_refresh_in_progress request = MetadataRequest[0]([]) - send.assert_called_with('foobar', request) + send.assert_called_once_with('foobar', request) -def test_maybe_refresh_metadata_failure(mocker, client): +def test_maybe_refresh_metadata_cant_send(mocker, client): mocker.patch.object(client, 'least_loaded_node', return_value='foobar') + mocker.patch.object(client, '_can_connect', return_value=True) + mocker.patch.object(client, '_maybe_connect', return_value=True) now = time.time() t = mocker.patch('time.time') t.return_value = now - client.poll(timeout_ms=9999999, sleep=True) - client._poll.assert_called_with(0, sleep=True) - assert client._last_no_node_available_ms == now * 1000 + # first poll attempts connection + client.poll(timeout_ms=12345678, sleep=True) + client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff + client._can_connect.assert_called_once_with('foobar') + client._maybe_connect.assert_called_once_with('foobar') + + # poll while connecting should not attempt a new connection + client._connecting.add('foobar') + client._can_connect.reset_mock() + client.poll(timeout_ms=12345678, sleep=True) + client._poll.assert_called_with(9999.999, sleep=True) # connection timeout (request timeout) + assert not client._can_connect.called + assert not client._metadata_refresh_in_progress |