summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichal Kuratczyk <mkuratczyk@vmware.com>2023-03-22 09:18:41 +0000
committerMichal Kuratczyk <mkuratczyk@vmware.com>2023-03-23 16:17:58 +0000
commit9d93d0581ba0adf03664bfca5576b5d8bfac1e6f (patch)
tree2f566294504070629a9302a7723550b223080452
parent2b1a80f1e977a652f526377bbe8e3256fcaaca34 (diff)
downloadrabbitmq-server-git-per-queue-recovery-terms.tar.gz
Recovery terms in per-queue files instead of DETSper-queue-recovery-terms
Per-vhost DETS file with recovery terms for all queues is a bottleneck when stopping RabbitMQ - all queues try save their state, leading to a very long file server mailbox and very unpredictable time to stop RabbitMQ (on my machine it can vary from 20 seconds to 5 minutes with 100k classic queues). In this PR we can still read the recovery terms from DETS but we only save them in per-queue files. This way each queue can quickly store its state. Under the same condition, my machine can consistently stop RabbitMQ in 15 seconds or so. The tradeoff is a slower startup time: on my machine, it goes up from 29 seconds to 38 seconds, but that's still better than what we had until https://github.com/rabbitmq/rabbitmq-server/pull/7676 was merged a few days ago. More importantly, the total of stop+start is lower and more predictable. This PR also improves shutdown with many classic queues v1. Startup time with 100k CQv1s is so long and unpredictable that it's hard to even tell if this PR affects it (it varies from 4 to 8 minutes for me). Unfortunately this PR makes startup on MacOS slower (~55s instead of 30s for me), but we don't have to optimise for that. In most cases (with much fewer queues), it won't be noticable anyway.
-rw-r--r--deps/rabbit/src/rabbit_classic_queue_index_v2.erl20
-rw-r--r--deps/rabbit/src/rabbit_file.erl16
-rw-r--r--deps/rabbit/src/rabbit_queue_index.erl42
-rw-r--r--deps/rabbit/src/rabbit_recovery_terms.erl17
-rw-r--r--deps/rabbit/src/rabbit_vhost.erl2
-rw-r--r--deps/rabbit/test/backing_queue_SUITE.erl2
6 files changed, 59 insertions, 40 deletions
diff --git a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl
index 51937558d7..196c08f164 100644
--- a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl
+++ b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl
@@ -205,9 +205,10 @@ init1(Name, Dir, OnSyncFun, OnSyncMsgFun) ->
ensure_queue_name_stub_file(#resource{virtual_host = VHost, name = QName}, Dir) ->
QueueNameFile = filename:join(Dir, ?QUEUE_NAME_STUB_FILE),
- ok = write_file_and_ensure_dir(QueueNameFile, <<"VHOST: ", VHost/binary, "\n",
- "QUEUE: ", QName/binary, "\n",
- "INDEX: v2\n">>).
+ ok = rabbit_file:write_file_and_ensure_dir(QueueNameFile,
+ <<"VHOST: ", VHost/binary, "\n",
+ "QUEUE: ", QName/binary, "\n",
+ "INDEX: v2\n">>).
-spec reset_state(State) -> State when State::state().
@@ -542,7 +543,7 @@ terminate(VHost, Terms, State0 = #qi { dir = Dir,
file_handle_cache:release_reservation(),
%% Write recovery terms for faster recovery.
_ = rabbit_recovery_terms:store(VHost,
- filename:basename(rabbit_file:binary_to_filename(Dir)),
+ rabbit_file:binary_to_filename(Dir),
[{v2_index_state, {?VERSION, Segments}} | Terms]),
State#qi{ segments = #{},
fds = #{} }.
@@ -1291,14 +1292,3 @@ highest_continuous_seq_id([SeqId1, SeqId2|Tail], EndSeqId)
highest_continuous_seq_id([SeqId2|Tail], EndSeqId);
highest_continuous_seq_id([SeqId|Tail], _) ->
{SeqId, Tail}.
-
-write_file_and_ensure_dir(Name, IOData) ->
- case file:write_file(Name, IOData, [raw]) of
- ok -> ok;
- {error, enoent} ->
- case filelib:ensure_dir(Name) of
- ok -> file:write_file(Name, IOData, [raw]);
- Err -> Err
- end;
- Err -> Err
- end.
diff --git a/deps/rabbit/src/rabbit_file.erl b/deps/rabbit/src/rabbit_file.erl
index 8115be6923..f9c963c511 100644
--- a/deps/rabbit/src/rabbit_file.erl
+++ b/deps/rabbit/src/rabbit_file.erl
@@ -17,6 +17,7 @@
-export([read_file_info/1]).
-export([filename_as_a_directory/1]).
-export([filename_to_binary/1, binary_to_filename/1]).
+-export([write_file_and_ensure_dir/2]).
-import(file_handle_cache, [with_handle/1, with_handle/2]).
@@ -349,3 +350,18 @@ binary_to_filename(Bin) when is_binary(Bin) ->
Other ->
erlang:error(Other)
end.
+
+%% Try to write a file and if it fails, ensure_dir and try again.
+%% This is an optimisation since ensuring dir takes time and often
+%% we can assume the folder exists already.
+-spec write_file_and_ensure_dir(file:filename(), iodata()) -> ok_or_error().
+write_file_and_ensure_dir(Name, IOData) ->
+ case file:write_file(Name, IOData, [raw]) of
+ ok -> ok;
+ {error, enoent} ->
+ case filelib:ensure_dir(Name) of
+ ok -> file:write_file(Name, IOData, [raw]);
+ Err -> Err
+ end;
+ Err -> Err
+ end.
diff --git a/deps/rabbit/src/rabbit_queue_index.erl b/deps/rabbit/src/rabbit_queue_index.erl
index 37a05263fd..2dd4173a32 100644
--- a/deps/rabbit/src/rabbit_queue_index.erl
+++ b/deps/rabbit/src/rabbit_queue_index.erl
@@ -333,8 +333,7 @@ recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
terminate(VHost, Terms, State = #qistate { dir = Dir }) ->
{SegmentCounts, State1} = terminate(State),
- _ = rabbit_recovery_terms:store(VHost, filename:basename(Dir),
- [{segments, SegmentCounts} | Terms]),
+ _ = rabbit_recovery_terms:store(VHost, Dir, [{segments, SegmentCounts} | Terms]),
State1.
-spec delete_and_terminate(qistate()) -> qistate().
@@ -537,25 +536,28 @@ bounds(State = #qistate { segments = Segments }) ->
start(VHost, DurableQueueNames) ->
ok = rabbit_recovery_terms:start(VHost),
+ QueuesFolder = filename:join([rabbit_vhost:msg_store_dir_path(VHost), "queues"]),
{DurableTerms, DurableDirectories} =
- lists:foldl(
- fun(QName, {RecoveryTerms, ValidDirectories}) ->
- DirName = queue_name_to_dir_name(QName),
- RecoveryInfo = case rabbit_recovery_terms:read(VHost, DirName) of
- {error, _} -> non_clean_shutdown;
- {ok, Terms} -> Terms
- end,
- {[RecoveryInfo | RecoveryTerms],
- sets:add_element(DirName, ValidDirectories)}
- end, {[], sets:new()}, DurableQueueNames),
+ lists:foldl(
+ fun(QName, {RecoveryTerms, ValidDirectories}) ->
+ DirName = queue_name_to_dir_name(QName),
+ QueueDir = filename:join([QueuesFolder, DirName]),
+ RecoveryInfo = case rabbit_recovery_terms:read(VHost, QueueDir) of
+ {ok, Terms} -> Terms;
+ {error, _} -> non_clean_shutdown
+ end,
+ {[RecoveryInfo | RecoveryTerms],
+ sets:add_element(DirName, ValidDirectories)}
+ end, {[], sets:new()}, DurableQueueNames),
%% Any queue directory we've not been asked to recover is considered garbage
- ToDelete = [filename:join([rabbit_vhost:msg_store_dir_path(VHost), "queues", Dir])
- || Dir <- lists:subtract(all_queue_directory_names(VHost),
- sets:to_list(DurableDirectories))],
- rabbit_log:debug("Deleting unknown files/folders: ~p", [ToDelete]),
- _ = rabbit_file:recursive_delete(ToDelete),
-
- rabbit_recovery_terms:clear(VHost),
+ _ = case [filename:join([QueuesFolder, Dir])
+ || Dir <- lists:subtract(all_queue_directory_names(VHost),
+ sets:to_list(DurableDirectories))] of
+ [] -> ok;
+ ToDelete ->
+ rabbit_log:debug("Deleting unknown files/folders: ~p", [ToDelete]),
+ _ = rabbit_file:recursive_delete(ToDelete)
+ end,
%% The backing queue interface requires that the queue recovery terms
%% which come back from start/1 are in the same order as DurableQueueNames
@@ -796,7 +798,7 @@ recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount, _MaxJournal
SegmentAndDirtyCount;
recover_message( true, false, no_del, RelSeq, {Segment, _DirtyCount}, MaxJournal) ->
%% force to flush the segment
- {add_to_journal(RelSeq, del, Segment), MaxJournal + 1};
+ {add_to_journal(RelSeq, del, Segment), MaxJournal + 1};
recover_message(false, _, del, RelSeq, {Segment, DirtyCount}, _MaxJournal) ->
{add_to_journal(RelSeq, ack, Segment), DirtyCount + 1};
recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}, _MaxJournal) ->
diff --git a/deps/rabbit/src/rabbit_recovery_terms.erl b/deps/rabbit/src/rabbit_recovery_terms.erl
index cae1a161e1..66283785d1 100644
--- a/deps/rabbit/src/rabbit_recovery_terms.erl
+++ b/deps/rabbit/src/rabbit_recovery_terms.erl
@@ -61,12 +61,23 @@ stop(VHost) ->
-spec store(rabbit_types:vhost(), file:filename(), term()) -> rabbit_types:ok_or_error(term()).
-store(VHost, DirBaseName, Terms) ->
- dets:insert(VHost, {DirBaseName, Terms}).
+store(_VHost, QueueDir, Terms) ->
+ RecoveryFile = filename:join([QueueDir, ".recovery"]),
+ _ = rabbit_file:write_file_and_ensure_dir(RecoveryFile, term_to_iovec(Terms)).
-spec read(rabbit_types:vhost(), file:filename()) -> rabbit_types:ok_or_error2(term(), not_found).
-read(VHost, DirBaseName) ->
+read(VHost, QueueDir) ->
+ RecoveryFile = filename:join([rabbit_vhost:msg_store_dir_path(VHost), QueueDir, ".recovery"]),
+ case file:read_file(RecoveryFile) of
+ {ok, TermsBin} ->
+ _ = prim_file:delete(RecoveryFile),
+ {ok, binary_to_term(TermsBin)};
+ {error, _} ->
+ read_legacy(VHost, QueueDir)
+ end.
+
+read_legacy(VHost, DirBaseName) ->
case dets:lookup(VHost, DirBaseName) of
[{_, Terms}] -> {ok, Terms};
_ -> {error, not_found}
diff --git a/deps/rabbit/src/rabbit_vhost.erl b/deps/rabbit/src/rabbit_vhost.erl
index 24bca6f808..ebb500824a 100644
--- a/deps/rabbit/src/rabbit_vhost.erl
+++ b/deps/rabbit/src/rabbit_vhost.erl
@@ -56,7 +56,7 @@ recover(VHost) ->
rabbit_log:info("Making sure data directory '~ts' for vhost '~ts' exists",
[VHostDir, VHost]),
VHostStubFile = filename:join(VHostDir, ".vhost"),
- ok = rabbit_file:ensure_dir(VHostStubFile),
+ ok = filelib:ensure_dir(VHostStubFile),
ok = file:write_file(VHostStubFile, VHost),
ok = ensure_config_file(VHost),
{Recovered, Failed} = rabbit_amqqueue:recover(VHost),
diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl
index 4901f27fd7..791a0495ef 100644
--- a/deps/rabbit/test/backing_queue_SUITE.erl
+++ b/deps/rabbit/test/backing_queue_SUITE.erl
@@ -1589,7 +1589,7 @@ variable_queue_read_terms(QName) ->
virtual_host = VHost,
name = Name } = QName,
<<Num:128>> = erlang:md5(<<"queue", VHost/binary, Name/binary>>),
- DirName = rabbit_misc:format("~.36B", [Num]),
+ DirName = filename:join([rabbit_vhost:msg_store_dir_path(VHost), "queues", rabbit_misc:format("~.36B", [Num])]),
{ok, Terms} = rabbit_recovery_terms:read(VHost, DirName),
Terms.