diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 40 |
1 files changed, 17 insertions, 23 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 62b0095..8ce436e 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -524,24 +524,21 @@ class KafkaClient(object): Returns: node_id or None if no suitable node was found """ - nodes = list(self._conns.keys()) + nodes = [broker.nodeId for broker in self.cluster.brokers()] random.shuffle(nodes) - # If there's a lingering bootstrap node, always try it last - # really we should just kill this connection - if 'bootstrap' in nodes: - nodes.remove('bootstrap') - nodes.append('bootstrap') - inflight = float('inf') found = None for node_id in nodes: - conn = self._conns[node_id] - curr_inflight = len(conn.in_flight_requests) - if curr_inflight == 0 and conn.connected(): - # if we find an established connection with no in-flight requests we can stop right away + conn = self._conns.get(node_id) + connected = conn is not None and conn.connected() + blacked_out = conn is not None and conn.blacked_out() + curr_inflight = len(conn.in_flight_requests) if conn else 0 + if connected and curr_inflight == 0: + # if we find an established connection + # with no in-flight requests, we can stop right away return node_id - elif not conn.blacked_out() and curr_inflight < inflight: + elif not blacked_out and curr_inflight < inflight: # otherwise if this is the best we have found so far, record that inflight = curr_inflight found = node_id @@ -549,19 +546,16 @@ class KafkaClient(object): if found is not None: return found - # if we found no connected node, return a disconnected one - log.debug("No connected nodes found. Trying disconnected nodes.") - for node_id in nodes: - if not self._conns[node_id].blacked_out(): - return node_id - - # if still no luck, look for a node not in self._conns yet - log.debug("No luck. Trying all broker metadata") - for broker in self.cluster.brokers(): - if broker.nodeId not in self._conns: - return broker.nodeId + # some broker versions return an empty list of broker metadata + # if there are no topics created yet. the bootstrap process + # should detect this and keep a 'bootstrap' node alive until + # a non-bootstrap node is connected and non-empty broker + # metadata is available + elif 'bootstrap' in self._conns: + return 'bootstrap' # Last option: try to bootstrap again + # this should only happen if no prior bootstrap has been successful log.error('No nodes found in metadata -- retrying bootstrap') self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) return None |