diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 71 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 16 |
2 files changed, 53 insertions, 34 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 1ec5fe9e1a..0d1bb994dc 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -23,7 +23,7 @@ client_ref/1, close_all_indicated/1, write/3, write_flow/3, read/2, contains/2, remove/2]). --export([set_maximum_since_use/2, has_readers/2, combine_files/3, +-export([set_maximum_since_use/2, combine_files/3, delete_file/2]). %% internal -export([transform_dir/3, force_recovery/2]). %% upgrade @@ -1970,33 +1970,48 @@ cleanup_after_file_deletion(File, %% garbage collection / compaction / aggregation -- external %%---------------------------------------------------------------------------- --spec has_readers(non_neg_integer(), gc_state()) -> boolean(). - -has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) -> - [#file_summary { locked = true, readers = Count }] = - ets:lookup(FileSummaryEts, File), - Count /= 0. - -spec combine_files(non_neg_integer(), non_neg_integer(), gc_state()) -> - deletion_thunk(). + {ok, deletion_thunk()} | {defer, non_neg_integer()}. combine_files(Source, Destination, - State = #gc_state { file_summary_ets = FileSummaryEts, - file_handles_ets = FileHandlesEts, - dir = Dir, - msg_store = Server }) -> - [#file_summary { + State = #gc_state { file_summary_ets = FileSummaryEts }) -> + [#file_summary{locked = true} = SourceSummary] = + ets:lookup(FileSummaryEts, Source), + + [#file_summary{locked = true} = DestinationSummary] = + ets:lookup(FileSummaryEts, Destination), + + case {SourceSummary, DestinationSummary} of + {#file_summary{readers = 0}, #file_summary{readers = 0}} -> + {ok, do_combine_files(SourceSummary, DestinationSummary, + Source, Destination, State)}; + _ -> + rabbit_log:error("Asked to combine files ~p and ~p, but they have readers. Deferring.", + [Source, Destination]), + DeferredFiles = [FileSummary#file_summary.file + || FileSummary <- [SourceSummary, DestinationSummary], + FileSummary#file_summary.readers /= 0], + {defer, DeferredFiles} + end. + +do_combine_files(SourceSummary, DestinationSummary, + Source, Destination, + State = #gc_state { file_summary_ets = FileSummaryEts, + file_handles_ets = FileHandlesEts, + dir = Dir, + msg_store = Server }) -> + #file_summary { readers = 0, left = Destination, valid_total_size = SourceValid, file_size = SourceFileSize, - locked = true }] = ets:lookup(FileSummaryEts, Source), - [#file_summary { + locked = true } = SourceSummary, + #file_summary { readers = 0, right = Source, valid_total_size = DestinationValid, file_size = DestinationFileSize, - locked = true }] = ets:lookup(FileSummaryEts, Destination), + locked = true } = DestinationSummary, SourceName = filenum_to_name(Source), DestinationName = filenum_to_name(Destination), @@ -2056,19 +2071,25 @@ combine_files(Source, Destination, gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}), safe_file_delete_fun(Source, Dir, FileHandlesEts). --spec delete_file(non_neg_integer(), gc_state()) -> deletion_thunk(). +-spec delete_file(non_neg_integer(), gc_state()) -> {ok, deletion_thunk()} | {defer, non_neg_integer()}. delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, file_handles_ets = FileHandlesEts, dir = Dir, msg_store = Server }) -> - [#file_summary { valid_total_size = 0, - locked = true, - file_size = FileSize, - readers = 0 }] = ets:lookup(FileSummaryEts, File), - {[], 0} = load_and_vacuum_message_file(File, State), - gen_server2:cast(Server, {delete_file, File, FileSize}), - safe_file_delete_fun(File, Dir, FileHandlesEts). + case ets:lookup(FileSummaryEts, File) of + [#file_summary { valid_total_size = 0, + locked = true, + file_size = FileSize, + readers = 0 }] -> + {[], 0} = load_and_vacuum_message_file(File, State), + gen_server2:cast(Server, {delete_file, File, FileSize}), + {ok, safe_file_delete_fun(File, Dir, FileHandlesEts)}; + [#file_summary{readers = Readers}] when Readers > 0 -> + rabbit_log:error("Asked to delete file ~p, but it has readers. Deferring.", + [File]), + {defer, [File]} + end. load_and_vacuum_message_file(File, State = #gc_state { dir = Dir }) -> %% Messages here will be end-of-file at start-of-list diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index dfadd5586d..60702b5b95 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -119,15 +119,13 @@ attempt_action(Action, Files, State = #state { pending_no_readers = Pending, on_action = Thunks, msg_store_state = MsgStoreState }) -> - case [File || File <- Files, - rabbit_msg_store:has_readers(File, MsgStoreState)] of - [] -> State #state { - on_action = lists:filter( - fun (Thunk) -> not Thunk() end, - [do_action(Action, Files, MsgStoreState) | - Thunks]) }; - [File | _] -> Pending1 = maps:put(File, {Action, Files}, Pending), - State #state { pending_no_readers = Pending1 } + case do_action(Action, Files, MsgStoreState) of + {ok, OkThunk} -> + State#state{on_action = lists:filter(fun (Thunk) -> not Thunk() end, + [OkThunk | Thunks])}; + {defer, [File | _]} -> + Pending1 = maps:put(File, {Action, Files}, Pending), + State #state { pending_no_readers = Pending1 } end. do_action(combine, [Source, Destination], MsgStoreState) -> |
