diff options
| author | Matthias Radestock <matthias@lshift.net> | 2009-09-08 15:26:54 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2009-09-08 15:26:54 +0100 |
| commit | cd6aed9e02dd24c59a4ddd62184ee7b0e290622c (patch) | |
| tree | 0cf6c84181ee51fd77e2976181892861d65496c9 | |
| parent | 4c56adbe121efad7803bf33b3a713abfd35bcdca (diff) | |
| download | rabbitmq-server-git-cd6aed9e02dd24c59a4ddd62184ee7b0e290622c.tar.gz | |
refer to files by their number not their name
This makes matching faster and keeps record sizes smaller. It also
means we can get rid of one bit of state.
| -rw-r--r-- | src/rabbit_msg_store.erl | 172 |
1 files changed, 84 insertions, 88 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 241453c99f..d5959f9f38 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -43,8 +43,7 @@ msg_location_dets, %% where are messages? msg_location_ets, %% as above, but for ets version file_summary, %% what's in the files? - current_file_num, %% current file name as number - current_file_name, %% current file name + current_file, %% current file name as number current_file_handle, %% current file handle current_offset, %% current offset within current file current_dirty, %% has the current file been written to @@ -94,8 +93,7 @@ msg_location_dets :: dets_table(), msg_location_ets :: ets_table(), file_summary :: ets_table(), - current_file_num :: non_neg_integer(), - current_file_name :: file_path(), + current_file :: non_neg_integer(), current_file_handle :: io_device(), current_offset :: non_neg_integer(), current_dirty :: boolean(), @@ -278,7 +276,7 @@ init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, RefCountFun, MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected, {keypos, #msg_location.msg_id}]), - InitName = "0" ++ ?FILE_EXTENSION, + InitFile = 0, HandleCache = rabbit_file_handle_cache:init(ReadFileHandlesLimit, ?BINARY_MODE ++ [read]), FileSummary = ets:new(?FILE_SUMMARY_ETS_NAME, @@ -290,8 +288,7 @@ init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, RefCountFun, msg_location_dets = MsgLocationDets, msg_location_ets = MsgLocationEts, file_summary = FileSummary, - current_file_num = 0, - current_file_name = InitName, + current_file = InitFile, current_file_handle = undefined, current_offset = 0, current_dirty = false, @@ -302,26 +299,27 @@ init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, RefCountFun, ets_bytes_per_record = EtsBytesPerRecord }, - Files = + FileNames = sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)), - TmpFiles = + TmpFileNames = sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)), - ok = recover_crashed_compactions(RefCountFun, Dir, Files, TmpFiles), + ok = recover_crashed_compactions(RefCountFun, Dir, FileNames, TmpFileNames), %% There should be no more tmp files now, so go ahead and load the %% whole lot - State1 = #msstate { current_file_name = CurrentName, - current_offset = Offset } = + Files = [filename_to_num(FileName) || FileName <- FileNames], + State1 = #msstate { current_file = CurFile, current_offset = Offset } = load_messages(RefCountFun, Files, State), %% read is only needed so that we can seek - {ok, FileHdl} = open_file(Dir, CurrentName, ?WRITE_MODE ++ [read]), + {ok, FileHdl} = open_file(Dir, filenum_to_name(CurFile), + ?WRITE_MODE ++ [read]), {ok, Offset} = file:position(FileHdl, Offset), State1 #msstate { current_file_handle = FileHdl }. write(MsgId, Msg, Attrs, State = #msstate { current_file_handle = CurHdl, - current_file_name = CurName, + current_file = CurFile, current_offset = CurOffset, file_summary = FileSummary }) -> case dets_ets_lookup(State, MsgId) of @@ -330,13 +328,13 @@ write(MsgId, Msg, Attrs, {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg, Attrs), true = dets_ets_insert_new( State, #msg_location { - msg_id = MsgId, ref_count = 1, file = CurName, + msg_id = MsgId, ref_count = 1, file = CurFile, offset = CurOffset, total_size = TotalSize, attrs = Attrs }), [FSEntry = #file_summary { valid_total_size = ValidTotalSize, contiguous_top = ContiguousTop, right = undefined }] = - ets:lookup(FileSummary, CurName), + ets:lookup(FileSummary, CurFile), ValidTotalSize1 = ValidTotalSize + TotalSize, ContiguousTop1 = if CurOffset =:= ContiguousTop -> %% can't be any holes in this file @@ -380,7 +378,7 @@ read(MsgId, State) -> throw({error, {misread, [{old_state, State}, - {file, File}, + {file_num, File}, {offset, Offset}, {read, Rest}]}}) end, @@ -407,13 +405,13 @@ attrs(MsgId, State) -> [#msg_location { msg_id = MsgId, attrs = Attrs }] -> Attrs end. -remove(MsgIds, State = #msstate { current_file_name = CurName }) -> +remove(MsgIds, State = #msstate { current_file = CurFile }) -> compact(sets:to_list( lists:foldl( fun (MsgId, Files1) -> case remove_message(MsgId, State) of {compact, File} -> - if CurName =:= File -> Files1; + if CurFile =:= File -> Files1; true -> sets:add_element(File, Files1) end; no_compact -> Files1 @@ -427,8 +425,8 @@ release(MsgIds, State) -> needs_sync(_MsgIds, #msstate { current_dirty = false }) -> false; -needs_sync(MsgIds, State = #msstate { current_file_name = CurFile, - last_sync_offset = SyncOffset }) -> +needs_sync(MsgIds, State = #msstate { current_file = CurFile, + last_sync_offset = SyncOffset }) -> lists:any(fun (MsgId) -> [#msg_location { msg_id = MsgId, file = File, offset = Offset }] = @@ -520,21 +518,21 @@ to_ram_disk_mode(State = #msstate { operation_mode = disk_only, %% general helper functions %%---------------------------------------------------------------------------- -form_filename(Dir, Name) -> - filename:join(Dir, Name). +form_filename(Dir, Name) -> filename:join(Dir, Name). + +filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION. + +filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)). msg_location_dets_file(Dir) -> form_filename(Dir, atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS). -open_file(Dir, File, Mode) -> - file:open(form_filename(Dir, File), ?BINARY_MODE ++ Mode). +open_file(Dir, FileName, Mode) -> + file:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode). -sort_file_names(Files) -> - lists:sort(fun (A, B) -> - ANum = list_to_integer(filename:rootname(A)), - BNum = list_to_integer(filename:rootname(B)), - ANum < BNum - end, Files). +sort_file_names(FileNames) -> + lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end, + FileNames). preallocate(Hdl, FileSizeLimit, FinalPos) -> {ok, FileSizeLimit} = file:position(Hdl, FileSizeLimit), @@ -550,14 +548,14 @@ truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> with_read_handle_at(File, Offset, Fun, State = #msstate { dir = Dir, read_file_handle_cache = HC, - current_file_name = CurName, + current_file = CurFile, current_dirty = IsDirty, last_sync_offset = SyncOffset }) -> - State1 = if CurName =:= File andalso IsDirty andalso Offset >= SyncOffset -> + State1 = if CurFile == File andalso IsDirty andalso Offset >= SyncOffset -> sync(State); true -> State end, - FilePath = form_filename(Dir, File), + FilePath = form_filename(Dir, filenum_to_name(File)), {Result, HC1} = rabbit_file_handle_cache:with_file_handle_at(FilePath, Offset, Fun, HC), {Result, State1 #msstate { read_file_handle_cache = HC1 }}. @@ -672,24 +670,24 @@ dets_ets_match_object(#msstate { msg_location_ets = MsgLocationEts, %% recovery %%---------------------------------------------------------------------------- -recover_crashed_compactions(RefCountFun, Dir, Files, TmpFiles) -> - lists:foreach(fun (TmpFile) -> +recover_crashed_compactions(RefCountFun, Dir, FileNames, TmpFileNames) -> + lists:foreach(fun (TmpFileName) -> ok = recover_crashed_compactions1( - RefCountFun, Dir, Files, TmpFile) + RefCountFun, Dir, FileNames, TmpFileName) end, - TmpFiles), + TmpFileNames), ok. -recover_crashed_compactions1(RefCountFun, Dir, Files, TmpFile) -> - NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION, - true = lists:member(NonTmpRelatedFile, Files), +recover_crashed_compactions1(RefCountFun, Dir, FileNames, TmpFileName) -> + NonTmpRelatedFileName = filename:rootname(TmpFileName) ++ ?FILE_EXTENSION, + true = lists:member(NonTmpRelatedFileName, FileNames), {ok, UncorruptedMessagesTmp, MsgIdsTmp} = - scan_file_for_valid_messages_msg_ids(Dir, TmpFile), + scan_file_for_valid_messages_msg_ids(Dir, TmpFileName), %% all of these messages should be referenced %% otherwise they wouldn't have been copied out verify_messages_referenced(RefCountFun, MsgIdsTmp), {ok, UncorruptedMessages, MsgIds} = - scan_file_for_valid_messages_msg_ids(Dir, NonTmpRelatedFile), + scan_file_for_valid_messages_msg_ids(Dir, NonTmpRelatedFileName), %% 1) It's possible that everything in the tmp file is also in the %% main file such that the main file is (prefix ++ %% tmpfile). This means that compaction failed immediately @@ -716,7 +714,7 @@ recover_crashed_compactions1(RefCountFun, Dir, Files, TmpFile) -> %% consist only of valid messages. Plan: Truncate the main file %% back to before any of the files in the tmp file and copy %% them over again - TmpPath = form_filename(Dir, TmpFile), + TmpPath = form_filename(Dir, TmpFileName), case is_sublist(MsgIdsTmp, MsgIds) of true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file %% note this also catches the case when the tmp file @@ -750,7 +748,7 @@ recover_crashed_compactions1(RefCountFun, Dir, Files, TmpFile) -> %% are in the tmp file true = is_disjoint(MsgIds1, MsgIdsTmp), %% must open with read flag, otherwise will stomp over contents - {ok, MainHdl} = open_file(Dir, NonTmpRelatedFile, + {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName, ?WRITE_MODE ++ [read]), %% Wipe out any rubbish at the end of the file. Remember %% the head of the list will be the highest entry in the @@ -761,7 +759,7 @@ recover_crashed_compactions1(RefCountFun, Dir, Files, TmpFile) -> %% move. If we run out of disk space, this truncate could %% fail, but we still aren't risking losing data ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize), - {ok, TmpHdl} = open_file(Dir, TmpFile, ?READ_MODE), + {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_MODE), {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize), ok = file:sync(MainHdl), ok = file:close(MainHdl), @@ -769,7 +767,8 @@ recover_crashed_compactions1(RefCountFun, Dir, Files, TmpFile) -> ok = file:delete(TmpPath), {ok, _MainMessages, MsgIdsMain} = - scan_file_for_valid_messages_msg_ids(Dir, NonTmpRelatedFile), + scan_file_for_valid_messages_msg_ids( + Dir, NonTmpRelatedFileName), %% check that everything in MsgIds1 is in MsgIdsMain true = is_sublist(MsgIds1, MsgIdsMain), %% check that everything in MsgIdsTmp is in MsgIdsMain @@ -786,13 +785,13 @@ is_disjoint(SmallerL, BiggerL) -> verify_messages_referenced(RefCountFun, MsgIds) -> lists:foreach(fun (MsgId) -> false = RefCountFun(MsgId) == 0 end, MsgIds). -scan_file_for_valid_messages_msg_ids(Dir, File) -> - {ok, Messages} = scan_file_for_valid_messages(Dir, File), +scan_file_for_valid_messages_msg_ids(Dir, FileName) -> + {ok, Messages} = scan_file_for_valid_messages(Dir, FileName), {ok, Messages, [MsgId || {MsgId, _Attrs, _TotalSize, _FileOffset} <- Messages]}. -scan_file_for_valid_messages(Dir, File) -> - case open_file(Dir, File, ?READ_MODE) of +scan_file_for_valid_messages(Dir, FileName) -> + case open_file(Dir, FileName, ?READ_MODE) of {ok, Hdl} -> Valid = rabbit_msg_file:scan(Hdl), %% if something really bad's happened, the close could fail, @@ -800,7 +799,8 @@ scan_file_for_valid_messages(Dir, File) -> file:close(Hdl), Valid; {error, enoent} -> {ok, []}; - {error, Reason} -> throw({error, {unable_to_scan_file, File, Reason}}) + {error, Reason} -> throw({error, + {unable_to_scan_file, FileName, Reason}}) end. %% Takes the list in *ascending* order (i.e. eldest message @@ -820,13 +820,12 @@ find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) -> {ExpectedOffset, MsgIds}. load_messages(RefCountFun, [], State) -> - CurrentFile = State #msstate.current_file_name, - load_messages(RefCountFun, undefined, [CurrentFile], State); + CurFile = State #msstate.current_file, + load_messages(RefCountFun, undefined, [CurFile], State); load_messages(RefCountFun, Files, State) -> load_messages(RefCountFun, undefined, Files, State). load_messages(_RefCountFun, Left, [], State) -> - Num = list_to_integer(filename:rootname(Left)), Offset = case sort_msg_locations_by_offset(desc, Left, State) of [] -> 0; @@ -834,11 +833,10 @@ load_messages(_RefCountFun, Left, [], State) -> total_size = TotalSize } | _] -> MaxOffset + TotalSize end, - State #msstate { current_file_num = Num, current_file_name = Left, - current_offset = Offset }; + State #msstate { current_file = Left, current_offset = Offset }; load_messages(RefCountFun, Left, [File|Files], State = #msstate { dir = Dir, file_summary = FileSummary }) -> - {ok, Messages} = scan_file_for_valid_messages(Dir, File), + {ok, Messages} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), {ValidMessages, ValidTotalSize} = lists:foldl( fun (Obj = {MsgId, Attrs, TotalSize, Offset}, {VMAcc, VTSAcc}) -> case RefCountFun(MsgId) of @@ -873,36 +871,31 @@ load_messages(RefCountFun, Left, [File|Files], maybe_roll_to_new_file(Offset, State = #msstate { dir = Dir, file_size_limit = FileSizeLimit, - current_file_name = CurName, current_file_handle = CurHdl, - current_file_num = CurNum, - file_summary = FileSummary - } - ) when Offset >= FileSizeLimit -> + current_file = CurFile, + file_summary = FileSummary }) + when Offset >= FileSizeLimit -> State1 = sync(State), ok = file:close(CurHdl), - NextNum = CurNum + 1, - NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION, - {ok, NextHdl} = open_file(Dir, NextName, ?WRITE_MODE), - true = ets:update_element(FileSummary, CurName, - {#file_summary.right, NextName}), + NextFile = CurFile + 1, + {ok, NextHdl} = open_file(Dir, filenum_to_name(NextFile), ?WRITE_MODE), + true = ets:update_element(FileSummary, CurFile, + {#file_summary.right, NextFile}), true = ets:insert_new( FileSummary, #file_summary { - file = NextName, valid_total_size = 0, contiguous_top = 0, - left = CurName, right = undefined }), - State2 = State1 #msstate { current_file_name = NextName, - current_file_handle = NextHdl, - current_file_num = NextNum, - current_offset = 0, - last_sync_offset = 0 - }, - compact([CurName], State2); + file = NextFile, valid_total_size = 0, contiguous_top = 0, + left = CurFile, right = undefined }), + State2 = State1 #msstate { current_file_handle = NextHdl, + current_file = NextFile, + current_offset = 0, + last_sync_offset = 0 }, + compact([CurFile], State2); maybe_roll_to_new_file(_, State) -> State. compact(Files, State) -> %% smallest number, hence eldest, hence left-most, first - SortedFiles = sort_file_names(Files), + SortedFiles = lists:sort(Files), %% foldl reverses, so now youngest/right-most first RemainingFiles = lists:foldl(fun (File, Acc) -> @@ -920,7 +913,7 @@ compact(Files, State) -> %% we merge right then this file is the destination and the right file %% is the source. combine_file(File, State = #msstate { file_summary = FileSummary, - current_file_name = CurName }) -> + current_file = CurFile }) -> %% the file we're looking at may no longer exist as it may have %% been deleted within the current GC run case ets:lookup(FileSummary, File) of @@ -930,7 +923,7 @@ combine_file(File, State = #msstate { file_summary = FileSummary, fun() -> case Right of undefined -> State; - _ when not (CurName == Right) -> + _ when not (CurFile == Right) -> [FSRight] = ets:lookup(FileSummary, Right), {_, State1} = adjust_meta_and_combine( FSEntry, FSRight, State), @@ -991,9 +984,11 @@ combine_files(#file_summary { file = Source, contiguous_top = DestinationContiguousTop, right = Source }, State = #msstate { dir = Dir }) -> - State1 = close_file(Source, close_file(Destination, State)), - {ok, SourceHdl} = open_file(Dir, Source, ?READ_MODE), - {ok, DestinationHdl} = open_file(Dir, Destination, + SourceName = filenum_to_name(Source), + DestinationName = filenum_to_name(Destination), + State1 = close_file(SourceName, close_file(DestinationName, State)), + {ok, SourceHdl} = open_file(Dir, SourceName, ?READ_MODE), + {ok, DestinationHdl} = open_file(Dir, DestinationName, ?READ_MODE ++ ?WRITE_MODE), ExpectedSize = SourceValid + DestinationValid, %% if DestinationValid =:= DestinationContiguousTop then we don't @@ -1006,7 +1001,7 @@ combine_files(#file_summary { file = Source, ok = truncate_and_extend_file(DestinationHdl, DestinationValid, ExpectedSize); true -> - Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP, + Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_MODE ++ ?WRITE_MODE), Worklist = lists:dropwhile( @@ -1045,7 +1040,7 @@ combine_files(#file_summary { file = Source, %% tidy up ok = file:close(SourceHdl), ok = file:close(DestinationHdl), - ok = file:delete(form_filename(Dir, Source)), + ok = file:delete(form_filename(Dir, SourceName)), State1. copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, @@ -1087,8 +1082,9 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, ok = file:sync(DestinationHdl), ok. -close_file(File, State = #msstate { dir = Dir, read_file_handle_cache = HC }) -> - HC1 = rabbit_file_handle_cache:close_file(form_filename(Dir, File), HC), +close_file(FileName, + State = #msstate { dir = Dir, read_file_handle_cache = HC }) -> + HC1 = rabbit_file_handle_cache:close_file(form_filename(Dir, FileName), HC), State #msstate { read_file_handle_cache = HC1 }. delete_file_if_empty(File, @@ -1113,7 +1109,7 @@ delete_file_if_empty(File, {#file_summary.right, Right}) end, true = ets:delete(FileSummary, File), - ok = file:delete(form_filename(Dir, File)), + ok = file:delete(form_filename(Dir, filenum_to_name(File))), true; _ -> false end. |
