diff options
-rw-r--r-- | kafka/client_async.py | 8 | ||||
-rw-r--r-- | test/test_client_async.py | 4 |
2 files changed, 6 insertions, 6 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 0d9e562..b6adb77 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -517,7 +517,7 @@ class KafkaClient(object): Future: resolves to Response struct or Error """ if not self._can_send_request(node_id): - self.maybe_connect(node_id) + self.maybe_connect(node_id, wakeup=wakeup) return Future().failure(Errors.NodeNotReadyError(node_id)) # conn.send will queue the request internally @@ -761,7 +761,7 @@ class KafkaClient(object): return self.cluster.request_update() # This method should be locked when running multi-threaded - def _maybe_refresh_metadata(self): + def _maybe_refresh_metadata(self, wakeup=False): """Send a metadata request if needed. Returns: @@ -792,7 +792,7 @@ class KafkaClient(object): api_version = 0 if self.config['api_version'] < (0, 10) else 1 request = MetadataRequest[api_version](topics) log.debug("Sending metadata request %s to node %s", request, node_id) - future = self.send(node_id, request) + future = self.send(node_id, request, wakeup=wakeup) future.add_callback(self.cluster.update_metadata) future.add_errback(self.cluster.failed_update) @@ -809,7 +809,7 @@ class KafkaClient(object): if self._connecting: return self.config['reconnect_backoff_ms'] - if self.maybe_connect(node_id): + if self.maybe_connect(node_id, wakeup=wakeup): log.debug("Initializing connection to node %s for metadata request", node_id) return self.config['reconnect_backoff_ms'] diff --git a/test/test_client_async.py b/test/test_client_async.py index 3588423..246e36c 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -332,7 +332,7 @@ def test_maybe_refresh_metadata_update(mocker, client): client._poll.assert_called_with(9999.999) # request_timeout_ms assert client._metadata_refresh_in_progress request = MetadataRequest[0]([]) - send.assert_called_once_with('foobar', request) + send.assert_called_once_with('foobar', request, wakeup=False) def test_maybe_refresh_metadata_cant_send(mocker, client): @@ -348,7 +348,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client): # first poll attempts connection client.poll(timeout_ms=12345678) client._poll.assert_called_with(2.222) # reconnect backoff - client.maybe_connect.assert_called_once_with('foobar') + client.maybe_connect.assert_called_once_with('foobar', wakeup=False) # poll while connecting should not attempt a new connection client._connecting.add('foobar') |