diff options
| author | Daniil Fedotov <hairyhum@gmail.com> | 2019-05-03 14:45:04 -0400 |
|---|---|---|
| committer | Daniil Fedotov <hairyhum@gmail.com> | 2019-05-03 14:45:04 -0400 |
| commit | 4b4e8a7f4b0101170835df87622606d22104da1f (patch) | |
| tree | 7dad218c2971477487cc62fed02378140d4dcb67 | |
| parent | a4033d826ed4f7546019753cb98b1cdca1939210 (diff) | |
| download | rabbitmq-server-git-4b4e8a7f4b0101170835df87622606d22104da1f.tar.gz | |
Move check for reader to action function for message store GC.
Message store GC postpones processing of file, which have readers.
When performing an action, it asserts that there are no readers.
Check for readers may race with readers update by a queue, crashing
the message store.
Make check and assert work with the same lookup to reduce failure rate.
In case of races the queue process should handle exception instead.
Addresses #2000
[#165755203]
| -rw-r--r-- | src/rabbit_msg_store.erl | 71 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 16 |
2 files changed, 53 insertions, 34 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 1ec5fe9e1a..0d1bb994dc 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -23,7 +23,7 @@ client_ref/1, close_all_indicated/1, write/3, write_flow/3, read/2, contains/2, remove/2]). --export([set_maximum_since_use/2, has_readers/2, combine_files/3, +-export([set_maximum_since_use/2, combine_files/3, delete_file/2]). %% internal -export([transform_dir/3, force_recovery/2]). %% upgrade @@ -1970,33 +1970,48 @@ cleanup_after_file_deletion(File, %% garbage collection / compaction / aggregation -- external %%---------------------------------------------------------------------------- --spec has_readers(non_neg_integer(), gc_state()) -> boolean(). - -has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) -> - [#file_summary { locked = true, readers = Count }] = - ets:lookup(FileSummaryEts, File), - Count /= 0. - -spec combine_files(non_neg_integer(), non_neg_integer(), gc_state()) -> - deletion_thunk(). + {ok, deletion_thunk()} | {defer, non_neg_integer()}. combine_files(Source, Destination, - State = #gc_state { file_summary_ets = FileSummaryEts, - file_handles_ets = FileHandlesEts, - dir = Dir, - msg_store = Server }) -> - [#file_summary { + State = #gc_state { file_summary_ets = FileSummaryEts }) -> + [#file_summary{locked = true} = SourceSummary] = + ets:lookup(FileSummaryEts, Source), + + [#file_summary{locked = true} = DestinationSummary] = + ets:lookup(FileSummaryEts, Destination), + + case {SourceSummary, DestinationSummary} of + {#file_summary{readers = 0}, #file_summary{readers = 0}} -> + {ok, do_combine_files(SourceSummary, DestinationSummary, + Source, Destination, State)}; + _ -> + rabbit_log:error("Asked to combine files ~p and ~p, but they have readers. Deferring.", + [Source, Destination]), + DeferredFiles = [FileSummary#file_summary.file + || FileSummary <- [SourceSummary, DestinationSummary], + FileSummary#file_summary.readers /= 0], + {defer, DeferredFiles} + end. + +do_combine_files(SourceSummary, DestinationSummary, + Source, Destination, + State = #gc_state { file_summary_ets = FileSummaryEts, + file_handles_ets = FileHandlesEts, + dir = Dir, + msg_store = Server }) -> + #file_summary { readers = 0, left = Destination, valid_total_size = SourceValid, file_size = SourceFileSize, - locked = true }] = ets:lookup(FileSummaryEts, Source), - [#file_summary { + locked = true } = SourceSummary, + #file_summary { readers = 0, right = Source, valid_total_size = DestinationValid, file_size = DestinationFileSize, - locked = true }] = ets:lookup(FileSummaryEts, Destination), + locked = true } = DestinationSummary, SourceName = filenum_to_name(Source), DestinationName = filenum_to_name(Destination), @@ -2056,19 +2071,25 @@ combine_files(Source, Destination, gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}), safe_file_delete_fun(Source, Dir, FileHandlesEts). --spec delete_file(non_neg_integer(), gc_state()) -> deletion_thunk(). +-spec delete_file(non_neg_integer(), gc_state()) -> {ok, deletion_thunk()} | {defer, non_neg_integer()}. delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, file_handles_ets = FileHandlesEts, dir = Dir, msg_store = Server }) -> - [#file_summary { valid_total_size = 0, - locked = true, - file_size = FileSize, - readers = 0 }] = ets:lookup(FileSummaryEts, File), - {[], 0} = load_and_vacuum_message_file(File, State), - gen_server2:cast(Server, {delete_file, File, FileSize}), - safe_file_delete_fun(File, Dir, FileHandlesEts). + case ets:lookup(FileSummaryEts, File) of + [#file_summary { valid_total_size = 0, + locked = true, + file_size = FileSize, + readers = 0 }] -> + {[], 0} = load_and_vacuum_message_file(File, State), + gen_server2:cast(Server, {delete_file, File, FileSize}), + {ok, safe_file_delete_fun(File, Dir, FileHandlesEts)}; + [#file_summary{readers = Readers}] when Readers > 0 -> + rabbit_log:error("Asked to delete file ~p, but it has readers. Deferring.", + [File]), + {defer, [File]} + end. load_and_vacuum_message_file(File, State = #gc_state { dir = Dir }) -> %% Messages here will be end-of-file at start-of-list diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index dfadd5586d..60702b5b95 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -119,15 +119,13 @@ attempt_action(Action, Files, State = #state { pending_no_readers = Pending, on_action = Thunks, msg_store_state = MsgStoreState }) -> - case [File || File <- Files, - rabbit_msg_store:has_readers(File, MsgStoreState)] of - [] -> State #state { - on_action = lists:filter( - fun (Thunk) -> not Thunk() end, - [do_action(Action, Files, MsgStoreState) | - Thunks]) }; - [File | _] -> Pending1 = maps:put(File, {Action, Files}, Pending), - State #state { pending_no_readers = Pending1 } + case do_action(Action, Files, MsgStoreState) of + {ok, OkThunk} -> + State#state{on_action = lists:filter(fun (Thunk) -> not Thunk() end, + [OkThunk | Thunks])}; + {defer, [File | _]} -> + Pending1 = maps:put(File, {Action, Files}, Pending), + State #state { pending_no_readers = Pending1 } end. do_action(combine, [Source, Destination], MsgStoreState) -> |
