summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-20 12:18:16 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-20 12:18:16 +0100
commitd1c8d63dc05b1f11cb99fe3c12bbb9cd70108ae2 (patch)
tree4d857fd0b233ed83c2a7e5028ca3900abf2435df /src
parent4fa8ba984fedb54df8f6760202540149a0a32488 (diff)
downloadrabbitmq-server-git-d1c8d63dc05b1f11cb99fe3c12bbb9cd70108ae2.tar.gz
removed file_detail ets table and converted all use into use of msg_location ets table.
Even though this is slightly less optimal because of the loss of doing index lookups in file_detail, this is actually slightly faster due to not having to maintain two tables. Performance: Msg Count | Msg Size | Queue Count | Startup mu s | Publish mu s | Pub mu s/msg | Pub mu s/byte | Deliver mu s | Del mu s/msg | Del mu s/byte 1024| 512| 1| 2644.0| 41061.0| 40.098633| 0.0783176422| 156031.0| 152.374023| 0.2976055145 4096| 512| 1| 74843.0| 328683.0| 80.244873| 0.1567282677| 629441.0| 153.672119| 0.3001408577 16384| 512| 1| 373729.0| 3614155.0| 220.590515| 0.4308408499| 2969499.0| 181.243835| 0.3539918661 1024| 512| 10| 1605989.0| 281004.0| 27.441797| 0.0535972595| 1936168.0| 189.078906| 0.3692947388 4096| 512| 10| 85912.0| 2940291.0| 71.784448| 0.1402040005| 7662259.0| 187.066870| 0.3653649807 16384| 512| 10| 418213.0| 37962842.0| 231.706799| 0.4525523424| 32293492.0| 197.103833| 0.3849684238 1024| 8192| 1| 1347269.0| 144988.0| 141.589844| 0.0172839165| 173906.0| 169.830078| 0.0207312107 4096| 8192| 1| 93070.0| 606369.0| 148.039307| 0.0180712044| 829812.0| 202.590820| 0.0247303247 16384| 8192| 1| 20014.0| 4976009.0| 303.711487| 0.0370741561| 3211632.0| 196.022461| 0.0239285231 1024| 8192| 10| 77291.0| 348677.0| 34.050488| 0.0041565537| 1877374.0| 183.337305| 0.0223800421 4096| 8192| 10| 104842.0| 2722730.0| 66.472900| 0.0081143677| 7787817.0| 190.132251| 0.0232095033 16384| 8192| 10| 21746.0| 44301448.0| 270.394580| 0.0330071509| 32018244.0| 195.423853| 0.0238554507 1024| 32768| 1| 120732.0| 426700.0| 416.699219| 0.0127166510| 210704.0| 205.765625| 0.0062794685 4096| 32768| 1| 9355.0| 1925633.0| 470.125244| 0.0143470839| 824304.0| 201.246094| 0.0061415434 16384| 32768| 1| 14734.0| 10371560.0| 633.029785| 0.0193185359| 3594753.0| 219.406311| 0.0066957492 1024| 32768| 10| 6052.0| 629362.0| 61.461133| 0.0018756449| 2100901.0| 205.166113| 0.0062611729 4096| 32768| 10| 5546.0| 4203683.0| 102.628979| 0.0031319879| 8899536.0| 217.273828| 0.0066306710 16384| 32768| 10| 22657.0| 50306069.0| 307.043878| 0.0093702355| 36433817.0| 222.374371| 0.0067863273 1024| 131072| 1| 7155.0| 1913696.0| 1868.843750| 0.0142581463| 444638.0| 434.216797| 0.0033128113 4096| 131072| 1| 6671.0| 8232640.0| 2009.921875| 0.0153344870| 1907439.0| 465.683350| 0.0035528820 16384| 131072| 1| 1699.0| 33886514.0| 2068.268677| 0.0157796377| 7291762.0| 445.053833| 0.0033954913 1024| 131072| 10| 7506.0| 1991032.0| 194.436719| 0.0014834344| 4564850.0| 445.786133| 0.0034010783 4096| 131072| 10| 7486.0| 9551800.0| 233.198242| 0.0017791614| 18048697.0| 440.642017| 0.0033618318 16384| 131072| 10| 2771.0| 71072559.0| 433.792474| 0.0033095739| 81144745.0| 495.268219| 0.0037785966
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl66
1 files changed, 29 insertions, 37 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 4cd146edd9..a704ff2171 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -50,7 +50,6 @@
-define(INTEGER_SIZE_BYTES, 8).
-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
-define(MSG_LOC_ETS_NAME, rabbit_disk_queue_msg_location).
--define(FILE_DETAIL_ETS_NAME, rabbit_disk_queue_file_detail).
-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
-define(FILE_EXTENSION, ".rdq").
-define(FILE_EXTENSION_TMP, ".rdt").
@@ -60,7 +59,6 @@
-record(dqstate, {msg_location,
file_summary,
- file_detail,
current_file_num,
current_file_name,
current_file_handle,
@@ -106,7 +104,6 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
InitName = "0" ++ ?FILE_EXTENSION,
State = #dqstate { msg_location = ets:new(?MSG_LOC_ETS_NAME, [set, private]),
file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, [set, private]),
- file_detail = ets:new(?FILE_DETAIL_ETS_NAME, [ordered_set, private]),
current_file_num = 0,
current_file_name = InitName,
current_file_handle = undefined,
@@ -133,13 +130,11 @@ handle_call(stop, _From, State) ->
{stop, normal, ok, State}; %% gen_server now calls terminate
handle_call(clean_stop, _From, State) ->
State1 = #dqstate { msg_location = MsgLocation,
- file_summary = FileSummary,
- file_detail = FileDetail }
+ file_summary = FileSummary }
= shutdown(State), %% tidy up file handles early
{atomic, ok} = mnesia:clear_table(rabbit_disk_queue),
true = ets:delete(MsgLocation),
true = ets:delete(FileSummary),
- true = ets:delete(FileDetail),
lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))),
{stop, normal, ok, State1 # dqstate { current_file_handle = undefined,
read_file_handles = {dict:new(), gb_trees:empty()}}}.
@@ -234,7 +229,6 @@ internal_ack(Q, MsgIds, State) ->
%% Q is only needed if MnesiaDelete = true
remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation,
file_summary = FileSummary,
- file_detail = FileDetail,
current_file_name = CurName
}) ->
Files
@@ -246,7 +240,6 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL
true = ets:delete(MsgLocation, MsgId),
[{File, ValidTotalSize, ContiguousTop, Left, Right}]
= ets:lookup(FileSummary, File),
- true = ets:delete(FileDetail, {File, Offset}),
ContiguousTop1 = lists:min([ContiguousTop, Offset]),
true = ets:insert(FileSummary,
{File, (ValidTotalSize - TotalSize - ?FILE_PACKING_ADJUSTMENT),
@@ -272,8 +265,7 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio
current_file_handle = CurHdl,
current_file_name = CurName,
current_offset = CurOffset,
- file_summary = FileSummary,
- file_detail = FileDetail
+ file_summary = FileSummary
}) ->
case ets:lookup(MsgLocation, MsgId) of
[] ->
@@ -282,7 +274,6 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio
true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, CurOffset, TotalSize}),
[{CurName, ValidTotalSize, ContiguousTop, Left, undefined}]
= ets:lookup(FileSummary, CurName),
- true = ets:insert_new(FileDetail, {{CurName, CurOffset}, TotalSize, MsgId}),
ValidTotalSize1 = ValidTotalSize + TotalSize + ?FILE_PACKING_ADJUSTMENT,
ContiguousTop1 = if CurOffset =:= ContiguousTop ->
ValidTotalSize1; % can't be any holes in this file
@@ -414,7 +405,7 @@ combineFile(File, State = #dqstate { file_size_limit = FileSizeLimit,
combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRight},
{Destination, DestinationValid, DestinationContiguousTop, _DestinationLeft, _DestinationRight},
State1) ->
- (State = #dqstate { file_detail = FileDetail, msg_location = MsgLocation })
+ (State = #dqstate { msg_location = MsgLocation })
= closeFile(Source, closeFile(Destination, State1)),
{ok, SourceHdl} = file:open(form_filename(Source), [read, write, raw, binary, delayed_write, read_ahead]),
{ok, DestinationHdl} = file:open(form_filename(Destination), [read, write, raw, binary, delayed_write, read_ahead]),
@@ -432,26 +423,27 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRig
true ->
Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP,
{ok, TmpHdl} = file:open(form_filename(Tmp), [read, write, raw, binary, delayed_write, read_ahead]),
- % as FileDetail is an ordered_set, we should have the lowest offsets first
- Worklist = lists:filter(fun ({{Destination2, Offset}, _TotalSize, _MsgId})
- when Destination2 =:= Destination, Offset /= DestinationContiguousTop ->
- % it cannot be that Offset == DestinationContiguousTop
- % because if it was then DestinationContiguousTop would have been
- % extended by TotalSize
- Offset > DestinationContiguousTop
- end, ets:match_object(FileDetail, {{Destination, '_'}, '_', '_'})),
+ Worklist
+ = lists:dropwhile(fun ({_, _, _, Offset, _}) when Offset /= DestinationContiguousTop ->
+ % it cannot be that Offset == DestinationContiguousTop
+ % because if it was then DestinationContiguousTop would have been
+ % extended by TotalSize
+ Offset < DestinationContiguousTop
+ % Given expected access patterns, I suspect that the list should be
+ % naturally sorted as we require, however, we need to enforce it anyway
+ end, lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) ->
+ OffA < OffB
+ end,
+ ets:match_object(MsgLocation, {'_', '_', Destination, '_', '_'}))),
TmpSize = DestinationValid - DestinationContiguousTop,
{TmpSize, BlockStart1, BlockEnd1} =
- lists:foldl(fun ({{Destination2, Offset}, TotalSize, MsgId}, {CurOffset, BlockStart, BlockEnd}) when Destination2 =:= Destination ->
+ lists:foldl(fun ({MsgId, _RefCount, _Destination, Offset, TotalSize}, {CurOffset, BlockStart, BlockEnd}) ->
% CurOffset is in the TmpFile.
% Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!)
Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
% this message is going to end up back in Destination, at DestinationContiguousTop + CurOffset
FinalOffset = DestinationContiguousTop + CurOffset,
true = ets:update_element(MsgLocation, MsgId, {4, FinalOffset}),
- % sadly you can't use update_element to change the key:
- true = ets:delete(FileDetail, {Destination, Offset}),
- true = ets:insert_new(FileDetail, {{Destination, FinalOffset}, TotalSize, MsgId}),
NextOffset = CurOffset + Size,
if BlockStart =:= undefined ->
% base case, called only for the first list elem
@@ -472,7 +464,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRig
{ok, BlockStart1} = file:position(DestinationHdl, {bof, BlockStart1}),
{ok, BSize1} = file:copy(DestinationHdl, TmpHdl, BSize1),
% so now Tmp contains everything we need to salvage from Destination,
- % and both FileDetail and MsgLocation have been updated to reflect compaction of Destination
+ % and MsgLocation has been updated to reflect compaction of Destination
% so truncate Destination and copy from Tmp back to the end
{ok, 0} = file:position(TmpHdl, {bof, 0}),
{ok, DestinationContiguousTop} = file:position(DestinationHdl, {bof, DestinationContiguousTop}),
@@ -486,17 +478,16 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRig
ok = file:close(TmpHdl),
ok = file:delete(form_filename(Tmp))
end,
- SourceWorkList = ets:match_object(FileDetail, {{Source, '_'}, '_', '_'}),
+ SourceWorkList = lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) ->
+ OffA < OffB
+ end, ets:match_object(MsgLocation, {'_', '_', Source, '_', '_'})),
{ExpectedSize, BlockStart2, BlockEnd2} =
- lists:foldl(fun ({{Source2, Offset}, TotalSize, MsgId}, {CurOffset, BlockStart, BlockEnd}) when Source2 =:= Source ->
+ lists:foldl(fun ({MsgId, _RefCount, _Source, Offset, TotalSize}, {CurOffset, BlockStart, BlockEnd}) ->
% CurOffset is in the DestinationFile.
% Offset, BlockStart and BlockEnd are in the SourceFile
Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
% update MsgLocation to reflect change of file (3rd field) and offset (4th field)
true = ets:update_element(MsgLocation, MsgId, [{3, Destination}, {4, CurOffset}]),
- % can't use update_element to change key:
- true = ets:delete(FileDetail, {Source, Offset}),
- true = ets:insert_new(FileDetail, {{Destination, CurOffset}, TotalSize, MsgId}),
NextOffset = CurOffset + Size,
if BlockStart =:= undefined ->
% base case, called only for the first list elem
@@ -567,19 +558,21 @@ load_messages(undefined, [], State = #dqstate { file_summary = FileSummary,
current_file_name = CurName }) ->
true = ets:insert_new(FileSummary, {CurName, 0, 0, undefined, undefined}),
State;
-load_messages(Left, [], State = #dqstate { file_detail = FileDetail }) ->
+load_messages(Left, [], State = #dqstate { msg_location = MsgLocation }) ->
Num = list_to_integer(filename:rootname(Left)),
- Offset = case ets:match_object(FileDetail, {{Left, '_'}, '_', '_'}) of
+ Offset = case ets:match_object(MsgLocation, {'_', '_', Left, '_', '_'}) of
[] -> 0;
- L -> {{Left, Offset1}, TotalSize, _MsgId} = lists:last(L),
- Offset1 + TotalSize + ?FILE_PACKING_ADJUSTMENT
+ L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_]
+ = lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) ->
+ OffB < OffA
+ end, L),
+ MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT
end,
State # dqstate { current_file_num = Num, current_file_name = Left,
current_offset = Offset };
load_messages(Left, [File|Files],
State = #dqstate { msg_location = MsgLocation,
- file_summary = FileSummary,
- file_detail = FileDetail
+ file_summary = FileSummary
}) ->
% [{MsgId, TotalSize, FileOffset}]
{ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
@@ -591,7 +584,6 @@ load_messages(Left, [File|Files],
0 -> {VMAcc, VTSAcc};
RefCount ->
true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}),
- true = ets:insert_new(FileDetail, {{File, Offset}, TotalSize, MsgId}),
{[{MsgId, TotalSize, Offset}|VMAcc],
VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT
}