summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-12-17 13:08:01 +0000
committerMatthew Sackman <matthew@lshift.net>2009-12-17 13:08:01 +0000
commit719a8ff67308eb1b80a61ada6bfae7b7ef8da64b (patch)
tree976e6ee6ab8765f30db88cd108e79e18c8f87d3d
parent9542e4d24e3968ed973a549370a120b9d222ecde (diff)
downloadrabbitmq-server-git-719a8ff67308eb1b80a61ada6bfae7b7ef8da64b.tar.gz
Reworked the GC of msg_store so that it scans the files themselves for their content, rather than a select on ets. This bounds the time it can take (ets could have many billions of other entries in it), and also makes it simpler to make the msg_location pluggable => toke. Also reduce the msg file size to 16MB from 256MB as tests show that although max write speed drops (more fsyncs and fclose), the GC is much faster. This may go back up a bit when lazy+background GC arrives.
-rw-r--r--src/rabbit_msg_store.erl50
-rw-r--r--src/rabbit_tests.erl3
2 files changed, 34 insertions, 19 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index b9bffef6d6..0702cf3690 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -43,7 +43,7 @@
-define(SERVER, ?MODULE).
--define(FILE_SIZE_LIMIT, (256*1024*1024)).
+-define(FILE_SIZE_LIMIT, (16*1024*1024)).
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB
@@ -622,13 +622,6 @@ index_delete(Key, #msstate { msg_locations = MsgLocations }) ->
true = ets:delete(MsgLocations, Key),
ok.
-index_search_by_file(File, #msstate { msg_locations = MsgLocations }) ->
- lists:sort(fun (#msg_location { offset = OffA },
- #msg_location { offset = OffB }) ->
- OffA < OffB
- end, ets:match_object(MsgLocations,
- #msg_location { file = File, _ = '_' })).
-
index_delete_by_file(File, #msstate { msg_locations = MsgLocations }) ->
MatchHead = #msg_location { file = File, _ = '_' },
ets:select_delete(MsgLocations, [{MatchHead, [], [true]}]),
@@ -798,8 +791,7 @@ find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) ->
{ExpectedOffset, MsgIds}.
build_index([], State) ->
- CurFile = State #msstate.current_file,
- build_index(undefined, [CurFile], [], State);
+ build_index(undefined, [State #msstate.current_file], [], State);
build_index(Files, State) ->
build_index(undefined, Files, [], State).
@@ -990,8 +982,6 @@ combine_files(#file_summary { file = Source,
ok = truncate_and_extend_file(DestinationHdl,
DestinationValid, ExpectedSize);
true ->
- Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
- {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
Worklist =
lists:dropwhile(
fun (#msg_location { offset = Offset })
@@ -1005,7 +995,9 @@ combine_files(#file_summary { file = Source,
%% that the list should be naturally sorted
%% as we require, however, we need to
%% enforce it anyway
- end, index_search_by_file(Destination, State1)),
+ end, find_unremoved_messages_in_file(Destination, State1)),
+ Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
+ {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
ok = copy_messages(
Worklist, DestinationContiguousTop, DestinationValid,
DestinationHdl, TmpHdl, Destination, State1),
@@ -1024,7 +1016,7 @@ combine_files(#file_summary { file = Source,
ok = file_handle_cache:close(TmpHdl),
ok = file:delete(form_filename(Dir, Tmp))
end,
- SourceWorkList = index_search_by_file(Source, State1),
+ SourceWorkList = find_unremoved_messages_in_file(Source, State1),
ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
SourceHdl, DestinationHdl, Destination, State1),
%% tidy up
@@ -1033,6 +1025,19 @@ combine_files(#file_summary { file = Source,
ok = file:delete(form_filename(Dir, SourceName)),
State1.
+find_unremoved_messages_in_file(File, State = #msstate { dir = Dir }) ->
+ %% Msgs here will be end-of-file at start-of-list
+ {ok, Messages, _FileSize} =
+ scan_file_for_valid_messages(Dir, filenum_to_name(File)),
+ %% foldl will reverse so will end up with msgs in ascending offset order
+ lists:foldl(
+ fun ({MsgId, _TotalSize, _Offset}, Acc) ->
+ case index_lookup(MsgId, State) of
+ Entry = #msg_location { file = File } -> [ Entry | Acc ];
+ _ -> Acc
+ end
+ end, [], Messages).
+
copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
Destination, State) ->
{FinalOffset, BlockStart1, BlockEnd1} =
@@ -1065,11 +1070,18 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
{NextOffset, Offset, Offset + TotalSize}
end
end, {InitOffset, undefined, undefined}, WorkList),
- %% do the last remaining block
- BSize1 = BlockEnd1 - BlockStart1,
- {ok, BlockStart1} = file_handle_cache:position(SourceHdl, BlockStart1),
- {ok, BSize1} = file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1),
- ok = file_handle_cache:sync(DestinationHdl),
+ case WorkList of
+ [] ->
+ ok;
+ _ ->
+ %% do the last remaining block
+ BSize1 = BlockEnd1 - BlockStart1,
+ {ok, BlockStart1} =
+ file_handle_cache:position(SourceHdl, BlockStart1),
+ {ok, BSize1} =
+ file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1),
+ ok = file_handle_cache:sync(DestinationHdl)
+ end,
ok.
delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index dc81ea18b9..fe782049e5 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1005,6 +1005,9 @@ test_msg_store() ->
%% this should force some sort of sync internally otherwise misread
ok = msg_store_read(MsgIds1stHalf),
ok = rabbit_msg_store:remove(MsgIds1stHalf),
+ %% restart empty
+ ok = stop_msg_store(),
+ ok = start_msg_store_empty(), %% now safe to reuse msg_ids
%% push a lot of msgs in...
BigCount = 100000,
MsgIdsBig = lists:seq(1, BigCount),