diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 33 |
1 files changed, 14 insertions, 19 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index d8c2389..5308c1f 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -67,6 +67,14 @@ class KafkaClient(object): reconnect_backoff_ms (int): The amount of time in milliseconds to wait before attempting to reconnect to a given host. Default: 50. + reconnect_backoff_max_ms (int): The maximum amount of time in + milliseconds to wait when reconnecting to a broker that has + repeatedly failed to connect. If provided, the backoff per host + will increase exponentially for each consecutive connection + failure, up to this maximum. To avoid connection storms, a + randomization factor of 0.2 will be applied to the backoff + resulting in a random range between 20% below and 20% above + the computed value. Default: 1000. request_timeout_ms (int): Client request timeout in milliseconds. Default: 40000. retry_backoff_ms (int): Milliseconds to backoff when retrying on @@ -137,6 +145,7 @@ class KafkaClient(object): 'request_timeout_ms': 40000, 'connections_max_idle_ms': 9 * 60 * 1000, 'reconnect_backoff_ms': 50, + 'reconnect_backoff_max_ms': 1000, 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, @@ -432,15 +441,7 @@ class KafkaClient(object): """ if node_id not in self._conns: return 0 - - conn = self._conns[node_id] - time_waited_ms = time.time() - (conn.last_attempt or 0) - if conn.disconnected(): - return max(self.config['reconnect_backoff_ms'] - time_waited_ms, 0) - elif conn.connecting(): - return 0 - else: - return 999999999 + return self._conns[node_id].connection_delay() def is_ready(self, node_id, metadata_priority=True): """Check whether a node is ready to send more requests. @@ -655,12 +656,10 @@ class KafkaClient(object): def least_loaded_node(self): """Choose the node with fewest outstanding requests, with fallbacks. - This method will prefer a node with an existing connection, but will - potentially choose a node for which we don't yet have a connection if - all existing connections are in use. This method will never choose a - node that was disconnected within the reconnect backoff period. - If all else fails, the method will attempt to bootstrap again using the - bootstrap_servers list. + This method will prefer a node with an existing connection and no + in-flight-requests. If no such node is found, a node will be chosen + randomly from disconnected nodes that are not "blacked out" (i.e., + are not subject to a reconnect backoff). Returns: node_id or None if no suitable node was found @@ -695,10 +694,6 @@ class KafkaClient(object): elif 'bootstrap' in self._conns: return 'bootstrap' - # Last option: try to bootstrap again - # this should only happen if no prior bootstrap has been successful - log.error('No nodes found in metadata -- retrying bootstrap') - self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) return None def set_topics(self, topics): |