diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-05-28 23:45:48 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2019-05-28 23:45:48 -0700 |
commit | 0ac7b6c144907df1481d55085fc81b784f20bd27 (patch) | |
tree | c25496928c9d5e3f1a4fe89256ad6505443708a2 | |
parent | f6a8a38937688ea2cc5dc13d3d1039493be5c9b5 (diff) | |
download | kafka-python-no_ifrs_short_poll.tar.gz |
Reduce client poll timeout when no ifrsno_ifrs_short_poll
-rw-r--r-- | kafka/client_async.py | 3 | ||||
-rw-r--r-- | test/test_client_async.py | 12 |
2 files changed, 15 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 77efac8..42ec42b 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -588,6 +588,9 @@ class KafkaClient(object): metadata_timeout_ms, idle_connection_timeout_ms, self.config['request_timeout_ms']) + # if there are no requests in flight, do not block longer than the retry backoff + if self.in_flight_request_count() == 0: + timeout = min(timeout, self.config['retry_backoff_ms']) timeout = max(0, timeout / 1000) # avoid negative timeouts self._poll(timeout) diff --git a/test/test_client_async.py b/test/test_client_async.py index 2132c8e..9a33e62 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -233,6 +233,8 @@ def test_send(cli, conn): def test_poll(mocker): metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata') _poll = mocker.patch.object(KafkaClient, '_poll') + ifrs = mocker.patch.object(KafkaClient, 'in_flight_request_count') + ifrs.return_value = 1 cli = KafkaClient(api_version=(0, 9)) # metadata timeout wins @@ -249,6 +251,11 @@ def test_poll(mocker): cli.poll() _poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0) + # If no in-flight-requests, drop timeout to retry_backoff_ms + ifrs.return_value = 0 + cli.poll() + _poll.assert_called_with(cli.config['retry_backoff_ms'] / 1000.0) + def test__poll(): pass @@ -304,12 +311,14 @@ def client(mocker): def test_maybe_refresh_metadata_ttl(mocker, client): client.cluster.ttl.return_value = 1234 + mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) client.poll(timeout_ms=12345678) client._poll.assert_called_with(1.234) def test_maybe_refresh_metadata_backoff(mocker, client): + mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) now = time.time() t = mocker.patch('time.time') t.return_value = now @@ -320,6 +329,7 @@ def test_maybe_refresh_metadata_backoff(mocker, client): def test_maybe_refresh_metadata_in_progress(mocker, client): client._metadata_refresh_in_progress = True + mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) client.poll(timeout_ms=12345678) client._poll.assert_called_with(9999.999) # request_timeout_ms @@ -328,6 +338,7 @@ def test_maybe_refresh_metadata_in_progress(mocker, client): def test_maybe_refresh_metadata_update(mocker, client): mocker.patch.object(client, 'least_loaded_node', return_value='foobar') mocker.patch.object(client, '_can_send_request', return_value=True) + mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) send = mocker.patch.object(client, 'send') client.poll(timeout_ms=12345678) @@ -342,6 +353,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client): mocker.patch.object(client, '_can_connect', return_value=True) mocker.patch.object(client, '_maybe_connect', return_value=True) mocker.patch.object(client, 'maybe_connect', return_value=True) + mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) now = time.time() t = mocker.patch('time.time') |