diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-19 12:28:30 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-19 12:28:30 +0100 |
| commit | 77619924582b4d62872f9ef9568feabe4f4ead2d (patch) | |
| tree | f08d9f8a009ee133006c0118beec1bd465eb3e0b /src | |
| parent | c6a1dcda3fe84a4ec40b09b42e818f42196773f7 (diff) | |
| download | rabbitmq-server-git-77619924582b4d62872f9ef9568feabe4f4ead2d.tar.gz | |
Substantial refactoring of gc_done, and drop gc_active in favour of the guarantee that files involved somehow in gc have a key in pending_gc_deletion (which is now an orddict, thus allowing us to run exactly the correct actions when gc_actions complete)
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 149 |
1 files changed, 73 insertions, 76 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index cb3013e417..980780c0dd 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -75,7 +75,6 @@ sum_valid_data, %% sum of valid data in all files sum_file_size, %% sum of file sizes pending_gc_completion, %% things to do once GC completes - gc_active, %% is the GC currently working? gc_pid, %% pid of our GC file_handles_ets, %% tid of the shared file handles table file_summary_ets, %% tid of the file summary table @@ -564,8 +563,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> sync_timer_ref = undefined, sum_valid_data = 0, sum_file_size = 0, - pending_gc_completion = [], - gc_active = false, + pending_gc_completion = orddict:new(), gc_pid = undefined, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, @@ -707,63 +705,50 @@ handle_cast({sync, Guids, K}, handle_cast(sync, State) -> noreply(internal_sync(State)); -handle_cast({gc_done, Reclaimed, Casualty, undefined}, +handle_cast({gc_done, Reclaimed, Casualty, Survivor}, State = #msstate { sum_file_size = SumFileSize, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts }) -> + %% GC done, so now ensure that any clients that have open fhs to + %% those files close them before using them again. This has to be + %% done here (given it's done in the msg_store, and not the gc), + %% and not when starting up the GC, because if done when starting + %% up the GC, the client could find the close, and close and + %% reopen the fh, whilst the GC is waiting for readers to + %% disappear, before it's actually done the GC. + true = mark_handle_to_close(FileHandlesEts, Casualty), [#file_summary { left = Left, right = Right, locked = true, readers = 0 }] = ets:lookup(FileSummaryEts, Casualty), - %% we should NEVER find the current file in here hence right - %% should always be a file, not undefined + %% We'll never do any GC on the current file, so right is never undefined true = Right =/= undefined, %% ASSERTION - true = ets:update_element(FileSummaryEts, Right, - {#file_summary.left, Left}), - case Left of - undefined -> ok; - _ -> true = ets:update_element(FileSummaryEts, Left, - {#file_summary.right, Right}) - end, - true = mark_handle_to_close(FileHandlesEts, Casualty), + true = ets:update_element(FileSummaryEts, Right, {#file_summary.left, Left}), + %% Regardless of whether Survivor is undefined, we need to ensure + %% the double linked list is maintained + true = case Left of + undefined -> true; %% Casualty is the eldest file (left-most) + _ -> ets:update_element(FileSummaryEts, Left, + {#file_summary.right, Right}) + end, + %% If there is a Survivor, it must be the left of the Casualty. + SurvivingFiles = + case Survivor of + undefined -> + []; + Left -> %% ASSERTION + true = mark_handle_to_close(FileHandlesEts, Survivor), + true = ets:update_element(FileSummaryEts, Survivor, + {#file_summary.locked, false}), + [Survivor] + end, true = ets:delete(FileSummaryEts, Casualty), noreply( maybe_compact( run_pending( + [Casualty | SurvivingFiles], State #msstate { sum_file_size = SumFileSize - Reclaimed }))); -handle_cast({gc_done, Reclaimed, Src, Dst}, - State = #msstate { sum_file_size = SumFileSize, - gc_active = {Src, Dst}, - file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts }) -> - %% GC done, so now ensure that any clients that have open fhs to - %% those files close them before using them again. This has to be - %% done here (given it's done in the msg_store, and not the gc), - %% and not when starting up the GC, because if done when starting - %% up the GC, the client could find the close, and close and - %% reopen the fh, whilst the GC is waiting for readers to - %% disappear, before it's actually done the GC. - true = mark_handle_to_close(FileHandlesEts, Src), - true = mark_handle_to_close(FileHandlesEts, Dst), - %% we always move data left, so Src has gone and was on the - %% right, so need to make dest = source.right.left, and also - %% dest.right = source.right - [#file_summary { left = Dst, - right = SrcRight, - locked = true, - readers = 0 }] = ets:lookup(FileSummaryEts, Src), - %% this could fail if SrcRight =:= undefined - ets:update_element(FileSummaryEts, SrcRight, {#file_summary.left, Dst}), - true = ets:update_element(FileSummaryEts, Dst, - [{#file_summary.locked, false}, - {#file_summary.right, SrcRight}]), - true = ets:delete(FileSummaryEts, Src), - noreply( - maybe_compact(run_pending( - State #msstate { sum_file_size = SumFileSize - Reclaimed, - gc_active = false }))); - handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State). @@ -913,7 +898,7 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount, ets:lookup(FileSummaryEts, File), case Locked of true -> add_to_pending_gc_completion({read, Guid, From}, - State); + File, State); false -> {Msg, State1} = read_from_disk(MsgLoc, State, DedupCacheEts), gen_server2:reply(From, {ok, Msg}), @@ -943,19 +928,18 @@ read_from_disk(#msg_location { guid = Guid, ref_count = RefCount, ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg), {Msg, State1}. -contains_message(Guid, From, State = #msstate { gc_active = GCActive }) -> +contains_message(Guid, From, + State = #msstate { pending_gc_completion = Pending }) -> case index_lookup_positive_ref_count(Guid, State) of not_found -> gen_server2:reply(From, false), State; #msg_location { file = File } -> - case GCActive of - {A, B} when File =:= A orelse File =:= B -> - add_to_pending_gc_completion( - {contains, Guid, From}, State); - _ -> - gen_server2:reply(From, true), - State + case orddict:is_key(File, Pending) of + true -> add_to_pending_gc_completion( + {contains, Guid, From}, File, State); + false -> gen_server2:reply(From, true), + State end end. @@ -974,7 +958,7 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, 1 -> ok = remove_cache_entry(DedupCacheEts, Guid), case ets:lookup(FileSummaryEts, File) of [#file_summary { locked = true } ] -> - add_to_pending_gc_completion({remove, Guid}, State); + add_to_pending_gc_completion({remove, Guid}, File, State); [#file_summary {}] -> ok = Dec(), [_] = ets:update_counter( @@ -990,20 +974,25 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, end. add_to_pending_gc_completion( - Op, State = #msstate { pending_gc_completion = Pending }) -> - State #msstate { pending_gc_completion = [Op | Pending] }. + Op, File, State = #msstate { pending_gc_completion = Pending }) -> + State #msstate { pending_gc_completion = + rabbit_misc:orddict_cons(File, Op, Pending) }. -run_pending(State = #msstate { pending_gc_completion = [] }) -> - State; -run_pending(State = #msstate { pending_gc_completion = Pending }) -> - State1 = State #msstate { pending_gc_completion = [] }, - lists:foldl(fun run_pending/2, State1, lists:reverse(Pending)). +run_pending(Files, State) -> + lists:foldl(fun run_pending_for_file/2, State, Files). + +run_pending_for_file(File, + State = #msstate { pending_gc_completion = Pending }) -> + lists:foldl( + fun run_pending_action/2, + State #msstate { pending_gc_completion = orddict:erase(File, Pending) }, + lists:reverse(orddict:fetch(File, Pending))). -run_pending({read, Guid, From}, State) -> +run_pending_action({read, Guid, From}, State) -> read_message(Guid, From, State); -run_pending({contains, Guid, From}, State) -> +run_pending_action({contains, Guid, From}, State) -> contains_message(Guid, From, State); -run_pending({remove, Guid}, State) -> +run_pending_action({remove, Guid}, State) -> remove_message(Guid, State). safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> @@ -1475,12 +1464,12 @@ maybe_roll_to_new_file( maybe_roll_to_new_file(_, State) -> State. -maybe_compact(State = #msstate { sum_valid_data = SumValid, - sum_file_size = SumFileSize, - gc_active = false, - gc_pid = GCPid, - file_summary_ets = FileSummaryEts, - file_size_limit = FileSizeLimit }) +maybe_compact(State = #msstate { sum_valid_data = SumValid, + sum_file_size = SumFileSize, + gc_pid = GCPid, + pending_gc_completion = Pending, + file_summary_ets = FileSummaryEts, + file_size_limit = FileSizeLimit }) when (SumFileSize > 2 * FileSizeLimit andalso (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) -> %% TODO: the algorithm here is sub-optimal - it may result in a @@ -1494,13 +1483,17 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, not_found -> State; {Src, Dst} -> + false = orddict:is_key(Src, Pending) orelse + orddict:is_key(Dst, Pending), %% ASSERTION + Pending1 = orddict:store(Dst, [], + orddict:store(Src, [], Pending)), State1 = close_handle(Src, close_handle(Dst, State)), true = ets:update_element(FileSummaryEts, Src, {#file_summary.locked, true}), true = ets:update_element(FileSummaryEts, Dst, {#file_summary.locked, true}), ok = rabbit_msg_store_gc:combine(GCPid, Src, Dst), - State1 #msstate { gc_active = {Src, Dst} } + State1 #msstate { pending_gc_completion = Pending1 } end end; maybe_compact(State) -> @@ -1536,8 +1529,9 @@ find_files_to_combine(FileSummaryEts, FileSizeLimit, delete_file_if_empty(File, State = #msstate { current_file = File }) -> State; delete_file_if_empty(File, State = #msstate { - gc_pid = GCPid, - file_summary_ets = FileSummaryEts }) -> + gc_pid = GCPid, + file_summary_ets = FileSummaryEts, + pending_gc_completion = Pending }) -> [#file_summary { valid_total_size = ValidData, locked = false }] = ets:lookup(FileSummaryEts, File), @@ -1548,7 +1542,10 @@ delete_file_if_empty(File, State = #msstate { true = ets:update_element(FileSummaryEts, File, {#file_summary.locked, true}), ok = rabbit_msg_store_gc:delete(GCPid, File), - close_handle(File, State); + false = orddict:is_key(File, Pending), %% ASSERTION + Pending1 = orddict:store(File, [], Pending), + close_handle(File, + State #msstate { pending_gc_completion = Pending1 }); _ -> State end. |
