summaryrefslogtreecommitdiff
path: root/test/test_client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-04-10 10:06:46 -0700
committerGitHub <noreply@github.com>2017-04-10 10:06:46 -0700
commit04296994defcbf1c6dd9d1bae802af94bc11d74f (patch)
tree4df566314f7cb072b9d87034d3615db4ad17540d /test/test_client_async.py
parent7c24135eaf1db95c50c5d340cd15cbfc2674c927 (diff)
downloadkafka-python-04296994defcbf1c6dd9d1bae802af94bc11d74f.tar.gz
Timeout idle connections via connections_max_idle_ms (#1068)
Diffstat (limited to 'test/test_client_async.py')
-rw-r--r--test/test_client_async.py38
1 files changed, 36 insertions, 2 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 8f6ac3f..d4e6d37 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -1,3 +1,5 @@
+from __future__ import absolute_import, division
+
# selectors in stdlib as of py3.4
try:
import selectors # pylint: disable=import-error
@@ -10,7 +12,7 @@ import time
import pytest
-from kafka.client_async import KafkaClient
+from kafka.client_async import KafkaClient, IdleConnectionManager
from kafka.conn import ConnectionStates
import kafka.errors as Errors
from kafka.future import Future
@@ -319,7 +321,10 @@ def client(mocker):
mocker.patch.object(KafkaClient, '_bootstrap')
_poll = mocker.patch.object(KafkaClient, '_poll')
- cli = KafkaClient(request_timeout_ms=9999999, reconnect_backoff_ms=2222, api_version=(0, 9))
+ cli = KafkaClient(request_timeout_ms=9999999,
+ reconnect_backoff_ms=2222,
+ connections_max_idle_ms=float('inf'),
+ api_version=(0, 9))
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
tasks.return_value = 9999999
@@ -395,3 +400,32 @@ def test_schedule():
def test_unschedule():
pass
+
+
+def test_idle_connection_manager(mocker):
+ t = mocker.patch.object(time, 'time')
+ t.return_value = 0
+
+ idle = IdleConnectionManager(100)
+ assert idle.next_check_ms() == float('inf')
+
+ idle.update('foo')
+ assert not idle.is_expired('foo')
+ assert idle.poll_expired_connection() is None
+ assert idle.next_check_ms() == 100
+
+ t.return_value = 90 / 1000
+ assert not idle.is_expired('foo')
+ assert idle.poll_expired_connection() is None
+ assert idle.next_check_ms() == 10
+
+ t.return_value = 100 / 1000
+ assert idle.is_expired('foo')
+ assert idle.next_check_ms() == 0
+
+ conn_id, conn_ts = idle.poll_expired_connection()
+ assert conn_id == 'foo'
+ assert conn_ts == 0
+
+ idle.remove('foo')
+ assert idle.next_check_ms() == float('inf')