summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-05-13 11:43:02 +0100
committerMatthew Sackman <matthew@lshift.net>2009-05-13 11:43:02 +0100
commit25995387be653239513a9e00ae2abe885b167093 (patch)
tree67e3dbbaaee94b7d1c35b39597b3498c03703127
parent5f03e402dff344565656a7fc8c0252ce51b5915d (diff)
downloadrabbitmq-server-git-25995387be653239513a9e00ae2abe885b167093.tar.gz
Startup requires a match in mnesia per message we encounter on disk. This lead to a 22minute startup time for 100000 messages. However, by dynamically adding an index during startup to mnesia, and then later removing it, this is reduced to 13.5 seconds. Note however, that to test this with rabbit_tests:rdq_time_insane_startup() requires the disk queue to be edited so that it starts up in ram_disk mode, not disk_only mode as is the code default.
-rw-r--r--src/rabbit_disk_queue.erl29
-rw-r--r--src/rabbit_tests.erl9
2 files changed, 28 insertions, 10 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 16208fd058..803f358b9c 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -228,6 +228,8 @@
-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
+-spec(to_ram_disk_mode/0 :: () -> 'ok').
+-spec(to_disk_only_mode/0 :: () -> 'ok').
-endif.
@@ -931,6 +933,11 @@ delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) ->
load_from_disk(State) ->
%% sorted so that smallest number is first. which also means
%% eldest file (left-most) first
+ ok = case mnesia:add_table_index(rabbit_disk_queue, msg_id) of
+ {atomic, ok} -> ok;
+ {aborted,{already_exists,rabbit_disk_queue,_}} -> ok;
+ E -> E
+ end,
{Files, TmpFiles} = get_disk_queue_files(),
ok = recover_crashed_compactions(Files, TmpFiles),
%% There should be no more tmp files now, so go ahead and load the
@@ -948,6 +955,13 @@ load_from_disk(State) ->
true, rabbit_disk_queue)
end),
State2 = extract_sequence_numbers(State1),
+ ok = case mnesia:del_table_index(rabbit_disk_queue, msg_id) of
+ {atomic, ok} -> ok;
+ %% hmm, something weird must be going on, but it's
+ %% probably not the end of the world
+ {aborted,{no_exists,rabbit_disk_queue,_}} -> ok;
+ E2 -> E2
+ end,
{ok, State2}.
extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
@@ -995,11 +1009,12 @@ load_messages(Left, [File|Files],
{ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
{ValidMessagesRev, ValidTotalSize} = lists:foldl(
fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
- case length(mnesia:dirty_match_object
+ case length(mnesia:dirty_index_match_object
(rabbit_disk_queue,
#dq_msg_loc { msg_id = MsgId,
queue_and_seq_id = '_',
- is_delivered = '_'})) of
+ is_delivered = '_'},
+ msg_id)) of
0 -> {VMAcc, VTSAcc};
RefCount ->
true = dets_ets_insert_new(State, {MsgId, RefCount, File,
@@ -1037,11 +1052,12 @@ recover_crashed_compactions1(Files, TmpFile) ->
%% all of these messages should appear in the mnesia table,
%% otherwise they wouldn't have been copied out
lists:foreach(fun (MsgId) ->
- true = 0 < length(mnesia:dirty_match_object
+ true = 0 < length(mnesia:dirty_index_match_object
(rabbit_disk_queue,
#dq_msg_loc { msg_id = MsgId,
queue_and_seq_id = '_',
- is_delivered = '_'}))
+ is_delivered = '_'},
+ msg_id))
end, MsgIdsTmp),
{ok, UncorruptedMessages} =
scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
@@ -1074,11 +1090,12 @@ recover_crashed_compactions1(Files, TmpFile) ->
%% we're in case 4 above.
%% check that everything in the main file is a valid message in mnesia
lists:foreach(fun (MsgId) ->
- true = 0 < length(mnesia:dirty_match_object
+ true = 0 < length(mnesia:dirty_index_match_object
(rabbit_disk_queue,
#dq_msg_loc { msg_id = MsgId,
queue_and_seq_id = '_',
- is_delivered = '_'}))
+ is_delivered = '_'},
+ msg_id))
end, MsgIds),
%% The main file should be contiguous
{Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 9ce62f8636..fcd3d5f668 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -772,17 +772,17 @@ rdq_time_insane_startup() ->
rabbit_disk_queue:start_link(OneGig),
rabbit_disk_queue:to_ram_disk_mode(),
Msg = <<>>,
- List = lists:seq(1, 1024*1024),
+ Count = 100000,
+ List = lists:seq(1, Count),
%% 1M empty messages, at say, 100B per message, should all fit
%% within 1GB and thus in a single file
- io:format("Publishing 1M empty messages...~n",[]),
+ io:format("Publishing ~p empty messages...~n",[Count]),
[rabbit_disk_queue:tx_publish(N, Msg) || N <- List],
rabbit_disk_queue:tx_commit(q, List),
io:format("...done. Timing restart...~n", []),
rdq_stop(),
Micros = rdq_virgin(),
- io:format("...startup took ~w microseconds.~n", [Micros]),
- rdq_stop().
+ io:format("...startup took ~w microseconds.~n", [Micros]).
rdq_time_commands(Funcs) ->
lists:foreach(fun (F) -> F() end, Funcs).
@@ -791,6 +791,7 @@ rdq_virgin() ->
{Micros, {ok, _}} =
timer:tc(rabbit_disk_queue, start_link, [1024*1024]),
ok = rabbit_disk_queue:stop_and_obliterate(),
+ timer:sleep(1000),
Micros.
rdq_start() ->