summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py29
1 files changed, 10 insertions, 19 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index d8c2389..44bc9af 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -67,6 +67,10 @@ class KafkaClient(object):
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host.
Default: 50.
+ reconnect_backoff_max (int): If higher than reconnect_backoff_ms,
+ node reconnect backoff will increase on each consecutive failure
+ up to this maximum. The actual backoff is chosen randomly from
+ an exponentially increasing range. Default: 60000.
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 40000.
retry_backoff_ms (int): Milliseconds to backoff when retrying on
@@ -137,6 +141,7 @@ class KafkaClient(object):
'request_timeout_ms': 40000,
'connections_max_idle_ms': 9 * 60 * 1000,
'reconnect_backoff_ms': 50,
+ 'reconnect_backoff_max': 60000,
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
@@ -432,15 +437,7 @@ class KafkaClient(object):
"""
if node_id not in self._conns:
return 0
-
- conn = self._conns[node_id]
- time_waited_ms = time.time() - (conn.last_attempt or 0)
- if conn.disconnected():
- return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0)
- elif conn.connecting():
- return 0
- else:
- return 999999999
+ return self._conns[node_id].connection_delay()
def is_ready(self, node_id, metadata_priority=True):
"""Check whether a node is ready to send more requests.
@@ -655,12 +652,10 @@ class KafkaClient(object):
def least_loaded_node(self):
"""Choose the node with fewest outstanding requests, with fallbacks.
- This method will prefer a node with an existing connection, but will
- potentially choose a node for which we don't yet have a connection if
- all existing connections are in use. This method will never choose a
- node that was disconnected within the reconnect backoff period.
- If all else fails, the method will attempt to bootstrap again using the
- bootstrap_servers list.
+ This method will prefer a node with an existing connection and no
+ in-flight-requests. If no such node is found, a node will be chosen
+ randomly from disconnected nodes that are not "blacked out" (i.e.,
+ are not subject to a reconnect backoff).
Returns:
node_id or None if no suitable node was found
@@ -695,10 +690,6 @@ class KafkaClient(object):
elif 'bootstrap' in self._conns:
return 'bootstrap'
- # Last option: try to bootstrap again
- # this should only happen if no prior bootstrap has been successful
- log.error('No nodes found in metadata -- retrying bootstrap')
- self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
return None
def set_topics(self, topics):