diff options
| -rw-r--r-- | src/rabbit_msg_store.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 20 |
2 files changed, 27 insertions, 26 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index e3f7cc8f53..ff6f732d34 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -150,7 +150,7 @@ A}). -type(maybe_guid_fun() :: 'undefined' | fun ((gb_set()) -> any())). -type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')). --type(deletion_thunk() :: fun (() -> 'ok' | deletion_thunk())). +-type(deletion_thunk() :: fun (() -> boolean())). -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', @@ -577,9 +577,14 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer, case index_lookup(Guid, CState) of #msg_location { file = File } = MsgLocation -> %% Still the same file. - mark_handle_open(FileHandlesEts, File, Ref), - CState1 = close_all_indicated(CState), + %% We are now guaranteed that the mark_handle_open + %% call will either insert_new correctly, or will + %% fail, but find the value is open, not close. + mark_handle_open(FileHandlesEts, File, Ref), + %% Could the msg_store now mark the file to be + %% closed? No: marks for closing are issued only + %% when the msg_store has locked the file. {Msg, CState2} = %% This will never be the current file read_from_disk(MsgLocation, CState1, DedupCacheEts), Release(), %% this MUST NOT fail with badarg @@ -844,7 +849,7 @@ handle_cast({combine_files, Source, Destination, Reclaimed}, file_summary_ets = FileSummaryEts, client_refs = ClientRefs }) -> ok = cleanup_after_file_deletion(Source, State), - %% see comment in cleanup_after_file_deletion + %% see comment in cleanup_after_file_deletion, and client_read3 true = mark_handle_to_close(ClientRefs, FileHandlesEts, Destination, false), true = ets:update_element(FileSummaryEts, Destination, {#file_summary.locked, false}), @@ -1232,14 +1237,16 @@ close_handle(Key, FHC) -> end. mark_handle_open(FileHandlesEts, File, Ref) -> - %% This is fine to fail (already exists) + %% This is fine to fail (already exists). Note it could fail with + %% the value being close, and not have it updated to open. ets:insert_new(FileHandlesEts, {{Ref, File}, open}), true. +%% See comment in client_read3 - only call this when the file is locked mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) -> [ begin - ets:update_element(FileHandlesEts, Key, {2, close}), - case Invoke of + case ets:update_element(FileHandlesEts, Key, {2, close}) + andalso Invoke of true -> case dict:fetch(Ref, ClientRefs) of undefined -> ok; CloseFDsFun -> ok = CloseFDsFun() @@ -1251,12 +1258,16 @@ mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) -> ets:match_object(FileHandlesEts, {{'_', File}, open}) ], true. +safe_file_delete_fun(FileHandlesEts, File, Dir) -> + fun () -> safe_file_delete(FileHandlesEts, File, Dir) end. + safe_file_delete(FileHandlesEts, File, Dir) -> case ets:match_object(FileHandlesEts, {{'_', File}, open}, 1) of {[_|_], _Cont} -> - fun () -> safe_file_delete(FileHandlesEts, File, Dir) end; + false; _ -> - ok = file:delete(form_filename(Dir, filenum_to_name(File))) + ok = file:delete(form_filename(Dir, filenum_to_name(File))), + true end. close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts, @@ -1884,7 +1895,7 @@ combine_files(Source, Destination, Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData, gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}), - safe_file_delete(FileHandlesEts, Source, Dir). + safe_file_delete_fun(FileHandlesEts, Source, Dir). delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, file_handles_ets = FileHandlesEts, @@ -1896,7 +1907,7 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, readers = 0 }] = ets:lookup(FileSummaryEts, File), {[], 0} = load_and_vacuum_message_file(File, State), gen_server2:cast(Server, {delete_file, File, FileSize}), - safe_file_delete(FileHandlesEts, File, Dir). + safe_file_delete_fun(FileHandlesEts, File, Dir). load_and_vacuum_message_file(File, #gc_state { dir = Dir, index_module = Index, diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 0baf992b9d..7cf5c1f917 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -135,17 +135,13 @@ attempt_action(Action, Files, State = #state { pending_no_readers = Pending, on_action = Thunks, msg_store_state = MsgStoreState }) -> - Thunks1 = run_thunks(Thunks), case [File || File <- Files, rabbit_msg_store:has_readers(File, MsgStoreState)] of - [] -> Thunks2 = case do_action(Action, Files, MsgStoreState) of - ok -> Thunks1; - Thunk -> [Thunk | Thunks1] - end, - State #state { on_action = Thunks2 }; + [] -> Thunks1 = + [do_action(Action, Files, MsgStoreState) | Thunks], + State #state { on_action = run_thunks(Thunks1) }; [File | _] -> Pending1 = dict:store(File, {Action, Files}, Pending), - State #state { pending_no_readers = Pending1, - on_action = Thunks1 } + State #state { pending_no_readers = Pending1 } end. do_action(combine, [Source, Destination], MsgStoreState) -> @@ -153,10 +149,4 @@ do_action(combine, [Source, Destination], MsgStoreState) -> do_action(delete, [File], MsgStoreState) -> rabbit_msg_store:delete_file(File, MsgStoreState). -run_thunks(Thunks) -> - lists:foldl(fun (Thunk, Thunks1) -> - case Thunk() of - ok -> Thunks1; - Thunk1 -> [Thunk1 | Thunks1] - end - end, [], Thunks). +run_thunks(Thunks) -> lists:filter(fun (Thunk) -> not Thunk() end, Thunks). |
