summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-05-27 18:02:45 +0100
committerMatthew Sackman <matthew@lshift.net>2009-05-27 18:02:45 +0100
commite5f82c4168712f69adf394d218df158cd13e8fea (patch)
treeb9ef6b6e5c333fe4a4134465861d6083e0d0066f /src
parent6246085d1566ef701379006b2ed5ce1b45b4f434 (diff)
downloadrabbitmq-server-git-e5f82c4168712f69adf394d218df158cd13e8fea.tar.gz
gen_server -> gen_server2, delete_queue calls purge_queue first in order to try and reduce horrible inefficient mnesia_match_object call. Also some refactoring and tidying.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl81
1 files changed, 44 insertions, 37 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index d13b6eb620..a0bc1bfdd6 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -31,7 +31,7 @@
-module(rabbit_disk_queue).
--behaviour(gen_server).
+-behaviour(gen_server2).
-export([start_link/0]).
@@ -257,63 +257,63 @@
%% ---- PUBLIC API ----
start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE,
- [?FILE_SIZE_LIMIT, ?MAX_READ_FILE_HANDLES], []).
+ gen_server2:start_link({local, ?SERVER}, ?MODULE,
+ [?FILE_SIZE_LIMIT, ?MAX_READ_FILE_HANDLES], []).
publish(Q, MsgId, Msg) when is_binary(Msg) ->
- gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}).
+ gen_server2:cast(?SERVER, {publish, Q, MsgId, Msg}).
publish_with_seq(Q, MsgId, SeqId, Msg) when is_binary(Msg) ->
- gen_server:cast(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg}).
+ gen_server2:cast(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg}).
deliver(Q) ->
- gen_server:call(?SERVER, {deliver, Q}, infinity).
+ gen_server2:call(?SERVER, {deliver, Q}, infinity).
phantom_deliver(Q) ->
- gen_server:call(?SERVER, {phantom_deliver, Q}).
+ gen_server2:call(?SERVER, {phantom_deliver, Q}, infinity).
ack(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
- gen_server:cast(?SERVER, {ack, Q, MsgSeqIds}).
+ gen_server2:cast(?SERVER, {ack, Q, MsgSeqIds}).
tx_publish(MsgId, Msg) when is_binary(Msg) ->
- gen_server:cast(?SERVER, {tx_publish, MsgId, Msg}).
+ gen_server2:cast(?SERVER, {tx_publish, MsgId, Msg}).
tx_commit(Q, PubMsgIds, AckSeqIds) when is_list(PubMsgIds) andalso is_list(AckSeqIds) ->
- gen_server:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity).
+ gen_server2:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity).
tx_commit_with_seqs(Q, PubMsgSeqIds, AckSeqIds)
when is_list(PubMsgSeqIds) andalso is_list(AckSeqIds) ->
- gen_server:call(?SERVER, {tx_commit_with_seqs, Q, PubMsgSeqIds, AckSeqIds}, infinity).
+ gen_server2:call(?SERVER, {tx_commit_with_seqs, Q, PubMsgSeqIds, AckSeqIds}, infinity).
tx_cancel(MsgIds) when is_list(MsgIds) ->
- gen_server:cast(?SERVER, {tx_cancel, MsgIds}).
+ gen_server2:cast(?SERVER, {tx_cancel, MsgIds}).
requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
- gen_server:cast(?SERVER, {requeue, Q, MsgSeqIds}).
+ gen_server2:cast(?SERVER, {requeue, Q, MsgSeqIds}).
requeue_with_seqs(Q, MsgSeqSeqIds) when is_list(MsgSeqSeqIds) ->
- gen_server:cast(?SERVER, {requeue_with_seqs, Q, MsgSeqSeqIds}).
+ gen_server2:cast(?SERVER, {requeue_with_seqs, Q, MsgSeqSeqIds}).
purge(Q) ->
- gen_server:call(?SERVER, {purge, Q}).
+ gen_server2:call(?SERVER, {purge, Q}, infinity).
delete_queue(Q) ->
- gen_server:cast(?SERVER, {delete_queue, Q}).
+ gen_server2:cast(?SERVER, {delete_queue, Q}).
stop() ->
- gen_server:call(?SERVER, stop, infinity).
+ gen_server2:call(?SERVER, stop, infinity).
stop_and_obliterate() ->
- gen_server:call(?SERVER, stop_vaporise, infinity).
+ gen_server2:call(?SERVER, stop_vaporise, infinity).
to_disk_only_mode() ->
- gen_server:call(?SERVER, to_disk_only_mode, infinity).
+ gen_server2:call(?SERVER, to_disk_only_mode, infinity).
to_ram_disk_mode() ->
- gen_server:call(?SERVER, to_ram_disk_mode, infinity).
+ gen_server2:call(?SERVER, to_ram_disk_mode, infinity).
length(Q) ->
- gen_server:call(?SERVER, {length, Q}, infinity).
+ gen_server2:call(?SERVER, {length, Q}, infinity).
is_empty(Q) ->
Length = rabbit_disk_queue:length(Q),
@@ -878,9 +878,10 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
{ok, WriteSeqId - ReadSeqId, State2}
end.
-internal_delete_queue(Q, State = #dqstate { sequences = Sequences }) ->
+internal_delete_queue(Q, State) ->
+ {ok, _Count, State1 = #dqstate { sequences = Sequences }} = internal_purge(Q, State),
true = ets:delete(Sequences, Q),
- {atomic, {ok, State1}} =
+ {atomic, {ok, State2}} =
mnesia:transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
@@ -895,9 +896,9 @@ internal_delete_queue(Q, State = #dqstate { sequences = Sequences }) ->
fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, msg_id = MsgId }) ->
{MsgId, SeqId}
end, Objs),
- remove_messages(Q, MsgSeqIds, txn, State)
+ remove_messages(Q, MsgSeqIds, txn, State1)
end),
- {ok, State1}.
+ {ok, State2}.
%% ---- ROLLING OVER THE APPEND FILE ----
@@ -1196,14 +1197,26 @@ delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) ->
%% ---- DISK RECOVERY ----
+add_index() ->
+ case mnesia:add_table_index(rabbit_disk_queue, msg_id) of
+ {atomic, ok} -> ok;
+ {aborted,{already_exists,rabbit_disk_queue,_}} -> ok;
+ E -> E
+ end.
+
+del_index() ->
+ case mnesia:del_table_index(rabbit_disk_queue, msg_id) of
+ {atomic, ok} -> ok;
+ %% hmm, something weird must be going on, but it's probably
+ %% not the end of the world
+ {aborted,{no_exists,rabbit_disk_queue,_}} -> ok;
+ E2 -> E2
+ end.
+
load_from_disk(State) ->
%% sorted so that smallest number is first. which also means
%% eldest file (left-most) first
- ok = case mnesia:add_table_index(rabbit_disk_queue, msg_id) of
- {atomic, ok} -> ok;
- {aborted,{already_exists,rabbit_disk_queue,_}} -> ok;
- E -> E
- end,
+ ok = add_index(),
{Files, TmpFiles} = get_disk_queue_files(),
ok = recover_crashed_compactions(Files, TmpFiles),
%% There should be no more tmp files now, so go ahead and load the
@@ -1221,13 +1234,7 @@ load_from_disk(State) ->
true, rabbit_disk_queue)
end),
State2 = extract_sequence_numbers(State1),
- ok = case mnesia:del_table_index(rabbit_disk_queue, msg_id) of
- {atomic, ok} -> ok;
- %% hmm, something weird must be going on, but it's
- %% probably not the end of the world
- {aborted,{no_exists,rabbit_disk_queue,_}} -> ok;
- E2 -> E2
- end,
+ ok = del_index(),
{ok, State2}.
extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->