summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-20 14:01:36 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-20 14:01:36 +0100
commit0823f1d578a748c32b2495341650a17befbe3b28 (patch)
tree94d00007d2ccccec1e1887aedbb8a0c751fd0edb /src
parentd1c8d63dc05b1f11cb99fe3c12bbb9cd70108ae2 (diff)
downloadrabbitmq-server-git-0823f1d578a748c32b2495341650a17befbe3b28.tar.gz
moved the message location table to dets. performance is slightly worse than 10 times slower.
This means that the test suite which used to take about 12 mins to run now takes about 2 hours. Looks like we could now be talking up to 40ms to publish a message. Interestingly, delivery is only twice as slow as with ets, it's publish that's taken the 10+times hit. Worryingly, the numbers show that performance per message is not constant, and wasn't in ets either. This must be the effect of buckets in both ets and dets filling up and chaining. The dets man page does say that it organises data as a linear hash list, which is a structure I know well, and I am surprised performance is dropping off in this way - maybe suggests poor distribution of their hashing algorithm or rebalancing.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl58
-rw-r--r--src/rabbit_tests.erl4
2 files changed, 36 insertions, 26 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index a704ff2171..26aa9d4b2c 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -49,10 +49,11 @@
-define(WRITE_OK, 255).
-define(INTEGER_SIZE_BYTES, 8).
-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
--define(MSG_LOC_ETS_NAME, rabbit_disk_queue_msg_location).
+-define(MSG_LOC_DETS_NAME, rabbit_disk_queue_msg_location).
-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
-define(FILE_EXTENSION, ".rdq").
-define(FILE_EXTENSION_TMP, ".rdt").
+-define(FILE_EXTENSION_DETS, ".dets").
-define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))).
-define(SERVER, ?MODULE).
@@ -102,7 +103,14 @@ clean_stop() ->
init([FileSizeLimit, ReadFileHandlesLimit]) ->
process_flag(trap_exit, true),
InitName = "0" ++ ?FILE_EXTENSION,
- State = #dqstate { msg_location = ets:new(?MSG_LOC_ETS_NAME, [set, private]),
+ {ok, MsgLocation} = dets:open_file(?MSG_LOC_DETS_NAME,
+ [{file, form_filename(atom_to_list(?MSG_LOC_DETS_NAME) ++ ?FILE_EXTENSION_DETS)},
+ {min_no_slots, 1024*1024},
+ % man says this should be <= 32M. But it works...
+ {max_no_slots, 1024*1024*1024},
+ {type, set}
+ ]),
+ State = #dqstate { msg_location = MsgLocation,
file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, [set, private]),
current_file_num = 0,
current_file_name = InitName,
@@ -129,11 +137,9 @@ handle_call({tx_commit, Q, MsgIds}, _From, State) ->
handle_call(stop, _From, State) ->
{stop, normal, ok, State}; %% gen_server now calls terminate
handle_call(clean_stop, _From, State) ->
- State1 = #dqstate { msg_location = MsgLocation,
- file_summary = FileSummary }
+ State1 = #dqstate { file_summary = FileSummary }
= shutdown(State), %% tidy up file handles early
{atomic, ok} = mnesia:clear_table(rabbit_disk_queue),
- true = ets:delete(MsgLocation),
true = ets:delete(FileSummary),
lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))),
{stop, normal, ok, State1 # dqstate { current_file_handle = undefined,
@@ -159,10 +165,13 @@ handle_info(_Info, State) ->
terminate(_Reason, State) ->
shutdown(State).
-shutdown(State = #dqstate { current_file_handle = FileHdl,
+shutdown(State = #dqstate { msg_location = MsgLocation,
+ current_file_handle = FileHdl,
read_file_handles = {ReadHdls, _ReadHdlsAge}
}) ->
% deliberately ignoring return codes here
+ dets:close(MsgLocation),
+ file:delete(form_filename(atom_to_list(?MSG_LOC_DETS_NAME) ++ ?FILE_EXTENSION_DETS)),
if FileHdl =:= undefined -> ok;
true -> file:sync(FileHdl),
file:close(FileHdl)
@@ -190,7 +199,7 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
read_file_handles_limit = ReadFileHandlesLimit,
read_file_handles = {ReadHdls, ReadHdlsAge}
}) ->
- [{MsgId, _RefCount, File, Offset, TotalSize}] = ets:lookup(MsgLocation, MsgId),
+ [{MsgId, _RefCount, File, Offset, TotalSize}] = dets:lookup(MsgLocation, MsgId),
% so this next bit implements an LRU for file handles. But it's a bit insane, and smells
% of premature optimisation. So I might remove it and dump it overboard
{FileHdl, ReadHdls1, ReadHdlsAge1}
@@ -234,10 +243,10 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL
Files
= lists:foldl(fun (MsgId, Files2) ->
[{MsgId, RefCount, File, Offset, TotalSize}]
- = ets:lookup(MsgLocation, MsgId),
+ = dets:lookup(MsgLocation, MsgId),
Files3 =
if 1 =:= RefCount ->
- true = ets:delete(MsgLocation, MsgId),
+ ok = dets:delete(MsgLocation, MsgId),
[{File, ValidTotalSize, ContiguousTop, Left, Right}]
= ets:lookup(FileSummary, File),
ContiguousTop1 = lists:min([ContiguousTop, Offset]),
@@ -248,7 +257,7 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL
true -> sets:add_element(File, Files2)
end;
1 < RefCount ->
- ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
+ ok = dets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}),
Files2
end,
if MnesiaDelete ->
@@ -267,11 +276,11 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio
current_offset = CurOffset,
file_summary = FileSummary
}) ->
- case ets:lookup(MsgLocation, MsgId) of
+ case dets:lookup(MsgLocation, MsgId) of
[] ->
% New message, lots to do
{ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody),
- true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, CurOffset, TotalSize}),
+ true = dets:insert_new(MsgLocation, {MsgId, 1, CurName, CurOffset, TotalSize}),
[{CurName, ValidTotalSize, ContiguousTop, Left, undefined}]
= ets:lookup(FileSummary, CurName),
ValidTotalSize1 = ValidTotalSize + TotalSize + ?FILE_PACKING_ADJUSTMENT,
@@ -284,7 +293,7 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio
State # dqstate {current_offset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT});
[{MsgId, RefCount, File, Offset, TotalSize}] ->
% We already know about it, just update counter
- true = ets:insert(MsgLocation, {MsgId, RefCount + 1, File, Offset, TotalSize}),
+ ok = dets:insert(MsgLocation, {MsgId, RefCount + 1, File, Offset, TotalSize}),
{ok, State}
end.
@@ -297,7 +306,7 @@ internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation,
fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue),
lists:foldl(fun (MsgId, Acc) ->
[{MsgId, _RefCount, File, _Offset, _TotalSize}] =
- ets:lookup(MsgLocation, MsgId),
+ dets:lookup(MsgLocation, MsgId),
ok = mnesia:write(rabbit_disk_queue,
#dq_msg_loc { msg_id_and_queue = {MsgId, Q},
is_delivered = false}, write),
@@ -434,16 +443,17 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRig
end, lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) ->
OffA < OffB
end,
- ets:match_object(MsgLocation, {'_', '_', Destination, '_', '_'}))),
+ dets:match_object(MsgLocation, {'_', '_', Destination, '_', '_'}))),
TmpSize = DestinationValid - DestinationContiguousTop,
{TmpSize, BlockStart1, BlockEnd1} =
- lists:foldl(fun ({MsgId, _RefCount, _Destination, Offset, TotalSize}, {CurOffset, BlockStart, BlockEnd}) ->
+ lists:foldl(fun ({MsgId, RefCount, _Destination, Offset, TotalSize}, {CurOffset, BlockStart, BlockEnd}) ->
% CurOffset is in the TmpFile.
% Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!)
Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
% this message is going to end up back in Destination, at DestinationContiguousTop + CurOffset
FinalOffset = DestinationContiguousTop + CurOffset,
- true = ets:update_element(MsgLocation, MsgId, {4, FinalOffset}),
+ ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination, FinalOffset, TotalSize}),
+
NextOffset = CurOffset + Size,
if BlockStart =:= undefined ->
% base case, called only for the first list elem
@@ -480,14 +490,14 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRig
end,
SourceWorkList = lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) ->
OffA < OffB
- end, ets:match_object(MsgLocation, {'_', '_', Source, '_', '_'})),
+ end, dets:match_object(MsgLocation, {'_', '_', Source, '_', '_'})),
{ExpectedSize, BlockStart2, BlockEnd2} =
- lists:foldl(fun ({MsgId, _RefCount, _Source, Offset, TotalSize}, {CurOffset, BlockStart, BlockEnd}) ->
+ lists:foldl(fun ({MsgId, RefCount, _Source, Offset, TotalSize}, {CurOffset, BlockStart, BlockEnd}) ->
% CurOffset is in the DestinationFile.
% Offset, BlockStart and BlockEnd are in the SourceFile
Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
- % update MsgLocation to reflect change of file (3rd field) and offset (4th field)
- true = ets:update_element(MsgLocation, MsgId, [{3, Destination}, {4, CurOffset}]),
+ % update MsgLocation to reflect change of file and offset
+ ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination, CurOffset, TotalSize}),
NextOffset = CurOffset + Size,
if BlockStart =:= undefined ->
% base case, called only for the first list elem
@@ -550,7 +560,7 @@ load_from_disk(State) ->
% There should be no more tmp files now, so go ahead and load the whole lot
(State1 = #dqstate{ msg_location = MsgLocation }) = load_messages(undefined, Files, State),
% Finally, check there is nothing in mnesia which we haven't loaded
- true = lists:foldl(fun ({MsgId, _Q}, true) -> true = 1 =:= length(ets:lookup(MsgLocation, MsgId)) end,
+ true = lists:foldl(fun ({MsgId, _Q}, true) -> true = 1 =:= length(dets:lookup(MsgLocation, MsgId)) end,
true, mnesia:dirty_all_keys(rabbit_disk_queue)),
{ok, State1}.
@@ -560,7 +570,7 @@ load_messages(undefined, [], State = #dqstate { file_summary = FileSummary,
State;
load_messages(Left, [], State = #dqstate { msg_location = MsgLocation }) ->
Num = list_to_integer(filename:rootname(Left)),
- Offset = case ets:match_object(MsgLocation, {'_', '_', Left, '_', '_'}) of
+ Offset = case dets:match_object(MsgLocation, {'_', '_', Left, '_', '_'}) of
[] -> 0;
L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_]
= lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) ->
@@ -583,7 +593,7 @@ load_messages(Left, [File|Files],
is_delivered = '_'})) of
0 -> {VMAcc, VTSAcc};
RefCount ->
- true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}),
+ true = dets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}),
{[{MsgId, TotalSize, Offset}|VMAcc],
VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT
}
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ba74a8774f..736ddfd473 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -703,12 +703,12 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
List = lists:seq(1, MsgCount),
{Publish, ok} = timer:tc(?MODULE, rdq_time_commands,
[[fun() -> [rabbit_disk_queue:tx_publish(N, Msg) || N <- List, _ <- Qs] end,
- fun() -> [rabbit_disk_queue:tx_commit(Q, List) || Q <- Qs] end
+ fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List) || Q <- Qs] end
]]),
{Deliver, ok} = timer:tc(?MODULE, rdq_time_commands,
[[fun() -> [begin [begin {Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(Q, N), ok end || N <- List],
rabbit_disk_queue:ack(Q, List),
- rabbit_disk_queue:tx_commit(Q, [])
+ ok = rabbit_disk_queue:tx_commit(Q, [])
end || Q <- Qs]
end]]),
io:format(" ~15.10B| ~14.10B| ~14.10B| ~14.1f| ~14.1f| ~14.6f| ~14.10f| ~14.1f| ~14.6f| ~14.10f~n",