diff options
-rw-r--r-- | kafka/client_async.py | 73 | ||||
-rw-r--r-- | kafka/cluster.py | 4 | ||||
-rw-r--r-- | test/test_client_async.py | 102 |
3 files changed, 147 insertions, 32 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 8916a3e..25952be 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -126,6 +126,7 @@ class KafkaClient(object): self.cluster = ClusterMetadata(**self.config) self._topics = set() # empty set will fetch all topic metadata self._metadata_refresh_in_progress = False + self._last_no_node_available_ms = 0 self._selector = selectors.DefaultSelector() self._conns = {} self._connecting = set() @@ -600,38 +601,50 @@ class KafkaClient(object): int: milliseconds until next refresh """ ttl = self.cluster.ttl() - if ttl > 0: - return ttl + next_reconnect_ms = self._last_no_node_available_ms + self.cluster.refresh_backoff() + next_reconnect_ms = max(next_reconnect_ms - time.time() * 1000, 0) + wait_for_in_progress_ms = 9999999999 if self._metadata_refresh_in_progress else 0 + timeout = max(ttl, next_reconnect_ms, wait_for_in_progress_ms) + + if timeout == 0: + node_id = self.least_loaded_node() + if node_id is None: + log.debug("Give up sending metadata request since no node is available") + # mark the timestamp for no node available to connect + self._last_no_node_available_ms = time.time() * 1000 + return timeout + + topics = list(self._topics) + if self.cluster.need_all_topic_metadata: + topics = [] - if self._metadata_refresh_in_progress: - return 9999999999 - - node_id = self.least_loaded_node() - if node_id is None: - return 0 - - topics = list(self._topics) - if self.cluster.need_all_topic_metadata: - topics = [] - - if self._can_send_request(node_id): - request = MetadataRequest[0](topics) - log.debug("Sending metadata request %s to node %s", request, node_id) - future = self.send(node_id, request) - future.add_callback(self.cluster.update_metadata) - future.add_errback(self.cluster.failed_update) - - self._metadata_refresh_in_progress = True - def refresh_done(val_or_error): - self._metadata_refresh_in_progress = False - future.add_callback(refresh_done) - future.add_errback(refresh_done) - - elif self._can_connect(node_id): - log.debug("Initializing connection to node %s for metadata request", node_id) - self._maybe_connect(node_id) + if self._can_send_request(node_id): + request = MetadataRequest[0](topics) + log.debug("Sending metadata request %s to node %s", request, node_id) + future = self.send(node_id, request) + future.add_callback(self.cluster.update_metadata) + future.add_errback(self.cluster.failed_update) + + self._metadata_refresh_in_progress = True + def refresh_done(val_or_error): + self._metadata_refresh_in_progress = False + future.add_callback(refresh_done) + future.add_errback(refresh_done) + + elif self._can_connect(node_id): + log.debug("Initializing connection to node %s for metadata request", node_id) + self._maybe_connect(node_id) + # If initiateConnect failed immediately, this node will be put into blackout and we + # should allow immediately retrying in case there is another candidate node. If it + # is still connecting, the worst case is that we end up setting a longer timeout + # on the next round and then wait for the response. + else: + # connected, but can't send more OR connecting + # In either case, we just need to wait for a network event to let us know the selected + # connection might be usable again. + self._last_no_node_available_ms = time.time() * 1000 - return 0 + return timeout def schedule(self, task, at): """Schedule a new task to be executed at the given time. diff --git a/kafka/cluster.py b/kafka/cluster.py index 3309d1f..9aabec1 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -131,6 +131,10 @@ class ClusterMetadata(object): return max(ttl, next_retry, 0) + def refresh_backoff(self): + """Return milliseconds to wait before attempting to retry after failure""" + return self.config['retry_backoff_ms'] + def request_update(self): """Flags metadata for update, return Future() diff --git a/test/test_client_async.py b/test/test_client_async.py index 5870501..06c2bf5 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -293,8 +293,106 @@ def test_set_topics(): pass -def test_maybe_refresh_metadata(): - pass +def test_maybe_refresh_metadata_ttl(mocker): + mocker.patch.object(KafkaClient, '_bootstrap') + _poll = mocker.patch.object(KafkaClient, '_poll') + + cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) + + tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') + tasks.return_value = 9999999 + + ttl = mocker.patch.object(cli.cluster, 'ttl') + ttl.return_value = 1234 + + cli.poll(timeout_ms=9999999, sleep=True) + _poll.assert_called_with(1.234, sleep=True) + + +def test_maybe_refresh_metadata_backoff(mocker): + mocker.patch.object(KafkaClient, '_bootstrap') + _poll = mocker.patch.object(KafkaClient, '_poll') + + cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) + + tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') + tasks.return_value = 9999999 + + ttl = mocker.patch.object(cli.cluster, 'ttl') + ttl.return_value = 0 + + now = time.time() + t = mocker.patch('time.time') + t.return_value = now + cli._last_no_node_available_ms = now * 1000 + + cli.poll(timeout_ms=9999999, sleep=True) + _poll.assert_called_with(2.222, sleep=True) + + +def test_maybe_refresh_metadata_in_progress(mocker): + mocker.patch.object(KafkaClient, '_bootstrap') + _poll = mocker.patch.object(KafkaClient, '_poll') + + cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) + + tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') + tasks.return_value = 9999999 + + ttl = mocker.patch.object(cli.cluster, 'ttl') + ttl.return_value = 0 + + cli._metadata_refresh_in_progress = True + + cli.poll(timeout_ms=9999999, sleep=True) + _poll.assert_called_with(9999.999, sleep=True) + + +def test_maybe_refresh_metadata_update(mocker): + mocker.patch.object(KafkaClient, '_bootstrap') + _poll = mocker.patch.object(KafkaClient, '_poll') + + cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) + + tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') + tasks.return_value = 9999999 + + ttl = mocker.patch.object(cli.cluster, 'ttl') + ttl.return_value = 0 + + mocker.patch.object(cli, 'least_loaded_node', return_value='foobar') + mocker.patch.object(cli, '_can_send_request', return_value=True) + send = mocker.patch.object(cli, 'send') + + cli.poll(timeout_ms=9999999, sleep=True) + _poll.assert_called_with(0, sleep=True) + assert cli._metadata_refresh_in_progress + request = MetadataRequest[0]([]) + send.assert_called_with('foobar', request) + + +def test_maybe_refresh_metadata_failure(mocker): + mocker.patch.object(KafkaClient, '_bootstrap') + _poll = mocker.patch.object(KafkaClient, '_poll') + + cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222) + + tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') + tasks.return_value = 9999999 + + ttl = mocker.patch.object(cli.cluster, 'ttl') + ttl.return_value = 0 + + mocker.patch.object(cli, 'least_loaded_node', return_value='foobar') + + now = time.time() + t = mocker.patch('time.time') + t.return_value = now + + cli.poll(timeout_ms=9999999, sleep=True) + _poll.assert_called_with(0, sleep=True) + assert cli._last_no_node_available_ms == now * 1000 + assert not cli._metadata_refresh_in_progress def test_schedule(): |