summaryrefslogtreecommitdiff
path: root/test/test_client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-04-04 18:07:44 -0700
committerDana Powers <dana.powers@gmail.com>2017-04-05 23:14:23 -0700
commite5117a2b61ceea86dbb46f6510fc4a878f669487 (patch)
tree4df566314f7cb072b9d87034d3615db4ad17540d /test/test_client_async.py
parent29e699d940df5fa3ae3ee77cc57e9f90da1396c7 (diff)
downloadkafka-python-timeout_idle_connections.tar.gz
Add support for connections_max_idle_mstimeout_idle_connections
Diffstat (limited to 'test/test_client_async.py')
-rw-r--r--test/test_client_async.py38
1 files changed, 36 insertions, 2 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 8f6ac3f..d4e6d37 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -1,3 +1,5 @@
+from __future__ import absolute_import, division
+
# selectors in stdlib as of py3.4
try:
import selectors # pylint: disable=import-error
@@ -10,7 +12,7 @@ import time
import pytest
-from kafka.client_async import KafkaClient
+from kafka.client_async import KafkaClient, IdleConnectionManager
from kafka.conn import ConnectionStates
import kafka.errors as Errors
from kafka.future import Future
@@ -319,7 +321,10 @@ def client(mocker):
mocker.patch.object(KafkaClient, '_bootstrap')
_poll = mocker.patch.object(KafkaClient, '_poll')
- cli = KafkaClient(request_timeout_ms=9999999, reconnect_backoff_ms=2222, api_version=(0, 9))
+ cli = KafkaClient(request_timeout_ms=9999999,
+ reconnect_backoff_ms=2222,
+ connections_max_idle_ms=float('inf'),
+ api_version=(0, 9))
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
tasks.return_value = 9999999
@@ -395,3 +400,32 @@ def test_schedule():
def test_unschedule():
pass
+
+
+def test_idle_connection_manager(mocker):
+ t = mocker.patch.object(time, 'time')
+ t.return_value = 0
+
+ idle = IdleConnectionManager(100)
+ assert idle.next_check_ms() == float('inf')
+
+ idle.update('foo')
+ assert not idle.is_expired('foo')
+ assert idle.poll_expired_connection() is None
+ assert idle.next_check_ms() == 100
+
+ t.return_value = 90 / 1000
+ assert not idle.is_expired('foo')
+ assert idle.poll_expired_connection() is None
+ assert idle.next_check_ms() == 10
+
+ t.return_value = 100 / 1000
+ assert idle.is_expired('foo')
+ assert idle.next_check_ms() == 0
+
+ conn_id, conn_ts = idle.poll_expired_connection()
+ assert conn_id == 'foo'
+ assert conn_ts == 0
+
+ idle.remove('foo')
+ assert idle.next_check_ms() == float('inf')