diff options
| -rw-r--r-- | src/rabbit_msg_store.erl | 50 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 3 |
2 files changed, 34 insertions, 19 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index b9bffef6d6..0702cf3690 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -43,7 +43,7 @@ -define(SERVER, ?MODULE). --define(FILE_SIZE_LIMIT, (256*1024*1024)). +-define(FILE_SIZE_LIMIT, (16*1024*1024)). -define(SYNC_INTERVAL, 5). %% milliseconds -define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB @@ -622,13 +622,6 @@ index_delete(Key, #msstate { msg_locations = MsgLocations }) -> true = ets:delete(MsgLocations, Key), ok. -index_search_by_file(File, #msstate { msg_locations = MsgLocations }) -> - lists:sort(fun (#msg_location { offset = OffA }, - #msg_location { offset = OffB }) -> - OffA < OffB - end, ets:match_object(MsgLocations, - #msg_location { file = File, _ = '_' })). - index_delete_by_file(File, #msstate { msg_locations = MsgLocations }) -> MatchHead = #msg_location { file = File, _ = '_' }, ets:select_delete(MsgLocations, [{MatchHead, [], [true]}]), @@ -798,8 +791,7 @@ find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) -> {ExpectedOffset, MsgIds}. build_index([], State) -> - CurFile = State #msstate.current_file, - build_index(undefined, [CurFile], [], State); + build_index(undefined, [State #msstate.current_file], [], State); build_index(Files, State) -> build_index(undefined, Files, [], State). @@ -990,8 +982,6 @@ combine_files(#file_summary { file = Source, ok = truncate_and_extend_file(DestinationHdl, DestinationValid, ExpectedSize); true -> - Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, - {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), Worklist = lists:dropwhile( fun (#msg_location { offset = Offset }) @@ -1005,7 +995,9 @@ combine_files(#file_summary { file = Source, %% that the list should be naturally sorted %% as we require, however, we need to %% enforce it anyway - end, index_search_by_file(Destination, State1)), + end, find_unremoved_messages_in_file(Destination, State1)), + Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, + {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), ok = copy_messages( Worklist, DestinationContiguousTop, DestinationValid, DestinationHdl, TmpHdl, Destination, State1), @@ -1024,7 +1016,7 @@ combine_files(#file_summary { file = Source, ok = file_handle_cache:close(TmpHdl), ok = file:delete(form_filename(Dir, Tmp)) end, - SourceWorkList = index_search_by_file(Source, State1), + SourceWorkList = find_unremoved_messages_in_file(Source, State1), ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, SourceHdl, DestinationHdl, Destination, State1), %% tidy up @@ -1033,6 +1025,19 @@ combine_files(#file_summary { file = Source, ok = file:delete(form_filename(Dir, SourceName)), State1. +find_unremoved_messages_in_file(File, State = #msstate { dir = Dir }) -> + %% Msgs here will be end-of-file at start-of-list + {ok, Messages, _FileSize} = + scan_file_for_valid_messages(Dir, filenum_to_name(File)), + %% foldl will reverse so will end up with msgs in ascending offset order + lists:foldl( + fun ({MsgId, _TotalSize, _Offset}, Acc) -> + case index_lookup(MsgId, State) of + Entry = #msg_location { file = File } -> [ Entry | Acc ]; + _ -> Acc + end + end, [], Messages). + copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, Destination, State) -> {FinalOffset, BlockStart1, BlockEnd1} = @@ -1065,11 +1070,18 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, {NextOffset, Offset, Offset + TotalSize} end end, {InitOffset, undefined, undefined}, WorkList), - %% do the last remaining block - BSize1 = BlockEnd1 - BlockStart1, - {ok, BlockStart1} = file_handle_cache:position(SourceHdl, BlockStart1), - {ok, BSize1} = file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1), - ok = file_handle_cache:sync(DestinationHdl), + case WorkList of + [] -> + ok; + _ -> + %% do the last remaining block + BSize1 = BlockEnd1 - BlockStart1, + {ok, BlockStart1} = + file_handle_cache:position(SourceHdl, BlockStart1), + {ok, BSize1} = + file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1), + ok = file_handle_cache:sync(DestinationHdl) + end, ok. delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index dc81ea18b9..fe782049e5 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1005,6 +1005,9 @@ test_msg_store() -> %% this should force some sort of sync internally otherwise misread ok = msg_store_read(MsgIds1stHalf), ok = rabbit_msg_store:remove(MsgIds1stHalf), + %% restart empty + ok = stop_msg_store(), + ok = start_msg_store_empty(), %% now safe to reuse msg_ids %% push a lot of msgs in... BigCount = 100000, MsgIdsBig = lists:seq(1, BigCount), |
