diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-17 16:47:37 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-17 16:47:37 +0000 |
| commit | 219ac1df31cff229bcf94ad1e89ac54e727ff656 (patch) | |
| tree | 8a8c3b436fdb20506e105b18d78a6bcc845136ac | |
| parent | 78756f0f0e667a7045a4c88f1b4db0af3fc4311a (diff) | |
| download | rabbitmq-server-git-219ac1df31cff229bcf94ad1e89ac54e727ff656.tar.gz | |
Tidying of msg_store as per QA discussion with Matthias
| -rw-r--r-- | src/rabbit_msg_store.erl | 108 |
1 files changed, 53 insertions, 55 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index b42574c0f7..2ddb18262e 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -286,8 +286,54 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> {ok, State1 #msstate { current_file_handle = FileHdl }}. -handle_call({read, MsgId}, _From, State) -> - {Result, State1} = internal_read_message(MsgId, State), +handle_call({read, MsgId}, _From, State = + #msstate { current_file = CurFile, + current_file_handle = CurHdl }) -> + {Result, State1} = + case index_lookup(MsgId, State) of + not_found -> {not_found, State}; + #msg_location { ref_count = RefCount, + file = File, + offset = Offset, + total_size = TotalSize } -> + case fetch_and_increment_cache(MsgId, State) of + not_found -> + ok = case CurFile =:= File andalso {ok, Offset} >= + file_handle_cache:current_raw_offset(CurHdl) of + true -> + file_handle_cache:append_write_buffer( + CurHdl); + false -> + ok + end, + {Hdl, State2} = get_read_handle(File, State), + {ok, Offset} = file_handle_cache:position(Hdl, Offset), + {ok, {MsgId, Msg}} = + case rabbit_msg_file:read(Hdl, TotalSize) of + {ok, {MsgId, _}} = Obj -> Obj; + Rest -> + throw({error, {misread, [{old_state, State}, + {file_num, File}, + {offset, Offset}, + {read, Rest}, + {proc_dict, get()} + ]}}) + end, + ok = case RefCount > 1 of + true -> + insert_into_cache(MsgId, Msg, State2); + false -> + %% it's not in the cache and we + %% only have one reference to the + %% message. So don't bother + %% putting it in the cache. + ok + end, + {{ok, Msg}, State2}; + {Msg, _RefCount} -> + {{ok, Msg}, State} + end + end, reply(Result, State1); handle_call({contains, MsgId}, _From, State) -> @@ -477,55 +523,10 @@ remove_message(MsgId, State = #msstate { file_summary = FileSummary }) -> no_compact end. -internal_read_message(MsgId, - State = #msstate { current_file = CurFile, - current_file_handle = CurHdl }) -> - case index_lookup(MsgId, State) of - not_found -> {not_found, State}; - #msg_location { ref_count = RefCount, - file = File, - offset = Offset, - total_size = TotalSize } -> - case fetch_and_increment_cache(MsgId, State) of - not_found -> - {ok, CurOffset} = - file_handle_cache:current_raw_offset(CurHdl), - ok = case CurFile =:= File andalso Offset >= CurOffset of - true -> - file_handle_cache:append_write_buffer(CurHdl); - false -> - ok - end, - {Hdl, State1} = get_read_handle(File, State), - {ok, Offset} = file_handle_cache:position(Hdl, Offset), - {ok, {MsgId, Msg}} = - case rabbit_msg_file:read(Hdl, TotalSize) of - {ok, {MsgId, _}} = Obj -> Obj; - Rest -> - throw({error, {misread, [{old_state, State}, - {file_num, File}, - {offset, Offset}, - {read, Rest}, - {proc_dict, get()}]}}) - end, - ok = if RefCount > 1 -> - insert_into_cache(MsgId, Msg, State1); - true -> ok - %% it's not in the cache and we - %% only have one reference to the - %% message. So don't bother - %% putting it in the cache. - end, - {{ok, Msg}, State1}; - {Msg, _RefCount} -> - {{ok, Msg}, State} - end - end. - close_handle(Key, State = #msstate { file_handle_cache = FHC }) -> case dict:find(Key, FHC) of {ok, Hdl} -> - ok = close_file(Hdl), + ok = file_handle_cache:close(Hdl), State #msstate { file_handle_cache = dict:erase(Key, FHC) }; error -> State end. @@ -552,9 +553,6 @@ open_file(Dir, FileName, Mode) -> file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode, [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). -close_file(Hdl) -> - file_handle_cache:close(Hdl). - %%---------------------------------------------------------------------------- %% message cache helper functions %%---------------------------------------------------------------------------- @@ -850,7 +848,7 @@ maybe_roll_to_new_file(Offset, file_summary = FileSummary }) when Offset >= FileSizeLimit -> State1 = sync(State), - ok = close_file(CurHdl), + ok = file_handle_cache:close(CurHdl), NextFile = CurFile + 1, {ok, NextHdl} = open_file(Dir, filenum_to_name(NextFile), ?WRITE_MODE), true = ets:update_element(FileSummary, CurFile, @@ -993,15 +991,15 @@ combine_files(#file_summary { file = Source, file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), %% position in DestinationHdl should now be DestinationValid ok = file_handle_cache:sync(DestinationHdl), - ok = close_file(TmpHdl), + ok = file_handle_cache:close(TmpHdl), ok = file:delete(form_filename(Dir, Tmp)) end, SourceWorkList = index_search_by_file(Source, State1), ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, SourceHdl, DestinationHdl, Destination, State1), %% tidy up - ok = close_file(SourceHdl), - ok = close_file(DestinationHdl), + ok = file_handle_cache:close(SourceHdl), + ok = file_handle_cache:close(DestinationHdl), ok = file:delete(form_filename(Dir, SourceName)), State1. |
