diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-05-13 11:43:02 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-05-13 11:43:02 +0100 |
| commit | 25995387be653239513a9e00ae2abe885b167093 (patch) | |
| tree | 67e3dbbaaee94b7d1c35b39597b3498c03703127 | |
| parent | 5f03e402dff344565656a7fc8c0252ce51b5915d (diff) | |
| download | rabbitmq-server-git-25995387be653239513a9e00ae2abe885b167093.tar.gz | |
Startup requires a match in mnesia per message we encounter on disk. This lead to a 22minute startup time for 100000 messages. However, by dynamically adding an index during startup to mnesia, and then later removing it, this is reduced to 13.5 seconds. Note however, that to test this with rabbit_tests:rdq_time_insane_startup() requires the disk queue to be edited so that it starts up in ram_disk mode, not disk_only mode as is the code default.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 9 |
2 files changed, 28 insertions, 10 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 16208fd058..803f358b9c 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -228,6 +228,8 @@ -spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). +-spec(to_ram_disk_mode/0 :: () -> 'ok'). +-spec(to_disk_only_mode/0 :: () -> 'ok'). -endif. @@ -931,6 +933,11 @@ delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> load_from_disk(State) -> %% sorted so that smallest number is first. which also means %% eldest file (left-most) first + ok = case mnesia:add_table_index(rabbit_disk_queue, msg_id) of + {atomic, ok} -> ok; + {aborted,{already_exists,rabbit_disk_queue,_}} -> ok; + E -> E + end, {Files, TmpFiles} = get_disk_queue_files(), ok = recover_crashed_compactions(Files, TmpFiles), %% There should be no more tmp files now, so go ahead and load the @@ -948,6 +955,13 @@ load_from_disk(State) -> true, rabbit_disk_queue) end), State2 = extract_sequence_numbers(State1), + ok = case mnesia:del_table_index(rabbit_disk_queue, msg_id) of + {atomic, ok} -> ok; + %% hmm, something weird must be going on, but it's + %% probably not the end of the world + {aborted,{no_exists,rabbit_disk_queue,_}} -> ok; + E2 -> E2 + end, {ok, State2}. extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> @@ -995,11 +1009,12 @@ load_messages(Left, [File|Files], {ok, Messages} = scan_file_for_valid_messages(form_filename(File)), {ValidMessagesRev, ValidTotalSize} = lists:foldl( fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> - case length(mnesia:dirty_match_object + case length(mnesia:dirty_index_match_object (rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue_and_seq_id = '_', - is_delivered = '_'})) of + is_delivered = '_'}, + msg_id)) of 0 -> {VMAcc, VTSAcc}; RefCount -> true = dets_ets_insert_new(State, {MsgId, RefCount, File, @@ -1037,11 +1052,12 @@ recover_crashed_compactions1(Files, TmpFile) -> %% all of these messages should appear in the mnesia table, %% otherwise they wouldn't have been copied out lists:foreach(fun (MsgId) -> - true = 0 < length(mnesia:dirty_match_object + true = 0 < length(mnesia:dirty_index_match_object (rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue_and_seq_id = '_', - is_delivered = '_'})) + is_delivered = '_'}, + msg_id)) end, MsgIdsTmp), {ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), @@ -1074,11 +1090,12 @@ recover_crashed_compactions1(Files, TmpFile) -> %% we're in case 4 above. %% check that everything in the main file is a valid message in mnesia lists:foreach(fun (MsgId) -> - true = 0 < length(mnesia:dirty_match_object + true = 0 < length(mnesia:dirty_index_match_object (rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue_and_seq_id = '_', - is_delivered = '_'})) + is_delivered = '_'}, + msg_id)) end, MsgIds), %% The main file should be contiguous {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 9ce62f8636..fcd3d5f668 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -772,17 +772,17 @@ rdq_time_insane_startup() -> rabbit_disk_queue:start_link(OneGig), rabbit_disk_queue:to_ram_disk_mode(), Msg = <<>>, - List = lists:seq(1, 1024*1024), + Count = 100000, + List = lists:seq(1, Count), %% 1M empty messages, at say, 100B per message, should all fit %% within 1GB and thus in a single file - io:format("Publishing 1M empty messages...~n",[]), + io:format("Publishing ~p empty messages...~n",[Count]), [rabbit_disk_queue:tx_publish(N, Msg) || N <- List], rabbit_disk_queue:tx_commit(q, List), io:format("...done. Timing restart...~n", []), rdq_stop(), Micros = rdq_virgin(), - io:format("...startup took ~w microseconds.~n", [Micros]), - rdq_stop(). + io:format("...startup took ~w microseconds.~n", [Micros]). rdq_time_commands(Funcs) -> lists:foreach(fun (F) -> F() end, Funcs). @@ -791,6 +791,7 @@ rdq_virgin() -> {Micros, {ok, _}} = timer:tc(rabbit_disk_queue, start_link, [1024*1024]), ok = rabbit_disk_queue:stop_and_obliterate(), + timer:sleep(1000), Micros. rdq_start() -> |
