summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl149
1 files changed, 73 insertions, 76 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index cb3013e417..980780c0dd 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -75,7 +75,6 @@
sum_valid_data, %% sum of valid data in all files
sum_file_size, %% sum of file sizes
pending_gc_completion, %% things to do once GC completes
- gc_active, %% is the GC currently working?
gc_pid, %% pid of our GC
file_handles_ets, %% tid of the shared file handles table
file_summary_ets, %% tid of the file summary table
@@ -564,8 +563,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
sync_timer_ref = undefined,
sum_valid_data = 0,
sum_file_size = 0,
- pending_gc_completion = [],
- gc_active = false,
+ pending_gc_completion = orddict:new(),
gc_pid = undefined,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
@@ -707,63 +705,50 @@ handle_cast({sync, Guids, K},
handle_cast(sync, State) ->
noreply(internal_sync(State));
-handle_cast({gc_done, Reclaimed, Casualty, undefined},
+handle_cast({gc_done, Reclaimed, Casualty, Survivor},
State = #msstate { sum_file_size = SumFileSize,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts }) ->
+ %% GC done, so now ensure that any clients that have open fhs to
+ %% those files close them before using them again. This has to be
+ %% done here (given it's done in the msg_store, and not the gc),
+ %% and not when starting up the GC, because if done when starting
+ %% up the GC, the client could find the close, and close and
+ %% reopen the fh, whilst the GC is waiting for readers to
+ %% disappear, before it's actually done the GC.
+ true = mark_handle_to_close(FileHandlesEts, Casualty),
[#file_summary { left = Left,
right = Right,
locked = true,
readers = 0 }] = ets:lookup(FileSummaryEts, Casualty),
- %% we should NEVER find the current file in here hence right
- %% should always be a file, not undefined
+ %% We'll never do any GC on the current file, so right is never undefined
true = Right =/= undefined, %% ASSERTION
- true = ets:update_element(FileSummaryEts, Right,
- {#file_summary.left, Left}),
- case Left of
- undefined -> ok;
- _ -> true = ets:update_element(FileSummaryEts, Left,
- {#file_summary.right, Right})
- end,
- true = mark_handle_to_close(FileHandlesEts, Casualty),
+ true = ets:update_element(FileSummaryEts, Right, {#file_summary.left, Left}),
+ %% Regardless of whether Survivor is undefined, we need to ensure
+ %% the double linked list is maintained
+ true = case Left of
+ undefined -> true; %% Casualty is the eldest file (left-most)
+ _ -> ets:update_element(FileSummaryEts, Left,
+ {#file_summary.right, Right})
+ end,
+ %% If there is a Survivor, it must be the left of the Casualty.
+ SurvivingFiles =
+ case Survivor of
+ undefined ->
+ [];
+ Left -> %% ASSERTION
+ true = mark_handle_to_close(FileHandlesEts, Survivor),
+ true = ets:update_element(FileSummaryEts, Survivor,
+ {#file_summary.locked, false}),
+ [Survivor]
+ end,
true = ets:delete(FileSummaryEts, Casualty),
noreply(
maybe_compact(
run_pending(
+ [Casualty | SurvivingFiles],
State #msstate { sum_file_size = SumFileSize - Reclaimed })));
-handle_cast({gc_done, Reclaimed, Src, Dst},
- State = #msstate { sum_file_size = SumFileSize,
- gc_active = {Src, Dst},
- file_handles_ets = FileHandlesEts,
- file_summary_ets = FileSummaryEts }) ->
- %% GC done, so now ensure that any clients that have open fhs to
- %% those files close them before using them again. This has to be
- %% done here (given it's done in the msg_store, and not the gc),
- %% and not when starting up the GC, because if done when starting
- %% up the GC, the client could find the close, and close and
- %% reopen the fh, whilst the GC is waiting for readers to
- %% disappear, before it's actually done the GC.
- true = mark_handle_to_close(FileHandlesEts, Src),
- true = mark_handle_to_close(FileHandlesEts, Dst),
- %% we always move data left, so Src has gone and was on the
- %% right, so need to make dest = source.right.left, and also
- %% dest.right = source.right
- [#file_summary { left = Dst,
- right = SrcRight,
- locked = true,
- readers = 0 }] = ets:lookup(FileSummaryEts, Src),
- %% this could fail if SrcRight =:= undefined
- ets:update_element(FileSummaryEts, SrcRight, {#file_summary.left, Dst}),
- true = ets:update_element(FileSummaryEts, Dst,
- [{#file_summary.locked, false},
- {#file_summary.right, SrcRight}]),
- true = ets:delete(FileSummaryEts, Src),
- noreply(
- maybe_compact(run_pending(
- State #msstate { sum_file_size = SumFileSize - Reclaimed,
- gc_active = false })));
-
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State).
@@ -913,7 +898,7 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
ets:lookup(FileSummaryEts, File),
case Locked of
true -> add_to_pending_gc_completion({read, Guid, From},
- State);
+ File, State);
false -> {Msg, State1} =
read_from_disk(MsgLoc, State, DedupCacheEts),
gen_server2:reply(From, {ok, Msg}),
@@ -943,19 +928,18 @@ read_from_disk(#msg_location { guid = Guid, ref_count = RefCount,
ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg),
{Msg, State1}.
-contains_message(Guid, From, State = #msstate { gc_active = GCActive }) ->
+contains_message(Guid, From,
+ State = #msstate { pending_gc_completion = Pending }) ->
case index_lookup_positive_ref_count(Guid, State) of
not_found ->
gen_server2:reply(From, false),
State;
#msg_location { file = File } ->
- case GCActive of
- {A, B} when File =:= A orelse File =:= B ->
- add_to_pending_gc_completion(
- {contains, Guid, From}, State);
- _ ->
- gen_server2:reply(From, true),
- State
+ case orddict:is_key(File, Pending) of
+ true -> add_to_pending_gc_completion(
+ {contains, Guid, From}, File, State);
+ false -> gen_server2:reply(From, true),
+ State
end
end.
@@ -974,7 +958,7 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
1 -> ok = remove_cache_entry(DedupCacheEts, Guid),
case ets:lookup(FileSummaryEts, File) of
[#file_summary { locked = true } ] ->
- add_to_pending_gc_completion({remove, Guid}, State);
+ add_to_pending_gc_completion({remove, Guid}, File, State);
[#file_summary {}] ->
ok = Dec(),
[_] = ets:update_counter(
@@ -990,20 +974,25 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
end.
add_to_pending_gc_completion(
- Op, State = #msstate { pending_gc_completion = Pending }) ->
- State #msstate { pending_gc_completion = [Op | Pending] }.
+ Op, File, State = #msstate { pending_gc_completion = Pending }) ->
+ State #msstate { pending_gc_completion =
+ rabbit_misc:orddict_cons(File, Op, Pending) }.
-run_pending(State = #msstate { pending_gc_completion = [] }) ->
- State;
-run_pending(State = #msstate { pending_gc_completion = Pending }) ->
- State1 = State #msstate { pending_gc_completion = [] },
- lists:foldl(fun run_pending/2, State1, lists:reverse(Pending)).
+run_pending(Files, State) ->
+ lists:foldl(fun run_pending_for_file/2, State, Files).
+
+run_pending_for_file(File,
+ State = #msstate { pending_gc_completion = Pending }) ->
+ lists:foldl(
+ fun run_pending_action/2,
+ State #msstate { pending_gc_completion = orddict:erase(File, Pending) },
+ lists:reverse(orddict:fetch(File, Pending))).
-run_pending({read, Guid, From}, State) ->
+run_pending_action({read, Guid, From}, State) ->
read_message(Guid, From, State);
-run_pending({contains, Guid, From}, State) ->
+run_pending_action({contains, Guid, From}, State) ->
contains_message(Guid, From, State);
-run_pending({remove, Guid}, State) ->
+run_pending_action({remove, Guid}, State) ->
remove_message(Guid, State).
safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
@@ -1475,12 +1464,12 @@ maybe_roll_to_new_file(
maybe_roll_to_new_file(_, State) ->
State.
-maybe_compact(State = #msstate { sum_valid_data = SumValid,
- sum_file_size = SumFileSize,
- gc_active = false,
- gc_pid = GCPid,
- file_summary_ets = FileSummaryEts,
- file_size_limit = FileSizeLimit })
+maybe_compact(State = #msstate { sum_valid_data = SumValid,
+ sum_file_size = SumFileSize,
+ gc_pid = GCPid,
+ pending_gc_completion = Pending,
+ file_summary_ets = FileSummaryEts,
+ file_size_limit = FileSizeLimit })
when (SumFileSize > 2 * FileSizeLimit andalso
(SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) ->
%% TODO: the algorithm here is sub-optimal - it may result in a
@@ -1494,13 +1483,17 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
not_found ->
State;
{Src, Dst} ->
+ false = orddict:is_key(Src, Pending) orelse
+ orddict:is_key(Dst, Pending), %% ASSERTION
+ Pending1 = orddict:store(Dst, [],
+ orddict:store(Src, [], Pending)),
State1 = close_handle(Src, close_handle(Dst, State)),
true = ets:update_element(FileSummaryEts, Src,
{#file_summary.locked, true}),
true = ets:update_element(FileSummaryEts, Dst,
{#file_summary.locked, true}),
ok = rabbit_msg_store_gc:combine(GCPid, Src, Dst),
- State1 #msstate { gc_active = {Src, Dst} }
+ State1 #msstate { pending_gc_completion = Pending1 }
end
end;
maybe_compact(State) ->
@@ -1536,8 +1529,9 @@ find_files_to_combine(FileSummaryEts, FileSizeLimit,
delete_file_if_empty(File, State = #msstate { current_file = File }) ->
State;
delete_file_if_empty(File, State = #msstate {
- gc_pid = GCPid,
- file_summary_ets = FileSummaryEts }) ->
+ gc_pid = GCPid,
+ file_summary_ets = FileSummaryEts,
+ pending_gc_completion = Pending }) ->
[#file_summary { valid_total_size = ValidData,
locked = false }] =
ets:lookup(FileSummaryEts, File),
@@ -1548,7 +1542,10 @@ delete_file_if_empty(File, State = #msstate {
true = ets:update_element(FileSummaryEts, File,
{#file_summary.locked, true}),
ok = rabbit_msg_store_gc:delete(GCPid, File),
- close_handle(File, State);
+ false = orddict:is_key(File, Pending), %% ASSERTION
+ Pending1 = orddict:store(File, [], Pending),
+ close_handle(File,
+ State #msstate { pending_gc_completion = Pending1 });
_ -> State
end.