summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py20
1 files changed, 20 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 577229a..c99057c 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -223,6 +223,26 @@ class KafkaClient(object):
return False
return self._conns[node_id].state is ConnectionStates.DISCONNECTED
+ def connection_delay(self, node_id):
+ """
+ Returns the number of milliseconds to wait, based on the connection
+ state, before attempting to send data. When disconnected, this respects
+ the reconnect backoff time. When connecting or connected, this handles
+ slow/stalled connections.
+
+ @param node_id The id of the node to check
+ @return The number of milliseconds to wait.
+ """
+ if node_id not in self._conns:
+ return 0
+
+ conn = self._conns[node_id]
+ time_waited_ms = time.time() - (conn.last_attempt or 0)
+ if conn.state is ConnectionStates.DISCONNECTED:
+ return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0)
+ else:
+ return 999999999
+
def is_ready(self, node_id):
"""Check whether a node is ready to send more requests.