diff options
| -rw-r--r-- | include/rabbit_msg_store.hrl | 7 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 281 |
2 files changed, 144 insertions, 144 deletions
diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl index a094454a78..4dff4a01b2 100644 --- a/include/rabbit_msg_store.hrl +++ b/include/rabbit_msg_store.hrl @@ -50,6 +50,7 @@ -define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB --define(FILE_SUMMARY_ETS_NAME, rabbit_msg_store_file_summary). --define(CACHE_ETS_NAME, rabbit_msg_store_cache). --define(FILE_HANDLES_ETS_NAME, rabbit_msg_store_file_handles). +-define(FILE_SUMMARY_ETS_NAME, rabbit_msg_store_file_summary). +-define(CACHE_ETS_NAME, rabbit_msg_store_cache). +-define(FILE_HANDLES_ETS_NAME, rabbit_msg_store_file_handles). +-define(CUR_FILE_CACHE_ETS_NAME, rabbit_msg_store_cur_file). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index f362d15d07..272db825eb 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -247,101 +247,13 @@ read(MsgId, CState) -> case index_lookup(MsgId, CState) of not_found -> Defer(); - #msg_location { ref_count = RefCount, - file = File, - offset = Offset, - total_size = TotalSize } -> + MsgLocation -> case fetch_and_increment_cache(MsgId) of - not_found -> - [#file_summary { locked = Locked, right = Right }] = - ets:lookup(?FILE_SUMMARY_ETS_NAME, File), - case Right =:= undefined orelse Locked =:= true of - true -> - Defer(); - false -> - ets:update_counter(?FILE_SUMMARY_ETS_NAME, File, - {#file_summary.readers, 1}), - Release = fun() -> - ets:update_counter( - ?FILE_SUMMARY_ETS_NAME, File, - {#file_summary.readers, -1}) - end, - %% If a GC hasn't already started, it - %% won't start now. Need to check again to - %% see if we've been locked in the - %% meantime, between lookup and - %% update_counter (thus GC actually in - %% progress). - [#file_summary { locked = Locked2 }] = - ets:lookup(?FILE_SUMMARY_ETS_NAME, File), - case Locked2 of - true -> - Release(), - Defer(); - false -> - %% Ok, we're definitely safe to - %% continue - a GC can't start up - %% now, and isn't running, so - %% nothing will tell us from now - %% on to close the handle if it's - %% already open. (Well, a GC could - %% start, and could put close - %% entries into the ets table, but - %% the GC will wait until we're - %% done here before doing any real - %% work.) - - %% This is fine to fail (already - %% exists) - ets:insert_new(?FILE_HANDLES_ETS_NAME, - {{self(), File}, open}), - CState1 = close_all_indicated(CState), - {Hdl, CState3} = - get_read_handle(File, CState1), - {ok, Offset} = - file_handle_cache:position(Hdl, Offset), - {ok, {MsgId, Msg}} = - case rabbit_msg_file:read(Hdl, TotalSize) of - {ok, {MsgId, _}} = Obj -> Obj; - Rest -> - throw({error, - {misread, - [{old_cstate, CState1}, - {file_num, File}, - {offset, Offset}, - {read, Rest}, - {proc_dict, get()} - ]}}) - end, - Release(), - ok = case RefCount > 1 of - true -> - insert_into_cache(MsgId, Msg); - false -> - %% It's not in the - %% cache and we only - %% have one reference - %% to the message. So - %% don't bother - %% putting it in the - %% cache. - ok - end, - {{ok, Msg}, CState3} - end - end; - Msg -> - {{ok, Msg}, CState} + not_found -> client_read1(MsgLocation, Defer, CState); + Msg -> {{ok, Msg}, CState} end end. -close_all_indicated(CState) -> - Objs = ets:match_object(?FILE_HANDLES_ETS_NAME, {{self(), '_'}, close}), - lists:foldl(fun ({Key = {_Self, File}, close}, CStateM) -> - true = ets:delete(?FILE_HANDLES_ETS_NAME, Key), - close_handle(File, CStateM) - end, CState, Objs). - contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity). remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}). release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}). @@ -364,6 +276,69 @@ client_terminate(CState) -> ok. %%---------------------------------------------------------------------------- +%% Client-side-only helpers +%%---------------------------------------------------------------------------- + +client_read1(#msg_location { msg_id = MsgId, ref_count = RefCount, file = File } + = MsgLocation, Defer, CState) -> + [#file_summary { locked = Locked, right = Right }] = + ets:lookup(?FILE_SUMMARY_ETS_NAME, File), + case {Right, Locked} of + {undefined, false} -> + case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of + [] -> + Defer(); %% may have rolled over + [{MsgId, Msg}] -> + ok = maybe_insert_into_cache(RefCount, MsgId, Msg), + {{ok, Msg}, CState} + end; + {_, true} -> + Defer(); + _ -> + ets:update_counter(?FILE_SUMMARY_ETS_NAME, File, + {#file_summary.readers, 1}), + Release = fun() -> + ets:update_counter(?FILE_SUMMARY_ETS_NAME, File, + {#file_summary.readers, -1}) + end, + %% If a GC hasn't already started, it won't start + %% now. Need to check again to see if we've been locked in + %% the meantime, between lookup and update_counter (thus + %% GC actually in progress). + [#file_summary { locked = Locked2 }] = + ets:lookup(?FILE_SUMMARY_ETS_NAME, File), + case Locked2 of + true -> + Release(), + Defer(); + false -> + %% Ok, we're definitely safe to continue - a GC + %% can't start up now, and isn't running, so + %% nothing will tell us from now on to close the + %% handle if it's already open. (Well, a GC could + %% start, and could put close entries into the ets + %% table, but the GC will wait until we're done + %% here before doing any real work.) + + %% This is fine to fail (already exists) + ets:insert_new(?FILE_HANDLES_ETS_NAME, + {{self(), File}, open}), + CState1 = close_all_indicated(CState), + {Msg, CState2} = read_from_disk(MsgLocation, CState1), + ok = maybe_insert_into_cache(RefCount, MsgId, Msg), + Release(), + {{ok, Msg}, CState2} + end + end. + +close_all_indicated(CState) -> + Objs = ets:match_object(?FILE_HANDLES_ETS_NAME, {{self(), '_'}, close}), + lists:foldl(fun ({Key = {_Self, File}, close}, CStateM) -> + true = ets:delete(?FILE_HANDLES_ETS_NAME, Key), + close_handle(File, CStateM) + end, CState, Objs). + +%%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -384,6 +359,8 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> ?CACHE_ETS_NAME = ets:new(?CACHE_ETS_NAME, [set, public, named_table]), ?FILE_HANDLES_ETS_NAME = ets:new(?FILE_HANDLES_ETS_NAME, [ordered_set, public, named_table]), + ?CUR_FILE_CACHE_ETS_NAME = ets:new(?CUR_FILE_CACHE_ETS_NAME, + [set, public, named_table]), State = #msstate { dir = Dir, index_module = IndexModule, @@ -444,6 +421,7 @@ handle_cast({write, MsgId, Msg}, case index_lookup(MsgId, State) of not_found -> %% New message, lots to do + true = ets:insert_new(?CUR_FILE_CACHE_ETS_NAME, {MsgId, Msg}), {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg), ok = index_insert(#msg_location { @@ -563,6 +541,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, ets:delete(?FILE_SUMMARY_ETS_NAME), ets:delete(?CACHE_ETS_NAME), ets:delete(?FILE_HANDLES_ETS_NAME), + ets:delete(?CUR_FILE_CACHE_ETS_NAME), IndexModule:terminate(IndexState), State3 #msstate { index_state = undefined, current_file_handle = undefined }. @@ -621,66 +600,80 @@ sync(State = #msstate { current_file_handle = CurHdl, State1 #msstate { on_sync = [] } end. -read_message(MsgId, From, State = - #msstate { current_file = CurFile, - current_file_handle = CurHdl }) -> +read_message(MsgId, From, State) -> case index_lookup(MsgId, State) of not_found -> gen_server2:reply(From, not_found), State; - #msg_location { ref_count = RefCount, - file = File, - offset = Offset, - total_size = TotalSize } -> + MsgLocation -> case fetch_and_increment_cache(MsgId) of not_found -> - [#file_summary { locked = Locked }] = - ets:lookup(?FILE_SUMMARY_ETS_NAME, File), - case Locked of - true -> - add_to_pending_gc_completion({read, MsgId, From}, - State); - false -> - ok = case CurFile =:= File andalso {ok, Offset} >= - file_handle_cache:current_raw_offset( - CurHdl) of - true -> file_handle_cache:flush(CurHdl); - false -> ok - end, - {Hdl, State1} = get_read_handle(File, State), - {ok, Offset} = - file_handle_cache:position(Hdl, Offset), - {ok, {MsgId, Msg}} = - case rabbit_msg_file:read(Hdl, TotalSize) of - {ok, {MsgId, _}} = Obj -> Obj; - Rest -> - throw({error, {misread, - [{old_state, State}, - {file_num, File}, - {offset, Offset}, - {read, Rest}, - {proc_dict, get()} - ]}}) - end, - ok = case RefCount > 1 of - true -> - insert_into_cache(MsgId, Msg); - false -> - %% it's not in the cache and - %% we only have one reference - %% to the message. So don't - %% bother putting it in the - %% cache. - ok - end, - gen_server2:reply(From, {ok, Msg}), - State1 - end; + read_message1(From, MsgLocation, State); Msg -> gen_server2:reply(From, {ok, Msg}), State end end. +read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount, + file = File, offset = Offset } = MsgLoc, + State = #msstate { current_file = CurFile, + current_file_handle = CurHdl }) -> + case File =:= CurFile of + true -> + {Msg, State1} = + %% can return [] if msg in file existed on startup + case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of + [] -> + ok = case {ok, Offset} >= + file_handle_cache:current_raw_offset(CurHdl) of + true -> file_handle_cache:flush(CurHdl); + false -> ok + end, + read_from_disk(MsgLoc, State); + [{MsgId, Msg1}] -> + {Msg1, State} + end, + ok = maybe_insert_into_cache(RefCount, MsgId, Msg), + gen_server2:reply(From, {ok, Msg}), + State1; + false -> + [#file_summary { locked = Locked }] = + ets:lookup(?FILE_SUMMARY_ETS_NAME, File), + case Locked of + true -> + add_to_pending_gc_completion({read, MsgId, From}, State); + false -> + {Msg, State1} = read_from_disk(MsgLoc, State), + gen_server2:reply(From, {ok, Msg}), + State1 + end + end. + +read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount, + file = File, offset = Offset, + total_size = TotalSize }, State) -> + {Hdl, State1} = get_read_handle(File, State), + {ok, Offset} = file_handle_cache:position(Hdl, Offset), + {ok, {MsgId, Msg}} = + case rabbit_msg_file:read(Hdl, TotalSize) of + {ok, {MsgId, _}} = Obj -> + Obj; + Rest -> + throw({error, {misread, [{old_state, State}, + {file_num, File}, + {offset, Offset}, + {read, Rest}, + {proc_dict, get()} + ]}}) + end, + ok = maybe_insert_into_cache(RefCount, MsgId, Msg), + {Msg, State1}. + +maybe_insert_into_cache(RefCount, MsgId, Msg) when RefCount > 1 -> + insert_into_cache(MsgId, Msg); +maybe_insert_into_cache(_RefCount, _MsgId, _Msg) -> + ok. + contains_message(MsgId, From, State = #msstate { gc_active = GCActive }) -> case index_lookup(MsgId, State) of not_found -> @@ -697,10 +690,15 @@ contains_message(MsgId, From, State = #msstate { gc_active = GCActive }) -> end end. -remove_message(MsgId, State = #msstate { sum_valid_data = SumValid }) -> +remove_message(MsgId, State = #msstate { sum_valid_data = SumValid, + current_file = CurFile }) -> #msg_location { ref_count = RefCount, file = File, offset = Offset, total_size = TotalSize } = index_lookup(MsgId, State), + true = case File =:= CurFile of + true -> ets:delete(?CUR_FILE_CACHE_ETS_NAME, MsgId); + false -> true + end, case RefCount of 1 -> ok = remove_cache_entry(MsgId), @@ -1104,6 +1102,7 @@ maybe_roll_to_new_file(Offset, locked = false, readers = 0 }), true = ets:update_element(?FILE_SUMMARY_ETS_NAME, CurFile, {#file_summary.right, NextFile}), + true = ets:delete_all_objects(?CUR_FILE_CACHE_ETS_NAME), State1 #msstate { current_file_handle = NextHdl, current_file = NextFile }; maybe_roll_to_new_file(_, State) -> |
