diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 92 |
1 files changed, 48 insertions, 44 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 2d711e4..403c783 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -183,7 +183,6 @@ class KafkaClient(object): self.cluster = ClusterMetadata(**self.config) self._topics = set() # empty set will fetch all topic metadata self._metadata_refresh_in_progress = False - self._last_no_node_available_ms = 0 self._selector = self.config['selector']() self._conns = {} self._connecting = set() @@ -709,50 +708,55 @@ class KafkaClient(object): int: milliseconds until next refresh """ ttl = self.cluster.ttl() - next_reconnect_ms = self._last_no_node_available_ms + self.cluster.refresh_backoff() - next_reconnect_ms = max(next_reconnect_ms - time.time() * 1000, 0) - wait_for_in_progress_ms = 9999999999 if self._metadata_refresh_in_progress else 0 - timeout = max(ttl, next_reconnect_ms, wait_for_in_progress_ms) - - if timeout == 0: - node_id = self.least_loaded_node() - if node_id is None: - log.debug("Give up sending metadata request since no node is available") - # mark the timestamp for no node available to connect - self._last_no_node_available_ms = time.time() * 1000 - return timeout - - if self._can_send_request(node_id): - topics = list(self._topics) - if self.cluster.need_all_topic_metadata or not topics: - topics = [] if self.config['api_version'] < (0, 10) else None - api_version = 0 if self.config['api_version'] < (0, 10) else 1 - request = MetadataRequest[api_version](topics) - log.debug("Sending metadata request %s to node %s", request, node_id) - future = self.send(node_id, request) - future.add_callback(self.cluster.update_metadata) - future.add_errback(self.cluster.failed_update) - - self._metadata_refresh_in_progress = True - def refresh_done(val_or_error): - self._metadata_refresh_in_progress = False - future.add_callback(refresh_done) - future.add_errback(refresh_done) - - elif self._can_connect(node_id): - log.debug("Initializing connection to node %s for metadata request", node_id) - self._maybe_connect(node_id) - # If _maybe_connect failed immediately, this node will be put into blackout and we - # should allow immediately retrying in case there is another candidate node. If it - # is still connecting, the worst case is that we end up setting a longer timeout - # on the next round and then wait for the response. - else: - # connected, but can't send more OR connecting - # In either case, we just need to wait for a network event to let us know the selected - # connection might be usable again. - self._last_no_node_available_ms = time.time() * 1000 + wait_for_in_progress_ms = self.config['request_timeout_ms'] if self._metadata_refresh_in_progress else 0 + metadata_timeout = max(ttl, wait_for_in_progress_ms) - return timeout + if metadata_timeout > 0: + return metadata_timeout + + # Beware that the behavior of this method and the computation of + # timeouts for poll() are highly dependent on the behavior of + # least_loaded_node() + node_id = self.least_loaded_node() + if node_id is None: + log.debug("Give up sending metadata request since no node is available"); + return self.config['reconnect_backoff_ms'] + + if self._can_send_request(node_id): + topics = list(self._topics) + if self.cluster.need_all_topic_metadata or not topics: + topics = [] if self.config['api_version'] < (0, 10) else None + api_version = 0 if self.config['api_version'] < (0, 10) else 1 + request = MetadataRequest[api_version](topics) + log.debug("Sending metadata request %s to node %s", request, node_id) + future = self.send(node_id, request) + future.add_callback(self.cluster.update_metadata) + future.add_errback(self.cluster.failed_update) + + self._metadata_refresh_in_progress = True + def refresh_done(val_or_error): + self._metadata_refresh_in_progress = False + future.add_callback(refresh_done) + future.add_errback(refresh_done) + return self.config['request_timeout_ms'] + + # If there's any connection establishment underway, wait until it completes. This prevents + # the client from unnecessarily connecting to additional nodes while a previous connection + # attempt has not been completed. + if self._connecting: + # Strictly the timeout we should return here is "connect timeout", but as we don't + # have such application level configuration, using request timeout instead. + return self.config['request_timeout_ms'] + + if self._can_connect(node_id): + log.debug("Initializing connection to node %s for metadata request", node_id) + self._maybe_connect(node_id) + return self.config['reconnect_backoff_ms'] + + # connected but can't send more, OR connecting + # In either case we just need to wait for a network event + # to let us know the selected connection might be usable again. + return float('inf') def schedule(self, task, at): """Schedule a new task to be executed at the given time. |