diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-01-07 16:26:24 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-01-07 16:26:24 +0000 |
| commit | 3274299179812bfc5be48886ecee4d29d02d22c4 (patch) | |
| tree | a84478ad1e7287417bc90b62ee8f182b4c1164fe /src | |
| parent | 3e2f565c8a10fc6fdded8dc081e8d3b96f7cb8c6 (diff) | |
| download | rabbitmq-server-git-3274299179812bfc5be48886ecee4d29d02d22c4.tar.gz | |
Toughened up cache accessors and switched to using named tables in prep for concurrent readers of msg_store.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 188 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 20 |
2 files changed, 101 insertions, 107 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 7bf91bb3cb..8acd9149aa 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -76,14 +76,12 @@ {dir, %% store directory index_module, %% the module for index ops index_state, %% where are messages? - file_summary, %% what's in the files? current_file, %% current file name as number current_file_handle, %% current file handle %% since the last fsync? file_handle_cache, %% file handle cache on_sync, %% pending sync requests sync_timer_ref, %% TRef for our interval timer - message_cache, %% ets message cache sum_valid_data, %% sum of valid data in all files sum_file_size, %% sum of file sizes pending_gc_completion, %% things to do once GC completes @@ -92,8 +90,6 @@ -include("rabbit_msg_store.hrl"). --define(FILE_SUMMARY_ETS_NAME, rabbit_msg_store_file_summary). --define(CACHE_ETS_NAME, rabbit_msg_store_cache). %% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION %% It is not recommended to set this to < 0.5 -define(GARBAGE_FRACTION, 0.5). @@ -250,21 +246,19 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> IndexState = IndexModule:init(Dir), InitFile = 0, - FileSummary = ets:new(?FILE_SUMMARY_ETS_NAME, - [ordered_set, public, - {keypos, #file_summary.file}]), - MessageCache = ets:new(?CACHE_ETS_NAME, [set, private]), + ?FILE_SUMMARY_ETS_NAME = ets:new(?FILE_SUMMARY_ETS_NAME, + [ordered_set, public, named_table, + {keypos, #file_summary.file}]), + ?CACHE_ETS_NAME = ets:new(?CACHE_ETS_NAME, [set, public, named_table]), State = #msstate { dir = Dir, index_module = IndexModule, index_state = IndexState, - file_summary = FileSummary, current_file = InitFile, current_file_handle = undefined, file_handle_cache = dict:new(), on_sync = [], sync_timer_ref = undefined, - message_cache = MessageCache, sum_valid_data = 0, sum_file_size = 0, pending_gc_completion = [], @@ -290,8 +284,7 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> {ok, Offset} = file_handle_cache:position(FileHdl, Offset), ok = file_handle_cache:truncate(FileHdl), - {ok, _Pid} = rabbit_msg_store_gc:start_link( - Dir, IndexState, FileSummary, IndexModule), + {ok, _Pid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule), {ok, State1 #msstate { current_file_handle = FileHdl }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -307,7 +300,6 @@ handle_call({contains, MsgId}, From, State) -> handle_cast({write, MsgId, Msg}, State = #msstate { current_file_handle = CurHdl, current_file = CurFile, - file_summary = FileSummary, sum_valid_data = SumValid, sum_file_size = SumFileSize }) -> case index_lookup(MsgId, State) of @@ -324,17 +316,18 @@ handle_cast({write, MsgId, Msg}, right = undefined, locked = false, file_size = FileSize }] = - ets:lookup(FileSummary, CurFile), + ets:lookup(?FILE_SUMMARY_ETS_NAME, CurFile), ValidTotalSize1 = ValidTotalSize + TotalSize, ContiguousTop1 = if CurOffset =:= ContiguousTop -> %% can't be any holes in this file ValidTotalSize1; true -> ContiguousTop end, - true = ets:insert(FileSummary, FSEntry #file_summary { - valid_total_size = ValidTotalSize1, - contiguous_top = ContiguousTop1, - file_size = FileSize + TotalSize }), + true = ets:insert(?FILE_SUMMARY_ETS_NAME, + FSEntry #file_summary { + valid_total_size = ValidTotalSize1, + contiguous_top = ContiguousTop1, + file_size = FileSize + TotalSize }), NextOffset = CurOffset + TotalSize, noreply(maybe_compact(maybe_roll_to_new_file( NextOffset, State #msstate @@ -357,7 +350,7 @@ handle_cast({remove, MsgIds}, State) -> noreply(maybe_compact(State1)); handle_cast({release, MsgIds}, State) -> - lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds), + lists:foreach(fun (MsgId) -> decrement_cache(MsgId) end, MsgIds), noreply(State); handle_cast({sync, MsgIds, K}, @@ -380,20 +373,19 @@ handle_cast(sync, State) -> handle_cast({gc_done, Reclaimed, Source, Dest}, State = #msstate { sum_file_size = SumFileSize, - gc_active = {Source, Dest}, - file_summary = FileSummary }) -> + gc_active = {Source, Dest} }) -> %% we always move data left, so Source has gone and was on the %% right, so need to make dest = source.right.left, and also %% dest.right = source.right [#file_summary { left = Dest, right = SourceRight, locked = true }] = - ets:lookup(FileSummary, Source), + ets:lookup(?FILE_SUMMARY_ETS_NAME, Source), %% this could fail if SourceRight == undefined - ets:update_element(FileSummary, SourceRight, + ets:update_element(?FILE_SUMMARY_ETS_NAME, SourceRight, {#file_summary.left, Dest}), - true = ets:update_element(FileSummary, Dest, + true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Dest, [{#file_summary.locked, false}, {#file_summary.right, SourceRight}]), - true = ets:delete(FileSummary, Source), + true = ets:delete(?FILE_SUMMARY_ETS_NAME, Source), noreply(run_pending( State #msstate { sum_file_size = SumFileSize - Reclaimed, gc_active = false })). @@ -410,7 +402,6 @@ handle_info({'EXIT', _Pid, Reason}, State) -> terminate(_Reason, State = #msstate { index_state = IndexState, index_module = IndexModule, - file_summary = FileSummary, current_file_handle = FileHdl }) -> %% stop the gc first, otherwise it could be working and we pull %% out the ets tables from under it. @@ -422,10 +413,9 @@ terminate(_Reason, State = #msstate { index_state = IndexState, State2 end, State3 = close_all_handles(State1), - ets:delete(FileSummary), + ets:delete(?FILE_SUMMARY_ETS_NAME), IndexModule:terminate(IndexState), State3 #msstate { index_state = undefined, - file_summary = undefined, current_file_handle = undefined }. code_change(_OldVsn, State, _Extra) -> @@ -484,8 +474,7 @@ sync(State = #msstate { current_file_handle = CurHdl, read_message(MsgId, From, State = #msstate { current_file = CurFile, - current_file_handle = CurHdl, - file_summary = FileSummary }) -> + current_file_handle = CurHdl }) -> case index_lookup(MsgId, State) of not_found -> gen_server2:reply(From, not_found), State; @@ -493,10 +482,10 @@ read_message(MsgId, From, State = file = File, offset = Offset, total_size = TotalSize } -> - case fetch_and_increment_cache(MsgId, State) of + case fetch_and_increment_cache(MsgId) of not_found -> [#file_summary { locked = Locked }] = - ets:lookup(FileSummary, File), + ets:lookup(?FILE_SUMMARY_ETS_NAME, File), case Locked of true -> add_to_pending_gc_completion({read, MsgId, From}, @@ -525,7 +514,7 @@ read_message(MsgId, From, State = end, ok = case RefCount > 1 of true -> - insert_into_cache(MsgId, Msg, State1); + insert_into_cache(MsgId, Msg); false -> %% it's not in the cache and %% we only have one reference @@ -537,7 +526,7 @@ read_message(MsgId, From, State = gen_server2:reply(From, {ok, Msg}), State1 end; - {Msg, _RefCount} -> + Msg -> gen_server2:reply(From, {ok, Msg}), State end @@ -559,18 +548,17 @@ contains_message(MsgId, From, State = #msstate { gc_active = GCActive }) -> end end. -remove_message(MsgId, State = #msstate { file_summary = FileSummary, - sum_valid_data = SumValid }) -> +remove_message(MsgId, State = #msstate { sum_valid_data = SumValid }) -> #msg_location { ref_count = RefCount, file = File, offset = Offset, total_size = TotalSize } = index_lookup(MsgId, State), case RefCount of 1 -> - ok = remove_cache_entry(MsgId, State), + ok = remove_cache_entry(MsgId), [FSEntry = #file_summary { valid_total_size = ValidTotalSize, contiguous_top = ContiguousTop, locked = Locked }] = - ets:lookup(FileSummary, File), + ets:lookup(?FILE_SUMMARY_ETS_NAME, File), case Locked of true -> add_to_pending_gc_completion({remove, MsgId}, State); @@ -578,15 +566,15 @@ remove_message(MsgId, State = #msstate { file_summary = FileSummary, ok = index_delete(MsgId, State), ContiguousTop1 = lists:min([ContiguousTop, Offset]), ValidTotalSize1 = ValidTotalSize - TotalSize, - true = ets:insert( - FileSummary, FSEntry #file_summary { - valid_total_size = ValidTotalSize1, - contiguous_top = ContiguousTop1 }), + true = ets:insert(?FILE_SUMMARY_ETS_NAME, + FSEntry #file_summary { + valid_total_size = ValidTotalSize1, + contiguous_top = ContiguousTop1 }), State1 = delete_file_if_empty(File, State), State1 #msstate { sum_valid_data = SumValid - TotalSize } end; _ when 1 < RefCount -> - ok = decrement_cache(MsgId, State), + ok = decrement_cache(MsgId), %% only update field, otherwise bad interaction with concurrent GC ok = index_update_fields(MsgId, {#msg_location.ref_count, RefCount - 1}, @@ -642,22 +630,27 @@ new_handle(Key, FileName, Mode, State = #msstate { file_handle_cache = FHC, %% message cache helper functions %%---------------------------------------------------------------------------- -remove_cache_entry(MsgId, #msstate { message_cache = Cache }) -> - true = ets:delete(Cache, MsgId), +remove_cache_entry(MsgId) -> + true = ets:delete(?CACHE_ETS_NAME, MsgId), ok. -fetch_and_increment_cache(MsgId, #msstate { message_cache = Cache }) -> - case ets:lookup(Cache, MsgId) of +fetch_and_increment_cache(MsgId) -> + case ets:lookup(?CACHE_ETS_NAME, MsgId) of [] -> not_found; - [{MsgId, Msg, _RefCount}] -> - NewRefCount = ets:update_counter(Cache, MsgId, {3, 1}), - {Msg, NewRefCount} + [{_MsgId, Msg, _RefCount}] -> + try + ets:update_counter(?CACHE_ETS_NAME, MsgId, {3, 1}) + catch error:badarg -> + %% someone has deleted us in the meantime, insert us + ok = insert_into_cache(MsgId, Msg) + end, + Msg end. -decrement_cache(MsgId, #msstate { message_cache = Cache }) -> - true = try case ets:update_counter(Cache, MsgId, {3, -1}) of - N when N =< 0 -> true = ets:delete(Cache, MsgId); +decrement_cache(MsgId) -> + true = try case ets:update_counter(?CACHE_ETS_NAME, MsgId, {3, -1}) of + N when N =< 0 -> true = ets:delete(?CACHE_ETS_NAME, MsgId); _N -> true end catch error:badarg -> @@ -668,9 +661,16 @@ decrement_cache(MsgId, #msstate { message_cache = Cache }) -> end, ok. -insert_into_cache(MsgId, Msg, #msstate { message_cache = Cache }) -> - true = ets:insert_new(Cache, {MsgId, Msg, 1}), - ok. +insert_into_cache(MsgId, Msg) -> + case ets:insert_new(?CACHE_ETS_NAME, {MsgId, Msg, 1}) of + true -> ok; + false -> try + ets:update_counter(?CACHE_ETS_NAME, MsgId, {3, 1}), + ok + catch error:badarg -> + insert_into_cache(MsgId, Msg) + end + end. %%---------------------------------------------------------------------------- %% index @@ -855,16 +855,15 @@ build_index(Files, State) -> {Offset, State1} = build_index(undefined, Files, State), {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)}. -build_index(Left, [], State = #msstate { file_summary = FileSummary }) -> +build_index(Left, [], State) -> ok = index_delete_by_file(undefined, State), - Offset = case ets:lookup(FileSummary, Left) of + Offset = case ets:lookup(?FILE_SUMMARY_ETS_NAME, Left) of [] -> 0; [#file_summary { file_size = FileSize }] -> FileSize end, {Offset, State #msstate { current_file = Left }}; build_index(Left, [File|Files], - State = #msstate { dir = Dir, file_summary = FileSummary, - sum_valid_data = SumValid, + State = #msstate { dir = Dir, sum_valid_data = SumValid, sum_file_size = SumFileSize }) -> {ok, Messages, FileSize} = rabbit_msg_store_misc:scan_file_for_valid_messages( @@ -899,10 +898,10 @@ build_index(Left, [File|Files], [F|_] -> {F, FileSize} end, true = - ets:insert_new(FileSummary, #file_summary { - file = File, valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, locked = false, - left = Left, right = Right, file_size = FileSize1 }), + ets:insert_new(?FILE_SUMMARY_ETS_NAME, #file_summary { + file = File, valid_total_size = ValidTotalSize, + contiguous_top = ContiguousTop, locked = false, + left = Left, right = Right, file_size = FileSize1 }), build_index(File, Files, State #msstate { sum_valid_data = SumValid + ValidTotalSize, sum_file_size = SumFileSize + FileSize1 }). @@ -914,8 +913,7 @@ build_index(Left, [File|Files], maybe_roll_to_new_file(Offset, State = #msstate { dir = Dir, current_file_handle = CurHdl, - current_file = CurFile, - file_summary = FileSummary }) + current_file = CurFile }) when Offset >= ?FILE_SIZE_LIMIT -> State1 = sync(State), ok = file_handle_cache:close(CurHdl), @@ -924,11 +922,11 @@ maybe_roll_to_new_file(Offset, Dir, rabbit_msg_store_misc:filenum_to_name(NextFile), ?WRITE_MODE), true = ets:insert_new( - FileSummary, #file_summary { - file = NextFile, valid_total_size = 0, contiguous_top = 0, - left = CurFile, right = undefined, file_size = 0, - locked = false }), - true = ets:update_element(FileSummary, CurFile, + ?FILE_SUMMARY_ETS_NAME, #file_summary { + file = NextFile, valid_total_size = 0, contiguous_top = 0, + left = CurFile, right = undefined, file_size = 0, + locked = false }), + true = ets:update_element(?FILE_SUMMARY_ETS_NAME, CurFile, {#file_summary.right, NextFile}), State1 #msstate { current_file_handle = NextHdl, current_file = NextFile }; @@ -937,19 +935,18 @@ maybe_roll_to_new_file(_, State) -> maybe_compact(State = #msstate { sum_valid_data = SumValid, sum_file_size = SumFileSize, - file_summary = FileSummary, gc_active = false }) when (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION -> - First = ets:first(FileSummary), + First = ets:first(?FILE_SUMMARY_ETS_NAME), N = random_distributions:geometric(?GEOMETRIC_P), - case find_files_to_gc(FileSummary, N, First) of + case find_files_to_gc(N, First) of undefined -> State; {Source, Dest} -> State1 = close_handle(Source, close_handle(Dest, State)), - true = ets:update_element(FileSummary, Source, + true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Source, {#file_summary.locked, true}), - true = ets:update_element(FileSummary, Dest, + true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Dest, {#file_summary.locked, true}), ok = rabbit_msg_store_gc:gc(Source, Dest), State1 #msstate { gc_active = {Source, Dest} } @@ -957,14 +954,13 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, maybe_compact(State) -> State. -find_files_to_gc(_FileSummary, _N, '$end_of_table') -> +find_files_to_gc(_N, '$end_of_table') -> undefined; -find_files_to_gc(FileSummary, N, First) -> +find_files_to_gc(N, First) -> [FirstObj = #file_summary { right = Right }] = - ets:lookup(FileSummary, First), - Pairs = - find_files_to_gc(FileSummary, N, FirstObj, - ets:lookup(FileSummary, Right), []), + ets:lookup(?FILE_SUMMARY_ETS_NAME, First), + Pairs = find_files_to_gc(N, FirstObj, + ets:lookup(?FILE_SUMMARY_ETS_NAME, Right), []), case Pairs of [] -> undefined; [Pair] -> Pair; @@ -972,9 +968,9 @@ find_files_to_gc(FileSummary, N, First) -> lists:nth(M, Pairs) end. -find_files_to_gc(_FileSummary, _N, #file_summary {}, [], Pairs) -> +find_files_to_gc(_N, #file_summary {}, [], Pairs) -> lists:reverse(Pairs); -find_files_to_gc(FileSummary, N, +find_files_to_gc(N, #file_summary { right = Source, file = Dest, valid_total_size = DestValid }, [SourceObj = #file_summary { left = Dest, right = SourceRight, @@ -985,22 +981,22 @@ find_files_to_gc(FileSummary, N, Pair = {Source, Dest}, case N == 1 of true -> [Pair]; - false -> find_files_to_gc(FileSummary, (N - 1), SourceObj, - ets:lookup(FileSummary, SourceRight), + false -> find_files_to_gc((N - 1), SourceObj, + ets:lookup(?FILE_SUMMARY_ETS_NAME, SourceRight), [Pair | Pairs]) end; -find_files_to_gc(FileSummary, N, _Left, +find_files_to_gc(N, _Left, [Right = #file_summary { right = RightRight }], Pairs) -> - find_files_to_gc(FileSummary, N, Right, - ets:lookup(FileSummary, RightRight), Pairs). + find_files_to_gc( + N, Right, ets:lookup(?FILE_SUMMARY_ETS_NAME, RightRight), Pairs). delete_file_if_empty(File, State = #msstate { current_file = File }) -> State; -delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary, - sum_file_size = SumFileSize } = State) -> +delete_file_if_empty(File, State = + #msstate { dir = Dir, sum_file_size = SumFileSize }) -> [#file_summary { valid_total_size = ValidData, file_size = FileSize, left = Left, right = Right, locked = false }] = - ets:lookup(FileSummary, File), + ets:lookup(?FILE_SUMMARY_ETS_NAME, File), case ValidData of %% we should NEVER find the current file in here hence right %% should always be a file, not undefined @@ -1008,15 +1004,15 @@ delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary, {undefined, _} when not is_atom(Right) -> %% the eldest file is empty. true = ets:update_element( - FileSummary, Right, + ?FILE_SUMMARY_ETS_NAME, Right, {#file_summary.left, undefined}); {_, _} when not is_atom(Right) -> - true = ets:update_element(FileSummary, Right, + true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Right, {#file_summary.left, Left}), - true = ets:update_element(FileSummary, Left, + true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Left, {#file_summary.right, Right}) end, - true = ets:delete(FileSummary, File), + true = ets:delete(?FILE_SUMMARY_ETS_NAME, File), State1 = close_handle(File, State), ok = file:delete(rabbit_msg_store_misc:form_filename( Dir, diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 729cd28715..1866e6297a 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -33,7 +33,7 @@ -behaviour(gen_server2). --export([start_link/4, gc/2, stop/0]). +-export([start_link/3, gc/2, stop/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -41,7 +41,6 @@ -record(gcstate, {dir, index_state, - file_summary, index_module }). @@ -51,9 +50,9 @@ %%---------------------------------------------------------------------------- -start_link(Dir, IndexState, FileSummary, IndexModule) -> +start_link(Dir, IndexState, IndexModule) -> gen_server2:start_link({local, ?SERVER}, ?MODULE, - [Dir, IndexState, FileSummary, IndexModule], + [Dir, IndexState, IndexModule], [{timeout, infinity}]). gc(Source, Destination) -> @@ -64,9 +63,9 @@ stop() -> %%---------------------------------------------------------------------------- -init([Dir, IndexState, FileSummary, IndexModule]) -> +init([Dir, IndexState, IndexModule]) -> {ok, #gcstate { dir = Dir, index_state = IndexState, - file_summary = FileSummary, index_module = IndexModule }, + index_module = IndexModule }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -89,23 +88,22 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -adjust_meta_and_combine(SourceFile, DestFile, - State = #gcstate { file_summary = FileSummary }) -> +adjust_meta_and_combine(SourceFile, DestFile, State) -> [SourceObj = #file_summary { valid_total_size = SourceValidData, left = DestFile, file_size = SourceFileSize, locked = true }] = - ets:lookup(FileSummary, SourceFile), + ets:lookup(?FILE_SUMMARY_ETS_NAME, SourceFile), [DestObj = #file_summary { valid_total_size = DestValidData, right = SourceFile, file_size = DestFileSize, locked = true }] = - ets:lookup(FileSummary, DestFile), + ets:lookup(?FILE_SUMMARY_ETS_NAME, DestFile), TotalValidData = DestValidData + SourceValidData, ok = combine_files(SourceObj, DestObj, State), %% don't update dest.right, because it could be changing at the same time true = - ets:update_element(FileSummary, DestFile, + ets:update_element(?FILE_SUMMARY_ETS_NAME, DestFile, [{#file_summary.valid_total_size, TotalValidData}, {#file_summary.contiguous_top, TotalValidData}, {#file_summary.file_size, TotalValidData}]), |
