diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-05-27 18:02:45 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-05-27 18:02:45 +0100 |
| commit | e5f82c4168712f69adf394d218df158cd13e8fea (patch) | |
| tree | b9ef6b6e5c333fe4a4134465861d6083e0d0066f /src | |
| parent | 6246085d1566ef701379006b2ed5ce1b45b4f434 (diff) | |
| download | rabbitmq-server-git-e5f82c4168712f69adf394d218df158cd13e8fea.tar.gz | |
gen_server -> gen_server2, delete_queue calls purge_queue first in order to try and reduce horrible inefficient mnesia_match_object call. Also some refactoring and tidying.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 81 |
1 files changed, 44 insertions, 37 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index d13b6eb620..a0bc1bfdd6 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -31,7 +31,7 @@ -module(rabbit_disk_queue). --behaviour(gen_server). +-behaviour(gen_server2). -export([start_link/0]). @@ -257,63 +257,63 @@ %% ---- PUBLIC API ---- start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, - [?FILE_SIZE_LIMIT, ?MAX_READ_FILE_HANDLES], []). + gen_server2:start_link({local, ?SERVER}, ?MODULE, + [?FILE_SIZE_LIMIT, ?MAX_READ_FILE_HANDLES], []). publish(Q, MsgId, Msg) when is_binary(Msg) -> - gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}). + gen_server2:cast(?SERVER, {publish, Q, MsgId, Msg}). publish_with_seq(Q, MsgId, SeqId, Msg) when is_binary(Msg) -> - gen_server:cast(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg}). + gen_server2:cast(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg}). deliver(Q) -> - gen_server:call(?SERVER, {deliver, Q}, infinity). + gen_server2:call(?SERVER, {deliver, Q}, infinity). phantom_deliver(Q) -> - gen_server:call(?SERVER, {phantom_deliver, Q}). + gen_server2:call(?SERVER, {phantom_deliver, Q}, infinity). ack(Q, MsgSeqIds) when is_list(MsgSeqIds) -> - gen_server:cast(?SERVER, {ack, Q, MsgSeqIds}). + gen_server2:cast(?SERVER, {ack, Q, MsgSeqIds}). tx_publish(MsgId, Msg) when is_binary(Msg) -> - gen_server:cast(?SERVER, {tx_publish, MsgId, Msg}). + gen_server2:cast(?SERVER, {tx_publish, MsgId, Msg}). tx_commit(Q, PubMsgIds, AckSeqIds) when is_list(PubMsgIds) andalso is_list(AckSeqIds) -> - gen_server:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity). + gen_server2:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity). tx_commit_with_seqs(Q, PubMsgSeqIds, AckSeqIds) when is_list(PubMsgSeqIds) andalso is_list(AckSeqIds) -> - gen_server:call(?SERVER, {tx_commit_with_seqs, Q, PubMsgSeqIds, AckSeqIds}, infinity). + gen_server2:call(?SERVER, {tx_commit_with_seqs, Q, PubMsgSeqIds, AckSeqIds}, infinity). tx_cancel(MsgIds) when is_list(MsgIds) -> - gen_server:cast(?SERVER, {tx_cancel, MsgIds}). + gen_server2:cast(?SERVER, {tx_cancel, MsgIds}). requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) -> - gen_server:cast(?SERVER, {requeue, Q, MsgSeqIds}). + gen_server2:cast(?SERVER, {requeue, Q, MsgSeqIds}). requeue_with_seqs(Q, MsgSeqSeqIds) when is_list(MsgSeqSeqIds) -> - gen_server:cast(?SERVER, {requeue_with_seqs, Q, MsgSeqSeqIds}). + gen_server2:cast(?SERVER, {requeue_with_seqs, Q, MsgSeqSeqIds}). purge(Q) -> - gen_server:call(?SERVER, {purge, Q}). + gen_server2:call(?SERVER, {purge, Q}, infinity). delete_queue(Q) -> - gen_server:cast(?SERVER, {delete_queue, Q}). + gen_server2:cast(?SERVER, {delete_queue, Q}). stop() -> - gen_server:call(?SERVER, stop, infinity). + gen_server2:call(?SERVER, stop, infinity). stop_and_obliterate() -> - gen_server:call(?SERVER, stop_vaporise, infinity). + gen_server2:call(?SERVER, stop_vaporise, infinity). to_disk_only_mode() -> - gen_server:call(?SERVER, to_disk_only_mode, infinity). + gen_server2:call(?SERVER, to_disk_only_mode, infinity). to_ram_disk_mode() -> - gen_server:call(?SERVER, to_ram_disk_mode, infinity). + gen_server2:call(?SERVER, to_ram_disk_mode, infinity). length(Q) -> - gen_server:call(?SERVER, {length, Q}, infinity). + gen_server2:call(?SERVER, {length, Q}, infinity). is_empty(Q) -> Length = rabbit_disk_queue:length(Q), @@ -878,9 +878,10 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) -> {ok, WriteSeqId - ReadSeqId, State2} end. -internal_delete_queue(Q, State = #dqstate { sequences = Sequences }) -> +internal_delete_queue(Q, State) -> + {ok, _Count, State1 = #dqstate { sequences = Sequences }} = internal_purge(Q, State), true = ets:delete(Sequences, Q), - {atomic, {ok, State1}} = + {atomic, {ok, State2}} = mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), @@ -895,9 +896,9 @@ internal_delete_queue(Q, State = #dqstate { sequences = Sequences }) -> fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, msg_id = MsgId }) -> {MsgId, SeqId} end, Objs), - remove_messages(Q, MsgSeqIds, txn, State) + remove_messages(Q, MsgSeqIds, txn, State1) end), - {ok, State1}. + {ok, State2}. %% ---- ROLLING OVER THE APPEND FILE ---- @@ -1196,14 +1197,26 @@ delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> %% ---- DISK RECOVERY ---- +add_index() -> + case mnesia:add_table_index(rabbit_disk_queue, msg_id) of + {atomic, ok} -> ok; + {aborted,{already_exists,rabbit_disk_queue,_}} -> ok; + E -> E + end. + +del_index() -> + case mnesia:del_table_index(rabbit_disk_queue, msg_id) of + {atomic, ok} -> ok; + %% hmm, something weird must be going on, but it's probably + %% not the end of the world + {aborted,{no_exists,rabbit_disk_queue,_}} -> ok; + E2 -> E2 + end. + load_from_disk(State) -> %% sorted so that smallest number is first. which also means %% eldest file (left-most) first - ok = case mnesia:add_table_index(rabbit_disk_queue, msg_id) of - {atomic, ok} -> ok; - {aborted,{already_exists,rabbit_disk_queue,_}} -> ok; - E -> E - end, + ok = add_index(), {Files, TmpFiles} = get_disk_queue_files(), ok = recover_crashed_compactions(Files, TmpFiles), %% There should be no more tmp files now, so go ahead and load the @@ -1221,13 +1234,7 @@ load_from_disk(State) -> true, rabbit_disk_queue) end), State2 = extract_sequence_numbers(State1), - ok = case mnesia:del_table_index(rabbit_disk_queue, msg_id) of - {atomic, ok} -> ok; - %% hmm, something weird must be going on, but it's - %% probably not the end of the world - {aborted,{no_exists,rabbit_disk_queue,_}} -> ok; - E2 -> E2 - end, + ok = del_index(), {ok, State2}. extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> |
