summaryrefslogtreecommitdiff
path: root/test/test_client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_client_async.py')
-rw-r--r--test/test_client_async.py15
1 files changed, 14 insertions, 1 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py
index ad76aad..922e43c 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -1,3 +1,10 @@
+# selectors in stdlib as of py3.4
+try:
+ import selectors # pylint: disable=import-error
+except ImportError:
+ # vendored backport module
+ import kafka.selectors34 as selectors
+
import socket
import time
@@ -99,15 +106,19 @@ def test_maybe_connect(conn):
def test_conn_state_change(mocker, conn):
cli = KafkaClient()
+ sel = mocker.patch.object(cli, '_selector')
node_id = 0
conn.state = ConnectionStates.CONNECTING
cli._conn_state_change(node_id, conn)
assert node_id in cli._connecting
+ sel.register.assert_called_with(conn._sock, selectors.EVENT_WRITE)
conn.state = ConnectionStates.CONNECTED
cli._conn_state_change(node_id, conn)
assert node_id not in cli._connecting
+ sel.unregister.assert_called_with(conn._sock)
+ sel.register.assert_called_with(conn._sock, selectors.EVENT_READ, conn)
# Failure to connect should trigger metadata update
assert cli.cluster._need_update is False
@@ -115,6 +126,7 @@ def test_conn_state_change(mocker, conn):
cli._conn_state_change(node_id, conn)
assert node_id not in cli._connecting
assert cli.cluster._need_update is True
+ sel.unregister.assert_called_with(conn._sock)
conn.state = ConnectionStates.CONNECTING
cli._conn_state_change(node_id, conn)
@@ -167,8 +179,9 @@ def test_is_ready(mocker, conn):
assert not cli.is_ready(0)
-def test_close(conn):
+def test_close(mocker, conn):
cli = KafkaClient()
+ mocker.patch.object(cli, '_selector')
# Unknown node - silent
cli.close(2)