diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 20 |
1 files changed, 20 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 577229a..c99057c 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -223,6 +223,26 @@ class KafkaClient(object): return False return self._conns[node_id].state is ConnectionStates.DISCONNECTED + def connection_delay(self, node_id): + """ + Returns the number of milliseconds to wait, based on the connection + state, before attempting to send data. When disconnected, this respects + the reconnect backoff time. When connecting or connected, this handles + slow/stalled connections. + + @param node_id The id of the node to check + @return The number of milliseconds to wait. + """ + if node_id not in self._conns: + return 0 + + conn = self._conns[node_id] + time_waited_ms = time.time() - (conn.last_attempt or 0) + if conn.state is ConnectionStates.DISCONNECTED: + return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0) + else: + return 999999999 + def is_ready(self, node_id): """Check whether a node is ready to send more requests. |