summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-05-28 23:45:48 -0700
committerDana Powers <dana.powers@gmail.com>2019-05-28 23:45:48 -0700
commit0ac7b6c144907df1481d55085fc81b784f20bd27 (patch)
treec25496928c9d5e3f1a4fe89256ad6505443708a2
parentf6a8a38937688ea2cc5dc13d3d1039493be5c9b5 (diff)
downloadkafka-python-no_ifrs_short_poll.tar.gz
Reduce client poll timeout when no ifrsno_ifrs_short_poll
-rw-r--r--kafka/client_async.py3
-rw-r--r--test/test_client_async.py12
2 files changed, 15 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 77efac8..42ec42b 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -588,6 +588,9 @@ class KafkaClient(object):
metadata_timeout_ms,
idle_connection_timeout_ms,
self.config['request_timeout_ms'])
+ # if there are no requests in flight, do not block longer than the retry backoff
+ if self.in_flight_request_count() == 0:
+ timeout = min(timeout, self.config['retry_backoff_ms'])
timeout = max(0, timeout / 1000) # avoid negative timeouts
self._poll(timeout)
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 2132c8e..9a33e62 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -233,6 +233,8 @@ def test_send(cli, conn):
def test_poll(mocker):
metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata')
_poll = mocker.patch.object(KafkaClient, '_poll')
+ ifrs = mocker.patch.object(KafkaClient, 'in_flight_request_count')
+ ifrs.return_value = 1
cli = KafkaClient(api_version=(0, 9))
# metadata timeout wins
@@ -249,6 +251,11 @@ def test_poll(mocker):
cli.poll()
_poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0)
+ # If no in-flight-requests, drop timeout to retry_backoff_ms
+ ifrs.return_value = 0
+ cli.poll()
+ _poll.assert_called_with(cli.config['retry_backoff_ms'] / 1000.0)
+
def test__poll():
pass
@@ -304,12 +311,14 @@ def client(mocker):
def test_maybe_refresh_metadata_ttl(mocker, client):
client.cluster.ttl.return_value = 1234
+ mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(1.234)
def test_maybe_refresh_metadata_backoff(mocker, client):
+ mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
now = time.time()
t = mocker.patch('time.time')
t.return_value = now
@@ -320,6 +329,7 @@ def test_maybe_refresh_metadata_backoff(mocker, client):
def test_maybe_refresh_metadata_in_progress(mocker, client):
client._metadata_refresh_in_progress = True
+ mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(9999.999) # request_timeout_ms
@@ -328,6 +338,7 @@ def test_maybe_refresh_metadata_in_progress(mocker, client):
def test_maybe_refresh_metadata_update(mocker, client):
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
mocker.patch.object(client, '_can_send_request', return_value=True)
+ mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
send = mocker.patch.object(client, 'send')
client.poll(timeout_ms=12345678)
@@ -342,6 +353,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
mocker.patch.object(client, '_can_connect', return_value=True)
mocker.patch.object(client, '_maybe_connect', return_value=True)
mocker.patch.object(client, 'maybe_connect', return_value=True)
+ mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
now = time.time()
t = mocker.patch('time.time')