summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-18 22:15:58 -0800
committerDana Powers <dana.powers@rd.io>2016-01-18 22:15:58 -0800
commit4c2ad1278013a9e04e718b411f938d7c7ff050ad (patch)
treefc34986990d58af6544c357ee91aff1b011928dc /kafka/client_async.py
parent1f6be11b0948cfaf27e88c2914b04a680a48f926 (diff)
downloadkafka-python-4c2ad1278013a9e04e718b411f938d7c7ff050ad.tar.gz
Add back connection_delay method to KafkaClient - used by KafkaProducer
This reverts commit 88cf1b5e4551cd96322aa812fa482bf0f978060a.
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py20
1 files changed, 20 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 577229a..c99057c 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -223,6 +223,26 @@ class KafkaClient(object):
return False
return self._conns[node_id].state is ConnectionStates.DISCONNECTED
+ def connection_delay(self, node_id):
+ """
+ Returns the number of milliseconds to wait, based on the connection
+ state, before attempting to send data. When disconnected, this respects
+ the reconnect backoff time. When connecting or connected, this handles
+ slow/stalled connections.
+
+ @param node_id The id of the node to check
+ @return The number of milliseconds to wait.
+ """
+ if node_id not in self._conns:
+ return 0
+
+ conn = self._conns[node_id]
+ time_waited_ms = time.time() - (conn.last_attempt or 0)
+ if conn.state is ConnectionStates.DISCONNECTED:
+ return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0)
+ else:
+ return 999999999
+
def is_ready(self, node_id):
"""Check whether a node is ready to send more requests.