diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-04-04 16:31:13 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-04-04 17:57:04 -0700 |
commit | d39fdd7fafc4a409e92ce9dfb11245eb7e3ec277 (patch) | |
tree | e9f4280e115dce4cb12ca9a37b71984242294688 | |
parent | 998147dae95020bf7169441118b51dee1a2a69bd (diff) | |
download | kafka-python-metadata_wait_for_connect.tar.gz |
Avoid multiple connection attempts when refreshing metadatametadata_wait_for_connect
-rw-r--r-- | kafka/client_async.py | 92 | ||||
-rw-r--r-- | test/test_client_async.py | 39 |
2 files changed, 73 insertions, 58 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 2d711e4..403c783 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -183,7 +183,6 @@ class KafkaClient(object): self.cluster = ClusterMetadata(**self.config) self._topics = set() # empty set will fetch all topic metadata self._metadata_refresh_in_progress = False - self._last_no_node_available_ms = 0 self._selector = self.config['selector']() self._conns = {} self._connecting = set() @@ -709,50 +708,55 @@ class KafkaClient(object): int: milliseconds until next refresh """ ttl = self.cluster.ttl() - next_reconnect_ms = self._last_no_node_available_ms + self.cluster.refresh_backoff() - next_reconnect_ms = max(next_reconnect_ms - time.time() * 1000, 0) - wait_for_in_progress_ms = 9999999999 if self._metadata_refresh_in_progress else 0 - timeout = max(ttl, next_reconnect_ms, wait_for_in_progress_ms) - - if timeout == 0: - node_id = self.least_loaded_node() - if node_id is None: - log.debug("Give up sending metadata request since no node is available") - # mark the timestamp for no node available to connect - self._last_no_node_available_ms = time.time() * 1000 - return timeout - - if self._can_send_request(node_id): - topics = list(self._topics) - if self.cluster.need_all_topic_metadata or not topics: - topics = [] if self.config['api_version'] < (0, 10) else None - api_version = 0 if self.config['api_version'] < (0, 10) else 1 - request = MetadataRequest[api_version](topics) - log.debug("Sending metadata request %s to node %s", request, node_id) - future = self.send(node_id, request) - future.add_callback(self.cluster.update_metadata) - future.add_errback(self.cluster.failed_update) - - self._metadata_refresh_in_progress = True - def refresh_done(val_or_error): - self._metadata_refresh_in_progress = False - future.add_callback(refresh_done) - future.add_errback(refresh_done) - - elif self._can_connect(node_id): - log.debug("Initializing connection to node %s for metadata request", node_id) - self._maybe_connect(node_id) - # If _maybe_connect failed immediately, this node will be put into blackout and we - # should allow immediately retrying in case there is another candidate node. If it - # is still connecting, the worst case is that we end up setting a longer timeout - # on the next round and then wait for the response. - else: - # connected, but can't send more OR connecting - # In either case, we just need to wait for a network event to let us know the selected - # connection might be usable again. - self._last_no_node_available_ms = time.time() * 1000 + wait_for_in_progress_ms = self.config['request_timeout_ms'] if self._metadata_refresh_in_progress else 0 + metadata_timeout = max(ttl, wait_for_in_progress_ms) - return timeout + if metadata_timeout > 0: + return metadata_timeout + + # Beware that the behavior of this method and the computation of + # timeouts for poll() are highly dependent on the behavior of + # least_loaded_node() + node_id = self.least_loaded_node() + if node_id is None: + log.debug("Give up sending metadata request since no node is available"); + return self.config['reconnect_backoff_ms'] + + if self._can_send_request(node_id): + topics = list(self._topics) + if self.cluster.need_all_topic_metadata or not topics: + topics = [] if self.config['api_version'] < (0, 10) else None + api_version = 0 if self.config['api_version'] < (0, 10) else 1 + request = MetadataRequest[api_version](topics) + log.debug("Sending metadata request %s to node %s", request, node_id) + future = self.send(node_id, request) + future.add_callback(self.cluster.update_metadata) + future.add_errback(self.cluster.failed_update) + + self._metadata_refresh_in_progress = True + def refresh_done(val_or_error): + self._metadata_refresh_in_progress = False + future.add_callback(refresh_done) + future.add_errback(refresh_done) + return self.config['request_timeout_ms'] + + # If there's any connection establishment underway, wait until it completes. This prevents + # the client from unnecessarily connecting to additional nodes while a previous connection + # attempt has not been completed. + if self._connecting: + # Strictly the timeout we should return here is "connect timeout", but as we don't + # have such application level configuration, using request timeout instead. + return self.config['request_timeout_ms'] + + if self._can_connect(node_id): + log.debug("Initializing connection to node %s for metadata request", node_id) + self._maybe_connect(node_id) + return self.config['reconnect_backoff_ms'] + + # connected but can't send more, OR connecting + # In either case we just need to wait for a network event + # to let us know the selected connection might be usable again. + return float('inf') def schedule(self, task, at): """Schedule a new task to be executed at the given time. diff --git a/test/test_client_async.py b/test/test_client_async.py index 97be827..8f6ac3f 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -319,7 +319,7 @@ def client(mocker): mocker.patch.object(KafkaClient, '_bootstrap') _poll = mocker.patch.object(KafkaClient, '_poll') - cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222, api_version=(0, 9)) + cli = KafkaClient(request_timeout_ms=9999999, reconnect_backoff_ms=2222, api_version=(0, 9)) tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') tasks.return_value = 9999999 @@ -332,7 +332,7 @@ def client(mocker): def test_maybe_refresh_metadata_ttl(mocker, client): client.cluster.ttl.return_value = 1234 - client.poll(timeout_ms=9999999, sleep=True) + client.poll(timeout_ms=12345678, sleep=True) client._poll.assert_called_with(1.234, sleep=True) @@ -340,17 +340,16 @@ def test_maybe_refresh_metadata_backoff(mocker, client): now = time.time() t = mocker.patch('time.time') t.return_value = now - client._last_no_node_available_ms = now * 1000 - client.poll(timeout_ms=9999999, sleep=True) - client._poll.assert_called_with(2.222, sleep=True) + client.poll(timeout_ms=12345678, sleep=True) + client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff def test_maybe_refresh_metadata_in_progress(mocker, client): client._metadata_refresh_in_progress = True - client.poll(timeout_ms=9999999, sleep=True) - client._poll.assert_called_with(9999.999, sleep=True) + client.poll(timeout_ms=12345678, sleep=True) + client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms def test_maybe_refresh_metadata_update(mocker, client): @@ -358,23 +357,35 @@ def test_maybe_refresh_metadata_update(mocker, client): mocker.patch.object(client, '_can_send_request', return_value=True) send = mocker.patch.object(client, 'send') - client.poll(timeout_ms=9999999, sleep=True) - client._poll.assert_called_with(0, sleep=True) + client.poll(timeout_ms=12345678, sleep=True) + client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms assert client._metadata_refresh_in_progress request = MetadataRequest[0]([]) - send.assert_called_with('foobar', request) + send.assert_called_once_with('foobar', request) -def test_maybe_refresh_metadata_failure(mocker, client): +def test_maybe_refresh_metadata_cant_send(mocker, client): mocker.patch.object(client, 'least_loaded_node', return_value='foobar') + mocker.patch.object(client, '_can_connect', return_value=True) + mocker.patch.object(client, '_maybe_connect', return_value=True) now = time.time() t = mocker.patch('time.time') t.return_value = now - client.poll(timeout_ms=9999999, sleep=True) - client._poll.assert_called_with(0, sleep=True) - assert client._last_no_node_available_ms == now * 1000 + # first poll attempts connection + client.poll(timeout_ms=12345678, sleep=True) + client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff + client._can_connect.assert_called_once_with('foobar') + client._maybe_connect.assert_called_once_with('foobar') + + # poll while connecting should not attempt a new connection + client._connecting.add('foobar') + client._can_connect.reset_mock() + client.poll(timeout_ms=12345678, sleep=True) + client._poll.assert_called_with(9999.999, sleep=True) # connection timeout (request timeout) + assert not client._can_connect.called + assert not client._metadata_refresh_in_progress |