summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/test_client_async.py84
1 files changed, 40 insertions, 44 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py
index e0b98c4..884686d 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -41,7 +41,8 @@ def conn(mocker):
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
[])) # topics
conn.blacked_out.return_value = False
- conn.connect.return_value = conn.state
+ conn.connect.side_effect = lambda: conn.state
+ conn.connected = lambda: conn.connect() is ConnectionStates.CONNECTED
return conn
@@ -76,7 +77,7 @@ def test_can_connect(conn):
assert cli._can_connect(0)
# Node is connected, can't reconnect
- cli._initiate_connect(0)
+ assert cli._maybe_connect(0) is True
assert not cli._can_connect(0)
# Node is disconnected, can connect
@@ -87,60 +88,47 @@ def test_can_connect(conn):
conn.blacked_out.return_value = True
assert not cli._can_connect(0)
-def test_initiate_connect(conn):
+def test_maybe_connect(conn):
cli = KafkaClient()
try:
# Node not in metadata, raises AssertionError
- cli._initiate_connect(2)
+ cli._maybe_connect(2)
except AssertionError:
pass
else:
assert False, 'Exception not raised'
assert 0 not in cli._conns
- state = cli._initiate_connect(0)
+ conn.state = ConnectionStates.DISCONNECTED
+ conn.connect.side_effect = lambda: ConnectionStates.CONNECTING
+ assert cli._maybe_connect(0) is False
assert cli._conns[0] is conn
- assert state is conn.state
-
-
-def test_finish_connect(conn):
- cli = KafkaClient()
- try:
- # Node not in metadata, raises AssertionError
- cli._initiate_connect(2)
- except AssertionError:
- pass
- else:
- assert False, 'Exception not raised'
-
- assert 0 not in cli._conns
- cli._initiate_connect(0)
-
- conn.connect.return_value = ConnectionStates.CONNECTING
- state = cli._finish_connect(0)
assert 0 in cli._connecting
- assert state is ConnectionStates.CONNECTING
- conn.connect.return_value = ConnectionStates.CONNECTED
- state = cli._finish_connect(0)
+ conn.state = ConnectionStates.CONNECTING
+ conn.connect.side_effect = lambda: ConnectionStates.CONNECTED
+ assert cli._maybe_connect(0) is True
assert 0 not in cli._connecting
- assert state is ConnectionStates.CONNECTED
# Failure to connect should trigger metadata update
- assert not cli.cluster._need_update
+ assert cli.cluster._need_update is False
cli._connecting.add(0)
- conn.connect.return_value = ConnectionStates.DISCONNECTED
- state = cli._finish_connect(0)
+ conn.state = ConnectionStates.CONNECTING
+ conn.connect.side_effect = lambda: ConnectionStates.DISCONNECTED
+ assert cli._maybe_connect(0) is False
assert 0 not in cli._connecting
- assert state is ConnectionStates.DISCONNECTED
- assert cli.cluster._need_update
+ assert cli.cluster._need_update is True
def test_ready(conn):
cli = KafkaClient()
- # Node not in metadata
- assert not cli.ready(2)
+ # Node not in metadata raises Exception
+ try:
+ cli.ready(2)
+ assert False, 'Exception not raised'
+ except AssertionError:
+ pass
# Node in metadata will connect
assert 0 not in cli._conns
@@ -176,13 +164,13 @@ def test_ready(conn):
# disconnected nodes, not ready
assert cli.ready(0)
assert cli.ready(1)
- conn.connected.return_value = False
+ conn.state = ConnectionStates.DISCONNECTED
assert not cli.ready(0)
- conn.connected.return_value = True
# connecting node connects
cli._connecting.add(0)
- conn.connected.return_value = False
+ conn.state = ConnectionStates.CONNECTING
+ conn.connect.side_effect = lambda: ConnectionStates.CONNECTED
cli.ready(0)
assert 0 not in cli._connecting
assert cli._conns[0].connect.called_with()
@@ -195,13 +183,13 @@ def test_close(conn):
cli.close(2)
# Single node close
- cli._initiate_connect(0)
+ cli._maybe_connect(0)
assert not conn.close.call_count
cli.close(0)
assert conn.close.call_count == 1
# All node close
- cli._initiate_connect(1)
+ cli._maybe_connect(1)
cli.close()
assert conn.close.call_count == 3
@@ -213,7 +201,7 @@ def test_is_disconnected(conn):
conn.state = ConnectionStates.DISCONNECTED
assert not cli.is_disconnected(0)
- cli._initiate_connect(0)
+ cli._maybe_connect(0)
assert cli.is_disconnected(0)
conn.state = ConnectionStates.CONNECTING
@@ -225,14 +213,22 @@ def test_is_disconnected(conn):
def test_send(conn):
cli = KafkaClient()
+
+ # Send to unknown node => raises AssertionError
try:
cli.send(2, None)
- except Errors.NodeNotReadyError:
+ assert False, 'Exception not raised'
+ except AssertionError:
pass
- else:
- assert False, 'NodeNotReadyError not raised'
- cli._initiate_connect(0)
+ # Send to disconnected node => NodeNotReady
+ conn.state = ConnectionStates.DISCONNECTED
+ f = cli.send(0, None)
+ assert f.failed()
+ assert isinstance(f.exception, Errors.NodeNotReadyError)
+
+ conn.state = ConnectionStates.CONNECTED
+ cli._maybe_connect(0)
# ProduceRequest w/ 0 required_acks -> no response
request = ProduceRequest(0, 0, [])
ret = cli.send(0, request)