diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-20 14:01:36 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-20 14:01:36 +0100 |
| commit | 0823f1d578a748c32b2495341650a17befbe3b28 (patch) | |
| tree | 94d00007d2ccccec1e1887aedbb8a0c751fd0edb /src | |
| parent | d1c8d63dc05b1f11cb99fe3c12bbb9cd70108ae2 (diff) | |
| download | rabbitmq-server-git-0823f1d578a748c32b2495341650a17befbe3b28.tar.gz | |
moved the message location table to dets. performance is slightly worse than 10 times slower.
This means that the test suite which used to take about 12 mins to run now takes about 2 hours.
Looks like we could now be talking up to 40ms to publish a message. Interestingly, delivery is only twice as slow as with ets, it's publish that's taken the 10+times hit.
Worryingly, the numbers show that performance per message is not constant, and wasn't in ets either. This must be the effect of buckets in both ets and dets filling up and chaining. The dets man page does say that it organises data as a linear hash list, which is a structure I know well, and I am surprised performance is dropping off in this way - maybe suggests poor distribution of their hashing algorithm or rebalancing.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 58 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 4 |
2 files changed, 36 insertions, 26 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index a704ff2171..26aa9d4b2c 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -49,10 +49,11 @@ -define(WRITE_OK, 255). -define(INTEGER_SIZE_BYTES, 8). -define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). --define(MSG_LOC_ETS_NAME, rabbit_disk_queue_msg_location). +-define(MSG_LOC_DETS_NAME, rabbit_disk_queue_msg_location). -define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). -define(FILE_EXTENSION, ".rdq"). -define(FILE_EXTENSION_TMP, ".rdt"). +-define(FILE_EXTENSION_DETS, ".dets"). -define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))). -define(SERVER, ?MODULE). @@ -102,7 +103,14 @@ clean_stop() -> init([FileSizeLimit, ReadFileHandlesLimit]) -> process_flag(trap_exit, true), InitName = "0" ++ ?FILE_EXTENSION, - State = #dqstate { msg_location = ets:new(?MSG_LOC_ETS_NAME, [set, private]), + {ok, MsgLocation} = dets:open_file(?MSG_LOC_DETS_NAME, + [{file, form_filename(atom_to_list(?MSG_LOC_DETS_NAME) ++ ?FILE_EXTENSION_DETS)}, + {min_no_slots, 1024*1024}, + % man says this should be <= 32M. But it works... + {max_no_slots, 1024*1024*1024}, + {type, set} + ]), + State = #dqstate { msg_location = MsgLocation, file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, [set, private]), current_file_num = 0, current_file_name = InitName, @@ -129,11 +137,9 @@ handle_call({tx_commit, Q, MsgIds}, _From, State) -> handle_call(stop, _From, State) -> {stop, normal, ok, State}; %% gen_server now calls terminate handle_call(clean_stop, _From, State) -> - State1 = #dqstate { msg_location = MsgLocation, - file_summary = FileSummary } + State1 = #dqstate { file_summary = FileSummary } = shutdown(State), %% tidy up file handles early {atomic, ok} = mnesia:clear_table(rabbit_disk_queue), - true = ets:delete(MsgLocation), true = ets:delete(FileSummary), lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))), {stop, normal, ok, State1 # dqstate { current_file_handle = undefined, @@ -159,10 +165,13 @@ handle_info(_Info, State) -> terminate(_Reason, State) -> shutdown(State). -shutdown(State = #dqstate { current_file_handle = FileHdl, +shutdown(State = #dqstate { msg_location = MsgLocation, + current_file_handle = FileHdl, read_file_handles = {ReadHdls, _ReadHdlsAge} }) -> % deliberately ignoring return codes here + dets:close(MsgLocation), + file:delete(form_filename(atom_to_list(?MSG_LOC_DETS_NAME) ++ ?FILE_EXTENSION_DETS)), if FileHdl =:= undefined -> ok; true -> file:sync(FileHdl), file:close(FileHdl) @@ -190,7 +199,7 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation, read_file_handles_limit = ReadFileHandlesLimit, read_file_handles = {ReadHdls, ReadHdlsAge} }) -> - [{MsgId, _RefCount, File, Offset, TotalSize}] = ets:lookup(MsgLocation, MsgId), + [{MsgId, _RefCount, File, Offset, TotalSize}] = dets:lookup(MsgLocation, MsgId), % so this next bit implements an LRU for file handles. But it's a bit insane, and smells % of premature optimisation. So I might remove it and dump it overboard {FileHdl, ReadHdls1, ReadHdlsAge1} @@ -234,10 +243,10 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL Files = lists:foldl(fun (MsgId, Files2) -> [{MsgId, RefCount, File, Offset, TotalSize}] - = ets:lookup(MsgLocation, MsgId), + = dets:lookup(MsgLocation, MsgId), Files3 = if 1 =:= RefCount -> - true = ets:delete(MsgLocation, MsgId), + ok = dets:delete(MsgLocation, MsgId), [{File, ValidTotalSize, ContiguousTop, Left, Right}] = ets:lookup(FileSummary, File), ContiguousTop1 = lists:min([ContiguousTop, Offset]), @@ -248,7 +257,7 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL true -> sets:add_element(File, Files2) end; 1 < RefCount -> - ets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), + ok = dets:insert(MsgLocation, {MsgId, RefCount - 1, File, Offset, TotalSize}), Files2 end, if MnesiaDelete -> @@ -267,11 +276,11 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio current_offset = CurOffset, file_summary = FileSummary }) -> - case ets:lookup(MsgLocation, MsgId) of + case dets:lookup(MsgLocation, MsgId) of [] -> % New message, lots to do {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody), - true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, CurOffset, TotalSize}), + true = dets:insert_new(MsgLocation, {MsgId, 1, CurName, CurOffset, TotalSize}), [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] = ets:lookup(FileSummary, CurName), ValidTotalSize1 = ValidTotalSize + TotalSize + ?FILE_PACKING_ADJUSTMENT, @@ -284,7 +293,7 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio State # dqstate {current_offset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT}); [{MsgId, RefCount, File, Offset, TotalSize}] -> % We already know about it, just update counter - true = ets:insert(MsgLocation, {MsgId, RefCount + 1, File, Offset, TotalSize}), + ok = dets:insert(MsgLocation, {MsgId, RefCount + 1, File, Offset, TotalSize}), {ok, State} end. @@ -297,7 +306,7 @@ internal_tx_commit(Q, MsgIds, State = #dqstate { msg_location = MsgLocation, fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), lists:foldl(fun (MsgId, Acc) -> [{MsgId, _RefCount, File, _Offset, _TotalSize}] = - ets:lookup(MsgLocation, MsgId), + dets:lookup(MsgLocation, MsgId), ok = mnesia:write(rabbit_disk_queue, #dq_msg_loc { msg_id_and_queue = {MsgId, Q}, is_delivered = false}, write), @@ -434,16 +443,17 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRig end, lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) -> OffA < OffB end, - ets:match_object(MsgLocation, {'_', '_', Destination, '_', '_'}))), + dets:match_object(MsgLocation, {'_', '_', Destination, '_', '_'}))), TmpSize = DestinationValid - DestinationContiguousTop, {TmpSize, BlockStart1, BlockEnd1} = - lists:foldl(fun ({MsgId, _RefCount, _Destination, Offset, TotalSize}, {CurOffset, BlockStart, BlockEnd}) -> + lists:foldl(fun ({MsgId, RefCount, _Destination, Offset, TotalSize}, {CurOffset, BlockStart, BlockEnd}) -> % CurOffset is in the TmpFile. % Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!) Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, % this message is going to end up back in Destination, at DestinationContiguousTop + CurOffset FinalOffset = DestinationContiguousTop + CurOffset, - true = ets:update_element(MsgLocation, MsgId, {4, FinalOffset}), + ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination, FinalOffset, TotalSize}), + NextOffset = CurOffset + Size, if BlockStart =:= undefined -> % base case, called only for the first list elem @@ -480,14 +490,14 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, _SourceLeft, _SourceRig end, SourceWorkList = lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) -> OffA < OffB - end, ets:match_object(MsgLocation, {'_', '_', Source, '_', '_'})), + end, dets:match_object(MsgLocation, {'_', '_', Source, '_', '_'})), {ExpectedSize, BlockStart2, BlockEnd2} = - lists:foldl(fun ({MsgId, _RefCount, _Source, Offset, TotalSize}, {CurOffset, BlockStart, BlockEnd}) -> + lists:foldl(fun ({MsgId, RefCount, _Source, Offset, TotalSize}, {CurOffset, BlockStart, BlockEnd}) -> % CurOffset is in the DestinationFile. % Offset, BlockStart and BlockEnd are in the SourceFile Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, - % update MsgLocation to reflect change of file (3rd field) and offset (4th field) - true = ets:update_element(MsgLocation, MsgId, [{3, Destination}, {4, CurOffset}]), + % update MsgLocation to reflect change of file and offset + ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination, CurOffset, TotalSize}), NextOffset = CurOffset + Size, if BlockStart =:= undefined -> % base case, called only for the first list elem @@ -550,7 +560,7 @@ load_from_disk(State) -> % There should be no more tmp files now, so go ahead and load the whole lot (State1 = #dqstate{ msg_location = MsgLocation }) = load_messages(undefined, Files, State), % Finally, check there is nothing in mnesia which we haven't loaded - true = lists:foldl(fun ({MsgId, _Q}, true) -> true = 1 =:= length(ets:lookup(MsgLocation, MsgId)) end, + true = lists:foldl(fun ({MsgId, _Q}, true) -> true = 1 =:= length(dets:lookup(MsgLocation, MsgId)) end, true, mnesia:dirty_all_keys(rabbit_disk_queue)), {ok, State1}. @@ -560,7 +570,7 @@ load_messages(undefined, [], State = #dqstate { file_summary = FileSummary, State; load_messages(Left, [], State = #dqstate { msg_location = MsgLocation }) -> Num = list_to_integer(filename:rootname(Left)), - Offset = case ets:match_object(MsgLocation, {'_', '_', Left, '_', '_'}) of + Offset = case dets:match_object(MsgLocation, {'_', '_', Left, '_', '_'}) of [] -> 0; L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] = lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) -> @@ -583,7 +593,7 @@ load_messages(Left, [File|Files], is_delivered = '_'})) of 0 -> {VMAcc, VTSAcc}; RefCount -> - true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}), + true = dets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}), {[{MsgId, TotalSize, Offset}|VMAcc], VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT } diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ba74a8774f..736ddfd473 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -703,12 +703,12 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> List = lists:seq(1, MsgCount), {Publish, ok} = timer:tc(?MODULE, rdq_time_commands, [[fun() -> [rabbit_disk_queue:tx_publish(N, Msg) || N <- List, _ <- Qs] end, - fun() -> [rabbit_disk_queue:tx_commit(Q, List) || Q <- Qs] end + fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List) || Q <- Qs] end ]]), {Deliver, ok} = timer:tc(?MODULE, rdq_time_commands, [[fun() -> [begin [begin {Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(Q, N), ok end || N <- List], rabbit_disk_queue:ack(Q, List), - rabbit_disk_queue:tx_commit(Q, []) + ok = rabbit_disk_queue:tx_commit(Q, []) end || Q <- Qs] end]]), io:format(" ~15.10B| ~14.10B| ~14.10B| ~14.1f| ~14.1f| ~14.6f| ~14.10f| ~14.1f| ~14.6f| ~14.10f~n", |
