summaryrefslogtreecommitdiff
path: root/test/test_client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_client_async.py')
-rw-r--r--test/test_client_async.py12
1 files changed, 12 insertions, 0 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 2132c8e..9a33e62 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -233,6 +233,8 @@ def test_send(cli, conn):
def test_poll(mocker):
metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata')
_poll = mocker.patch.object(KafkaClient, '_poll')
+ ifrs = mocker.patch.object(KafkaClient, 'in_flight_request_count')
+ ifrs.return_value = 1
cli = KafkaClient(api_version=(0, 9))
# metadata timeout wins
@@ -249,6 +251,11 @@ def test_poll(mocker):
cli.poll()
_poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0)
+ # If no in-flight-requests, drop timeout to retry_backoff_ms
+ ifrs.return_value = 0
+ cli.poll()
+ _poll.assert_called_with(cli.config['retry_backoff_ms'] / 1000.0)
+
def test__poll():
pass
@@ -304,12 +311,14 @@ def client(mocker):
def test_maybe_refresh_metadata_ttl(mocker, client):
client.cluster.ttl.return_value = 1234
+ mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(1.234)
def test_maybe_refresh_metadata_backoff(mocker, client):
+ mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
now = time.time()
t = mocker.patch('time.time')
t.return_value = now
@@ -320,6 +329,7 @@ def test_maybe_refresh_metadata_backoff(mocker, client):
def test_maybe_refresh_metadata_in_progress(mocker, client):
client._metadata_refresh_in_progress = True
+ mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(9999.999) # request_timeout_ms
@@ -328,6 +338,7 @@ def test_maybe_refresh_metadata_in_progress(mocker, client):
def test_maybe_refresh_metadata_update(mocker, client):
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
mocker.patch.object(client, '_can_send_request', return_value=True)
+ mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
send = mocker.patch.object(client, 'send')
client.poll(timeout_ms=12345678)
@@ -342,6 +353,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
mocker.patch.object(client, '_can_connect', return_value=True)
mocker.patch.object(client, '_maybe_connect', return_value=True)
mocker.patch.object(client, 'maybe_connect', return_value=True)
+ mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
now = time.time()
t = mocker.patch('time.time')