diff options
Diffstat (limited to 'test/test_conn.py')
-rw-r--r-- | test/test_conn.py | 81 |
1 files changed, 62 insertions, 19 deletions
diff --git a/test/test_conn.py b/test/test_conn.py index f0ca2cf..6a3b154 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from errno import EALREADY, EINPROGRESS, EISCONN, ECONNRESET +import socket import time import pytest @@ -14,7 +15,7 @@ import kafka.common as Errors @pytest.fixture -def socket(mocker): +def _socket(mocker): socket = mocker.MagicMock() socket.connect_ex.return_value = 0 mocker.patch('socket.socket', return_value=socket) @@ -22,9 +23,8 @@ def socket(mocker): @pytest.fixture -def conn(socket): - from socket import AF_INET - conn = BrokerConnection('localhost', 9092, AF_INET) +def conn(_socket): + conn = BrokerConnection('localhost', 9092, socket.AF_INET) return conn @@ -38,23 +38,23 @@ def conn(socket): ([EALREADY], ConnectionStates.CONNECTING), ([EISCONN], ConnectionStates.CONNECTED)), ]) -def test_connect(socket, conn, states): +def test_connect(_socket, conn, states): assert conn.state is ConnectionStates.DISCONNECTED for errno, state in states: - socket.connect_ex.side_effect = errno + _socket.connect_ex.side_effect = errno conn.connect() assert conn.state is state -def test_connect_timeout(socket, conn): +def test_connect_timeout(_socket, conn): assert conn.state is ConnectionStates.DISCONNECTED # Initial connect returns EINPROGRESS # immediate inline connect returns EALREADY # second explicit connect returns EALREADY # third explicit connect returns EALREADY and times out via last_attempt - socket.connect_ex.side_effect = [EINPROGRESS, EALREADY, EALREADY, EALREADY] + _socket.connect_ex.side_effect = [EINPROGRESS, EALREADY, EALREADY, EALREADY] conn.connect() assert conn.state is ConnectionStates.CONNECTING conn.connect() @@ -108,7 +108,7 @@ def test_send_max_ifr(conn): assert isinstance(f.exception, Errors.TooManyInFlightRequests) -def test_send_no_response(socket, conn): +def test_send_no_response(_socket, conn): conn.connect() assert conn.state is ConnectionStates.CONNECTED req = MetadataRequest[0]([]) @@ -116,7 +116,7 @@ def test_send_no_response(socket, conn): payload_bytes = len(header.encode()) + len(req.encode()) third = payload_bytes // 3 remainder = payload_bytes % 3 - socket.send.side_effect = [4, third, third, third, remainder] + _socket.send.side_effect = [4, third, third, third, remainder] assert len(conn.in_flight_requests) == 0 f = conn.send(req, expect_response=False) @@ -125,7 +125,7 @@ def test_send_no_response(socket, conn): assert len(conn.in_flight_requests) == 0 -def test_send_response(socket, conn): +def test_send_response(_socket, conn): conn.connect() assert conn.state is ConnectionStates.CONNECTED req = MetadataRequest[0]([]) @@ -133,7 +133,7 @@ def test_send_response(socket, conn): payload_bytes = len(header.encode()) + len(req.encode()) third = payload_bytes // 3 remainder = payload_bytes % 3 - socket.send.side_effect = [4, third, third, third, remainder] + _socket.send.side_effect = [4, third, third, third, remainder] assert len(conn.in_flight_requests) == 0 f = conn.send(req) @@ -141,20 +141,18 @@ def test_send_response(socket, conn): assert len(conn.in_flight_requests) == 1 -def test_send_error(socket, conn): +def test_send_error(_socket, conn): conn.connect() assert conn.state is ConnectionStates.CONNECTED req = MetadataRequest[0]([]) - header = RequestHeader(req, client_id=conn.config['client_id']) try: - error = ConnectionError + _socket.send.side_effect = ConnectionError except NameError: - from socket import error - socket.send.side_effect = error + _socket.send.side_effect = socket.error f = conn.send(req) assert f.failed() is True assert isinstance(f.exception, Errors.ConnectionError) - assert socket.close.call_count == 1 + assert _socket.close.call_count == 1 assert conn.state is ConnectionStates.DISCONNECTED @@ -167,7 +165,52 @@ def test_can_send_more(conn): assert conn.can_send_more() is False -def test_recv(socket, conn): +def test_recv_disconnected(): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(('127.0.0.1', 0)) + port = sock.getsockname()[1] + sock.listen(5) + + conn = BrokerConnection('127.0.0.1', port, socket.AF_INET) + timeout = time.time() + 1 + while time.time() < timeout: + conn.connect() + if conn.connected(): + break + else: + assert False, 'Connection attempt to local socket timed-out ?' + + conn.send(MetadataRequest[0]([])) + + # Disconnect server socket + sock.close() + + # Attempt to receive should mark connection as disconnected + assert conn.connected() + conn.recv() + assert conn.disconnected() + + +def test_recv_disconnected_too(_socket, conn): + conn.connect() + assert conn.connected() + + req = MetadataRequest[0]([]) + header = RequestHeader(req, client_id=conn.config['client_id']) + payload_bytes = len(header.encode()) + len(req.encode()) + _socket.send.side_effect = [4, payload_bytes] + conn.send(req) + + # Empty data on recv means the socket is disconnected + _socket.recv.return_value = b'' + + # Attempt to receive should mark connection as disconnected + assert conn.connected() + conn.recv() + assert conn.disconnected() + + +def test_recv(_socket, conn): pass # TODO |