diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 9 |
2 files changed, 28 insertions, 10 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 16208fd058..803f358b9c 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -228,6 +228,8 @@ -spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). +-spec(to_ram_disk_mode/0 :: () -> 'ok'). +-spec(to_disk_only_mode/0 :: () -> 'ok'). -endif. @@ -931,6 +933,11 @@ delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> load_from_disk(State) -> %% sorted so that smallest number is first. which also means %% eldest file (left-most) first + ok = case mnesia:add_table_index(rabbit_disk_queue, msg_id) of + {atomic, ok} -> ok; + {aborted,{already_exists,rabbit_disk_queue,_}} -> ok; + E -> E + end, {Files, TmpFiles} = get_disk_queue_files(), ok = recover_crashed_compactions(Files, TmpFiles), %% There should be no more tmp files now, so go ahead and load the @@ -948,6 +955,13 @@ load_from_disk(State) -> true, rabbit_disk_queue) end), State2 = extract_sequence_numbers(State1), + ok = case mnesia:del_table_index(rabbit_disk_queue, msg_id) of + {atomic, ok} -> ok; + %% hmm, something weird must be going on, but it's + %% probably not the end of the world + {aborted,{no_exists,rabbit_disk_queue,_}} -> ok; + E2 -> E2 + end, {ok, State2}. extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> @@ -995,11 +1009,12 @@ load_messages(Left, [File|Files], {ok, Messages} = scan_file_for_valid_messages(form_filename(File)), {ValidMessagesRev, ValidTotalSize} = lists:foldl( fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> - case length(mnesia:dirty_match_object + case length(mnesia:dirty_index_match_object (rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue_and_seq_id = '_', - is_delivered = '_'})) of + is_delivered = '_'}, + msg_id)) of 0 -> {VMAcc, VTSAcc}; RefCount -> true = dets_ets_insert_new(State, {MsgId, RefCount, File, @@ -1037,11 +1052,12 @@ recover_crashed_compactions1(Files, TmpFile) -> %% all of these messages should appear in the mnesia table, %% otherwise they wouldn't have been copied out lists:foreach(fun (MsgId) -> - true = 0 < length(mnesia:dirty_match_object + true = 0 < length(mnesia:dirty_index_match_object (rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue_and_seq_id = '_', - is_delivered = '_'})) + is_delivered = '_'}, + msg_id)) end, MsgIdsTmp), {ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), @@ -1074,11 +1090,12 @@ recover_crashed_compactions1(Files, TmpFile) -> %% we're in case 4 above. %% check that everything in the main file is a valid message in mnesia lists:foreach(fun (MsgId) -> - true = 0 < length(mnesia:dirty_match_object + true = 0 < length(mnesia:dirty_index_match_object (rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue_and_seq_id = '_', - is_delivered = '_'})) + is_delivered = '_'}, + msg_id)) end, MsgIds), %% The main file should be contiguous {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 9ce62f8636..fcd3d5f668 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -772,17 +772,17 @@ rdq_time_insane_startup() -> rabbit_disk_queue:start_link(OneGig), rabbit_disk_queue:to_ram_disk_mode(), Msg = <<>>, - List = lists:seq(1, 1024*1024), + Count = 100000, + List = lists:seq(1, Count), %% 1M empty messages, at say, 100B per message, should all fit %% within 1GB and thus in a single file - io:format("Publishing 1M empty messages...~n",[]), + io:format("Publishing ~p empty messages...~n",[Count]), [rabbit_disk_queue:tx_publish(N, Msg) || N <- List], rabbit_disk_queue:tx_commit(q, List), io:format("...done. Timing restart...~n", []), rdq_stop(), Micros = rdq_virgin(), - io:format("...startup took ~w microseconds.~n", [Micros]), - rdq_stop(). + io:format("...startup took ~w microseconds.~n", [Micros]). rdq_time_commands(Funcs) -> lists:foreach(fun (F) -> F() end, Funcs). @@ -791,6 +791,7 @@ rdq_virgin() -> {Micros, {ok, _}} = timer:tc(rabbit_disk_queue, start_link, [1024*1024]), ok = rabbit_disk_queue:stop_and_obliterate(), + timer:sleep(1000), Micros. rdq_start() -> |
