summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py33
1 files changed, 14 insertions, 19 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index d8c2389..5308c1f 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -67,6 +67,14 @@ class KafkaClient(object):
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host.
Default: 50.
+ reconnect_backoff_max_ms (int): The maximum amount of time in
+ milliseconds to wait when reconnecting to a broker that has
+ repeatedly failed to connect. If provided, the backoff per host
+ will increase exponentially for each consecutive connection
+ failure, up to this maximum. To avoid connection storms, a
+ randomization factor of 0.2 will be applied to the backoff
+ resulting in a random range between 20% below and 20% above
+ the computed value. Default: 1000.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 40000.
retry_backoff_ms (int): Milliseconds to backoff when retrying on
@@ -137,6 +145,7 @@ class KafkaClient(object):
'request_timeout_ms': 40000,
'connections_max_idle_ms': 9 * 60 * 1000,
'reconnect_backoff_ms': 50,
+ 'reconnect_backoff_max_ms': 1000,
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
@@ -432,15 +441,7 @@ class KafkaClient(object):
"""
if node_id not in self._conns:
return 0
-
- conn = self._conns[node_id]
- time_waited_ms = time.time() - (conn.last_attempt or 0)
- if conn.disconnected():
- return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0)
- elif conn.connecting():
- return 0
- else:
- return 999999999
+ return self._conns[node_id].connection_delay()
def is_ready(self, node_id, metadata_priority=True):
"""Check whether a node is ready to send more requests.
@@ -655,12 +656,10 @@ class KafkaClient(object):
def least_loaded_node(self):
"""Choose the node with fewest outstanding requests, with fallbacks.
- This method will prefer a node with an existing connection, but will
- potentially choose a node for which we don't yet have a connection if
- all existing connections are in use. This method will never choose a
- node that was disconnected within the reconnect backoff period.
- If all else fails, the method will attempt to bootstrap again using the
- bootstrap_servers list.
+ This method will prefer a node with an existing connection and no
+ in-flight-requests. If no such node is found, a node will be chosen
+ randomly from disconnected nodes that are not "blacked out" (i.e.,
+ are not subject to a reconnect backoff).
Returns:
node_id or None if no suitable node was found
@@ -695,10 +694,6 @@ class KafkaClient(object):
elif 'bootstrap' in self._conns:
return 'bootstrap'
- # Last option: try to bootstrap again
- # this should only happen if no prior bootstrap has been successful
- log.error('No nodes found in metadata -- retrying bootstrap')
- self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
return None
def set_topics(self, topics):