diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-05-12 13:39:06 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-05-12 13:39:06 +0100 |
| commit | 5d6ba0740a11762bbd916a637df0f2f787c7afc4 (patch) | |
| tree | 9127048fe8bb1e96e6b6869f34dd08a02dfef0e0 /src | |
| parent | 09b7163518b2b1e1ad6114ac1badee17f0171bb6 (diff) | |
| download | rabbitmq-server-git-5d6ba0740a11762bbd916a637df0f2f787c7afc4.tar.gz | |
Added to the disk queue the ability to dynamically switch between disk-only and disk+ram modes. The disk+ram mode uses disk_copies for mnesia and ets for msg_location. This results in a substantial performance improvement (minimum 5 times faster), but is ram limited by number of messages. The disk-only mode uses dets and disk_only_copies for mnesia. This is much slower, but should not be limited.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 181 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 4 |
2 files changed, 127 insertions, 58 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 55840ce9d1..16208fd058 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -40,7 +40,7 @@ -export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/2, tx_cancel/1]). --export([stop/0, stop_and_obliterate/0]). +-export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]). -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). @@ -49,7 +49,7 @@ -define(WRITE_OK, 255). -define(INTEGER_SIZE_BYTES, 8). -define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). --define(MSG_LOC_DETS_NAME, rabbit_disk_queue_msg_location). +-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). -define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). -define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences). -define(FILE_EXTENSION, ".rdq"). @@ -61,7 +61,9 @@ -define(MAX_READ_FILE_HANDLES, 256). --record(dqstate, {msg_location, %% where are messages? +-record(dqstate, {msg_location_dets, %% where are messages? + msg_location_ets, %% as above, but for ets version + operation_mode, %% ram_disk | disk_only file_summary, %% what's in the files? sequences, %% next read and write for each q current_file_num, %% current file name as number @@ -259,6 +261,12 @@ stop() -> stop_and_obliterate() -> gen_server:call(?SERVER, stop_vaporise, infinity). +to_disk_only_mode() -> + gen_server:call(?SERVER, to_disk_only_mode, infinity). + +to_ram_disk_mode() -> + gen_server:call(?SERVER, to_ram_disk_mode, infinity). + %% ---- GEN-SERVER INTERNAL API ---- init([FileSizeLimit, ReadFileHandlesLimit]) -> @@ -271,19 +279,30 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> %% brutal_kill. %% Otherwise, the gen_server will be immediately terminated. process_flag(trap_exit, true), + Node = node(), + ok = + case mnesia:change_table_copy_type(rabbit_disk_queue, Node, disc_only_copies) of + {atomic, ok} -> ok; + {aborted, {already_exists, rabbit_disk_queue, Node, disc_only_copies}} -> ok; + E -> E + end, ok = filelib:ensure_dir(form_filename("nothing")), InitName = "0" ++ ?FILE_EXTENSION, - {ok, MsgLocation} = - dets:open_file(?MSG_LOC_DETS_NAME, - [{file, form_filename(atom_to_list(?MSG_LOC_DETS_NAME) ++ + {ok, MsgLocationDets} = + dets:open_file(?MSG_LOC_NAME, + [{file, form_filename(atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS)}, {min_no_slots, 1024*1024}, %% man says this should be <= 32M. But it works... {max_no_slots, 1024*1024*1024}, {type, set} ]), + MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]), + true = ets:safe_fixtable(MsgLocationEts, true), State = - #dqstate { msg_location = MsgLocation, + #dqstate { msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts, + operation_mode = disk_only, file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, [set, private]), sequences = ets:new(?SEQUENCE_ETS_NAME, @@ -323,8 +342,26 @@ handle_call(stop_vaporise, _From, State) -> lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))), {stop, normal, ok, State1 #dqstate { current_file_handle = undefined, - read_file_handles = {dict:new(), gb_trees:empty()}}}. + read_file_handles = {dict:new(), gb_trees:empty()}}}; %% gen_server now calls terminate, which then calls shutdown +handle_call(to_disk_only_mode, _From, State = #dqstate { operation_mode = disk_only }) -> + {reply, ok, State}; +handle_call(to_disk_only_mode, _From, State = #dqstate { operation_mode = ram_disk, + msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts }) -> + {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_only_copies), + ok = dets:from_ets(MsgLocationDets, MsgLocationEts), + true = ets:delete_all_objects(MsgLocationEts), + {reply, ok, State #dqstate { operation_mode = disk_only }}; +handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = ram_disk }) -> + {reply, ok, State}; +handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = disk_only, + msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts }) -> + {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_copies), + true = ets:from_dets(MsgLocationEts, MsgLocationDets), + ok = dets:delete_all_objects(MsgLocationDets), + {reply, ok, State #dqstate { operation_mode = ram_disk }}. handle_cast({publish, Q, MsgId, MsgBody}, State) -> {ok, State1} = internal_publish(Q, MsgId, MsgBody, State), @@ -345,14 +382,16 @@ handle_info(_Info, State) -> terminate(_Reason, State) -> shutdown(State). -shutdown(State = #dqstate { msg_location = MsgLocation, +shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts, current_file_handle = FileHdl, read_file_handles = {ReadHdls, _ReadHdlsAge} }) -> %% deliberately ignoring return codes here - dets:close(MsgLocation), - file:delete(form_filename(atom_to_list(?MSG_LOC_DETS_NAME) ++ + dets:close(MsgLocationDets), + file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS)), + true = ets:delete_all_objects(MsgLocationEts), if FileHdl =:= undefined -> ok; true -> file:sync(FileHdl), file:close(FileHdl) @@ -374,12 +413,46 @@ form_filename(Name) -> base_directory() -> filename:join(mnesia:system_info(directory), "rabbit_disk_queue/"). +dets_ets_lookup(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only }, + Key) -> + dets:lookup(MsgLocationDets, Key); +dets_ets_lookup(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, + Key) -> + ets:lookup(MsgLocationEts, Key). + +dets_ets_delete(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only }, + Key) -> + ok = dets:delete(MsgLocationDets, Key); +dets_ets_delete(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, + Key) -> + true = ets:delete(MsgLocationEts, Key), + ok. + +dets_ets_insert(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only }, + Obj) -> + ok = dets:insert(MsgLocationDets, Obj); +dets_ets_insert(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, + Obj) -> + true = ets:insert(MsgLocationEts, Obj), + ok. + +dets_ets_insert_new(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only }, + Obj) -> + true = dets:insert_new(MsgLocationDets, Obj); +dets_ets_insert_new(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, + Obj) -> + true = ets:insert_new(MsgLocationEts, Obj). + +dets_ets_match_object(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only }, + Obj) -> + dets:match_object(MsgLocationDets, Obj); +dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, + Obj) -> + ets:match_object(MsgLocationEts, Obj). + %% ---- INTERNAL RAW FUNCTIONS ---- -internal_deliver(Q, State = - #dqstate { msg_location = MsgLocation, - sequences = Sequences - }) -> +internal_deliver(Q, State = #dqstate { sequences = Sequences }) -> case ets:lookup(Sequences, Q) of [] -> {ok, empty, State}; [{Q, ReadSeqId, WriteSeqId}] -> @@ -388,7 +461,7 @@ internal_deliver(Q, State = [Obj = #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] -> [{MsgId, _RefCount, File, Offset, TotalSize}] = - dets:lookup(MsgLocation, MsgId), + dets_ets_lookup(State, MsgId), {FileHdl, State1} = getReadHandle(File, State), %% read the message {ok, {MsgBody, BodySize}} = @@ -438,18 +511,17 @@ internal_ack(Q, MsgIds, State) -> %% called from tx_cancel with MnesiaDelete = false %% called from ack with MnesiaDelete = true remove_messages(Q, MsgSeqIds, MnesiaDelete, - State = #dqstate { msg_location = MsgLocation, - file_summary = FileSummary, + State = #dqstate { file_summary = FileSummary, current_file_name = CurName }) -> Files = lists:foldl( fun ({MsgId, SeqId}, Files2) -> [{MsgId, RefCount, File, Offset, TotalSize}] = - dets:lookup(MsgLocation, MsgId), + dets_ets_lookup(State, MsgId), Files3 = if 1 =:= RefCount -> - ok = dets:delete(MsgLocation, MsgId), + ok = dets_ets_delete(State, MsgId), [{File, ValidTotalSize, ContiguousTop, Left, Right}] = ets:lookup(FileSummary, File), ContiguousTop1 = lists:min([ContiguousTop, Offset]), @@ -461,8 +533,8 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete, true -> sets:add_element(File, Files2) end; 1 < RefCount -> - ok = dets:insert(MsgLocation, {MsgId, RefCount - 1, - File, Offset, TotalSize}), + ok = dets_ets_insert(State, {MsgId, RefCount - 1, + File, Offset, TotalSize}), Files2 end, if MnesiaDelete -> @@ -475,18 +547,17 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete, {ok, State2}. internal_tx_publish(MsgId, MsgBody, - State = #dqstate { msg_location = MsgLocation, - current_file_handle = CurHdl, + State = #dqstate { current_file_handle = CurHdl, current_file_name = CurName, current_offset = CurOffset, file_summary = FileSummary }) -> - case dets:lookup(MsgLocation, MsgId) of + case dets_ets_lookup(State, MsgId) of [] -> %% New message, lots to do {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody), - true = dets:insert_new(MsgLocation, {MsgId, 1, CurName, - CurOffset, TotalSize}), + true = dets_ets_insert_new(State, {MsgId, 1, CurName, + CurOffset, TotalSize}), [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] = ets:lookup(FileSummary, CurName), ValidTotalSize1 = ValidTotalSize + TotalSize + @@ -503,14 +574,13 @@ internal_tx_publish(MsgId, MsgBody, State #dqstate {current_offset = NextOffset}); [{MsgId, RefCount, File, Offset, TotalSize}] -> %% We already know about it, just update counter - ok = dets:insert(MsgLocation, {MsgId, RefCount + 1, File, - Offset, TotalSize}), + ok = dets_ets_insert(State, {MsgId, RefCount + 1, File, + Offset, TotalSize}), {ok, State} end. internal_tx_commit(Q, MsgIds, - State = #dqstate { msg_location = MsgLocation, - current_file_handle = CurHdl, + State = #dqstate { current_file_handle = CurHdl, current_file_name = CurName, sequences = Sequences }) -> @@ -525,7 +595,7 @@ internal_tx_commit(Q, MsgIds, lists:foldl( fun (MsgId, {Acc, NextWriteSeqId}) -> [{MsgId, _RefCount, File, _Offset, _TotalSize}] = - dets:lookup(MsgLocation, MsgId), + dets_ets_lookup(State, MsgId), ok = mnesia:write(rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, NextWriteSeqId}, @@ -687,8 +757,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, {Destination, DestinationValid, DestinationContiguousTop, _DestinationLeft, _DestinationRight}, State1) -> - (State = #dqstate { msg_location = MsgLocation }) = - closeFile(Source, closeFile(Destination, State1)), + State = closeFile(Source, closeFile(Destination, State1)), {ok, SourceHdl} = file:open(form_filename(Source), [read, write, raw, binary, delayed_write, read_ahead]), @@ -719,10 +788,10 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, %% Given expected access patterns, I suspect that the list should be %% naturally sorted as we require, however, we need to enforce it anyway end, sortMsgLocationsByOffset(true, - dets:match_object(MsgLocation, - {'_', '_', - Destination, - '_', '_'}))), + dets_ets_match_object(State, + {'_', '_', + Destination, + '_', '_'}))), TmpSize = DestinationValid - DestinationContiguousTop, {TmpSize, BlockStart1, BlockEnd1} = lists:foldl( @@ -735,8 +804,8 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, %% Destination, at DestinationContiguousTop %% + CurOffset FinalOffset = DestinationContiguousTop + CurOffset, - ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination, - FinalOffset, TotalSize}), + ok = dets_ets_insert(State, {MsgId, RefCount, Destination, + FinalOffset, TotalSize}), NextOffset = CurOffset + Size, if BlockStart =:= undefined -> %% base case, called only for the @@ -763,7 +832,7 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, {ok, BlockStart1} = file:position(DestinationHdl, {bof, BlockStart1}), {ok, BSize1} = file:copy(DestinationHdl, TmpHdl, BSize1), %% so now Tmp contains everything we need to salvage from - %% Destination, and MsgLocation has been updated to + %% Destination, and MsgLocationDets has been updated to %% reflect compaction of Destination so truncate %% Destination and copy from Tmp back to the end {ok, 0} = file:position(TmpHdl, {bof, 0}), @@ -777,9 +846,10 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, ok = file:delete(form_filename(Tmp)) end, SourceWorkList = - sortMsgLocationsByOffset(true, dets:match_object(MsgLocation, - {'_', '_', Source, - '_', '_'})), + sortMsgLocationsByOffset(true, + dets_ets_match_object(State, + {'_', '_', Source, + '_', '_'})), {ExpectedSize, BlockStart2, BlockEnd2} = lists:foldl( fun ({MsgId, RefCount, _Source, Offset, TotalSize}, @@ -787,9 +857,9 @@ combineFiles({Source, SourceValid, _SourceContiguousTop, %% CurOffset is in the DestinationFile. %% Offset, BlockStart and BlockEnd are in the SourceFile Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, - %% update MsgLocation to reflect change of file and offset - ok = dets:insert(MsgLocation, {MsgId, RefCount, Destination, - CurOffset, TotalSize}), + %% update MsgLocationDets to reflect change of file and offset + ok = dets_ets_insert(State, {MsgId, RefCount, Destination, + CurOffset, TotalSize}), NextOffset = CurOffset + Size, if BlockStart =:= undefined -> %% base case, called only for the first list @@ -865,8 +935,7 @@ load_from_disk(State) -> ok = recover_crashed_compactions(Files, TmpFiles), %% There should be no more tmp files now, so go ahead and load the %% whole lot - (State1 = #dqstate{ msg_location = MsgLocation }) = - load_messages(undefined, Files, State), + State1 = load_messages(undefined, Files, State), %% Finally, check there is nothing in mnesia which we haven't %% loaded {atomic, true} = mnesia:transaction( @@ -874,7 +943,7 @@ load_from_disk(State) -> ok = mnesia:read_lock_table(rabbit_disk_queue), mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId }, true) -> true = 1 =:= - length(dets:lookup(MsgLocation, MsgId)) + length(dets_ets_lookup(State1, MsgId)) end, true, rabbit_disk_queue) end), @@ -910,9 +979,9 @@ load_messages(undefined, [], State = #dqstate { file_summary = FileSummary, current_file_name = CurName }) -> true = ets:insert_new(FileSummary, {CurName, 0, 0, undefined, undefined}), State; -load_messages(Left, [], State = #dqstate { msg_location = MsgLocation }) -> +load_messages(Left, [], State) -> Num = list_to_integer(filename:rootname(Left)), - Offset = case dets:match_object(MsgLocation, {'_', '_', Left, '_', '_'}) of + Offset = case dets_ets_match_object(State, {'_', '_', Left, '_', '_'}) of [] -> 0; L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] = sortMsgLocationsByOffset(false, L), @@ -921,9 +990,7 @@ load_messages(Left, [], State = #dqstate { msg_location = MsgLocation }) -> State #dqstate { current_file_num = Num, current_file_name = Left, current_offset = Offset }; load_messages(Left, [File|Files], - State = #dqstate { msg_location = MsgLocation, - file_summary = FileSummary - }) -> + State = #dqstate { file_summary = FileSummary }) -> %% [{MsgId, TotalSize, FileOffset}] {ok, Messages} = scan_file_for_valid_messages(form_filename(File)), {ValidMessagesRev, ValidTotalSize} = lists:foldl( @@ -935,8 +1002,8 @@ load_messages(Left, [File|Files], is_delivered = '_'})) of 0 -> {VMAcc, VTSAcc}; RefCount -> - true = dets:insert_new(MsgLocation, {MsgId, RefCount, File, - Offset, TotalSize}), + true = dets_ets_insert_new(State, {MsgId, RefCount, File, + Offset, TotalSize}), {[{MsgId, TotalSize, Offset}|VMAcc], VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT } diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 2f1d0c4339..9ce62f8636 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -770,6 +770,7 @@ rdq_time_insane_startup() -> rdq_virgin(), OneGig = 1024*1024*1024, rabbit_disk_queue:start_link(OneGig), + rabbit_disk_queue:to_ram_disk_mode(), Msg = <<>>, List = lists:seq(1, 1024*1024), %% 1M empty messages, at say, 100B per message, should all fit @@ -796,4 +797,5 @@ rdq_start() -> {ok, _} = rabbit_disk_queue:start_link(1024*1024). rdq_stop() -> - rabbit_disk_queue:stop(). + rabbit_disk_queue:stop(), + timer:sleep(1000). |
