diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-22 12:43:50 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-22 12:43:50 +0100 |
| commit | 659194e683096c9ae0e3758f6dd6b95cd72b2662 (patch) | |
| tree | 438c7884787e1a772e6365d594295bc33ffd3618 | |
| parent | 0056ad665a93ab048f6dce0c065ac7664d57f551 (diff) | |
| download | rabbitmq-server-git-659194e683096c9ae0e3758f6dd6b95cd72b2662.tar.gz | |
more reformatting and refactoring.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 558 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 79 |
2 files changed, 328 insertions, 309 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 190c06f00b..2952ca89ec 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -107,32 +107,32 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> process_flag(trap_exit, true), ok = filelib:ensure_dir(form_filename("nothing")), InitName = "0" ++ ?FILE_EXTENSION, - {ok, MsgLocation} - = dets:open_file(?MSG_LOC_DETS_NAME, - [{file, form_filename(atom_to_list(?MSG_LOC_DETS_NAME) - ++ ?FILE_EXTENSION_DETS)}, - {min_no_slots, 1024*1024}, - %% man says this should be <= 32M. But it works... - {max_no_slots, 1024*1024*1024}, - {type, set} - ]), - State - = #dqstate { msg_location = MsgLocation, - file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, - [set, private]), - sequences = ets:new(?SEQUENCE_ETS_NAME, - [set, private]), - current_file_num = 0, - current_file_name = InitName, - current_file_handle = undefined, - current_offset = 0, - file_size_limit = FileSizeLimit, - read_file_handles = {dict:new(), gb_trees:empty()}, - read_file_handles_limit = ReadFileHandlesLimit - }, + {ok, MsgLocation} = + dets:open_file(?MSG_LOC_DETS_NAME, + [{file, form_filename(atom_to_list(?MSG_LOC_DETS_NAME) ++ + ?FILE_EXTENSION_DETS)}, + {min_no_slots, 1024*1024}, + %% man says this should be <= 32M. But it works... + {max_no_slots, 1024*1024*1024}, + {type, set} + ]), + State = + #dqstate { msg_location = MsgLocation, + file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, + [set, private]), + sequences = ets:new(?SEQUENCE_ETS_NAME, + [set, private]), + current_file_num = 0, + current_file_name = InitName, + current_file_handle = undefined, + current_offset = 0, + file_size_limit = FileSizeLimit, + read_file_handles = {dict:new(), gb_trees:empty()}, + read_file_handles_limit = ReadFileHandlesLimit + }, {ok, State1 = #dqstate { current_file_name = CurrentName, - current_offset = Offset } } - = load_from_disk(State), + current_offset = Offset } } = + load_from_disk(State), Path = form_filename(CurrentName), %% read is only needed so that we can seek {ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]), @@ -149,8 +149,8 @@ handle_call(stop, _From, State) -> {stop, normal, ok, State}; %% gen_server now calls terminate handle_call(clean_stop, _From, State) -> State1 = #dqstate { file_summary = FileSummary, - sequences = Sequences } - = shutdown(State), %% tidy up file handles early + sequences = Sequences } = + shutdown(State), %% tidy up file handles early {atomic, ok} = mnesia:clear_table(rabbit_disk_queue), true = ets:delete(FileSummary), true = ets:delete(Sequences), @@ -185,8 +185,8 @@ shutdown(State = #dqstate { msg_location = MsgLocation, }) -> %% deliberately ignoring return codes here dets:close(MsgLocation), - file:delete(form_filename(atom_to_list(?MSG_LOC_DETS_NAME) - ++ ?FILE_EXTENSION_DETS)), + file:delete(form_filename(atom_to_list(?MSG_LOC_DETS_NAME) ++ + ?FILE_EXTENSION_DETS)), if FileHdl =:= undefined -> ok; true -> file:sync(FileHdl), file:close(FileHdl) @@ -210,53 +210,51 @@ base_directory() -> %% ---- INTERNAL RAW FUNCTIONS ---- -internal_deliver(Q, State - = #dqstate { msg_location = MsgLocation, - sequences = Sequences, - read_file_handles_limit = ReadFileHandlesLimit, - read_file_handles = {ReadHdls, ReadHdlsAge} - }) -> +internal_deliver(Q, State = + #dqstate { msg_location = MsgLocation, + sequences = Sequences, + read_file_handles_limit = ReadFileHandlesLimit, + read_file_handles = {ReadHdls, ReadHdlsAge} + }) -> case ets:lookup(Sequences, Q) of [] -> {ok, empty, State}; [{Q, ReadSeqId, WriteSeqId}] -> case mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}) of [] -> {ok, empty, State}; - [Obj - = #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] -> - [{MsgId, _RefCount, File, Offset, TotalSize}] - = dets:lookup(MsgLocation, MsgId), - {FileHdl, ReadHdls1, ReadHdlsAge1} - = case dict:find(File, ReadHdls) of - error -> - {ok, Hdl} = file:open(form_filename(File), - [read, raw, binary, - read_ahead]), - Now = now(), - case dict:size(ReadHdls) < ReadFileHandlesLimit of - true -> - {Hdl, - dict:store(File, {Hdl, Now}, ReadHdls), - gb_trees:enter(Now, File, ReadHdlsAge)}; - _False -> - {_Then, OldFile, ReadHdlsAge2} - = gb_trees:take_smallest(ReadHdlsAge), - {ok, {OldHdl, _Then}} - = dict:find(OldFile, ReadHdls), - ok = file:close(OldHdl), - ReadHdls2 = dict:erase(OldFile, ReadHdls), - {Hdl, - dict:store(File, {Hdl, Now}, ReadHdls2), - gb_trees:enter(Now, File, ReadHdlsAge2)} - end; - {ok, {Hdl, Then}} -> - Now = now(), - {Hdl, dict:store(File, {Hdl, Now}, ReadHdls), - gb_trees:enter(Now, File, - gb_trees:delete(Then, ReadHdlsAge))} - end, + [Obj = + #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] -> + [{MsgId, _RefCount, File, Offset, TotalSize}] = + dets:lookup(MsgLocation, MsgId), + Now = now(), + {FileHdl, ReadHdls1, ReadHdlsAge1} = + case dict:find(File, ReadHdls) of + error -> + {ok, Hdl} = file:open(form_filename(File), + [read, raw, binary, + read_ahead]), + {ReadHdls2, ReadHdlsAge2} = + case dict:size(ReadHdls) < ReadFileHandlesLimit of + true -> + {ReadHdls, ReadHdlsAge}; + _False -> + {_Then, OldFile, ReadHdlsAge3} = + gb_trees:take_smallest(ReadHdlsAge), + {ok, {OldHdl, _Then}} = + dict:find(OldFile, ReadHdls), + ok = file:close(OldHdl), + {dict:erase(OldFile, ReadHdls), + ReadHdlsAge3} + end, + {Hdl, dict:store(File, {Hdl, Now}, ReadHdls2), + gb_trees:enter(Now, File, ReadHdlsAge2)}; + {ok, {Hdl, Then}} -> + {Hdl, dict:store(File, {Hdl, Now}, ReadHdls), + gb_trees:enter(Now, File, + gb_trees:delete(Then, ReadHdlsAge))} + end, %% read the message - {ok, {MsgBody, BodySize}} - = read_message_at_offset(FileHdl, Offset, TotalSize), + {ok, {MsgBody, BodySize}} = + read_message_at_offset(FileHdl, Offset, TotalSize), if Delivered -> ok; true -> ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true}) @@ -278,35 +276,35 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete, file_summary = FileSummary, current_file_name = CurName }) -> - Files - = lists:foldl( - fun ({MsgId, SeqId}, Files2) -> - [{MsgId, RefCount, File, Offset, TotalSize}] - = dets:lookup(MsgLocation, MsgId), - Files3 - = if 1 =:= RefCount -> - ok = dets:delete(MsgLocation, MsgId), - [{File, ValidTotalSize, ContiguousTop, Left, Right}] - = ets:lookup(FileSummary, File), - ContiguousTop1 = lists:min([ContiguousTop, Offset]), - true = ets:insert(FileSummary, - {File, (ValidTotalSize - TotalSize - - ?FILE_PACKING_ADJUSTMENT), - ContiguousTop1, Left, Right}), - if CurName =:= File -> Files2; - true -> sets:add_element(File, Files2) - end; - 1 < RefCount -> - ok = dets:insert(MsgLocation, {MsgId, RefCount - 1, - File, Offset, TotalSize}), - Files2 - end, - if MnesiaDelete -> - ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}); - true -> ok - end, - Files3 - end, sets:new(), MsgSeqIds), + Files = + lists:foldl( + fun ({MsgId, SeqId}, Files2) -> + [{MsgId, RefCount, File, Offset, TotalSize}] = + dets:lookup(MsgLocation, MsgId), + Files3 = + if 1 =:= RefCount -> + ok = dets:delete(MsgLocation, MsgId), + [{File, ValidTotalSize, ContiguousTop, Left, Right}] = + ets:lookup(FileSummary, File), + ContiguousTop1 = lists:min([ContiguousTop, Offset]), + true = ets:insert(FileSummary, + {File, (ValidTotalSize - TotalSize + - ?FILE_PACKING_ADJUSTMENT), + ContiguousTop1, Left, Right}), + if CurName =:= File -> Files2; + true -> sets:add_element(File, Files2) + end; + 1 < RefCount -> + ok = dets:insert(MsgLocation, {MsgId, RefCount - 1, + File, Offset, TotalSize}), + Files2 + end, + if MnesiaDelete -> + ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}); + true -> ok + end, + Files3 + end, sets:new(), MsgSeqIds), State2 = compact(Files, State), {ok, State2}. @@ -323,8 +321,8 @@ internal_tx_publish(MsgId, MsgBody, {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody), true = dets:insert_new(MsgLocation, {MsgId, 1, CurName, CurOffset, TotalSize}), - [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] - = ets:lookup(FileSummary, CurName), + [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] = + ets:lookup(FileSummary, CurName), ValidTotalSize1 = ValidTotalSize + TotalSize + ?FILE_PACKING_ADJUSTMENT, ContiguousTop1 = if CurOffset =:= ContiguousTop -> @@ -349,28 +347,28 @@ internal_tx_commit(Q, MsgIds, current_file_handle = CurHdl, current_file_name = CurName, sequences = Sequences - }) -> - {ReadSeqId, InitWriteSeqId} - = case ets:lookup(Sequences, Q) of - [] -> {0,0}; - [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2} - end, - {atomic, {Sync, WriteSeqId}} - = mnesia:transaction( - fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), - lists:foldl( - fun (MsgId, {Acc, NextWriteSeqId}) -> - [{MsgId, _RefCount, File, _Offset, _TotalSize}] - = dets:lookup(MsgLocation, MsgId), - ok = mnesia:write(rabbit_disk_queue, - #dq_msg_loc { queue_and_seq_id - = {Q, NextWriteSeqId}, - msg_id = MsgId, - is_delivered = false}, - write), - {Acc or (CurName =:= File), NextWriteSeqId + 1} - end, {false, InitWriteSeqId}, MsgIds) - end), + }) -> + {ReadSeqId, InitWriteSeqId} = + case ets:lookup(Sequences, Q) of + [] -> {0,0}; + [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2} + end, + {atomic, {Sync, WriteSeqId}} = + mnesia:transaction( + fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), + lists:foldl( + fun (MsgId, {Acc, NextWriteSeqId}) -> + [{MsgId, _RefCount, File, _Offset, _TotalSize}] = + dets:lookup(MsgLocation, MsgId), + ok = mnesia:write(rabbit_disk_queue, + #dq_msg_loc { queue_and_seq_id = + {Q, NextWriteSeqId}, + msg_id = MsgId, + is_delivered = false}, + write), + {Acc or (CurName =:= File), NextWriteSeqId + 1} + end, {false, InitWriteSeqId}, MsgIds) + end), true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId}), if Sync -> ok = file:sync(CurHdl); true -> ok @@ -378,8 +376,8 @@ internal_tx_commit(Q, MsgIds, {ok, State}. internal_publish(Q, MsgId, MsgBody, State) -> - {ok, State1 = #dqstate { sequences = Sequences }} - = internal_tx_publish(MsgId, MsgBody, State), + {ok, State1 = #dqstate { sequences = Sequences }} = + internal_tx_publish(MsgId, MsgBody, State), WriteSeqId = case ets:lookup(Sequences, Q) of [] -> %% previously unseen queue true = ets:insert_new(Sequences, {Q, 0, 1}), @@ -447,42 +445,42 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit, case ets:lookup(FileSummary, File) of [] -> State; [FileObj = {File, ValidData, _ContiguousTop, Left, Right}] -> - GoRight - = fun() -> - case Right of - undefined -> State; - _ when not(CurName =:= Right) -> - [RightObj = {Right, RightValidData, - _RightContiguousTop, File, RightRight}] - = ets:lookup(FileSummary, Right), - RightSumData = ValidData + RightValidData, - if FileSizeLimit >= RightSumData -> - %% here, Right will be the source and so will be deleted, - %% File will be the destination - State1 = combineFiles(RightObj, FileObj, - State), - %% this could fail if RightRight is undefined - %% left is the 4th field - ets:update_element(FileSummary, - RightRight, {4, File}), - true = ets:insert(FileSummary, {File, - RightSumData, - RightSumData, - Left, - RightRight}), - true = ets:delete(FileSummary, Right), - State1; - true -> State - end; - _ -> State - end - end, + GoRight = + fun() -> + case Right of + undefined -> State; + _ when not(CurName =:= Right) -> + [RightObj = {Right, RightValidData, + _RightContiguousTop, File, RightRight}] = + ets:lookup(FileSummary, Right), + RightSumData = ValidData + RightValidData, + if FileSizeLimit >= RightSumData -> + %% here, Right will be the source and so will be deleted, + %% File will be the destination + State1 = combineFiles(RightObj, FileObj, + State), + %% this could fail if RightRight is undefined + %% left is the 4th field + ets:update_element(FileSummary, + RightRight, {4, File}), + true = ets:insert(FileSummary, {File, + RightSumData, + RightSumData, + Left, + RightRight}), + true = ets:delete(FileSummary, Right), + State1; + true -> State + end; + _ -> State + end + end, case Left of undefined -> GoRight(); - _ -> [LeftObj - = {Left, LeftValidData, _LeftContiguousTop, LeftLeft, File}] - = ets:lookup(FileSummary, Left), + _ -> [LeftObj = + {Left, LeftValidData, _LeftContiguousTop, LeftLeft, File}] = + ets:lookup(FileSummary, Left), LeftSumData = ValidData + LeftValidData, if FileSizeLimit >= LeftSumData -> %% here, File will be the source and so will be deleted, @@ -502,19 +500,27 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit, end end. +sortMsgLocationsByOffset(Asc, List) -> + Comp = if Asc -> fun(X, Y) -> X < Y end; + true -> fun(X, Y) -> X > Y end + end, + lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) -> + Comp(OffA, OffB) + end, List). + combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRight}, {Destination, DestinationValid, DestinationContiguousTop, _DestinationLeft, _DestinationRight}, State1) -> - (State = #dqstate { msg_location = MsgLocation }) - = closeFile(Source, closeFile(Destination, State1)), - {ok, SourceHdl} - = file:open(form_filename(Source), - [read, write, raw, binary, delayed_write, read_ahead]), - {ok, DestinationHdl} - = file:open(form_filename(Destination), - [read, write, raw, binary, delayed_write, read_ahead]), + (State = #dqstate { msg_location = MsgLocation }) = + closeFile(Source, closeFile(Destination, State1)), + {ok, SourceHdl} = + file:open(form_filename(Source), + [read, write, raw, binary, delayed_write, read_ahead]), + {ok, DestinationHdl} = + file:open(form_filename(Destination), + [read, write, raw, binary, delayed_write, read_ahead]), ExpectedSize = SourceValid + DestinationValid, %% if DestinationValid =:= DestinationContiguousTop then we don't need a tmp file %% if they're not equal, then we need to write out everything past the DestinationContiguousTop to a tmp file @@ -531,56 +537,54 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, {bof, DestinationValid}); true -> Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP, - {ok, TmpHdl} - = file:open(form_filename(Tmp), - [read, write, raw, binary, delayed_write, read_ahead]), - Worklist - = lists:dropwhile( - fun ({_, _, _, Offset, _}) - when Offset /= DestinationContiguousTop -> - %% it cannot be that Offset == DestinationContiguousTop - %% because if it was then DestinationContiguousTop would have been - %% extended by TotalSize - Offset < DestinationContiguousTop - %% Given expected access patterns, I suspect that the list should be - %% naturally sorted as we require, however, we need to enforce it anyway - end, lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) -> - OffA < OffB - end, - dets:match_object(MsgLocation, - {'_', '_', Destination, - '_', '_'}))), + {ok, TmpHdl} = + file:open(form_filename(Tmp), + [read, write, raw, binary, delayed_write, read_ahead]), + Worklist = + lists:dropwhile( + fun ({_, _, _, Offset, _}) + when Offset /= DestinationContiguousTop -> + %% it cannot be that Offset == DestinationContiguousTop + %% because if it was then DestinationContiguousTop would have been + %% extended by TotalSize + Offset < DestinationContiguousTop + %% Given expected access patterns, I suspect that the list should be + %% naturally sorted as we require, however, we need to enforce it anyway + end, sortMsgLocationsByOffset(true, + dets:match_object(MsgLocation, + {'_', '_', + Destination, + '_', '_'}))), TmpSize = DestinationValid - DestinationContiguousTop, - {TmpSize, BlockStart1, BlockEnd1} - = lists:foldl( - fun ({MsgId, RefCount, _Destination, Offset, TotalSize}, - {CurOffset, BlockStart, BlockEnd}) -> - %% CurOffset is in the TmpFile. - %% Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!) - Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, - %% this message is going to end up back in Destination, at DestinationContiguousTop + CurOffset - FinalOffset = DestinationContiguousTop + CurOffset, - ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination, - FinalOffset, TotalSize}), - - NextOffset = CurOffset + Size, - if BlockStart =:= undefined -> - %% base case, called only for the first list elem - {NextOffset, Offset, Offset + Size}; - Offset =:= BlockEnd -> - %% extend the current block because the next msg follows straight on - {NextOffset, BlockStart, BlockEnd + Size}; - true -> - %% found a gap, so actually do the work for the previous block - BSize = BlockEnd - BlockStart, - {ok, BlockStart} - = file:position(DestinationHdl, - {bof, BlockStart}), - {ok, BSize} = file:copy(DestinationHdl, - TmpHdl, BSize), - {NextOffset, Offset, Offset + Size} - end - end, {0, undefined, undefined}, Worklist), + {TmpSize, BlockStart1, BlockEnd1} = + lists:foldl( + fun ({MsgId, RefCount, _Destination, Offset, TotalSize}, + {CurOffset, BlockStart, BlockEnd}) -> + %% CurOffset is in the TmpFile. + %% Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!) + Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, + %% this message is going to end up back in Destination, at DestinationContiguousTop + CurOffset + FinalOffset = DestinationContiguousTop + CurOffset, + ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination, + FinalOffset, TotalSize}), + NextOffset = CurOffset + Size, + if BlockStart =:= undefined -> + %% base case, called only for the first list elem + {NextOffset, Offset, Offset + Size}; + Offset =:= BlockEnd -> + %% extend the current block because the next msg follows straight on + {NextOffset, BlockStart, BlockEnd + Size}; + true -> + %% found a gap, so actually do the work for the previous block + BSize = BlockEnd - BlockStart, + {ok, BlockStart} = + file:position(DestinationHdl, + {bof, BlockStart}), + {ok, BSize} = file:copy(DestinationHdl, + TmpHdl, BSize), + {NextOffset, Offset, Offset + Size} + end + end, {0, undefined, undefined}, Worklist), %% do the last remaining block BSize1 = BlockEnd1 - BlockStart1, {ok, BlockStart1} = file:position(DestinationHdl, {bof, BlockStart1}), @@ -589,56 +593,54 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, %% and MsgLocation has been updated to reflect compaction of Destination %% so truncate Destination and copy from Tmp back to the end {ok, 0} = file:position(TmpHdl, {bof, 0}), - {ok, DestinationContiguousTop} - = file:position(DestinationHdl, - {bof, DestinationContiguousTop}), + {ok, DestinationContiguousTop} = + file:position(DestinationHdl, + {bof, DestinationContiguousTop}), ok = file:truncate(DestinationHdl), - {ok, ExpectedSize} - = file:position(DestinationHdl, - {bof, ExpectedSize}), + {ok, ExpectedSize} = + file:position(DestinationHdl, + {bof, ExpectedSize}), ok = file:truncate(DestinationHdl), - {ok, DestinationContiguousTop} - = file:position(DestinationHdl, - {bof, DestinationContiguousTop}), + {ok, DestinationContiguousTop} = + file:position(DestinationHdl, + {bof, DestinationContiguousTop}), {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize), %% position in DestinationHdl should now be DestinationValid ok = file:sync(DestinationHdl), ok = file:close(TmpHdl), ok = file:delete(form_filename(Tmp)) end, - SourceWorkList - = lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) -> - OffA < OffB - end, dets:match_object(MsgLocation, {'_', '_', Source, + SourceWorkList = + sortMsgLocationsByOffset(true, dets:match_object(MsgLocation, + {'_', '_', Source, '_', '_'})), - {ExpectedSize, BlockStart2, BlockEnd2} - = lists:foldl(fun ({MsgId, RefCount, _Source, Offset, TotalSize}, - {CurOffset, BlockStart, BlockEnd}) -> - %% CurOffset is in the DestinationFile. - %% Offset, BlockStart and BlockEnd are in the SourceFile - Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, - %% update MsgLocation to reflect change of file and offset - ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination, - CurOffset, TotalSize}), - NextOffset = CurOffset + Size, - if BlockStart =:= undefined -> - %% base case, called only for the first list elem - {NextOffset, Offset, Offset + Size}; - Offset =:= BlockEnd -> - %% extend the current block because the next msg follows straight on - {NextOffset, BlockStart, BlockEnd + Size}; - true -> - %% found a gap, so actually do the work for the previous block - BSize = BlockEnd - BlockStart, - {ok, BlockStart} - = file:position(SourceHdl, - {bof, BlockStart}), - {ok, BSize} - = file:copy(SourceHdl, DestinationHdl, - BSize), - {NextOffset, Offset, Offset + Size} - end - end, {DestinationValid, undefined, undefined}, SourceWorkList), + {ExpectedSize, BlockStart2, BlockEnd2} = + lists:foldl( + fun ({MsgId, RefCount, _Source, Offset, TotalSize}, + {CurOffset, BlockStart, BlockEnd}) -> + %% CurOffset is in the DestinationFile. + %% Offset, BlockStart and BlockEnd are in the SourceFile + Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, + %% update MsgLocation to reflect change of file and offset + ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination, + CurOffset, TotalSize}), + NextOffset = CurOffset + Size, + if BlockStart =:= undefined -> + %% base case, called only for the first list elem + {NextOffset, Offset, Offset + Size}; + Offset =:= BlockEnd -> + %% extend the current block because the next msg follows straight on + {NextOffset, BlockStart, BlockEnd + Size}; + true -> + %% found a gap, so actually do the work for the previous block + BSize = BlockEnd - BlockStart, + {ok, BlockStart} = + file:position(SourceHdl, {bof, BlockStart}), + {ok, BSize} = + file:copy(SourceHdl, DestinationHdl, BSize), + {NextOffset, Offset, Offset + Size} + end + end, {DestinationValid, undefined, undefined}, SourceWorkList), %% do the last remaining block BSize2 = BlockEnd2 - BlockStart2, {ok, BlockStart2} = file:position(SourceHdl, {bof, BlockStart2}), @@ -650,20 +652,21 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, ok = file:delete(form_filename(Source)), State. -closeFile(File, State = #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge} }) -> +closeFile(File, State = #dqstate { read_file_handles = + {ReadHdls, ReadHdlsAge} }) -> case dict:find(File, ReadHdls) of error -> State; {ok, {Hdl, Then}} -> ok = file:close(Hdl), - State #dqstate { read_file_handles - = { dict:erase(File, ReadHdls), - gb_trees:delete(Then, ReadHdlsAge) } } + State #dqstate { read_file_handles = + { dict:erase(File, ReadHdls), + gb_trees:delete(Then, ReadHdlsAge) } } end. delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> - [{File, ValidData, _ContiguousTop, Left, Right}] - = ets:lookup(FileSummary, File), + [{File, ValidData, _ContiguousTop, Left, Right}] = + ets:lookup(FileSummary, File), case ValidData of %% we should NEVER find the current file in here %% hence right should always be a file, not undefined @@ -691,15 +694,15 @@ load_from_disk(State) -> {Files, TmpFiles} = get_disk_queue_files(), ok = recover_crashed_compactions(Files, TmpFiles), %% There should be no more tmp files now, so go ahead and load the whole lot - (State1 = #dqstate{ msg_location = MsgLocation }) - = load_messages(undefined, Files, State), + (State1 = #dqstate{ msg_location = MsgLocation }) = + load_messages(undefined, Files, State), %% Finally, check there is nothing in mnesia which we haven't loaded {atomic, true} = mnesia:transaction( fun() -> ok = mnesia:read_lock_table(rabbit_disk_queue), mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId }, true) -> - true = 1 - =:= length(dets:lookup(MsgLocation, MsgId)) + true = 1 =:= + length(dets:lookup(MsgLocation, MsgId)) end, true, rabbit_disk_queue) end), @@ -738,11 +741,8 @@ load_messages(Left, [], State = #dqstate { msg_location = MsgLocation }) -> Num = list_to_integer(filename:rootname(Left)), Offset = case dets:match_object(MsgLocation, {'_', '_', Left, '_', '_'}) of [] -> 0; - L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] - = lists:sort(fun ({_, _, _, OffA, _}, - {_, _, _, OffB, _}) -> - OffB < OffA - end, L), + L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] = + sortMsgLocationsByOffset(false, L), MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT end, State # dqstate { current_file_num = Num, current_file_name = Left, @@ -791,8 +791,8 @@ recover_crashed_compactions1(Files, TmpFile) -> NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION, true = lists:member(NonTmpRelatedFile, Files), %% [{MsgId, TotalSize, FileOffset}] - {ok, UncorruptedMessagesTmp} - = scan_file_for_valid_messages(form_filename(TmpFile)), + {ok, UncorruptedMessagesTmp} = + scan_file_for_valid_messages(form_filename(TmpFile)), MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp), %% all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out lists:foreach(fun (MsgId) -> @@ -802,8 +802,8 @@ recover_crashed_compactions1(Files, TmpFile) -> queue_and_seq_id = '_', is_delivered = '_'})) end, MsgIdsTmp), - {ok, UncorruptedMessages} - = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), + {ok, UncorruptedMessages} = + scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), MsgIds = lists:map(GrabMsgId, UncorruptedMessages), %% 1) It's possible that everything in the tmp file is also in the main file %% such that the main file is (prefix ++ tmpfile). This means that compaction @@ -865,8 +865,8 @@ recover_crashed_compactions1(Files, TmpFile) -> ok = file:close(TmpHdl), ok = file:delete(TmpFile), - {ok, MainMessages} - = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), + {ok, MainMessages} = + scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), MsgIdsMain = lists:map(GrabMsgId, MainMessages), %% check that everything in MsgIds is in MsgIdsMain true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end, @@ -981,8 +981,8 @@ read_next_file_entry(FileHdl, Offset) -> case file:position(FileHdl, {cur, TotalSize - MsgIdBinSize}) of {ok, ExpectedAbsPos} -> - NextOffset = Offset + TotalSize - + ?FILE_PACKING_ADJUSTMENT, + NextOffset = Offset + TotalSize + + ?FILE_PACKING_ADJUSTMENT, case file:read(FileHdl, 1) of {ok, <<?WRITE_OK:?WRITE_OK_SIZE_BITS>>} -> {ok, {ok, binary_to_term(MsgId), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a04c6f1bfb..cce9da1a68 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -685,7 +685,8 @@ delete_log_handlers(Handlers) -> test_disk_queue() -> % unicode chars are supported properly from r13 onwards io:format("Msg Count\t| Msg Size\t| Queue Count\t| Startup mu s\t| Publish mu s\t| Pub mu s/msg\t| Pub mu s/byte\t| Deliver mu s\t| Del mu s/msg\t| Del mu s/byte~n", []), - [begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize), timer:sleep(1000) end || % 1000 milliseconds + [begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize), + timer:sleep(1000) end || % 1000 milliseconds MsgSize <- [512, 8192, 32768, 131072], Qs <- [[1], lists:seq(1,10)], %, lists:seq(1,100), lists:seq(1,1000)], MsgCount <- [1024, 4096, 16384] @@ -700,20 +701,29 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> QCount = length(Qs), Msg = <<0:(8*MsgSizeBytes)>>, List = lists:seq(1, MsgCount), - {Publish, ok} = timer:tc(?MODULE, rdq_time_commands, - [[fun() -> [rabbit_disk_queue:tx_publish(N, Msg) || N <- List, _ <- Qs] end, - fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List) || Q <- Qs] end - ]]), - {Deliver, ok} = timer:tc(?MODULE, rdq_time_commands, - [[fun() -> [begin SeqIds = [begin {N, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(Q), SeqId end || N <- List], - rabbit_disk_queue:ack(Q, SeqIds), - ok = rabbit_disk_queue:tx_commit(Q, []) - end || Q <- Qs] - end]]), + {Publish, ok} = + timer:tc(?MODULE, rdq_time_commands, + [[fun() -> [rabbit_disk_queue:tx_publish(N, Msg) + || N <- List, _ <- Qs] end, + fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List) + || Q <- Qs] end + ]]), + {Deliver, ok} = + timer:tc(?MODULE, rdq_time_commands, + [[fun() -> [begin SeqIds = + [begin {N, Msg, MsgSizeBytes, false, SeqId} = + rabbit_disk_queue:deliver(Q), SeqId end + || N <- List], + rabbit_disk_queue:ack(Q, SeqIds), + ok = rabbit_disk_queue:tx_commit(Q, []) + end || Q <- Qs] + end]]), io:format(" ~15.10B| ~14.10B| ~14.10B| ~14.1f| ~14.1f| ~14.6f| ~14.10f| ~14.1f| ~14.6f| ~14.10f~n", [MsgCount, MsgSizeBytes, QCount, float(Startup), - float(Publish), (Publish / (MsgCount * QCount)), (Publish / (MsgCount * QCount * MsgSizeBytes)), - float(Deliver), (Deliver / (MsgCount * QCount)), (Deliver / (MsgCount * QCount * MsgSizeBytes))]), + float(Publish), (Publish / (MsgCount * QCount)), + (Publish / (MsgCount * QCount * MsgSizeBytes)), + float(Deliver), (Deliver / (MsgCount * QCount)), + (Deliver / (MsgCount * QCount * MsgSizeBytes))]), rdq_stop(). % we know each file is going to be 1024*1024*10 bytes in size (10MB), so make sure we have @@ -728,22 +738,30 @@ rdq_stress_gc(MsgCount) -> rabbit_disk_queue:tx_commit(q, List), StartChunk = round(MsgCount / 20), % 5% AckList = - lists:reverse(lists:foldl(fun (E, Acc) -> case Acc of - [] -> [E]; - [F|_Fs] -> - case E rem F of - 0 -> Acc; - _ -> [E|Acc] - end - end - end, [], lists:flatten([lists:seq(N,MsgCount,N) || N <- lists:seq(StartChunk,MsgCount)]))) - ++ lists:seq(1, (StartChunk - 1)), - MsgIdToSeqDict - = lists:foldl(fun (_, Acc) -> - {MsgId, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(q), - dict:store(MsgId, SeqId, Acc) - end, dict:new(), List), - rabbit_disk_queue:ack(q, [begin {ok, SeqId} = dict:find(MsgId, MsgIdToSeqDict), SeqId end || MsgId <- AckList]), + lists:reverse( + lists:foldl( + fun (E, Acc) -> + case Acc of + [] -> [E]; + [F|_Fs] -> + case E rem F of + 0 -> Acc; + _ -> [E|Acc] + end + end + end, [], lists:flatten([lists:seq(N,MsgCount,N) + || N <- lists:seq(StartChunk,MsgCount)]))) ++ + lists:seq(1, (StartChunk - 1)), + MsgIdToSeqDict = + lists:foldl( + fun (_, Acc) -> + {MsgId, Msg, MsgSizeBytes, false, SeqId} = + rabbit_disk_queue:deliver(q), + dict:store(MsgId, SeqId, Acc) + end, dict:new(), List), + rabbit_disk_queue:ack(q, [begin {ok, SeqId} = dict:find(MsgId, MsgIdToSeqDict), + SeqId end + || MsgId <- AckList]), rabbit_disk_queue:tx_commit(q, []), rdq_stop(), passed. @@ -752,7 +770,8 @@ rdq_time_commands(Funcs) -> lists:foreach(fun (F) -> F() end, Funcs). rdq_virgin() -> - {Micros, {ok, _}} = timer:tc(rabbit_disk_queue, start_link, [1024*1024*10, 1000]), + {Micros, {ok, _}} = + timer:tc(rabbit_disk_queue, start_link, [1024*1024*10, 1000]), ok = rabbit_disk_queue:clean_stop(), Micros. |
