summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl33
-rw-r--r--src/rabbit_msg_store_gc.erl20
2 files changed, 27 insertions, 26 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e3f7cc8f53..ff6f732d34 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -150,7 +150,7 @@
A}).
-type(maybe_guid_fun() :: 'undefined' | fun ((gb_set()) -> any())).
-type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')).
--type(deletion_thunk() :: fun (() -> 'ok' | deletion_thunk())).
+-type(deletion_thunk() :: fun (() -> boolean())).
-spec(start_link/4 ::
(atom(), file:filename(), [binary()] | 'undefined',
@@ -577,9 +577,14 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
case index_lookup(Guid, CState) of
#msg_location { file = File } = MsgLocation ->
%% Still the same file.
- mark_handle_open(FileHandlesEts, File, Ref),
-
CState1 = close_all_indicated(CState),
+ %% We are now guaranteed that the mark_handle_open
+ %% call will either insert_new correctly, or will
+ %% fail, but find the value is open, not close.
+ mark_handle_open(FileHandlesEts, File, Ref),
+ %% Could the msg_store now mark the file to be
+ %% closed? No: marks for closing are issued only
+ %% when the msg_store has locked the file.
{Msg, CState2} = %% This will never be the current file
read_from_disk(MsgLocation, CState1, DedupCacheEts),
Release(), %% this MUST NOT fail with badarg
@@ -844,7 +849,7 @@ handle_cast({combine_files, Source, Destination, Reclaimed},
file_summary_ets = FileSummaryEts,
client_refs = ClientRefs }) ->
ok = cleanup_after_file_deletion(Source, State),
- %% see comment in cleanup_after_file_deletion
+ %% see comment in cleanup_after_file_deletion, and client_read3
true = mark_handle_to_close(ClientRefs, FileHandlesEts, Destination, false),
true = ets:update_element(FileSummaryEts, Destination,
{#file_summary.locked, false}),
@@ -1232,14 +1237,16 @@ close_handle(Key, FHC) ->
end.
mark_handle_open(FileHandlesEts, File, Ref) ->
- %% This is fine to fail (already exists)
+ %% This is fine to fail (already exists). Note it could fail with
+ %% the value being close, and not have it updated to open.
ets:insert_new(FileHandlesEts, {{Ref, File}, open}),
true.
+%% See comment in client_read3 - only call this when the file is locked
mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) ->
[ begin
- ets:update_element(FileHandlesEts, Key, {2, close}),
- case Invoke of
+ case ets:update_element(FileHandlesEts, Key, {2, close})
+ andalso Invoke of
true -> case dict:fetch(Ref, ClientRefs) of
undefined -> ok;
CloseFDsFun -> ok = CloseFDsFun()
@@ -1251,12 +1258,16 @@ mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) ->
ets:match_object(FileHandlesEts, {{'_', File}, open}) ],
true.
+safe_file_delete_fun(FileHandlesEts, File, Dir) ->
+ fun () -> safe_file_delete(FileHandlesEts, File, Dir) end.
+
safe_file_delete(FileHandlesEts, File, Dir) ->
case ets:match_object(FileHandlesEts, {{'_', File}, open}, 1) of
{[_|_], _Cont} ->
- fun () -> safe_file_delete(FileHandlesEts, File, Dir) end;
+ false;
_ ->
- ok = file:delete(form_filename(Dir, filenum_to_name(File)))
+ ok = file:delete(form_filename(Dir, filenum_to_name(File))),
+ true
end.
close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts,
@@ -1884,7 +1895,7 @@ combine_files(Source, Destination,
Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData,
gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}),
- safe_file_delete(FileHandlesEts, Source, Dir).
+ safe_file_delete_fun(FileHandlesEts, Source, Dir).
delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
file_handles_ets = FileHandlesEts,
@@ -1896,7 +1907,7 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
readers = 0 }] = ets:lookup(FileSummaryEts, File),
{[], 0} = load_and_vacuum_message_file(File, State),
gen_server2:cast(Server, {delete_file, File, FileSize}),
- safe_file_delete(FileHandlesEts, File, Dir).
+ safe_file_delete_fun(FileHandlesEts, File, Dir).
load_and_vacuum_message_file(File, #gc_state { dir = Dir,
index_module = Index,
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 0baf992b9d..7cf5c1f917 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -135,17 +135,13 @@ attempt_action(Action, Files,
State = #state { pending_no_readers = Pending,
on_action = Thunks,
msg_store_state = MsgStoreState }) ->
- Thunks1 = run_thunks(Thunks),
case [File || File <- Files,
rabbit_msg_store:has_readers(File, MsgStoreState)] of
- [] -> Thunks2 = case do_action(Action, Files, MsgStoreState) of
- ok -> Thunks1;
- Thunk -> [Thunk | Thunks1]
- end,
- State #state { on_action = Thunks2 };
+ [] -> Thunks1 =
+ [do_action(Action, Files, MsgStoreState) | Thunks],
+ State #state { on_action = run_thunks(Thunks1) };
[File | _] -> Pending1 = dict:store(File, {Action, Files}, Pending),
- State #state { pending_no_readers = Pending1,
- on_action = Thunks1 }
+ State #state { pending_no_readers = Pending1 }
end.
do_action(combine, [Source, Destination], MsgStoreState) ->
@@ -153,10 +149,4 @@ do_action(combine, [Source, Destination], MsgStoreState) ->
do_action(delete, [File], MsgStoreState) ->
rabbit_msg_store:delete_file(File, MsgStoreState).
-run_thunks(Thunks) ->
- lists:foldl(fun (Thunk, Thunks1) ->
- case Thunk() of
- ok -> Thunks1;
- Thunk1 -> [Thunk1 | Thunks1]
- end
- end, [], Thunks).
+run_thunks(Thunks) -> lists:filter(fun (Thunk) -> not Thunk() end, Thunks).