summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-04-27 05:12:25 +0100
committerMatthias Radestock <matthias@lshift.net>2010-04-27 05:12:25 +0100
commita872bd426153d9d8a7d3f255f92241c9ca1ad926 (patch)
tree2818939f5174705a9f9daf930cb5de44fe352c6e /src
parent2abce2db6cb2af6f0f55efdc8f409f8b7a0470e5 (diff)
downloadrabbitmq-server-git-a872bd426153d9d8a7d3f255f92241c9ca1ad926.tar.gz
turn queue recovery upside down
instead of the persister pushing recovered messages to the queues, the queues pull recovered messages from the persister. This allows us to perform queue recovery before recording the existince of the queues in mnesia, and thus prevents access to potentially uninitialised queues from other cluster nodes. It also makes the dependency between queues and the persister one way.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_amqqueue.erl39
-rw-r--r--src/rabbit_amqqueue_process.erl21
-rw-r--r--src/rabbit_persister.erl122
4 files changed, 79 insertions, 111 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index f2dce30334..17e18e0ec6 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -128,12 +128,8 @@
-rabbit_boot_step({queue_sup_queue_recovery,
[{description, "queue supervisor and queue recovery"},
{mfa, {rabbit_amqqueue, start, []}},
- {requires, empty_db_check}]}).
-
--rabbit_boot_step({persister,
- [{mfa, {rabbit_sup, start_child,
- [rabbit_persister]}},
- {requires, queue_sup_queue_recovery}]}).
+ {requires, empty_db_check},
+ {enables, routing_ready}]}).
-rabbit_boot_step({routing_ready,
[{description, "message delivery logic ready"}]}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f627883677..5f045b2764 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -35,7 +35,7 @@
-export([internal_declare/2, internal_delete/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
- stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
+ stat/1, stat_all/0, deliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([consumers/1, consumers_all/1]).
-export([claim_queue/2]).
@@ -88,7 +88,6 @@
{'error', 'not_empty'}).
-spec(purge/1 :: (amqqueue()) -> qlen()).
-spec(deliver/2 :: (pid(), delivery()) -> boolean()).
--spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok').
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
-spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()).
@@ -118,6 +117,9 @@
start() ->
DurableQueues = find_durable_queues(),
+ ok = rabbit_sup:start_child(
+ rabbit_persister,
+ [[QName || #amqqueue{name = QName} <- DurableQueues]]),
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup,
@@ -137,27 +139,13 @@ find_durable_queues() ->
end).
recover_durable_queues(DurableQueues) ->
- lists:foldl(
- fun (RecoveredQ, Acc) ->
- Q = start_queue_process(RecoveredQ),
- %% We need to catch the case where a client connected to
- %% another node has deleted the queue (and possibly
- %% re-created it).
- case rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:match_object(
- rabbit_durable_queue, RecoveredQ,
- read) of
- [_] -> ok = store_queue(Q),
- true;
- [] -> false
- end
- end) of
- true -> [Q | Acc];
- false -> exit(Q#amqqueue.pid, shutdown),
- Acc
- end
- end, [], DurableQueues).
+ Qs = [start_queue_process(Q) || Q <- DurableQueues],
+ %% Issue inits to *all* the queues so that they all init at the same time
+ [ok = gen_server2:cast(Q#amqqueue.pid, {init, true}) || Q <- Qs],
+ [ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs],
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> [ok = store_queue(Q) || Q <- Qs] end),
+ Qs.
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
@@ -165,6 +153,8 @@ declare(QueueName, Durable, AutoDelete, Args) ->
auto_delete = AutoDelete,
arguments = Args,
pid = none}),
+ ok = gen_server2:cast(Q#amqqueue.pid, {init, false}),
+ ok = gen_server2:call(Q#amqqueue.pid, sync, infinity),
internal_declare(Q, true).
internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
@@ -279,9 +269,6 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}),
true.
-redeliver(QPid, Messages) ->
- gen_server2:cast(QPid, {redeliver, Messages}).
-
requeue(QPid, MsgIds, ChPid) ->
gen_server2:cast(QPid, {requeue, MsgIds, ChPid}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 449e79eaf7..18c98f14eb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -92,7 +92,7 @@
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
info_keys() -> ?INFO_KEYS.
-
+
%%----------------------------------------------------------------------------
init(Q) ->
@@ -102,11 +102,13 @@ init(Q) ->
exclusive_consumer = none,
has_had_consumers = false,
next_msg_id = 1,
- message_buffer = queue:new(),
+ message_buffer = undefined,
active_consumers = queue:new(),
blocked_consumers = queue:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+terminate(_Reason, #q{message_buffer = undefined}) ->
+ ok;
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
QName = qname(State),
@@ -541,6 +543,9 @@ i(Item, _) ->
%---------------------------------------------------------------------------
+handle_call(sync, _From, State) ->
+ reply(ok, State);
+
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
@@ -748,6 +753,15 @@ handle_call({claim_queue, ReaderPid}, _From,
reply(locked, State)
end.
+handle_cast({init, Recover}, State = #q{message_buffer = undefined}) ->
+ Messages = case Recover of
+ true -> rabbit_persister:queue_content(qname(State));
+ false -> []
+ end,
+ noreply(State#q{message_buffer = queue:from_list(Messages)});
+handle_cast(init, State) ->
+ noreply(State);
+
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
@@ -775,9 +789,6 @@ handle_cast({rollback, Txn, ChPid}, State) ->
record_current_channel_tx(ChPid, none),
noreply(State);
-handle_cast({redeliver, Messages}, State) ->
- noreply(deliver_or_enqueue_n(Messages, State));
-
handle_cast({requeue, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index a9e0cab928..a8e41baf74 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -33,14 +33,14 @@
-behaviour(gen_server).
--export([start_link/0]).
+-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([transaction/1, extend_transaction/2, dirty_work/1,
commit_transaction/1, rollback_transaction/1,
- force_snapshot/0]).
+ force_snapshot/0, queue_content/1]).
-include("rabbit.hrl").
@@ -52,8 +52,7 @@
-define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}).
-record(pstate, {log_handle, entry_count, deadline,
- pending_logs, pending_replies,
- snapshot}).
+ pending_logs, pending_replies, snapshot}).
%% two tables for efficient persistency
%% one maps a key to a message
@@ -72,20 +71,22 @@
{deliver, pmsg()} |
{ack, pmsg()}).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/1 :: ([queue_name()]) ->
+ {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(transaction/1 :: ([work_item()]) -> 'ok').
-spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok').
-spec(dirty_work/1 :: ([work_item()]) -> 'ok').
-spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(force_snapshot/0 :: () -> 'ok').
+-spec(queue_content/1 :: (queue_name()) -> [{message(), boolean()}]).
-endif.
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+start_link(DurableQueues) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [DurableQueues], []).
transaction(MessageList) ->
?LOGDEBUG("transaction ~p~n", [MessageList]),
@@ -111,15 +112,18 @@ rollback_transaction(TxnKey) ->
force_snapshot() ->
gen_server:call(?SERVER, force_snapshot, infinity).
+queue_content(QName) ->
+ gen_server:call(?SERVER, {queue_content, QName}, infinity).
+
%%--------------------------------------------------------------------
-init(_Args) ->
+init([DurableQueues]) ->
process_flag(trap_exit, true),
FileName = base_filename(),
ok = filelib:ensure_dir(FileName),
Snapshot = #psnapshot{transactions = dict:new(),
messages = ets:new(messages, []),
- queues = ets:new(queues, []),
+ queues = ets:new(queues, [ordered_set]),
next_seq_id = 0},
LogHandle =
case disk_log:open([{name, rabbit_persister},
@@ -135,7 +139,8 @@ init(_Args) ->
[Recovered, Bad]),
LH
end,
- {Res, NewSnapshot} = internal_load_snapshot(LogHandle, Snapshot),
+ {Res, NewSnapshot} =
+ internal_load_snapshot(LogHandle, DurableQueues, Snapshot),
case Res of
ok ->
ok = take_snapshot(LogHandle, NewSnapshot);
@@ -143,12 +148,12 @@ init(_Args) ->
rabbit_log:error("Failed to load persister log: ~p~n", [Reason]),
ok = take_snapshot_and_save_old(LogHandle, NewSnapshot)
end,
- State = #pstate{log_handle = LogHandle,
- entry_count = 0,
- deadline = infinity,
- pending_logs = [],
- pending_replies = [],
- snapshot = NewSnapshot},
+ State = #pstate{log_handle = LogHandle,
+ entry_count = 0,
+ deadline = infinity,
+ pending_logs = [],
+ pending_replies = [],
+ snapshot = NewSnapshot},
{ok, State}.
handle_call({transaction, Key, MessageList}, From, State) ->
@@ -158,6 +163,13 @@ handle_call({commit_transaction, TxnKey}, From, State) ->
do_noreply(internal_commit(From, TxnKey, State));
handle_call(force_snapshot, _From, State) ->
do_reply(ok, flush(true, State));
+handle_call({queue_content, QName}, _From,
+ State = #pstate{snapshot = #psnapshot{messages = Messages,
+ queues = Queues}}) ->
+ MatchSpec= [{{{QName,'$1'}, '$2', '$3'}, [], [{{'$3', '$1', '$2'}}]}],
+ do_reply([{ets:lookup_element(Messages, K, 2), D} ||
+ {_, K, D} <- lists:sort(ets:select(Queues, MatchSpec))],
+ State);
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -339,10 +351,10 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
next_seq_id = NextSeqId}) ->
%% Avoid infinite growth of the table by removing messages not
%% bound to a queue anymore
- prune_table(Messages, ets:foldl(
- fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
- sets:add_element(PKey, S)
- end, sets:new(), Queues)),
+ PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
+ sets:add_element(PKey, S)
+ end, sets:new(), Queues),
+ prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end),
InnerSnapshot = {{txns, Ts},
{messages, ets:tab2list(Messages)},
{queues, ets:tab2list(Queues)},
@@ -351,20 +363,21 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
{persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
term_to_binary(InnerSnapshot)}.
-prune_table(Tab, Keys) ->
+prune_table(Tab, Pred) ->
true = ets:safe_fixtable(Tab, true),
- ok = prune_table(Tab, Keys, ets:first(Tab)),
+ ok = prune_table(Tab, Pred, ets:first(Tab)),
true = ets:safe_fixtable(Tab, false).
-prune_table(_Tab, _Keys, '$end_of_table') -> ok;
-prune_table(Tab, Keys, Key) ->
- case sets:is_element(Key, Keys) of
+prune_table(_Tab, _Pred, '$end_of_table') -> ok;
+prune_table(Tab, Pred, Key) ->
+ case Pred(Key) of
true -> ok;
false -> ets:delete(Tab, Key)
end,
- prune_table(Tab, Keys, ets:next(Tab, Key)).
+ prune_table(Tab, Pred, ets:next(Tab, Key)).
internal_load_snapshot(LogHandle,
+ DurableQueues,
Snapshot = #psnapshot{messages = Messages,
queues = Queues}) ->
{K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
@@ -378,11 +391,18 @@ internal_load_snapshot(LogHandle,
Snapshot#psnapshot{
transactions = Ts,
next_seq_id = NextSeqId}),
- Snapshot2 = requeue_messages(Snapshot1),
+ %% Remove all entries for queues that no longer exist.
+ %% Note that the 'messages' table is pruned when the next
+ %% snapshot is taken.
+ DurableQueuesSet = sets:from_list(DurableQueues),
+ prune_table(Snapshot1#psnapshot.queues,
+ fun ({QName, _PKey}) ->
+ sets:is_element(QName, DurableQueuesSet)
+ end),
%% uncompleted transactions are discarded - this is TRTTD
%% since we only get into this code on node restart, so
%% any uncompleted transactions will have been aborted.
- {ok, Snapshot2#psnapshot{transactions = dict:new()}};
+ {ok, Snapshot1#psnapshot{transactions = dict:new()}};
{error, Reason} -> {{error, Reason}, Snapshot}
end.
@@ -394,52 +414,6 @@ check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) ->
check_version(_Other) ->
{error, unrecognised_persister_log_format}.
-requeue_messages(Snapshot = #psnapshot{messages = Messages,
- queues = Queues}) ->
- Work = ets:foldl(
- fun ({{QName, PKey}, Delivered, SeqId}, Acc) ->
- rabbit_misc:dict_cons(QName, {SeqId, PKey, Delivered}, Acc)
- end, dict:new(), Queues),
- %% unstable parallel map, because order doesn't matter
- L = lists:append(
- rabbit_misc:upmap(
- %% we do as much work as possible in spawned worker
- %% processes, but we need to make sure the ets:inserts are
- %% performed in self()
- fun ({QName, Requeues}) ->
- requeue(QName, Requeues, Messages)
- end, dict:to_list(Work))),
- NewMessages = [{K, M} || {_S, _Q, K, M, _D} <- L],
- NewQueues = [{{Q, K}, D, S} || {S, Q, K, _M, D} <- L],
- ets:delete_all_objects(Messages),
- ets:delete_all_objects(Queues),
- true = ets:insert(Messages, NewMessages),
- true = ets:insert(Queues, NewQueues),
- %% contains the mutated messages and queues tables
- Snapshot.
-
-requeue(QName, Requeues, Messages) ->
- case rabbit_amqqueue:lookup(QName) of
- {ok, #amqqueue{pid = QPid}} ->
- RequeueMessages =
- [{SeqId, QName, PKey, Message, Delivered} ||
- {SeqId, PKey, Delivered} <- Requeues,
- {_, Message} <- ets:lookup(Messages, PKey)],
- rabbit_amqqueue:redeliver(
- QPid,
- %% Messages published by the same process receive
- %% persistence keys that are monotonically
- %% increasing. Since message ordering is defined on a
- %% per-channel basis, and channels are bound to specific
- %% processes, sorting the list does provide the correct
- %% ordering properties.
- [{Message, Delivered} || {_, _, _, Message, Delivered} <-
- lists:sort(RequeueMessages)]),
- RequeueMessages;
- {error, not_found} ->
- []
- end.
-
replay([], LogHandle, K, Snapshot) ->
case disk_log:chunk(LogHandle, K) of
{K1, Items} ->