diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-09 08:48:51 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-09 08:48:51 -0700 |
commit | cda2d59da4ff952adae1a75d906eaa3a99ac7f67 (patch) | |
tree | c1478b5a63c3fdbd5adb9274742717b82608596c /test | |
parent | 810f08b7996a15e65cdd8af6c1a7167c28f94646 (diff) | |
parent | 237bd730fd29a105b6aabdc0262a694fb7c8f510 (diff) | |
download | kafka-python-cda2d59da4ff952adae1a75d906eaa3a99ac7f67.tar.gz |
Merge pull request #640 from dpkp/selectors
Manage non-blocking I/O events with selectors module
Diffstat (limited to 'test')
-rw-r--r-- | test/test_client_async.py | 15 |
1 files changed, 14 insertions, 1 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py index ad76aad..922e43c 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -1,3 +1,10 @@ +# selectors in stdlib as of py3.4 +try: + import selectors # pylint: disable=import-error +except ImportError: + # vendored backport module + import kafka.selectors34 as selectors + import socket import time @@ -99,15 +106,19 @@ def test_maybe_connect(conn): def test_conn_state_change(mocker, conn): cli = KafkaClient() + sel = mocker.patch.object(cli, '_selector') node_id = 0 conn.state = ConnectionStates.CONNECTING cli._conn_state_change(node_id, conn) assert node_id in cli._connecting + sel.register.assert_called_with(conn._sock, selectors.EVENT_WRITE) conn.state = ConnectionStates.CONNECTED cli._conn_state_change(node_id, conn) assert node_id not in cli._connecting + sel.unregister.assert_called_with(conn._sock) + sel.register.assert_called_with(conn._sock, selectors.EVENT_READ, conn) # Failure to connect should trigger metadata update assert cli.cluster._need_update is False @@ -115,6 +126,7 @@ def test_conn_state_change(mocker, conn): cli._conn_state_change(node_id, conn) assert node_id not in cli._connecting assert cli.cluster._need_update is True + sel.unregister.assert_called_with(conn._sock) conn.state = ConnectionStates.CONNECTING cli._conn_state_change(node_id, conn) @@ -167,8 +179,9 @@ def test_is_ready(mocker, conn): assert not cli.is_ready(0) -def test_close(conn): +def test_close(mocker, conn): cli = KafkaClient() + mocker.patch.object(cli, '_selector') # Unknown node - silent cli.close(2) |