summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl29
-rw-r--r--src/rabbit_tests.erl9
2 files changed, 28 insertions, 10 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 16208fd058..803f358b9c 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -228,6 +228,8 @@
-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
+-spec(to_ram_disk_mode/0 :: () -> 'ok').
+-spec(to_disk_only_mode/0 :: () -> 'ok').
-endif.
@@ -931,6 +933,11 @@ delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) ->
load_from_disk(State) ->
%% sorted so that smallest number is first. which also means
%% eldest file (left-most) first
+ ok = case mnesia:add_table_index(rabbit_disk_queue, msg_id) of
+ {atomic, ok} -> ok;
+ {aborted,{already_exists,rabbit_disk_queue,_}} -> ok;
+ E -> E
+ end,
{Files, TmpFiles} = get_disk_queue_files(),
ok = recover_crashed_compactions(Files, TmpFiles),
%% There should be no more tmp files now, so go ahead and load the
@@ -948,6 +955,13 @@ load_from_disk(State) ->
true, rabbit_disk_queue)
end),
State2 = extract_sequence_numbers(State1),
+ ok = case mnesia:del_table_index(rabbit_disk_queue, msg_id) of
+ {atomic, ok} -> ok;
+ %% hmm, something weird must be going on, but it's
+ %% probably not the end of the world
+ {aborted,{no_exists,rabbit_disk_queue,_}} -> ok;
+ E2 -> E2
+ end,
{ok, State2}.
extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
@@ -995,11 +1009,12 @@ load_messages(Left, [File|Files],
{ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
{ValidMessagesRev, ValidTotalSize} = lists:foldl(
fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
- case length(mnesia:dirty_match_object
+ case length(mnesia:dirty_index_match_object
(rabbit_disk_queue,
#dq_msg_loc { msg_id = MsgId,
queue_and_seq_id = '_',
- is_delivered = '_'})) of
+ is_delivered = '_'},
+ msg_id)) of
0 -> {VMAcc, VTSAcc};
RefCount ->
true = dets_ets_insert_new(State, {MsgId, RefCount, File,
@@ -1037,11 +1052,12 @@ recover_crashed_compactions1(Files, TmpFile) ->
%% all of these messages should appear in the mnesia table,
%% otherwise they wouldn't have been copied out
lists:foreach(fun (MsgId) ->
- true = 0 < length(mnesia:dirty_match_object
+ true = 0 < length(mnesia:dirty_index_match_object
(rabbit_disk_queue,
#dq_msg_loc { msg_id = MsgId,
queue_and_seq_id = '_',
- is_delivered = '_'}))
+ is_delivered = '_'},
+ msg_id))
end, MsgIdsTmp),
{ok, UncorruptedMessages} =
scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
@@ -1074,11 +1090,12 @@ recover_crashed_compactions1(Files, TmpFile) ->
%% we're in case 4 above.
%% check that everything in the main file is a valid message in mnesia
lists:foreach(fun (MsgId) ->
- true = 0 < length(mnesia:dirty_match_object
+ true = 0 < length(mnesia:dirty_index_match_object
(rabbit_disk_queue,
#dq_msg_loc { msg_id = MsgId,
queue_and_seq_id = '_',
- is_delivered = '_'}))
+ is_delivered = '_'},
+ msg_id))
end, MsgIds),
%% The main file should be contiguous
{Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 9ce62f8636..fcd3d5f668 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -772,17 +772,17 @@ rdq_time_insane_startup() ->
rabbit_disk_queue:start_link(OneGig),
rabbit_disk_queue:to_ram_disk_mode(),
Msg = <<>>,
- List = lists:seq(1, 1024*1024),
+ Count = 100000,
+ List = lists:seq(1, Count),
%% 1M empty messages, at say, 100B per message, should all fit
%% within 1GB and thus in a single file
- io:format("Publishing 1M empty messages...~n",[]),
+ io:format("Publishing ~p empty messages...~n",[Count]),
[rabbit_disk_queue:tx_publish(N, Msg) || N <- List],
rabbit_disk_queue:tx_commit(q, List),
io:format("...done. Timing restart...~n", []),
rdq_stop(),
Micros = rdq_virgin(),
- io:format("...startup took ~w microseconds.~n", [Micros]),
- rdq_stop().
+ io:format("...startup took ~w microseconds.~n", [Micros]).
rdq_time_commands(Funcs) ->
lists:foreach(fun (F) -> F() end, Funcs).
@@ -791,6 +791,7 @@ rdq_virgin() ->
{Micros, {ok, _}} =
timer:tc(rabbit_disk_queue, start_link, [1024*1024]),
ok = rabbit_disk_queue:stop_and_obliterate(),
+ timer:sleep(1000),
Micros.
rdq_start() ->