diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-20 12:18:16 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-20 12:18:16 +0100 |
| commit | d1c8d63dc05b1f11cb99fe3c12bbb9cd70108ae2 (patch) | |
| tree | 4d857fd0b233ed83c2a7e5028ca3900abf2435df /src | |
| parent | 4fa8ba984fedb54df8f6760202540149a0a32488 (diff) | |
| download | rabbitmq-server-git-d1c8d63dc05b1f11cb99fe3c12bbb9cd70108ae2.tar.gz | |
removed file_detail ets table and converted all use into use of msg_location ets table.
Even though this is slightly less optimal because of the loss of doing index lookups in file_detail, this is actually slightly faster due to not having to maintain two tables. Performance:
Msg Count | Msg Size | Queue Count | Startup mu s | Publish mu s | Pub mu s/msg | Pub mu s/byte | Deliver mu s | Del mu s/msg | Del mu s/byte
1024| 512| 1| 2644.0| 41061.0| 40.098633| 0.0783176422| 156031.0| 152.374023| 0.2976055145
4096| 512| 1| 74843.0| 328683.0| 80.244873| 0.1567282677| 629441.0| 153.672119| 0.3001408577
16384| 512| 1| 373729.0| 3614155.0| 220.590515| 0.4308408499| 2969499.0| 181.243835| 0.3539918661
1024| 512| 10| 1605989.0| 281004.0| 27.441797| 0.0535972595| 1936168.0| 189.078906| 0.3692947388
4096| 512| 10| 85912.0| 2940291.0| 71.784448| 0.1402040005| 7662259.0| 187.066870| 0.3653649807
16384| 512| 10| 418213.0| 37962842.0| 231.706799| 0.4525523424| 32293492.0| 197.103833| 0.3849684238
1024| 8192| 1| 1347269.0| 144988.0| 141.589844| 0.0172839165| 173906.0| 169.830078| 0.0207312107
4096| 8192| 1| 93070.0| 606369.0| 148.039307| 0.0180712044| 829812.0| 202.590820| 0.0247303247
16384| 8192| 1| 20014.0| 4976009.0| 303.711487| 0.0370741561| 3211632.0| 196.022461| 0.0239285231
1024| 8192| 10| 77291.0| 348677.0| 34.050488| 0.0041565537| 1877374.0| 183.337305| 0.0223800421
4096| 8192| 10| 104842.0| 2722730.0| 66.472900| 0.0081143677| 7787817.0| 190.132251| 0.0232095033
16384| 8192| 10| 21746.0| 44301448.0| 270.394580| 0.0330071509| 32018244.0| 195.423853| 0.0238554507
1024| 32768| 1| 120732.0| 426700.0| 416.699219| 0.0127166510| 210704.0| 205.765625| 0.0062794685
4096| 32768| 1| 9355.0| 1925633.0| 470.125244| 0.0143470839| 824304.0| 201.246094| 0.0061415434
16384| 32768| 1| 14734.0| 10371560.0| 633.029785| 0.0193185359| 3594753.0| 219.406311| 0.0066957492
1024| 32768| 10| 6052.0| 629362.0| 61.461133| 0.0018756449| 2100901.0| 205.166113| 0.0062611729
4096| 32768| 10| 5546.0| 4203683.0| 102.628979| 0.0031319879| 8899536.0| 217.273828| 0.0066306710
16384| 32768| 10| 22657.0| 50306069.0| 307.043878| 0.0093702355| 36433817.0| 222.374371| 0.0067863273
1024| 131072| 1| 7155.0| 1913696.0| 1868.843750| 0.0142581463| 444638.0| 434.216797| 0.0033128113
4096| 131072| 1| 6671.0| 8232640.0| 2009.921875| 0.0153344870| 1907439.0| 465.683350| 0.0035528820
16384| 131072| 1| 1699.0| 33886514.0| 2068.268677| 0.0157796377| 7291762.0| 445.053833| 0.0033954913
1024| 131072| 10| 7506.0| 1991032.0| 194.436719| 0.0014834344| 4564850.0| 445.786133| 0.0034010783
4096| 131072| 10| 7486.0| 9551800.0| 233.198242| 0.0017791614| 18048697.0| 440.642017| 0.0033618318
16384| 131072| 10| 2771.0| 71072559.0| 433.792474| 0.0033095739| 81144745.0| 495.268219| 0.0037785966
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 66 |
1 files changed, 29 insertions, 37 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 4cd146edd9..a704ff2171 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -50,7 +50,6 @@ -define(INTEGER_SIZE_BYTES, 8). -define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). -define(MSG_LOC_ETS_NAME, rabbit_disk_queue_msg_location). --define(FILE_DETAIL_ETS_NAME, rabbit_disk_queue_file_detail). -define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). -define(FILE_EXTENSION, ".rdq"). -define(FILE_EXTENSION_TMP, ".rdt"). @@ -60,7 +59,6 @@ -record(dqstate, {msg_location, file_summary, - file_detail, current_file_num, current_file_name, current_file_handle, @@ -106,7 +104,6 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> InitName = "0" ++ ?FILE_EXTENSION, State = #dqstate { msg_location = ets:new(?MSG_LOC_ETS_NAME, [set, private]), file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, [set, private]), - file_detail = ets:new(?FILE_DETAIL_ETS_NAME, [ordered_set, private]), current_file_num = 0, current_file_name = InitName, current_file_handle = undefined, @@ -133,13 +130,11 @@ handle_call(stop, _From, State) -> {stop, normal, ok, State}; %% gen_server now calls terminate handle_call(clean_stop, _From, State) -> State1 = #dqstate { msg_location = MsgLocation, - file_summary = FileSummary, - file_detail = FileDetail } + file_summary = FileSummary } = shutdown(State), %% tidy up file handles early {atomic, ok} = mnesia:clear_table(rabbit_disk_queue), true = ets:delete(MsgLocation), true = ets:delete(FileSummary), - true = ets:delete(FileDetail), lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))), {stop, normal, ok, State1 # dqstate { current_file_handle = undefined, read_file_handles = {dict:new(), gb_trees:empty()}}}. @@ -234,7 +229,6 @@ internal_ack(Q, MsgIds, State) -> %% Q is only needed if MnesiaDelete = true remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation, file_summary = FileSummary, - file_detail = FileDetail, current_file_name = CurName }) -> Files @@ -246,7 +240,6 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL true = ets:delete(MsgLocation, MsgId), [{File, ValidTotalSize, ContiguousTop, Left, Right}] = ets:lookup(FileSummary, File), - true = ets:delete(FileDetail, {File, Offset}), ContiguousTop1 = lists:min([ContiguousTop, Offset]), true = ets:insert(FileSummary, {File, (ValidTotalSize - TotalSize - ?FILE_PACKING_ADJUSTMENT), @@ -272,8 +265,7 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio current_file_handle = CurHdl, current_file_name = CurName, current_offset = CurOffset, - file_summary = FileSummary, - file_detail = FileDetail + file_summary = FileSummary }) -> case ets:lookup(MsgLocation, MsgId) of [] -> @@ -282,7 +274,6 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, CurOffset, TotalSize}), [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] = ets:lookup(FileSummary, CurName), - true = ets:insert_new(FileDetail, {{CurName, CurOffset}, TotalSize, MsgId}), ValidTotalSize1 = ValidTotalSize + TotalSize + ?FILE_PACKING_ADJUSTMENT, ContiguousTop1 = if CurOffset =:= ContiguousTop -> ValidTotalSize1; % can't be any holes in this file @@ -414,7 +405,7 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit, combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRight}, {Destination, DestinationValid, DestinationContiguousTop, _DestinationLeft, _DestinationRight}, State1) -> - (State = #dqstate { file_detail = FileDetail, msg_location = MsgLocation }) + (State = #dqstate { msg_location = MsgLocation }) = closeFile(Source, closeFile(Destination, State1)), {ok, SourceHdl} = file:open(form_filename(Source), [read, write, raw, binary, delayed_write, read_ahead]), {ok, DestinationHdl} = file:open(form_filename(Destination), [read, write, raw, binary, delayed_write, read_ahead]), @@ -432,26 +423,27 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRig true -> Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP, {ok, TmpHdl} = file:open(form_filename(Tmp), [read, write, raw, binary, delayed_write, read_ahead]), - % as FileDetail is an ordered_set, we should have the lowest offsets first - Worklist = lists:filter(fun ({{Destination2, Offset}, _TotalSize, _MsgId}) - when Destination2 =:= Destination, Offset /= DestinationContiguousTop -> - % it cannot be that Offset == DestinationContiguousTop - % because if it was then DestinationContiguousTop would have been - % extended by TotalSize - Offset > DestinationContiguousTop - end, ets:match_object(FileDetail, {{Destination, '_'}, '_', '_'})), + Worklist + = lists:dropwhile(fun ({_, _, _, Offset, _}) when Offset /= DestinationContiguousTop -> + % it cannot be that Offset == DestinationContiguousTop + % because if it was then DestinationContiguousTop would have been + % extended by TotalSize + Offset < DestinationContiguousTop + % Given expected access patterns, I suspect that the list should be + % naturally sorted as we require, however, we need to enforce it anyway + end, lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) -> + OffA < OffB + end, + ets:match_object(MsgLocation, {'_', '_', Destination, '_', '_'}))), TmpSize = DestinationValid - DestinationContiguousTop, {TmpSize, BlockStart1, BlockEnd1} = - lists:foldl(fun ({{Destination2, Offset}, TotalSize, MsgId}, {CurOffset, BlockStart, BlockEnd}) when Destination2 =:= Destination -> + lists:foldl(fun ({MsgId, _RefCount, _Destination, Offset, TotalSize}, {CurOffset, BlockStart, BlockEnd}) -> % CurOffset is in the TmpFile. % Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!) Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, % this message is going to end up back in Destination, at DestinationContiguousTop + CurOffset FinalOffset = DestinationContiguousTop + CurOffset, true = ets:update_element(MsgLocation, MsgId, {4, FinalOffset}), - % sadly you can't use update_element to change the key: - true = ets:delete(FileDetail, {Destination, Offset}), - true = ets:insert_new(FileDetail, {{Destination, FinalOffset}, TotalSize, MsgId}), NextOffset = CurOffset + Size, if BlockStart =:= undefined -> % base case, called only for the first list elem @@ -472,7 +464,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRig {ok, BlockStart1} = file:position(DestinationHdl, {bof, BlockStart1}), {ok, BSize1} = file:copy(DestinationHdl, TmpHdl, BSize1), % so now Tmp contains everything we need to salvage from Destination, - % and both FileDetail and MsgLocation have been updated to reflect compaction of Destination + % and MsgLocation has been updated to reflect compaction of Destination % so truncate Destination and copy from Tmp back to the end {ok, 0} = file:position(TmpHdl, {bof, 0}), {ok, DestinationContiguousTop} = file:position(DestinationHdl, {bof, DestinationContiguousTop}), @@ -486,17 +478,16 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRig ok = file:close(TmpHdl), ok = file:delete(form_filename(Tmp)) end, - SourceWorkList = ets:match_object(FileDetail, {{Source, '_'}, '_', '_'}), + SourceWorkList = lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) -> + OffA < OffB + end, ets:match_object(MsgLocation, {'_', '_', Source, '_', '_'})), {ExpectedSize, BlockStart2, BlockEnd2} = - lists:foldl(fun ({{Source2, Offset}, TotalSize, MsgId}, {CurOffset, BlockStart, BlockEnd}) when Source2 =:= Source -> + lists:foldl(fun ({MsgId, _RefCount, _Source, Offset, TotalSize}, {CurOffset, BlockStart, BlockEnd}) -> % CurOffset is in the DestinationFile. % Offset, BlockStart and BlockEnd are in the SourceFile Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, % update MsgLocation to reflect change of file (3rd field) and offset (4th field) true = ets:update_element(MsgLocation, MsgId, [{3, Destination}, {4, CurOffset}]), - % can't use update_element to change key: - true = ets:delete(FileDetail, {Source, Offset}), - true = ets:insert_new(FileDetail, {{Destination, CurOffset}, TotalSize, MsgId}), NextOffset = CurOffset + Size, if BlockStart =:= undefined -> % base case, called only for the first list elem @@ -567,19 +558,21 @@ load_messages(undefined, [], State = #dqstate { file_summary = FileSummary, current_file_name = CurName }) -> true = ets:insert_new(FileSummary, {CurName, 0, 0, undefined, undefined}), State; -load_messages(Left, [], State = #dqstate { file_detail = FileDetail }) -> +load_messages(Left, [], State = #dqstate { msg_location = MsgLocation }) -> Num = list_to_integer(filename:rootname(Left)), - Offset = case ets:match_object(FileDetail, {{Left, '_'}, '_', '_'}) of + Offset = case ets:match_object(MsgLocation, {'_', '_', Left, '_', '_'}) of [] -> 0; - L -> {{Left, Offset1}, TotalSize, _MsgId} = lists:last(L), - Offset1 + TotalSize + ?FILE_PACKING_ADJUSTMENT + L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] + = lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) -> + OffB < OffA + end, L), + MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT end, State # dqstate { current_file_num = Num, current_file_name = Left, current_offset = Offset }; load_messages(Left, [File|Files], State = #dqstate { msg_location = MsgLocation, - file_summary = FileSummary, - file_detail = FileDetail + file_summary = FileSummary }) -> % [{MsgId, TotalSize, FileOffset}] {ok, Messages} = scan_file_for_valid_messages(form_filename(File)), @@ -591,7 +584,6 @@ load_messages(Left, [File|Files], 0 -> {VMAcc, VTSAcc}; RefCount -> true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}), - true = ets:insert_new(FileDetail, {{File, Offset}, TotalSize, MsgId}), {[{MsgId, TotalSize, Offset}|VMAcc], VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT } |
