summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-17 16:47:37 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-17 16:47:37 +0000
commit219ac1df31cff229bcf94ad1e89ac54e727ff656 (patch)
tree8a8c3b436fdb20506e105b18d78a6bcc845136ac
parent78756f0f0e667a7045a4c88f1b4db0af3fc4311a (diff)
downloadrabbitmq-server-git-219ac1df31cff229bcf94ad1e89ac54e727ff656.tar.gz
Tidying of msg_store as per QA discussion with Matthias
-rw-r--r--src/rabbit_msg_store.erl108
1 files changed, 53 insertions, 55 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index b42574c0f7..2ddb18262e 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -286,8 +286,54 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
{ok, State1 #msstate { current_file_handle = FileHdl }}.
-handle_call({read, MsgId}, _From, State) ->
- {Result, State1} = internal_read_message(MsgId, State),
+handle_call({read, MsgId}, _From, State =
+ #msstate { current_file = CurFile,
+ current_file_handle = CurHdl }) ->
+ {Result, State1} =
+ case index_lookup(MsgId, State) of
+ not_found -> {not_found, State};
+ #msg_location { ref_count = RefCount,
+ file = File,
+ offset = Offset,
+ total_size = TotalSize } ->
+ case fetch_and_increment_cache(MsgId, State) of
+ not_found ->
+ ok = case CurFile =:= File andalso {ok, Offset} >=
+ file_handle_cache:current_raw_offset(CurHdl) of
+ true ->
+ file_handle_cache:append_write_buffer(
+ CurHdl);
+ false ->
+ ok
+ end,
+ {Hdl, State2} = get_read_handle(File, State),
+ {ok, Offset} = file_handle_cache:position(Hdl, Offset),
+ {ok, {MsgId, Msg}} =
+ case rabbit_msg_file:read(Hdl, TotalSize) of
+ {ok, {MsgId, _}} = Obj -> Obj;
+ Rest ->
+ throw({error, {misread, [{old_state, State},
+ {file_num, File},
+ {offset, Offset},
+ {read, Rest},
+ {proc_dict, get()}
+ ]}})
+ end,
+ ok = case RefCount > 1 of
+ true ->
+ insert_into_cache(MsgId, Msg, State2);
+ false ->
+ %% it's not in the cache and we
+ %% only have one reference to the
+ %% message. So don't bother
+ %% putting it in the cache.
+ ok
+ end,
+ {{ok, Msg}, State2};
+ {Msg, _RefCount} ->
+ {{ok, Msg}, State}
+ end
+ end,
reply(Result, State1);
handle_call({contains, MsgId}, _From, State) ->
@@ -477,55 +523,10 @@ remove_message(MsgId, State = #msstate { file_summary = FileSummary }) ->
no_compact
end.
-internal_read_message(MsgId,
- State = #msstate { current_file = CurFile,
- current_file_handle = CurHdl }) ->
- case index_lookup(MsgId, State) of
- not_found -> {not_found, State};
- #msg_location { ref_count = RefCount,
- file = File,
- offset = Offset,
- total_size = TotalSize } ->
- case fetch_and_increment_cache(MsgId, State) of
- not_found ->
- {ok, CurOffset} =
- file_handle_cache:current_raw_offset(CurHdl),
- ok = case CurFile =:= File andalso Offset >= CurOffset of
- true ->
- file_handle_cache:append_write_buffer(CurHdl);
- false ->
- ok
- end,
- {Hdl, State1} = get_read_handle(File, State),
- {ok, Offset} = file_handle_cache:position(Hdl, Offset),
- {ok, {MsgId, Msg}} =
- case rabbit_msg_file:read(Hdl, TotalSize) of
- {ok, {MsgId, _}} = Obj -> Obj;
- Rest ->
- throw({error, {misread, [{old_state, State},
- {file_num, File},
- {offset, Offset},
- {read, Rest},
- {proc_dict, get()}]}})
- end,
- ok = if RefCount > 1 ->
- insert_into_cache(MsgId, Msg, State1);
- true -> ok
- %% it's not in the cache and we
- %% only have one reference to the
- %% message. So don't bother
- %% putting it in the cache.
- end,
- {{ok, Msg}, State1};
- {Msg, _RefCount} ->
- {{ok, Msg}, State}
- end
- end.
-
close_handle(Key, State = #msstate { file_handle_cache = FHC }) ->
case dict:find(Key, FHC) of
{ok, Hdl} ->
- ok = close_file(Hdl),
+ ok = file_handle_cache:close(Hdl),
State #msstate { file_handle_cache = dict:erase(Key, FHC) };
error -> State
end.
@@ -552,9 +553,6 @@ open_file(Dir, FileName, Mode) ->
file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
[{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
-close_file(Hdl) ->
- file_handle_cache:close(Hdl).
-
%%----------------------------------------------------------------------------
%% message cache helper functions
%%----------------------------------------------------------------------------
@@ -850,7 +848,7 @@ maybe_roll_to_new_file(Offset,
file_summary = FileSummary })
when Offset >= FileSizeLimit ->
State1 = sync(State),
- ok = close_file(CurHdl),
+ ok = file_handle_cache:close(CurHdl),
NextFile = CurFile + 1,
{ok, NextHdl} = open_file(Dir, filenum_to_name(NextFile), ?WRITE_MODE),
true = ets:update_element(FileSummary, CurFile,
@@ -993,15 +991,15 @@ combine_files(#file_summary { file = Source,
file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
%% position in DestinationHdl should now be DestinationValid
ok = file_handle_cache:sync(DestinationHdl),
- ok = close_file(TmpHdl),
+ ok = file_handle_cache:close(TmpHdl),
ok = file:delete(form_filename(Dir, Tmp))
end,
SourceWorkList = index_search_by_file(Source, State1),
ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
SourceHdl, DestinationHdl, Destination, State1),
%% tidy up
- ok = close_file(SourceHdl),
- ok = close_file(DestinationHdl),
+ ok = file_handle_cache:close(SourceHdl),
+ ok = file_handle_cache:close(DestinationHdl),
ok = file:delete(form_filename(Dir, SourceName)),
State1.