summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-09-08 15:26:54 +0100
committerMatthias Radestock <matthias@lshift.net>2009-09-08 15:26:54 +0100
commitcd6aed9e02dd24c59a4ddd62184ee7b0e290622c (patch)
tree0cf6c84181ee51fd77e2976181892861d65496c9
parent4c56adbe121efad7803bf33b3a713abfd35bcdca (diff)
downloadrabbitmq-server-git-cd6aed9e02dd24c59a4ddd62184ee7b0e290622c.tar.gz
refer to files by their number not their name
This makes matching faster and keeps record sizes smaller. It also means we can get rid of one bit of state.
-rw-r--r--src/rabbit_msg_store.erl172
1 files changed, 84 insertions, 88 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 241453c99f..d5959f9f38 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -43,8 +43,7 @@
msg_location_dets, %% where are messages?
msg_location_ets, %% as above, but for ets version
file_summary, %% what's in the files?
- current_file_num, %% current file name as number
- current_file_name, %% current file name
+ current_file, %% current file name as number
current_file_handle, %% current file handle
current_offset, %% current offset within current file
current_dirty, %% has the current file been written to
@@ -94,8 +93,7 @@
msg_location_dets :: dets_table(),
msg_location_ets :: ets_table(),
file_summary :: ets_table(),
- current_file_num :: non_neg_integer(),
- current_file_name :: file_path(),
+ current_file :: non_neg_integer(),
current_file_handle :: io_device(),
current_offset :: non_neg_integer(),
current_dirty :: boolean(),
@@ -278,7 +276,7 @@ init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, RefCountFun,
MsgLocationEts = ets:new(?MSG_LOC_NAME,
[set, protected, {keypos, #msg_location.msg_id}]),
- InitName = "0" ++ ?FILE_EXTENSION,
+ InitFile = 0,
HandleCache = rabbit_file_handle_cache:init(ReadFileHandlesLimit,
?BINARY_MODE ++ [read]),
FileSummary = ets:new(?FILE_SUMMARY_ETS_NAME,
@@ -290,8 +288,7 @@ init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, RefCountFun,
msg_location_dets = MsgLocationDets,
msg_location_ets = MsgLocationEts,
file_summary = FileSummary,
- current_file_num = 0,
- current_file_name = InitName,
+ current_file = InitFile,
current_file_handle = undefined,
current_offset = 0,
current_dirty = false,
@@ -302,26 +299,27 @@ init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, RefCountFun,
ets_bytes_per_record = EtsBytesPerRecord
},
- Files =
+ FileNames =
sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)),
- TmpFiles =
+ TmpFileNames =
sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)),
- ok = recover_crashed_compactions(RefCountFun, Dir, Files, TmpFiles),
+ ok = recover_crashed_compactions(RefCountFun, Dir, FileNames, TmpFileNames),
%% There should be no more tmp files now, so go ahead and load the
%% whole lot
- State1 = #msstate { current_file_name = CurrentName,
- current_offset = Offset } =
+ Files = [filename_to_num(FileName) || FileName <- FileNames],
+ State1 = #msstate { current_file = CurFile, current_offset = Offset } =
load_messages(RefCountFun, Files, State),
%% read is only needed so that we can seek
- {ok, FileHdl} = open_file(Dir, CurrentName, ?WRITE_MODE ++ [read]),
+ {ok, FileHdl} = open_file(Dir, filenum_to_name(CurFile),
+ ?WRITE_MODE ++ [read]),
{ok, Offset} = file:position(FileHdl, Offset),
State1 #msstate { current_file_handle = FileHdl }.
write(MsgId, Msg, Attrs,
State = #msstate { current_file_handle = CurHdl,
- current_file_name = CurName,
+ current_file = CurFile,
current_offset = CurOffset,
file_summary = FileSummary }) ->
case dets_ets_lookup(State, MsgId) of
@@ -330,13 +328,13 @@ write(MsgId, Msg, Attrs,
{ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg, Attrs),
true = dets_ets_insert_new(
State, #msg_location {
- msg_id = MsgId, ref_count = 1, file = CurName,
+ msg_id = MsgId, ref_count = 1, file = CurFile,
offset = CurOffset, total_size = TotalSize,
attrs = Attrs }),
[FSEntry = #file_summary { valid_total_size = ValidTotalSize,
contiguous_top = ContiguousTop,
right = undefined }] =
- ets:lookup(FileSummary, CurName),
+ ets:lookup(FileSummary, CurFile),
ValidTotalSize1 = ValidTotalSize + TotalSize,
ContiguousTop1 = if CurOffset =:= ContiguousTop ->
%% can't be any holes in this file
@@ -380,7 +378,7 @@ read(MsgId, State) ->
throw({error,
{misread,
[{old_state, State},
- {file, File},
+ {file_num, File},
{offset, Offset},
{read, Rest}]}})
end,
@@ -407,13 +405,13 @@ attrs(MsgId, State) ->
[#msg_location { msg_id = MsgId, attrs = Attrs }] -> Attrs
end.
-remove(MsgIds, State = #msstate { current_file_name = CurName }) ->
+remove(MsgIds, State = #msstate { current_file = CurFile }) ->
compact(sets:to_list(
lists:foldl(
fun (MsgId, Files1) ->
case remove_message(MsgId, State) of
{compact, File} ->
- if CurName =:= File -> Files1;
+ if CurFile =:= File -> Files1;
true -> sets:add_element(File, Files1)
end;
no_compact -> Files1
@@ -427,8 +425,8 @@ release(MsgIds, State) ->
needs_sync(_MsgIds, #msstate { current_dirty = false }) ->
false;
-needs_sync(MsgIds, State = #msstate { current_file_name = CurFile,
- last_sync_offset = SyncOffset }) ->
+needs_sync(MsgIds, State = #msstate { current_file = CurFile,
+ last_sync_offset = SyncOffset }) ->
lists:any(fun (MsgId) ->
[#msg_location { msg_id = MsgId, file = File,
offset = Offset }] =
@@ -520,21 +518,21 @@ to_ram_disk_mode(State = #msstate { operation_mode = disk_only,
%% general helper functions
%%----------------------------------------------------------------------------
-form_filename(Dir, Name) ->
- filename:join(Dir, Name).
+form_filename(Dir, Name) -> filename:join(Dir, Name).
+
+filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
+
+filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)).
msg_location_dets_file(Dir) ->
form_filename(Dir, atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS).
-open_file(Dir, File, Mode) ->
- file:open(form_filename(Dir, File), ?BINARY_MODE ++ Mode).
+open_file(Dir, FileName, Mode) ->
+ file:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode).
-sort_file_names(Files) ->
- lists:sort(fun (A, B) ->
- ANum = list_to_integer(filename:rootname(A)),
- BNum = list_to_integer(filename:rootname(B)),
- ANum < BNum
- end, Files).
+sort_file_names(FileNames) ->
+ lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end,
+ FileNames).
preallocate(Hdl, FileSizeLimit, FinalPos) ->
{ok, FileSizeLimit} = file:position(Hdl, FileSizeLimit),
@@ -550,14 +548,14 @@ truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
with_read_handle_at(File, Offset, Fun,
State = #msstate { dir = Dir,
read_file_handle_cache = HC,
- current_file_name = CurName,
+ current_file = CurFile,
current_dirty = IsDirty,
last_sync_offset = SyncOffset }) ->
- State1 = if CurName =:= File andalso IsDirty andalso Offset >= SyncOffset ->
+ State1 = if CurFile == File andalso IsDirty andalso Offset >= SyncOffset ->
sync(State);
true -> State
end,
- FilePath = form_filename(Dir, File),
+ FilePath = form_filename(Dir, filenum_to_name(File)),
{Result, HC1} =
rabbit_file_handle_cache:with_file_handle_at(FilePath, Offset, Fun, HC),
{Result, State1 #msstate { read_file_handle_cache = HC1 }}.
@@ -672,24 +670,24 @@ dets_ets_match_object(#msstate { msg_location_ets = MsgLocationEts,
%% recovery
%%----------------------------------------------------------------------------
-recover_crashed_compactions(RefCountFun, Dir, Files, TmpFiles) ->
- lists:foreach(fun (TmpFile) ->
+recover_crashed_compactions(RefCountFun, Dir, FileNames, TmpFileNames) ->
+ lists:foreach(fun (TmpFileName) ->
ok = recover_crashed_compactions1(
- RefCountFun, Dir, Files, TmpFile)
+ RefCountFun, Dir, FileNames, TmpFileName)
end,
- TmpFiles),
+ TmpFileNames),
ok.
-recover_crashed_compactions1(RefCountFun, Dir, Files, TmpFile) ->
- NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION,
- true = lists:member(NonTmpRelatedFile, Files),
+recover_crashed_compactions1(RefCountFun, Dir, FileNames, TmpFileName) ->
+ NonTmpRelatedFileName = filename:rootname(TmpFileName) ++ ?FILE_EXTENSION,
+ true = lists:member(NonTmpRelatedFileName, FileNames),
{ok, UncorruptedMessagesTmp, MsgIdsTmp} =
- scan_file_for_valid_messages_msg_ids(Dir, TmpFile),
+ scan_file_for_valid_messages_msg_ids(Dir, TmpFileName),
%% all of these messages should be referenced
%% otherwise they wouldn't have been copied out
verify_messages_referenced(RefCountFun, MsgIdsTmp),
{ok, UncorruptedMessages, MsgIds} =
- scan_file_for_valid_messages_msg_ids(Dir, NonTmpRelatedFile),
+ scan_file_for_valid_messages_msg_ids(Dir, NonTmpRelatedFileName),
%% 1) It's possible that everything in the tmp file is also in the
%% main file such that the main file is (prefix ++
%% tmpfile). This means that compaction failed immediately
@@ -716,7 +714,7 @@ recover_crashed_compactions1(RefCountFun, Dir, Files, TmpFile) ->
%% consist only of valid messages. Plan: Truncate the main file
%% back to before any of the files in the tmp file and copy
%% them over again
- TmpPath = form_filename(Dir, TmpFile),
+ TmpPath = form_filename(Dir, TmpFileName),
case is_sublist(MsgIdsTmp, MsgIds) of
true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file
%% note this also catches the case when the tmp file
@@ -750,7 +748,7 @@ recover_crashed_compactions1(RefCountFun, Dir, Files, TmpFile) ->
%% are in the tmp file
true = is_disjoint(MsgIds1, MsgIdsTmp),
%% must open with read flag, otherwise will stomp over contents
- {ok, MainHdl} = open_file(Dir, NonTmpRelatedFile,
+ {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
?WRITE_MODE ++ [read]),
%% Wipe out any rubbish at the end of the file. Remember
%% the head of the list will be the highest entry in the
@@ -761,7 +759,7 @@ recover_crashed_compactions1(RefCountFun, Dir, Files, TmpFile) ->
%% move. If we run out of disk space, this truncate could
%% fail, but we still aren't risking losing data
ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize),
- {ok, TmpHdl} = open_file(Dir, TmpFile, ?READ_MODE),
+ {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_MODE),
{ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize),
ok = file:sync(MainHdl),
ok = file:close(MainHdl),
@@ -769,7 +767,8 @@ recover_crashed_compactions1(RefCountFun, Dir, Files, TmpFile) ->
ok = file:delete(TmpPath),
{ok, _MainMessages, MsgIdsMain} =
- scan_file_for_valid_messages_msg_ids(Dir, NonTmpRelatedFile),
+ scan_file_for_valid_messages_msg_ids(
+ Dir, NonTmpRelatedFileName),
%% check that everything in MsgIds1 is in MsgIdsMain
true = is_sublist(MsgIds1, MsgIdsMain),
%% check that everything in MsgIdsTmp is in MsgIdsMain
@@ -786,13 +785,13 @@ is_disjoint(SmallerL, BiggerL) ->
verify_messages_referenced(RefCountFun, MsgIds) ->
lists:foreach(fun (MsgId) -> false = RefCountFun(MsgId) == 0 end, MsgIds).
-scan_file_for_valid_messages_msg_ids(Dir, File) ->
- {ok, Messages} = scan_file_for_valid_messages(Dir, File),
+scan_file_for_valid_messages_msg_ids(Dir, FileName) ->
+ {ok, Messages} = scan_file_for_valid_messages(Dir, FileName),
{ok, Messages,
[MsgId || {MsgId, _Attrs, _TotalSize, _FileOffset} <- Messages]}.
-scan_file_for_valid_messages(Dir, File) ->
- case open_file(Dir, File, ?READ_MODE) of
+scan_file_for_valid_messages(Dir, FileName) ->
+ case open_file(Dir, FileName, ?READ_MODE) of
{ok, Hdl} ->
Valid = rabbit_msg_file:scan(Hdl),
%% if something really bad's happened, the close could fail,
@@ -800,7 +799,8 @@ scan_file_for_valid_messages(Dir, File) ->
file:close(Hdl),
Valid;
{error, enoent} -> {ok, []};
- {error, Reason} -> throw({error, {unable_to_scan_file, File, Reason}})
+ {error, Reason} -> throw({error,
+ {unable_to_scan_file, FileName, Reason}})
end.
%% Takes the list in *ascending* order (i.e. eldest message
@@ -820,13 +820,12 @@ find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) ->
{ExpectedOffset, MsgIds}.
load_messages(RefCountFun, [], State) ->
- CurrentFile = State #msstate.current_file_name,
- load_messages(RefCountFun, undefined, [CurrentFile], State);
+ CurFile = State #msstate.current_file,
+ load_messages(RefCountFun, undefined, [CurFile], State);
load_messages(RefCountFun, Files, State) ->
load_messages(RefCountFun, undefined, Files, State).
load_messages(_RefCountFun, Left, [], State) ->
- Num = list_to_integer(filename:rootname(Left)),
Offset =
case sort_msg_locations_by_offset(desc, Left, State) of
[] -> 0;
@@ -834,11 +833,10 @@ load_messages(_RefCountFun, Left, [], State) ->
total_size = TotalSize } | _] ->
MaxOffset + TotalSize
end,
- State #msstate { current_file_num = Num, current_file_name = Left,
- current_offset = Offset };
+ State #msstate { current_file = Left, current_offset = Offset };
load_messages(RefCountFun, Left, [File|Files],
State = #msstate { dir = Dir, file_summary = FileSummary }) ->
- {ok, Messages} = scan_file_for_valid_messages(Dir, File),
+ {ok, Messages} = scan_file_for_valid_messages(Dir, filenum_to_name(File)),
{ValidMessages, ValidTotalSize} = lists:foldl(
fun (Obj = {MsgId, Attrs, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
case RefCountFun(MsgId) of
@@ -873,36 +871,31 @@ load_messages(RefCountFun, Left, [File|Files],
maybe_roll_to_new_file(Offset,
State = #msstate { dir = Dir,
file_size_limit = FileSizeLimit,
- current_file_name = CurName,
current_file_handle = CurHdl,
- current_file_num = CurNum,
- file_summary = FileSummary
- }
- ) when Offset >= FileSizeLimit ->
+ current_file = CurFile,
+ file_summary = FileSummary })
+ when Offset >= FileSizeLimit ->
State1 = sync(State),
ok = file:close(CurHdl),
- NextNum = CurNum + 1,
- NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION,
- {ok, NextHdl} = open_file(Dir, NextName, ?WRITE_MODE),
- true = ets:update_element(FileSummary, CurName,
- {#file_summary.right, NextName}),
+ NextFile = CurFile + 1,
+ {ok, NextHdl} = open_file(Dir, filenum_to_name(NextFile), ?WRITE_MODE),
+ true = ets:update_element(FileSummary, CurFile,
+ {#file_summary.right, NextFile}),
true = ets:insert_new(
FileSummary, #file_summary {
- file = NextName, valid_total_size = 0, contiguous_top = 0,
- left = CurName, right = undefined }),
- State2 = State1 #msstate { current_file_name = NextName,
- current_file_handle = NextHdl,
- current_file_num = NextNum,
- current_offset = 0,
- last_sync_offset = 0
- },
- compact([CurName], State2);
+ file = NextFile, valid_total_size = 0, contiguous_top = 0,
+ left = CurFile, right = undefined }),
+ State2 = State1 #msstate { current_file_handle = NextHdl,
+ current_file = NextFile,
+ current_offset = 0,
+ last_sync_offset = 0 },
+ compact([CurFile], State2);
maybe_roll_to_new_file(_, State) ->
State.
compact(Files, State) ->
%% smallest number, hence eldest, hence left-most, first
- SortedFiles = sort_file_names(Files),
+ SortedFiles = lists:sort(Files),
%% foldl reverses, so now youngest/right-most first
RemainingFiles =
lists:foldl(fun (File, Acc) ->
@@ -920,7 +913,7 @@ compact(Files, State) ->
%% we merge right then this file is the destination and the right file
%% is the source.
combine_file(File, State = #msstate { file_summary = FileSummary,
- current_file_name = CurName }) ->
+ current_file = CurFile }) ->
%% the file we're looking at may no longer exist as it may have
%% been deleted within the current GC run
case ets:lookup(FileSummary, File) of
@@ -930,7 +923,7 @@ combine_file(File, State = #msstate { file_summary = FileSummary,
fun() ->
case Right of
undefined -> State;
- _ when not (CurName == Right) ->
+ _ when not (CurFile == Right) ->
[FSRight] = ets:lookup(FileSummary, Right),
{_, State1} = adjust_meta_and_combine(
FSEntry, FSRight, State),
@@ -991,9 +984,11 @@ combine_files(#file_summary { file = Source,
contiguous_top = DestinationContiguousTop,
right = Source },
State = #msstate { dir = Dir }) ->
- State1 = close_file(Source, close_file(Destination, State)),
- {ok, SourceHdl} = open_file(Dir, Source, ?READ_MODE),
- {ok, DestinationHdl} = open_file(Dir, Destination,
+ SourceName = filenum_to_name(Source),
+ DestinationName = filenum_to_name(Destination),
+ State1 = close_file(SourceName, close_file(DestinationName, State)),
+ {ok, SourceHdl} = open_file(Dir, SourceName, ?READ_MODE),
+ {ok, DestinationHdl} = open_file(Dir, DestinationName,
?READ_MODE ++ ?WRITE_MODE),
ExpectedSize = SourceValid + DestinationValid,
%% if DestinationValid =:= DestinationContiguousTop then we don't
@@ -1006,7 +1001,7 @@ combine_files(#file_summary { file = Source,
ok = truncate_and_extend_file(DestinationHdl,
DestinationValid, ExpectedSize);
true ->
- Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP,
+ Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
{ok, TmpHdl} = open_file(Dir, Tmp, ?READ_MODE ++ ?WRITE_MODE),
Worklist =
lists:dropwhile(
@@ -1045,7 +1040,7 @@ combine_files(#file_summary { file = Source,
%% tidy up
ok = file:close(SourceHdl),
ok = file:close(DestinationHdl),
- ok = file:delete(form_filename(Dir, Source)),
+ ok = file:delete(form_filename(Dir, SourceName)),
State1.
copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
@@ -1087,8 +1082,9 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
ok = file:sync(DestinationHdl),
ok.
-close_file(File, State = #msstate { dir = Dir, read_file_handle_cache = HC }) ->
- HC1 = rabbit_file_handle_cache:close_file(form_filename(Dir, File), HC),
+close_file(FileName,
+ State = #msstate { dir = Dir, read_file_handle_cache = HC }) ->
+ HC1 = rabbit_file_handle_cache:close_file(form_filename(Dir, FileName), HC),
State #msstate { read_file_handle_cache = HC1 }.
delete_file_if_empty(File,
@@ -1113,7 +1109,7 @@ delete_file_if_empty(File,
{#file_summary.right, Right})
end,
true = ets:delete(FileSummary, File),
- ok = file:delete(form_filename(Dir, File)),
+ ok = file:delete(form_filename(Dir, filenum_to_name(File))),
true;
_ -> false
end.