diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 0d9e562..b6adb77 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -517,7 +517,7 @@ class KafkaClient(object): Future: resolves to Response struct or Error """ if not self._can_send_request(node_id): - self.maybe_connect(node_id) + self.maybe_connect(node_id, wakeup=wakeup) return Future().failure(Errors.NodeNotReadyError(node_id)) # conn.send will queue the request internally @@ -761,7 +761,7 @@ class KafkaClient(object): return self.cluster.request_update() # This method should be locked when running multi-threaded - def _maybe_refresh_metadata(self): + def _maybe_refresh_metadata(self, wakeup=False): """Send a metadata request if needed. Returns: @@ -792,7 +792,7 @@ class KafkaClient(object): api_version = 0 if self.config['api_version'] < (0, 10) else 1 request = MetadataRequest[api_version](topics) log.debug("Sending metadata request %s to node %s", request, node_id) - future = self.send(node_id, request) + future = self.send(node_id, request, wakeup=wakeup) future.add_callback(self.cluster.update_metadata) future.add_errback(self.cluster.failed_update) @@ -809,7 +809,7 @@ class KafkaClient(object): if self._connecting: return self.config['reconnect_backoff_ms'] - if self.maybe_connect(node_id): + if self.maybe_connect(node_id, wakeup=wakeup): log.debug("Initializing connection to node %s for metadata request", node_id) return self.config['reconnect_backoff_ms'] |