diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-11 01:05:09 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-11 01:05:09 +0100 |
| commit | b8b9896cf79bc62bc0e11da85e23922e024342de (patch) | |
| tree | 3d9c51efa8676087c16ae5c6ab023c74ddf80568 /src | |
| parent | 1882d139375274368a3ab2bffe3695c2d3f5c9bc (diff) | |
| download | rabbitmq-server-git-b8b9896cf79bc62bc0e11da85e23922e024342de.tar.gz | |
Mainly removing the unnecessary file:position in append.
This wasn't needed as I can track the position myself, and profiling showed that it is expensive (which I'd kinda been aware of before). Having removed it, things are quite a bit faster - though still CPU bound with smaller messages.
Next step is to convert dicts in state into ets tables.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 69 |
1 files changed, 39 insertions, 30 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 391c9b715c..8affd7aa96 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -50,6 +50,7 @@ -define(MSG_LOC_ETS_NAME, rabbit_disk_queue_msg_location). -define(FILE_EXTENSION, ".rdq"). -define(FILE_EXTENSION_TMP, ".rdt"). +-define(FILE_PACKING_ADJUSTMENT, 1 + (2* (?INTEGER_SIZE_BYTES))). -define(SERVER, ?MODULE). @@ -60,6 +61,7 @@ current_file_num, current_file_name, current_file_handle, + current_offset, file_size_limit, read_file_handles, read_file_handles_limit @@ -103,14 +105,16 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> current_file_num = 0, current_file_name = InitName, current_file_handle = undefined, + current_offset = 0, file_size_limit = FileSizeLimit, read_file_handles = {dict:new(), gb_trees:empty()}, read_file_handles_limit = ReadFileHandlesLimit }, - {ok, State1 = #dqstate { current_file_name = CurrentName } } = load_from_disk(State), + {ok, State1 = #dqstate { current_file_name = CurrentName, current_offset = Offset } } = load_from_disk(State), Path = form_filename(CurrentName), ok = filelib:ensure_dir(Path), - {ok, FileHdl} = file:open(Path, [append, raw, binary]), + {ok, FileHdl} = file:open(Path, [read, write, raw, binary]), %% read only needed so that we can seek + {ok, Offset} = file:position(FileHdl, {bof, Offset}), {ok, State1 # dqstate { current_file_handle = FileHdl }}. handle_call({deliver, Q, MsgId}, _From, State) -> @@ -159,21 +163,13 @@ form_filename(Name) -> base_directory() -> filename:join(mnesia:system_info(directory), "rabbit_disk_queue/"). -file_packing_adjustment_bytes() -> - 1 + (2* (?INTEGER_SIZE_BYTES)). - %% ---- INTERNAL RAW FUNCTIONS ---- internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation, - %current_file_handle = CurHdl, - %current_file_name = CurName, read_file_handles_limit = ReadFileHandlesLimit, read_file_handles = {ReadHdls, ReadHdlsAge} }) -> [{MsgId, _RefCount, File, Offset, _TotalSize}] = ets:lookup(MsgLocation, MsgId), - %if CurName =:= File -> ok = file:sync(CurHdl); % don't think this is necessary. Within a process you should always have a consistent view of a file - % true -> ok - %end, % so this next bit implements an LRU for file handles. But it's a bit insane, and smells % of premature optimisation. So I might remove it and dump it overboard {FileHdl, ReadHdls1, ReadHdlsAge1} @@ -198,7 +194,9 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation, % read the message {ok, {MsgBody, BodySize, _TotalSize}} = read_message_at_offset(FileHdl, Offset), [#dq_msg_loc {queue_and_msg_id = {MsgId, Q}, is_delivered = Delivered}] = mnesia:dirty_read(rabbit_disk_queue, {MsgId, Q}), - ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc {queue_and_msg_id = {MsgId, Q}, is_delivered = true}), + if Delivered -> ok; + true -> ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc {queue_and_msg_id = {MsgId, Q}, is_delivered = true}) + end, {ok, {MsgBody, BodySize, Delivered}, State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}. @@ -216,7 +214,7 @@ internal_ack(Q, MsgId, State = #dqstate { msg_location = MsgLocation, = dict:find(File, FileSummary), FileDetail1 = dict:erase(Offset, FileDetail), ContiguousTop1 = lists:min([ContiguousTop, Offset]), - FileSummary2 = dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - file_packing_adjustment_bytes()), + FileSummary2 = dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)), contiguous_prefix = ContiguousTop1, detail = FileDetail1 }, FileSummary), @@ -232,12 +230,12 @@ internal_ack(Q, MsgId, State = #dqstate { msg_location = MsgLocation, internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocation, current_file_handle = CurHdl, current_file_name = CurName, + current_offset = Offset, file_summary = FileSummary }) -> case ets:lookup(MsgLocation, MsgId) of [] -> % New message, lots to do - {ok, Offset} = file:position(CurHdl, cur), {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody), true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, Offset, TotalSize}), {ok, FileSum = #dqfile { valid_data = ValidTotalSize, @@ -246,7 +244,7 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio detail = FileDetail }} = dict:find(CurName, FileSummary), FileDetail1 = dict:store(Offset, TotalSize, FileDetail), - ValidTotalSize1 = ValidTotalSize + TotalSize + file_packing_adjustment_bytes(), + ValidTotalSize1 = ValidTotalSize + TotalSize + (?FILE_PACKING_ADJUSTMENT), ContiguousTop1 = if Offset =:= ContiguousTop -> ValidTotalSize; % can't be any holes in this file true -> ContiguousTop @@ -255,8 +253,10 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio contiguous_prefix = ContiguousTop1, detail = FileDetail1}, FileSummary), - maybe_roll_to_new_file(Offset + TotalSize + file_packing_adjustment_bytes(), - State # dqstate { file_summary = FileSummary1 }); + maybe_roll_to_new_file(Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT), + State # dqstate { file_summary = FileSummary1, + current_offset = Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT) + }); [{MsgId, RefCount, File, Offset, TotalSize}] -> % We already know about it, just update counter true = ets:insert(MsgLocation, {MsgId, RefCount + 1, File, Offset, TotalSize}), @@ -303,7 +303,7 @@ internal_tx_cancel(MsgIds, State = #dqstate { msg_location = MsgLocation, = dict:find(File, FileSummary2), FileDetail1 = dict:erase(Offset, FileDetail), ContiguousTop1 = lists:min([ContiguousTop, Offset]), - dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - file_packing_adjustment_bytes()), + dict:store(File, FileSum #dqfile { valid_data = (ValidTotalSize - TotalSize - (?FILE_PACKING_ADJUSTMENT)), contiguous_prefix = ContiguousTop1, detail = FileDetail1 }, FileSummary2); @@ -333,6 +333,7 @@ maybe_roll_to_new_file(Offset, State = #dqstate { file_size_limit = FileSizeLimi {ok, State # dqstate { current_file_name = NextName, current_file_handle = NextHdl, current_file_num = NextNum, + current_offset = 0, file_summary = dict:store(NextName, #dqfile { valid_data = 0, contiguous_prefix = 0, left = CurName, @@ -364,9 +365,14 @@ load_from_disk(State) -> load_messages(undefined, [], State) -> State; -load_messages(Left, [], State) -> +load_messages(Left, [], State = #dqstate { file_summary = Summary }) -> Num = list_to_integer(filename:rootname(Left)), - State # dqstate { current_file_num = Num, current_file_name = Left }; + {ok, #dqfile { detail = FileDetail }} = dict:find(Left, Summary), + Offset = dict:fold(fun (Offset1, TotalSize, Acc) -> + Acc1 = Offset1 + TotalSize + (?FILE_PACKING_ADJUSTMENT), + lists:max([Acc, Acc1]) + end, 0, FileDetail), + State # dqstate { current_file_num = Num, current_file_name = Left, current_offset = Offset }; load_messages(Left, [File|Files], State = #dqstate { msg_location = MsgLocation, file_summary = FileSummary @@ -380,7 +386,7 @@ load_messages(Left, [File|Files], RefCount -> true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}), {[{MsgId, TotalSize, Offset}|VMAcc], - VTSAcc + TotalSize + file_packing_adjustment_bytes(), + VTSAcc + TotalSize + (?FILE_PACKING_ADJUSTMENT), dict:store(Offset, TotalSize, FileDetail1) } end @@ -405,9 +411,12 @@ load_messages(Left, [File|Files], %% ---- DISK RECOVERY OF FAILED COMPACTION ---- -recover_crashed_compactions(_Files, []) -> - ok; -recover_crashed_compactions(Files, [TmpFile|TmpFiles]) -> +recover_crashed_compactions(Files, TmpFiles) -> + lists:foreach(fun (TmpFile) -> ok = recover_crashed_compactions1(Files, TmpFile) end, + TmpFiles), + ok. + +recover_crashed_compactions1(Files, TmpFile) -> GrabMsgId = fun ({MsgId, _TotalSize, _FileOffset}) -> MsgId end, NonTmpRelatedFile = filename:rootname(TmpFile) ++ (?FILE_EXTENSION), true = lists:member(NonTmpRelatedFile, Files), @@ -461,7 +470,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) -> % extending truncate. % Remember the head of the list will be the highest entry in the file [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp, - TmpSize = TmpTopOffset + TmpTopTotalSize + file_packing_adjustment_bytes(), + TmpSize = TmpTopOffset + TmpTopTotalSize + (?FILE_PACKING_ADJUSTMENT), ExpectedAbsPos = Top + TmpSize, {ok, ExpectedAbsPos} = file:position(MainHdl, {cur, TmpSize}), ok = file:truncate(MainHdl), % and now extend the main file as big as necessary in a single move @@ -480,7 +489,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) -> % check that everything in MsgIdsTmp is in MsgIdsMain true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end, MsgIdsTmp) end, - recover_crashed_compactions(Files, TmpFiles). + ok. % this assumes that the messages are ordered such that the highest address is at % the head of the list. @@ -488,7 +497,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) -> find_contiguous_block_prefix([]) -> {0, []}; find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail]) -> case find_contiguous_block_prefix(Tail, Offset, [MsgId]) of - {ok, Acc} -> {Offset + TotalSize + file_packing_adjustment_bytes(), lists:reverse(Acc)}; + {ok, Acc} -> {Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT), lists:reverse(Acc)}; Res -> Res end. find_contiguous_block_prefix([], 0, Acc) -> @@ -496,7 +505,7 @@ find_contiguous_block_prefix([], 0, Acc) -> find_contiguous_block_prefix([], _N, _Acc) -> {0, []}; find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail], ExpectedOffset, Acc) - when ExpectedOffset =:= Offset + TotalSize + 1 + (2* (?INTEGER_SIZE_BYTES)) -> %% Can't use file_packing_adjustment_bytes() + when ExpectedOffset =:= Offset + TotalSize + 1 + (2* (?INTEGER_SIZE_BYTES)) -> %% Can't use (?FILE_PACKING_ADJUSTMENT) find_contiguous_block_prefix(Tail, Offset, [MsgId|Acc]); find_contiguous_block_prefix(List, _ExpectedOffset, _Acc) -> find_contiguous_block_prefix(List). @@ -573,7 +582,7 @@ read_next_file_entry(FileHdl, Offset) -> case {TotalSize =:= 0, MsgIdBinSize =:= 0} of {true, _} -> {ok, eof}; %% Nothing we can do other than stop {false, true} -> %% current message corrupted, try skipping past it - ExpectedAbsPos = Offset + file_packing_adjustment_bytes() + TotalSize, + ExpectedAbsPos = Offset + (?FILE_PACKING_ADJUSTMENT) + TotalSize, case file:position(FileHdl, {cur, TotalSize + 1}) of {ok, ExpectedAbsPos} -> {ok, {corrupted, ExpectedAbsPos}}; {ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up @@ -588,9 +597,9 @@ read_next_file_entry(FileHdl, Offset) -> case file:read(FileHdl, 1) of {ok, <<(?WRITE_OK):(?WRITE_OK_SIZE_BITS)>>} -> {ok, {ok, binary_to_term(MsgId), TotalSize, - Offset + file_packing_adjustment_bytes() + TotalSize}}; + Offset + (?FILE_PACKING_ADJUSTMENT) + TotalSize}}; {ok, _SomeOtherData} -> - {ok, {corrupted, Offset + file_packing_adjustment_bytes() + TotalSize}}; + {ok, {corrupted, Offset + (?FILE_PACKING_ADJUSTMENT) + TotalSize}}; KO -> KO end; {ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up |
