summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-05-15 08:57:22 +0100
committerMatthias Radestock <matthias@lshift.net>2010-05-15 08:57:22 +0100
commitc8b86f5de876cb5ed859798d8628d67f3946d723 (patch)
treede28b3bd4b57ebcd54f26d5b19d8229894774596 /src
parent73d0ee7112079d2c8df0f97eb0873d7d1647e130 (diff)
downloadrabbitmq-server-git-c8b86f5de876cb5ed859798d8628d67f3946d723.tar.gz
minor refactoring and some cosmetic changes
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_store.erl140
-rw-r--r--src/rabbit_msg_store_gc.erl8
2 files changed, 71 insertions, 77 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 913cd65f82..21f1505804 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -460,8 +460,8 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
case index_lookup(Guid, CState) of
#msg_location { file = File } = MsgLocation ->
%% Still the same file.
- %% This is fine to fail (already exists)
- ets:insert_new(FileHandlesEts, {{self(), File}, open}),
+ mark_handle_open(FileHandlesEts, File),
+
CState1 = close_all_indicated(CState),
{Msg, CState2} = %% This will never be the current file
read_from_disk(MsgLocation, CState1, DedupCacheEts),
@@ -473,14 +473,6 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
end
end.
-close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } =
- CState) ->
- Objs = ets:match_object(FileHandlesEts, {{self(), '_'}, close}),
- lists:foldl(fun ({Key = {_Self, File}, close}, CStateM) ->
- true = ets:delete(FileHandlesEts, Key),
- close_handle(File, CStateM)
- end, CState, Objs).
-
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -958,6 +950,24 @@ close_handle(Key, FHC) ->
error -> FHC
end.
+mark_handle_open(FileHandlesEts, File) ->
+ %% This is fine to fail (already exists)
+ ets:insert_new(FileHandlesEts, {{self(), File}, open}),
+ true.
+
+mark_handle_to_close(FileHandlesEts, File) ->
+ [ ets:update_element(FileHandlesEts, Key, {2, close})
+ || {Key, open} <- ets:match_object(FileHandlesEts, {{'_', File}, open}) ],
+ true.
+
+close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } =
+ CState) ->
+ Objs = ets:match_object(FileHandlesEts, {{self(), '_'}, close}),
+ lists:foldl(fun ({Key = {_Self, File}, close}, CStateM) ->
+ true = ets:delete(FileHandlesEts, Key),
+ close_handle(File, CStateM)
+ end, CState, Objs).
+
close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts,
file_handle_cache = FHC }) ->
Self = self(),
@@ -1426,9 +1436,7 @@ maybe_roll_to_new_file(
State1 = internal_sync(State),
ok = file_handle_cache:close(CurHdl),
NextFile = CurFile + 1,
- {ok, NextHdl} = open_file(
- Dir, filenum_to_name(NextFile),
- ?WRITE_MODE),
+ {ok, NextHdl} = open_file(Dir, filenum_to_name(NextFile), ?WRITE_MODE),
true = ets:insert_new(FileSummaryEts, #file_summary {
file = NextFile,
valid_total_size = 0,
@@ -1476,11 +1484,6 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
maybe_compact(State) ->
State.
-mark_handle_to_close(FileHandlesEts, File) ->
- [ ets:update_element(FileHandlesEts, Key, {2, close})
- || {Key, open} <- ets:match_object(FileHandlesEts, {{'_', File}, open}) ],
- true.
-
find_files_to_gc(FileSummaryEts,
[#file_summary { file = Dst,
valid_total_size = DstValid,
@@ -1495,13 +1498,11 @@ find_files_to_gc(FileSummaryEts,
right = SrcRight }] = Next =
ets:lookup(FileSummaryEts, Src),
case SrcRight of
- undefined ->
- not_found;
- _ ->
- case DstValid + SrcValid =< ?FILE_SIZE_LIMIT of
- true -> {Src, Dst};
- false -> find_files_to_gc(FileSummaryEts, Next)
- end
+ undefined -> not_found;
+ _ -> case DstValid + SrcValid =< ?FILE_SIZE_LIMIT of
+ true -> {Src, Dst};
+ false -> find_files_to_gc(FileSummaryEts, Next)
+ end
end
end.
@@ -1536,9 +1537,7 @@ delete_file_if_empty(File, State = #msstate {
true = mark_handle_to_close(FileHandlesEts, File),
true = ets:delete(FileSummaryEts, File),
State1 = close_handle(File, State),
- ok = file:delete(form_filename(
- Dir,
- filenum_to_name(File))),
+ ok = file:delete(form_filename(Dir, filenum_to_name(File))),
State1 #msstate { sum_file_size = SumFileSize - FileSize };
_ -> State
end.
@@ -1550,20 +1549,17 @@ delete_file_if_empty(File, State = #msstate {
gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
[SrcObj = #file_summary {
readers = SrcReaders,
- valid_total_size = SrcValidData,
left = DstFile,
file_size = SrcFileSize,
locked = true }] = ets:lookup(FileSummaryEts, SrcFile),
[DstObj = #file_summary {
readers = DstReaders,
- valid_total_size = DstValidData,
right = SrcFile,
file_size = DstFileSize,
locked = true }] = ets:lookup(FileSummaryEts, DstFile),
case SrcReaders =:= 0 andalso DstReaders =:= 0 of
- true -> TotalValidData = DstValidData + SrcValidData,
- ok = combine_files(SrcObj, DstObj, State),
+ true -> TotalValidData = combine_files(SrcObj, DstObj, State),
%% don't update dest.right, because it could be
%% changing at the same time
true = ets:update_element(
@@ -1584,12 +1580,12 @@ combine_files(#file_summary { file = Source,
contiguous_top = DestinationContiguousTop,
right = Source },
State = {_FileSummaryEts, Dir, _Index, _IndexState}) ->
- SourceName = filenum_to_name(Source),
+ SourceName = filenum_to_name(Source),
DestinationName = filenum_to_name(Destination),
- {ok, SourceHdl} =
- open_file(Dir, SourceName, ?READ_AHEAD_MODE),
- {ok, DestinationHdl} =
- open_file(Dir, DestinationName, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
+ {ok, SourceHdl} = open_file(Dir, SourceName,
+ ?READ_AHEAD_MODE),
+ {ok, DestinationHdl} = open_file(Dir, DestinationName,
+ ?READ_AHEAD_MODE ++ ?WRITE_MODE),
ExpectedSize = SourceValid + DestinationValid,
%% if DestinationValid =:= DestinationContiguousTop then we don't
%% need a tmp file
@@ -1597,10 +1593,11 @@ combine_files(#file_summary { file = Source,
%% the DestinationContiguousTop to a tmp file then truncate,
%% copy back in, and then copy over from Source
%% otherwise we just truncate straight away and copy over from Source
- if DestinationContiguousTop =:= DestinationValid ->
+ case DestinationContiguousTop =:= DestinationValid of
+ true ->
ok = truncate_and_extend_file(
DestinationHdl, DestinationValid, ExpectedSize);
- true ->
+ false ->
{DestinationWorkList, DestinationValid} =
find_unremoved_messages_in_file(Destination, State),
Worklist =
@@ -1618,8 +1615,7 @@ combine_files(#file_summary { file = Source,
%% enforce it anyway
end, DestinationWorkList),
Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
- {ok, TmpHdl} = open_file(
- Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
+ {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
ok = copy_messages(
Worklist, DestinationContiguousTop, DestinationValid,
DestinationHdl, TmpHdl, Destination, State),
@@ -1644,11 +1640,11 @@ combine_files(#file_summary { file = Source,
%% tidy up
ok = file_handle_cache:close(DestinationHdl),
ok = file_handle_cache:delete(SourceHdl),
- ok.
+ ExpectedSize.
find_unremoved_messages_in_file(File,
{_FileSummaryEts, Dir, Index, IndexState}) ->
- %% Msgs here will be end-of-file at start-of-list
+ %% Messages here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
@@ -1663,11 +1659,18 @@ find_unremoved_messages_in_file(File,
copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
Destination, {_FileSummaryEts, _Dir, Index, IndexState}) ->
+ Copy = fun ({BlockStart, BlockEnd}) ->
+ BSize = BlockEnd - BlockStart,
+ {ok, BlockStart} =
+ file_handle_cache:position(SourceHdl, BlockStart),
+ {ok, BSize} =
+ file_handle_cache:copy(SourceHdl, DestinationHdl, BSize)
+ end,
case
lists:foldl(
fun (#msg_location { guid = Guid, offset = Offset,
total_size = TotalSize },
- {CurOffset, BlockStart, BlockEnd}) ->
+ {CurOffset, Block = {BlockStart, BlockEnd}}) ->
%% CurOffset is in the DestinationFile.
%% Offset, BlockStart and BlockEnd are in the SourceFile
%% update MsgLocation to reflect change of file and offset
@@ -1675,40 +1678,29 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
[{#msg_location.file, Destination},
{#msg_location.offset, CurOffset}],
IndexState),
- {BlockStart2, BlockEnd2} =
- if BlockStart =:= undefined ->
- %% base case, called only for the first list elem
- {Offset, Offset + TotalSize};
- Offset =:= BlockEnd ->
- %% extend the current block because the
- %% next msg follows straight on
- {BlockStart, BlockEnd + TotalSize};
- true ->
- %% found a gap, so actually do the work
- %% for the previous block
- BSize = BlockEnd - BlockStart,
- {ok, BlockStart} =
- file_handle_cache:position(SourceHdl,
- BlockStart),
- {ok, BSize} = file_handle_cache:copy(
- SourceHdl, DestinationHdl, BSize),
- {Offset, Offset + TotalSize}
- end,
- {CurOffset + TotalSize, BlockStart2, BlockEnd2}
- end, {InitOffset, undefined, undefined}, WorkList) of
- {FinalOffset, BlockStart1, BlockEnd1} ->
+ {CurOffset + TotalSize,
+ case BlockEnd of
+ undefined ->
+ %% base case, called only for the first list elem
+ {Offset, Offset + TotalSize};
+ Offset ->
+ %% extend the current block because the
+ %% next msg follows straight on
+ {BlockStart, BlockEnd + TotalSize};
+ _ ->
+ %% found a gap, so actually do the work for
+ %% the previous block
+ Copy(Block),
+ {Offset, Offset + TotalSize}
+ end}
+ end, {InitOffset, {undefined, undefined}}, WorkList) of
+ {FinalOffset, Block} ->
case WorkList of
[] -> ok;
- %% do the last remaining block
- _ -> BSize1 = BlockEnd1 - BlockStart1,
- {ok, BlockStart1} =
- file_handle_cache:position(SourceHdl, BlockStart1),
- {ok, BSize1} =
- file_handle_cache:copy(SourceHdl, DestinationHdl,
- BSize1),
+ _ -> Copy(Block), %% do the last remaining block
ok = file_handle_cache:sync(DestinationHdl)
end;
- {FinalOffsetZ, _BlockStart1, _BlockEnd1} ->
+ {FinalOffsetZ, _Block} ->
{gc_error, [{expected, FinalOffset},
{got, FinalOffsetZ},
{destination, Destination}]}
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 8a275c39d9..038d51c484 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -71,9 +71,11 @@ set_maximum_since_use(Pid, Age) ->
init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
- {ok, #gcstate { dir = Dir, index_state = IndexState,
- index_module = IndexModule, parent = Parent,
- file_summary_ets = FileSummaryEts},
+ {ok, #gcstate { dir = Dir,
+ index_state = IndexState,
+ index_module = IndexModule,
+ parent = Parent,
+ file_summary_ets = FileSummaryEts },
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.