summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/conn.py4
-rw-r--r--test/conftest.py9
-rw-r--r--test/test_client_async.py11
3 files changed, 16 insertions, 8 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index cdf7a5e..92b2fd3 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -139,6 +139,10 @@ class BrokerConnection(object):
"""Return True iff socket is in intermediate connecting state."""
return self.state is ConnectionStates.CONNECTING
+ def disconnected(self):
+ """Return True iff socket is closed"""
+ return self.state is ConnectionStates.DISCONNECTED
+
def close(self, error=None):
"""Close socket and fail all in-flight-requests.
diff --git a/test/conftest.py b/test/conftest.py
index a389480..1f37960 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -46,7 +46,12 @@ def conn(mocker):
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
[])) # topics
conn.blacked_out.return_value = False
+ def _set_conn_state(state):
+ conn.state = state
+ return state
+ conn._set_conn_state = _set_conn_state
conn.connect.side_effect = lambda: conn.state
- conn.connecting = lambda: conn.connect() is ConnectionStates.CONNECTING
- conn.connected = lambda: conn.connect() is ConnectionStates.CONNECTED
+ conn.connecting = lambda: conn.state is ConnectionStates.CONNECTING
+ conn.connected = lambda: conn.state is ConnectionStates.CONNECTED
+ conn.disconnected = lambda: conn.state is ConnectionStates.DISCONNECTED
return conn
diff --git a/test/test_client_async.py b/test/test_client_async.py
index c326d55..88f0fc7 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -85,21 +85,20 @@ def test_maybe_connect(conn):
assert 0 not in cli._conns
conn.state = ConnectionStates.DISCONNECTED
- conn.connect.side_effect = lambda: ConnectionStates.CONNECTING
+ conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTING)
assert cli._maybe_connect(0) is False
assert cli._conns[0] is conn
assert 0 in cli._connecting
- conn.state = ConnectionStates.CONNECTING
- conn.connect.side_effect = lambda: ConnectionStates.CONNECTED
+ conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTED)
assert cli._maybe_connect(0) is True
assert 0 not in cli._connecting
# Failure to connect should trigger metadata update
assert cli.cluster._need_update is False
- cli._connecting.add(0)
conn.state = ConnectionStates.CONNECTING
- conn.connect.side_effect = lambda: ConnectionStates.DISCONNECTED
+ cli._connecting.add(0)
+ conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.DISCONNECTED)
assert cli._maybe_connect(0) is False
assert 0 not in cli._connecting
assert cli.cluster._need_update is True
@@ -155,7 +154,7 @@ def test_ready(conn):
# connecting node connects
cli._connecting.add(0)
conn.state = ConnectionStates.CONNECTING
- conn.connect.side_effect = lambda: ConnectionStates.CONNECTED
+ conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTED)
cli.ready(0)
assert 0 not in cli._connecting
assert cli._conns[0].connect.called_with()