summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <hairyhum@gmail.com>2019-05-03 14:45:04 -0400
committerDaniil Fedotov <hairyhum@gmail.com>2019-05-03 14:45:04 -0400
commit4b4e8a7f4b0101170835df87622606d22104da1f (patch)
tree7dad218c2971477487cc62fed02378140d4dcb67
parenta4033d826ed4f7546019753cb98b1cdca1939210 (diff)
downloadrabbitmq-server-git-4b4e8a7f4b0101170835df87622606d22104da1f.tar.gz
Move check for reader to action function for message store GC.
Message store GC postpones processing of file, which have readers. When performing an action, it asserts that there are no readers. Check for readers may race with readers update by a queue, crashing the message store. Make check and assert work with the same lookup to reduce failure rate. In case of races the queue process should handle exception instead. Addresses #2000 [#165755203]
-rw-r--r--src/rabbit_msg_store.erl71
-rw-r--r--src/rabbit_msg_store_gc.erl16
2 files changed, 53 insertions, 34 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 1ec5fe9e1a..0d1bb994dc 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -23,7 +23,7 @@
client_ref/1, close_all_indicated/1,
write/3, write_flow/3, read/2, contains/2, remove/2]).
--export([set_maximum_since_use/2, has_readers/2, combine_files/3,
+-export([set_maximum_since_use/2, combine_files/3,
delete_file/2]). %% internal
-export([transform_dir/3, force_recovery/2]). %% upgrade
@@ -1970,33 +1970,48 @@ cleanup_after_file_deletion(File,
%% garbage collection / compaction / aggregation -- external
%%----------------------------------------------------------------------------
--spec has_readers(non_neg_integer(), gc_state()) -> boolean().
-
-has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) ->
- [#file_summary { locked = true, readers = Count }] =
- ets:lookup(FileSummaryEts, File),
- Count /= 0.
-
-spec combine_files(non_neg_integer(), non_neg_integer(), gc_state()) ->
- deletion_thunk().
+ {ok, deletion_thunk()} | {defer, non_neg_integer()}.
combine_files(Source, Destination,
- State = #gc_state { file_summary_ets = FileSummaryEts,
- file_handles_ets = FileHandlesEts,
- dir = Dir,
- msg_store = Server }) ->
- [#file_summary {
+ State = #gc_state { file_summary_ets = FileSummaryEts }) ->
+ [#file_summary{locked = true} = SourceSummary] =
+ ets:lookup(FileSummaryEts, Source),
+
+ [#file_summary{locked = true} = DestinationSummary] =
+ ets:lookup(FileSummaryEts, Destination),
+
+ case {SourceSummary, DestinationSummary} of
+ {#file_summary{readers = 0}, #file_summary{readers = 0}} ->
+ {ok, do_combine_files(SourceSummary, DestinationSummary,
+ Source, Destination, State)};
+ _ ->
+ rabbit_log:error("Asked to combine files ~p and ~p, but they have readers. Deferring.",
+ [Source, Destination]),
+ DeferredFiles = [FileSummary#file_summary.file
+ || FileSummary <- [SourceSummary, DestinationSummary],
+ FileSummary#file_summary.readers /= 0],
+ {defer, DeferredFiles}
+ end.
+
+do_combine_files(SourceSummary, DestinationSummary,
+ Source, Destination,
+ State = #gc_state { file_summary_ets = FileSummaryEts,
+ file_handles_ets = FileHandlesEts,
+ dir = Dir,
+ msg_store = Server }) ->
+ #file_summary {
readers = 0,
left = Destination,
valid_total_size = SourceValid,
file_size = SourceFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, Source),
- [#file_summary {
+ locked = true } = SourceSummary,
+ #file_summary {
readers = 0,
right = Source,
valid_total_size = DestinationValid,
file_size = DestinationFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, Destination),
+ locked = true } = DestinationSummary,
SourceName = filenum_to_name(Source),
DestinationName = filenum_to_name(Destination),
@@ -2056,19 +2071,25 @@ combine_files(Source, Destination,
gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}),
safe_file_delete_fun(Source, Dir, FileHandlesEts).
--spec delete_file(non_neg_integer(), gc_state()) -> deletion_thunk().
+-spec delete_file(non_neg_integer(), gc_state()) -> {ok, deletion_thunk()} | {defer, non_neg_integer()}.
delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
file_handles_ets = FileHandlesEts,
dir = Dir,
msg_store = Server }) ->
- [#file_summary { valid_total_size = 0,
- locked = true,
- file_size = FileSize,
- readers = 0 }] = ets:lookup(FileSummaryEts, File),
- {[], 0} = load_and_vacuum_message_file(File, State),
- gen_server2:cast(Server, {delete_file, File, FileSize}),
- safe_file_delete_fun(File, Dir, FileHandlesEts).
+ case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { valid_total_size = 0,
+ locked = true,
+ file_size = FileSize,
+ readers = 0 }] ->
+ {[], 0} = load_and_vacuum_message_file(File, State),
+ gen_server2:cast(Server, {delete_file, File, FileSize}),
+ {ok, safe_file_delete_fun(File, Dir, FileHandlesEts)};
+ [#file_summary{readers = Readers}] when Readers > 0 ->
+ rabbit_log:error("Asked to delete file ~p, but it has readers. Deferring.",
+ [File]),
+ {defer, [File]}
+ end.
load_and_vacuum_message_file(File, State = #gc_state { dir = Dir }) ->
%% Messages here will be end-of-file at start-of-list
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index dfadd5586d..60702b5b95 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -119,15 +119,13 @@ attempt_action(Action, Files,
State = #state { pending_no_readers = Pending,
on_action = Thunks,
msg_store_state = MsgStoreState }) ->
- case [File || File <- Files,
- rabbit_msg_store:has_readers(File, MsgStoreState)] of
- [] -> State #state {
- on_action = lists:filter(
- fun (Thunk) -> not Thunk() end,
- [do_action(Action, Files, MsgStoreState) |
- Thunks]) };
- [File | _] -> Pending1 = maps:put(File, {Action, Files}, Pending),
- State #state { pending_no_readers = Pending1 }
+ case do_action(Action, Files, MsgStoreState) of
+ {ok, OkThunk} ->
+ State#state{on_action = lists:filter(fun (Thunk) -> not Thunk() end,
+ [OkThunk | Thunks])};
+ {defer, [File | _]} ->
+ Pending1 = maps:put(File, {Action, Files}, Pending),
+ State #state { pending_no_readers = Pending1 }
end.
do_action(combine, [Source, Destination], MsgStoreState) ->