diff options
Diffstat (limited to 'test/test_client_async.py')
| -rw-r--r-- | test/test_client_async.py | 84 | 
1 files changed, 40 insertions, 44 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py index e0b98c4..884686d 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -41,7 +41,8 @@ def conn(mocker):              [(0, 'foo', 12), (1, 'bar', 34)],  # brokers              []))  # topics      conn.blacked_out.return_value = False -    conn.connect.return_value = conn.state +    conn.connect.side_effect = lambda: conn.state +    conn.connected = lambda: conn.connect() is ConnectionStates.CONNECTED      return conn @@ -76,7 +77,7 @@ def test_can_connect(conn):      assert cli._can_connect(0)      # Node is connected, can't reconnect -    cli._initiate_connect(0) +    assert cli._maybe_connect(0) is True      assert not cli._can_connect(0)      # Node is disconnected, can connect @@ -87,60 +88,47 @@ def test_can_connect(conn):      conn.blacked_out.return_value = True      assert not cli._can_connect(0) -def test_initiate_connect(conn): +def test_maybe_connect(conn):      cli = KafkaClient()      try:          # Node not in metadata, raises AssertionError -        cli._initiate_connect(2) +        cli._maybe_connect(2)      except AssertionError:          pass      else:          assert False, 'Exception not raised'      assert 0 not in cli._conns -    state = cli._initiate_connect(0) +    conn.state = ConnectionStates.DISCONNECTED +    conn.connect.side_effect = lambda: ConnectionStates.CONNECTING +    assert cli._maybe_connect(0) is False      assert cli._conns[0] is conn -    assert state is conn.state - - -def test_finish_connect(conn): -    cli = KafkaClient() -    try: -        # Node not in metadata, raises AssertionError -        cli._initiate_connect(2) -    except AssertionError: -        pass -    else: -        assert False, 'Exception not raised' - -    assert 0 not in cli._conns -    cli._initiate_connect(0) - -    conn.connect.return_value = ConnectionStates.CONNECTING -    state = cli._finish_connect(0)      assert 0 in cli._connecting -    assert state is ConnectionStates.CONNECTING -    conn.connect.return_value = ConnectionStates.CONNECTED -    state = cli._finish_connect(0) +    conn.state = ConnectionStates.CONNECTING +    conn.connect.side_effect = lambda: ConnectionStates.CONNECTED +    assert cli._maybe_connect(0) is True      assert 0 not in cli._connecting -    assert state is ConnectionStates.CONNECTED      # Failure to connect should trigger metadata update -    assert not cli.cluster._need_update +    assert cli.cluster._need_update is False      cli._connecting.add(0) -    conn.connect.return_value = ConnectionStates.DISCONNECTED -    state = cli._finish_connect(0) +    conn.state = ConnectionStates.CONNECTING +    conn.connect.side_effect = lambda: ConnectionStates.DISCONNECTED +    assert cli._maybe_connect(0) is False      assert 0 not in cli._connecting -    assert state is ConnectionStates.DISCONNECTED -    assert cli.cluster._need_update +    assert cli.cluster._need_update is True  def test_ready(conn):      cli = KafkaClient() -    # Node not in metadata -    assert not cli.ready(2) +    # Node not in metadata raises Exception +    try: +        cli.ready(2) +        assert False, 'Exception not raised' +    except AssertionError: +        pass      # Node in metadata will connect      assert 0 not in cli._conns @@ -176,13 +164,13 @@ def test_ready(conn):      # disconnected nodes, not ready      assert cli.ready(0)      assert cli.ready(1) -    conn.connected.return_value = False +    conn.state = ConnectionStates.DISCONNECTED      assert not cli.ready(0) -    conn.connected.return_value = True      # connecting node connects      cli._connecting.add(0) -    conn.connected.return_value = False +    conn.state = ConnectionStates.CONNECTING +    conn.connect.side_effect = lambda: ConnectionStates.CONNECTED      cli.ready(0)      assert 0 not in cli._connecting      assert cli._conns[0].connect.called_with() @@ -195,13 +183,13 @@ def test_close(conn):      cli.close(2)      # Single node close -    cli._initiate_connect(0) +    cli._maybe_connect(0)      assert not conn.close.call_count      cli.close(0)      assert conn.close.call_count == 1      # All node close -    cli._initiate_connect(1) +    cli._maybe_connect(1)      cli.close()      assert conn.close.call_count == 3 @@ -213,7 +201,7 @@ def test_is_disconnected(conn):      conn.state = ConnectionStates.DISCONNECTED      assert not cli.is_disconnected(0) -    cli._initiate_connect(0) +    cli._maybe_connect(0)      assert cli.is_disconnected(0)      conn.state = ConnectionStates.CONNECTING @@ -225,14 +213,22 @@ def test_is_disconnected(conn):  def test_send(conn):      cli = KafkaClient() + +    # Send to unknown node => raises AssertionError      try:          cli.send(2, None) -    except Errors.NodeNotReadyError: +        assert False, 'Exception not raised' +    except AssertionError:          pass -    else: -        assert False, 'NodeNotReadyError not raised' -    cli._initiate_connect(0) +    # Send to disconnected node => NodeNotReady +    conn.state = ConnectionStates.DISCONNECTED +    f = cli.send(0, None) +    assert f.failed() +    assert isinstance(f.exception, Errors.NodeNotReadyError) + +    conn.state = ConnectionStates.CONNECTED +    cli._maybe_connect(0)      # ProduceRequest w/ 0 required_acks -> no response      request = ProduceRequest(0, 0, [])      ret = cli.send(0, request)  | 
