summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_store.erl71
-rw-r--r--src/rabbit_msg_store_gc.erl16
2 files changed, 53 insertions, 34 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 1ec5fe9e1a..0d1bb994dc 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -23,7 +23,7 @@
client_ref/1, close_all_indicated/1,
write/3, write_flow/3, read/2, contains/2, remove/2]).
--export([set_maximum_since_use/2, has_readers/2, combine_files/3,
+-export([set_maximum_since_use/2, combine_files/3,
delete_file/2]). %% internal
-export([transform_dir/3, force_recovery/2]). %% upgrade
@@ -1970,33 +1970,48 @@ cleanup_after_file_deletion(File,
%% garbage collection / compaction / aggregation -- external
%%----------------------------------------------------------------------------
--spec has_readers(non_neg_integer(), gc_state()) -> boolean().
-
-has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) ->
- [#file_summary { locked = true, readers = Count }] =
- ets:lookup(FileSummaryEts, File),
- Count /= 0.
-
-spec combine_files(non_neg_integer(), non_neg_integer(), gc_state()) ->
- deletion_thunk().
+ {ok, deletion_thunk()} | {defer, non_neg_integer()}.
combine_files(Source, Destination,
- State = #gc_state { file_summary_ets = FileSummaryEts,
- file_handles_ets = FileHandlesEts,
- dir = Dir,
- msg_store = Server }) ->
- [#file_summary {
+ State = #gc_state { file_summary_ets = FileSummaryEts }) ->
+ [#file_summary{locked = true} = SourceSummary] =
+ ets:lookup(FileSummaryEts, Source),
+
+ [#file_summary{locked = true} = DestinationSummary] =
+ ets:lookup(FileSummaryEts, Destination),
+
+ case {SourceSummary, DestinationSummary} of
+ {#file_summary{readers = 0}, #file_summary{readers = 0}} ->
+ {ok, do_combine_files(SourceSummary, DestinationSummary,
+ Source, Destination, State)};
+ _ ->
+ rabbit_log:error("Asked to combine files ~p and ~p, but they have readers. Deferring.",
+ [Source, Destination]),
+ DeferredFiles = [FileSummary#file_summary.file
+ || FileSummary <- [SourceSummary, DestinationSummary],
+ FileSummary#file_summary.readers /= 0],
+ {defer, DeferredFiles}
+ end.
+
+do_combine_files(SourceSummary, DestinationSummary,
+ Source, Destination,
+ State = #gc_state { file_summary_ets = FileSummaryEts,
+ file_handles_ets = FileHandlesEts,
+ dir = Dir,
+ msg_store = Server }) ->
+ #file_summary {
readers = 0,
left = Destination,
valid_total_size = SourceValid,
file_size = SourceFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, Source),
- [#file_summary {
+ locked = true } = SourceSummary,
+ #file_summary {
readers = 0,
right = Source,
valid_total_size = DestinationValid,
file_size = DestinationFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, Destination),
+ locked = true } = DestinationSummary,
SourceName = filenum_to_name(Source),
DestinationName = filenum_to_name(Destination),
@@ -2056,19 +2071,25 @@ combine_files(Source, Destination,
gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}),
safe_file_delete_fun(Source, Dir, FileHandlesEts).
--spec delete_file(non_neg_integer(), gc_state()) -> deletion_thunk().
+-spec delete_file(non_neg_integer(), gc_state()) -> {ok, deletion_thunk()} | {defer, non_neg_integer()}.
delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
file_handles_ets = FileHandlesEts,
dir = Dir,
msg_store = Server }) ->
- [#file_summary { valid_total_size = 0,
- locked = true,
- file_size = FileSize,
- readers = 0 }] = ets:lookup(FileSummaryEts, File),
- {[], 0} = load_and_vacuum_message_file(File, State),
- gen_server2:cast(Server, {delete_file, File, FileSize}),
- safe_file_delete_fun(File, Dir, FileHandlesEts).
+ case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { valid_total_size = 0,
+ locked = true,
+ file_size = FileSize,
+ readers = 0 }] ->
+ {[], 0} = load_and_vacuum_message_file(File, State),
+ gen_server2:cast(Server, {delete_file, File, FileSize}),
+ {ok, safe_file_delete_fun(File, Dir, FileHandlesEts)};
+ [#file_summary{readers = Readers}] when Readers > 0 ->
+ rabbit_log:error("Asked to delete file ~p, but it has readers. Deferring.",
+ [File]),
+ {defer, [File]}
+ end.
load_and_vacuum_message_file(File, State = #gc_state { dir = Dir }) ->
%% Messages here will be end-of-file at start-of-list
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index dfadd5586d..60702b5b95 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -119,15 +119,13 @@ attempt_action(Action, Files,
State = #state { pending_no_readers = Pending,
on_action = Thunks,
msg_store_state = MsgStoreState }) ->
- case [File || File <- Files,
- rabbit_msg_store:has_readers(File, MsgStoreState)] of
- [] -> State #state {
- on_action = lists:filter(
- fun (Thunk) -> not Thunk() end,
- [do_action(Action, Files, MsgStoreState) |
- Thunks]) };
- [File | _] -> Pending1 = maps:put(File, {Action, Files}, Pending),
- State #state { pending_no_readers = Pending1 }
+ case do_action(Action, Files, MsgStoreState) of
+ {ok, OkThunk} ->
+ State#state{on_action = lists:filter(fun (Thunk) -> not Thunk() end,
+ [OkThunk | Thunks])};
+ {defer, [File | _]} ->
+ Pending1 = maps:put(File, {Action, Files}, Pending),
+ State #state { pending_no_readers = Pending1 }
end.
do_action(combine, [Source, Destination], MsgStoreState) ->