diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-12-17 10:39:06 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-01-19 11:10:04 -0800 |
commit | 0a8dabdf22e14396d3edcfeab4791065840bcc8c (patch) | |
tree | 47545a49eb93964b6fb11f4e20a276655b9adeb8 | |
parent | d0f4abe05d02458ad8a4a19e75d1ec86fb67ab3e (diff) | |
download | kafka-python-0a8dabdf22e14396d3edcfeab4791065840bcc8c.tar.gz |
Use connection state functions where possible
-rw-r--r-- | kafka/client_async.py | 6 | ||||
-rw-r--r-- | kafka/conn.py | 1 |
2 files changed, 4 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 6179eba..2c6413a 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -229,7 +229,7 @@ class KafkaClient(object): bootstrap.connect() while bootstrap.connecting(): bootstrap.connect() - if bootstrap.state is not ConnectionStates.CONNECTED: + if not bootstrap.connected(): bootstrap.close() continue future = bootstrap.send(metadata_request) @@ -261,7 +261,7 @@ class KafkaClient(object): return True return False conn = self._conns[node_id] - return conn.state is ConnectionStates.DISCONNECTED and not conn.blacked_out() + return conn.disconnected() and not conn.blacked_out() def _conn_state_change(self, node_id, conn): if conn.connecting(): @@ -398,7 +398,7 @@ class KafkaClient(object): conn = self._conns[node_id] time_waited_ms = time.time() - (conn.last_attempt or 0) - if conn.state is ConnectionStates.DISCONNECTED: + if conn.disconnected(): return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0) elif conn.connecting(): return 0 diff --git a/kafka/conn.py b/kafka/conn.py index 50dc4d9..ba88ca6 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -523,6 +523,7 @@ class BrokerConnection(object): return self._send(request, expect_response=expect_response) def _send(self, request, expect_response=True): + assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED) future = Future() correlation_id = self._next_correlation_id() header = RequestHeader(request, |