summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-06-20 09:23:07 -0700
committerDana Powers <dana.powers@gmail.com>2016-06-20 09:56:07 -0700
commit06abc63522a32ddf15a9de9c168132d85326cc0d (patch)
tree85e21b8e2796d61f9b1c62ae829aad1926706c88 /test
parent461ccbd9ecf06722c9ff73f6ed439be4b8391672 (diff)
downloadkafka-python-metadata_refresh_backoff.tar.gz
Avoid busy poll during metadata refresh failure with retry_backoff_msmetadata_refresh_backoff
Diffstat (limited to 'test')
-rw-r--r--test/test_client_async.py102
1 files changed, 100 insertions, 2 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 5870501..06c2bf5 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -293,8 +293,106 @@ def test_set_topics():
pass
-def test_maybe_refresh_metadata():
- pass
+def test_maybe_refresh_metadata_ttl(mocker):
+ mocker.patch.object(KafkaClient, '_bootstrap')
+ _poll = mocker.patch.object(KafkaClient, '_poll')
+
+ cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
+
+ tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
+ tasks.return_value = 9999999
+
+ ttl = mocker.patch.object(cli.cluster, 'ttl')
+ ttl.return_value = 1234
+
+ cli.poll(timeout_ms=9999999, sleep=True)
+ _poll.assert_called_with(1.234, sleep=True)
+
+
+def test_maybe_refresh_metadata_backoff(mocker):
+ mocker.patch.object(KafkaClient, '_bootstrap')
+ _poll = mocker.patch.object(KafkaClient, '_poll')
+
+ cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
+
+ tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
+ tasks.return_value = 9999999
+
+ ttl = mocker.patch.object(cli.cluster, 'ttl')
+ ttl.return_value = 0
+
+ now = time.time()
+ t = mocker.patch('time.time')
+ t.return_value = now
+ cli._last_no_node_available_ms = now * 1000
+
+ cli.poll(timeout_ms=9999999, sleep=True)
+ _poll.assert_called_with(2.222, sleep=True)
+
+
+def test_maybe_refresh_metadata_in_progress(mocker):
+ mocker.patch.object(KafkaClient, '_bootstrap')
+ _poll = mocker.patch.object(KafkaClient, '_poll')
+
+ cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
+
+ tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
+ tasks.return_value = 9999999
+
+ ttl = mocker.patch.object(cli.cluster, 'ttl')
+ ttl.return_value = 0
+
+ cli._metadata_refresh_in_progress = True
+
+ cli.poll(timeout_ms=9999999, sleep=True)
+ _poll.assert_called_with(9999.999, sleep=True)
+
+
+def test_maybe_refresh_metadata_update(mocker):
+ mocker.patch.object(KafkaClient, '_bootstrap')
+ _poll = mocker.patch.object(KafkaClient, '_poll')
+
+ cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
+
+ tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
+ tasks.return_value = 9999999
+
+ ttl = mocker.patch.object(cli.cluster, 'ttl')
+ ttl.return_value = 0
+
+ mocker.patch.object(cli, 'least_loaded_node', return_value='foobar')
+ mocker.patch.object(cli, '_can_send_request', return_value=True)
+ send = mocker.patch.object(cli, 'send')
+
+ cli.poll(timeout_ms=9999999, sleep=True)
+ _poll.assert_called_with(0, sleep=True)
+ assert cli._metadata_refresh_in_progress
+ request = MetadataRequest[0]([])
+ send.assert_called_with('foobar', request)
+
+
+def test_maybe_refresh_metadata_failure(mocker):
+ mocker.patch.object(KafkaClient, '_bootstrap')
+ _poll = mocker.patch.object(KafkaClient, '_poll')
+
+ cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222)
+
+ tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
+ tasks.return_value = 9999999
+
+ ttl = mocker.patch.object(cli.cluster, 'ttl')
+ ttl.return_value = 0
+
+ mocker.patch.object(cli, 'least_loaded_node', return_value='foobar')
+
+ now = time.time()
+ t = mocker.patch('time.time')
+ t.return_value = now
+
+ cli.poll(timeout_ms=9999999, sleep=True)
+ _poll.assert_called_with(0, sleep=True)
+ assert cli._last_no_node_available_ms == now * 1000
+ assert not cli._metadata_refresh_in_progress
def test_schedule():