summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl50
-rw-r--r--src/rabbit_tests.erl3
2 files changed, 34 insertions, 19 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index b9bffef6d6..0702cf3690 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -43,7 +43,7 @@
-define(SERVER, ?MODULE).
--define(FILE_SIZE_LIMIT, (256*1024*1024)).
+-define(FILE_SIZE_LIMIT, (16*1024*1024)).
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB
@@ -622,13 +622,6 @@ index_delete(Key, #msstate { msg_locations = MsgLocations }) ->
true = ets:delete(MsgLocations, Key),
ok.
-index_search_by_file(File, #msstate { msg_locations = MsgLocations }) ->
- lists:sort(fun (#msg_location { offset = OffA },
- #msg_location { offset = OffB }) ->
- OffA < OffB
- end, ets:match_object(MsgLocations,
- #msg_location { file = File, _ = '_' })).
-
index_delete_by_file(File, #msstate { msg_locations = MsgLocations }) ->
MatchHead = #msg_location { file = File, _ = '_' },
ets:select_delete(MsgLocations, [{MatchHead, [], [true]}]),
@@ -798,8 +791,7 @@ find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) ->
{ExpectedOffset, MsgIds}.
build_index([], State) ->
- CurFile = State #msstate.current_file,
- build_index(undefined, [CurFile], [], State);
+ build_index(undefined, [State #msstate.current_file], [], State);
build_index(Files, State) ->
build_index(undefined, Files, [], State).
@@ -990,8 +982,6 @@ combine_files(#file_summary { file = Source,
ok = truncate_and_extend_file(DestinationHdl,
DestinationValid, ExpectedSize);
true ->
- Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
- {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
Worklist =
lists:dropwhile(
fun (#msg_location { offset = Offset })
@@ -1005,7 +995,9 @@ combine_files(#file_summary { file = Source,
%% that the list should be naturally sorted
%% as we require, however, we need to
%% enforce it anyway
- end, index_search_by_file(Destination, State1)),
+ end, find_unremoved_messages_in_file(Destination, State1)),
+ Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
+ {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
ok = copy_messages(
Worklist, DestinationContiguousTop, DestinationValid,
DestinationHdl, TmpHdl, Destination, State1),
@@ -1024,7 +1016,7 @@ combine_files(#file_summary { file = Source,
ok = file_handle_cache:close(TmpHdl),
ok = file:delete(form_filename(Dir, Tmp))
end,
- SourceWorkList = index_search_by_file(Source, State1),
+ SourceWorkList = find_unremoved_messages_in_file(Source, State1),
ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
SourceHdl, DestinationHdl, Destination, State1),
%% tidy up
@@ -1033,6 +1025,19 @@ combine_files(#file_summary { file = Source,
ok = file:delete(form_filename(Dir, SourceName)),
State1.
+find_unremoved_messages_in_file(File, State = #msstate { dir = Dir }) ->
+ %% Msgs here will be end-of-file at start-of-list
+ {ok, Messages, _FileSize} =
+ scan_file_for_valid_messages(Dir, filenum_to_name(File)),
+ %% foldl will reverse so will end up with msgs in ascending offset order
+ lists:foldl(
+ fun ({MsgId, _TotalSize, _Offset}, Acc) ->
+ case index_lookup(MsgId, State) of
+ Entry = #msg_location { file = File } -> [ Entry | Acc ];
+ _ -> Acc
+ end
+ end, [], Messages).
+
copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
Destination, State) ->
{FinalOffset, BlockStart1, BlockEnd1} =
@@ -1065,11 +1070,18 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
{NextOffset, Offset, Offset + TotalSize}
end
end, {InitOffset, undefined, undefined}, WorkList),
- %% do the last remaining block
- BSize1 = BlockEnd1 - BlockStart1,
- {ok, BlockStart1} = file_handle_cache:position(SourceHdl, BlockStart1),
- {ok, BSize1} = file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1),
- ok = file_handle_cache:sync(DestinationHdl),
+ case WorkList of
+ [] ->
+ ok;
+ _ ->
+ %% do the last remaining block
+ BSize1 = BlockEnd1 - BlockStart1,
+ {ok, BlockStart1} =
+ file_handle_cache:position(SourceHdl, BlockStart1),
+ {ok, BSize1} =
+ file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1),
+ ok = file_handle_cache:sync(DestinationHdl)
+ end,
ok.
delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index dc81ea18b9..fe782049e5 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1005,6 +1005,9 @@ test_msg_store() ->
%% this should force some sort of sync internally otherwise misread
ok = msg_store_read(MsgIds1stHalf),
ok = rabbit_msg_store:remove(MsgIds1stHalf),
+ %% restart empty
+ ok = stop_msg_store(),
+ ok = start_msg_store_empty(), %% now safe to reuse msg_ids
%% push a lot of msgs in...
BigCount = 100000,
MsgIdsBig = lists:seq(1, BigCount),