summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_amqqueue_sup.erl8
-rw-r--r--src/rabbit_amqqueue_sup_sup.erl5
-rw-r--r--src/rabbit_prequeue.erl153
4 files changed, 78 insertions, 96 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8f25bf2e14..0ad6af51e4 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -195,8 +195,6 @@
arguments]).
recover() ->
- Marker = spawn_link(fun() -> receive stop -> ok end end),
- register(rabbit_recovery, Marker),
%% Clear out remnants of old incarnation, in case we restarted
%% faster than other nodes handled DOWN messages from us.
on_node_down(node()),
@@ -213,11 +211,7 @@ recover() ->
{rabbit_amqqueue_sup_sup,
{rabbit_amqqueue_sup_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}),
- Recovered = recover_durable_queues(
- lists:zip(DurableQueues, OrderedRecoveryTerms)),
- unlink(Marker),
- Marker ! stop,
- Recovered.
+ recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)).
stop() ->
ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup_sup),
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 591de408da..9f7060d7fa 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -36,12 +36,16 @@
%%----------------------------------------------------------------------------
-start_link(Q, Hint) ->
- ChildSpec = {rabbit_amqqueue, {rabbit_prequeue, start_link, [Q, Hint]},
+start_link(Q, StartMode) ->
+ Marker = spawn_link(fun() -> receive stop -> ok end end),
+ ChildSpec = {rabbit_amqqueue,
+ {rabbit_prequeue, start_link, [Q, StartMode, Marker]},
transient, ?MAX_WAIT, worker, [rabbit_amqqueue_process,
rabbit_mirror_queue_slave]},
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, QPid} = supervisor2:start_child(SupPid, ChildSpec),
+ unlink(Marker),
+ Marker ! stop,
{ok, SupPid, QPid}.
init([]) -> {ok, {{one_for_one, 5, 10}, []}}.
diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl
index 6870f7c4d2..793cb7c9f5 100644
--- a/src/rabbit_amqqueue_sup_sup.erl
+++ b/src/rabbit_amqqueue_sup_sup.erl
@@ -41,8 +41,9 @@
start_link() ->
supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
-start_queue_process(Node, Q, Hint) ->
- {ok, _SupPid, QPid} = supervisor2:start_child({?SERVER, Node}, [Q, Hint]),
+start_queue_process(Node, Q, StartMode) ->
+ {ok, _SupPid, QPid} = supervisor2:start_child(
+ {?SERVER, Node}, [Q, StartMode]),
QPid.
init([]) ->
diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl
index 059297cb05..b084967d62 100644
--- a/src/rabbit_prequeue.erl
+++ b/src/rabbit_prequeue.erl
@@ -22,7 +22,7 @@
%% new queue, or whether we are in recovery. Thus a crashing queue
%% process can restart from here and always do the right thing.
--export([start_link/2]).
+-export([start_link/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
@@ -41,27 +41,29 @@
%%----------------------------------------------------------------------------
-start_link(Q, Hint) ->
- gen_server2:start_link(?MODULE, {Q, Hint}, []).
+start_link(Q, StartMode, Marker) ->
+ gen_server2:start_link(?MODULE, {Q, StartMode, Marker}, []).
%%----------------------------------------------------------------------------
-init({Q, Hint}) ->
+init({Q, StartMode0, Marker}) ->
%% Hand back to supervisor ASAP
gen_server2:cast(self(), init),
- {ok, {Q#amqqueue{pid = self()}, Hint}, hibernate,
+ StartMode = case is_process_alive(Marker) of
+ true -> StartMode0;
+ false -> restart
+ end,
+ {ok, {Q#amqqueue{pid = self()}, StartMode}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}}.
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, {unexpected_call, Msg}, State}.
-handle_cast(init, {Q, Hint}) ->
- case whereis(rabbit_recovery) of
- undefined -> init_non_recovery(Q, Hint);
- _Pid -> recovery = Hint, %% assertion
- init_recovery(Q)
- end;
+handle_cast(init, {Q, declare}) -> init_declared(Q);
+handle_cast(init, {Q, recovery}) -> init_recovery(Q);
+handle_cast(init, {Q, slave}) -> init_slave(Q);
+handle_cast(init, {Q, restart}) -> init_restart(Q);
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
@@ -77,82 +79,27 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-init_non_recovery(Q = #amqqueue{name = QueueName}, Hint) ->
- Result = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_queue, QueueName}) of
- [] -> init_missing(Q, Hint);
- [ExistingQ] -> init_existing(ExistingQ)
- end
- end),
- case Result of
- {declared, DeclResult} ->
- %% We have just been declared. Block waiting for an init
- %% call so that we don't respond to any other message first
- receive {'$gen_call', From, {init, new}} ->
- case DeclResult of
- {new, Fun} ->
- Q1 = Fun(),
- rabbit_amqqueue_process:init_declared(new,From, Q1);
- {F, _} when F =:= absent; F =:= existing ->
- gen_server2:reply(From, DeclResult),
- {stop, normal, Q}
- end
- end;
- new_slave ->
- rabbit_mirror_queue_slave:init_slave(Q);
- {crash_restart, Q1} ->
- rabbit_log:error(
- "Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]),
- Self = self(),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- ok = rabbit_amqqueue:store_queue(Q1#amqqueue{pid = Self})
- end),
- rabbit_amqqueue_process:init_declared(
- {no_barrier, non_clean_shutdown}, none, Q1);
- sleep_retry ->
- timer:sleep(25),
- init_non_recovery(Q, Hint);
- master_in_recovery ->
- {stop, normal, Q}
+init_declared(Q = #amqqueue{name = QueueName}) ->
+ Decl = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_queue, QueueName}) of
+ [] -> rabbit_amqqueue:internal_declare(Q);
+ [ExistingQ] -> {existing, ExistingQ}
+ end
+ end),
+ %% We have just been declared. Block waiting for an init
+ %% call so that we don't respond to any other message first
+ receive {'$gen_call', From, {init, new}} ->
+ case Decl of
+ {new, Fun} ->
+ Q1 = Fun(),
+ rabbit_amqqueue_process:init_declared(new,From, Q1);
+ {F, _} when F =:= absent; F =:= existing ->
+ gen_server2:reply(From, Decl),
+ {stop, normal, Q}
+ end
end.
-%% The Hint is how we were originally started. Of course, if we
-%% crashed it might no longer be true - but we can only get here if
-%% there is no Mnesia record, which should mean we can't be here if we
-%% crashed.
-init_missing(Q, Hint) ->
- case Hint of
- declare -> {declared, rabbit_amqqueue:internal_declare(Q)};
- slave -> master_in_recovery %% [1]
- end.
-%% [1] This is the same concept as the master_in_recovery case in the
-%% slave startup code. Unfortunately since we start slaves with two
-%% transactions we need to check twice.
-
-init_existing(Q = #amqqueue{pid = QPid, slave_pids = SPids}) ->
- Alive = fun rabbit_misc:is_process_alive/1,
- case {Alive(QPid), node(QPid) =:= node()} of
- {true, true} -> {declared, {existing, Q}}; %% [1]
- {true, false} -> new_slave; %% [2]
- {false, _} -> case [SPid || SPid <- SPids, Alive(SPid)] of
- [] -> {crash_restart, Q}; %% [3]
- _ -> sleep_retry %% [4]
- end
- end.
-%% [1] Lost a race to declare a queue - just return the winner.
-%%
-%% [2] There is a master on another node. Regardless of whether we
-%% just crashed (as a master or slave) and restarted or were asked to
-%% start as a slave, we are now a new slave.
-%%
-%% [3] Nothing is alive. We must have just died. Try to restart as a master.
-%%
-%% [4] The current master is dead but there are alive slaves. This is
-%% not a stable situation. Sleep and wait for somebody else to make a
-%% move - those slaves should either promote one of their own or die.
-
init_recovery(Q) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> ok = rabbit_amqqueue:store_queue(Q) end),
@@ -160,3 +107,39 @@ init_recovery(Q) ->
receive {'$gen_call', From, {init, Terms}} ->
rabbit_amqqueue_process:init_declared(Terms, From, Q)
end.
+
+init_slave(Q) ->
+ rabbit_mirror_queue_slave:init_slave(Q).
+
+init_restart(#amqqueue{name = QueueName}) ->
+ {ok, Q = #amqqueue{pid = QPid,
+ slave_pids = SPids}} = rabbit_amqqueue:lookup(QueueName),
+ Local = node(QPid) =:= node(),
+ Slaves = [SPid || SPid <- SPids, rabbit_misc:is_process_alive(SPid)],
+ case rabbit_misc:is_process_alive(QPid) of
+ true -> false = Local, %% assertion
+ rabbit_mirror_queue_slave:init_slave(Q); %% [1]
+ false -> case Local andalso Slaves =:= [] of
+ true -> crash_restart(Q); %% [2]
+ false -> timer:sleep(25),
+ init_restart(Q) %% [3]
+ end
+ end.
+%% [1] There is a master on another node. Regardless of whether we
+%% were originally a master or a slave, we are now a new slave.
+%%
+%% [2] Nothing is alive. We are the last best hope. Try to restart as a master.
+%%
+%% [3] The current master is dead but either there are alive slaves to
+%% take over or it's all happening on a different node anyway. This is
+%% not a stable situation. Sleep and wait for somebody else to make a
+%% move.
+
+crash_restart(Q = #amqqueue{name = QueueName}) ->
+ rabbit_log:error(
+ "Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]),
+ Self = self(),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> ok = rabbit_amqqueue:store_queue(Q#amqqueue{pid = Self}) end),
+ rabbit_amqqueue_process:init_declared(
+ {no_barrier, non_clean_shutdown}, none, Q).