diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_sup.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_sup_sup.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_prequeue.erl | 153 |
4 files changed, 78 insertions, 96 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8f25bf2e14..0ad6af51e4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -195,8 +195,6 @@ arguments]). recover() -> - Marker = spawn_link(fun() -> receive stop -> ok end end), - register(rabbit_recovery, Marker), %% Clear out remnants of old incarnation, in case we restarted %% faster than other nodes handled DOWN messages from us. on_node_down(node()), @@ -213,11 +211,7 @@ recover() -> {rabbit_amqqueue_sup_sup, {rabbit_amqqueue_sup_sup, start_link, []}, transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}), - Recovered = recover_durable_queues( - lists:zip(DurableQueues, OrderedRecoveryTerms)), - unlink(Marker), - Marker ! stop, - Recovered. + recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)). stop() -> ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup_sup), diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 591de408da..9f7060d7fa 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -36,12 +36,16 @@ %%---------------------------------------------------------------------------- -start_link(Q, Hint) -> - ChildSpec = {rabbit_amqqueue, {rabbit_prequeue, start_link, [Q, Hint]}, +start_link(Q, StartMode) -> + Marker = spawn_link(fun() -> receive stop -> ok end end), + ChildSpec = {rabbit_amqqueue, + {rabbit_prequeue, start_link, [Q, StartMode, Marker]}, transient, ?MAX_WAIT, worker, [rabbit_amqqueue_process, rabbit_mirror_queue_slave]}, {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, QPid} = supervisor2:start_child(SupPid, ChildSpec), + unlink(Marker), + Marker ! stop, {ok, SupPid, QPid}. init([]) -> {ok, {{one_for_one, 5, 10}, []}}. diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl index 6870f7c4d2..793cb7c9f5 100644 --- a/src/rabbit_amqqueue_sup_sup.erl +++ b/src/rabbit_amqqueue_sup_sup.erl @@ -41,8 +41,9 @@ start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). -start_queue_process(Node, Q, Hint) -> - {ok, _SupPid, QPid} = supervisor2:start_child({?SERVER, Node}, [Q, Hint]), +start_queue_process(Node, Q, StartMode) -> + {ok, _SupPid, QPid} = supervisor2:start_child( + {?SERVER, Node}, [Q, StartMode]), QPid. init([]) -> diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index 059297cb05..b084967d62 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -22,7 +22,7 @@ %% new queue, or whether we are in recovery. Thus a crashing queue %% process can restart from here and always do the right thing. --export([start_link/2]). +-export([start_link/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -41,27 +41,29 @@ %%---------------------------------------------------------------------------- -start_link(Q, Hint) -> - gen_server2:start_link(?MODULE, {Q, Hint}, []). +start_link(Q, StartMode, Marker) -> + gen_server2:start_link(?MODULE, {Q, StartMode, Marker}, []). %%---------------------------------------------------------------------------- -init({Q, Hint}) -> +init({Q, StartMode0, Marker}) -> %% Hand back to supervisor ASAP gen_server2:cast(self(), init), - {ok, {Q#amqqueue{pid = self()}, Hint}, hibernate, + StartMode = case is_process_alive(Marker) of + true -> StartMode0; + false -> restart + end, + {ok, {Q#amqqueue{pid = self()}, StartMode}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, {unexpected_call, Msg}, State}. -handle_cast(init, {Q, Hint}) -> - case whereis(rabbit_recovery) of - undefined -> init_non_recovery(Q, Hint); - _Pid -> recovery = Hint, %% assertion - init_recovery(Q) - end; +handle_cast(init, {Q, declare}) -> init_declared(Q); +handle_cast(init, {Q, recovery}) -> init_recovery(Q); +handle_cast(init, {Q, slave}) -> init_slave(Q); +handle_cast(init, {Q, restart}) -> init_restart(Q); handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. @@ -77,82 +79,27 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -init_non_recovery(Q = #amqqueue{name = QueueName}, Hint) -> - Result = rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_queue, QueueName}) of - [] -> init_missing(Q, Hint); - [ExistingQ] -> init_existing(ExistingQ) - end - end), - case Result of - {declared, DeclResult} -> - %% We have just been declared. Block waiting for an init - %% call so that we don't respond to any other message first - receive {'$gen_call', From, {init, new}} -> - case DeclResult of - {new, Fun} -> - Q1 = Fun(), - rabbit_amqqueue_process:init_declared(new,From, Q1); - {F, _} when F =:= absent; F =:= existing -> - gen_server2:reply(From, DeclResult), - {stop, normal, Q} - end - end; - new_slave -> - rabbit_mirror_queue_slave:init_slave(Q); - {crash_restart, Q1} -> - rabbit_log:error( - "Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]), - Self = self(), - rabbit_misc:execute_mnesia_transaction( - fun () -> - ok = rabbit_amqqueue:store_queue(Q1#amqqueue{pid = Self}) - end), - rabbit_amqqueue_process:init_declared( - {no_barrier, non_clean_shutdown}, none, Q1); - sleep_retry -> - timer:sleep(25), - init_non_recovery(Q, Hint); - master_in_recovery -> - {stop, normal, Q} +init_declared(Q = #amqqueue{name = QueueName}) -> + Decl = rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> rabbit_amqqueue:internal_declare(Q); + [ExistingQ] -> {existing, ExistingQ} + end + end), + %% We have just been declared. Block waiting for an init + %% call so that we don't respond to any other message first + receive {'$gen_call', From, {init, new}} -> + case Decl of + {new, Fun} -> + Q1 = Fun(), + rabbit_amqqueue_process:init_declared(new,From, Q1); + {F, _} when F =:= absent; F =:= existing -> + gen_server2:reply(From, Decl), + {stop, normal, Q} + end end. -%% The Hint is how we were originally started. Of course, if we -%% crashed it might no longer be true - but we can only get here if -%% there is no Mnesia record, which should mean we can't be here if we -%% crashed. -init_missing(Q, Hint) -> - case Hint of - declare -> {declared, rabbit_amqqueue:internal_declare(Q)}; - slave -> master_in_recovery %% [1] - end. -%% [1] This is the same concept as the master_in_recovery case in the -%% slave startup code. Unfortunately since we start slaves with two -%% transactions we need to check twice. - -init_existing(Q = #amqqueue{pid = QPid, slave_pids = SPids}) -> - Alive = fun rabbit_misc:is_process_alive/1, - case {Alive(QPid), node(QPid) =:= node()} of - {true, true} -> {declared, {existing, Q}}; %% [1] - {true, false} -> new_slave; %% [2] - {false, _} -> case [SPid || SPid <- SPids, Alive(SPid)] of - [] -> {crash_restart, Q}; %% [3] - _ -> sleep_retry %% [4] - end - end. -%% [1] Lost a race to declare a queue - just return the winner. -%% -%% [2] There is a master on another node. Regardless of whether we -%% just crashed (as a master or slave) and restarted or were asked to -%% start as a slave, we are now a new slave. -%% -%% [3] Nothing is alive. We must have just died. Try to restart as a master. -%% -%% [4] The current master is dead but there are alive slaves. This is -%% not a stable situation. Sleep and wait for somebody else to make a -%% move - those slaves should either promote one of their own or die. - init_recovery(Q) -> rabbit_misc:execute_mnesia_transaction( fun () -> ok = rabbit_amqqueue:store_queue(Q) end), @@ -160,3 +107,39 @@ init_recovery(Q) -> receive {'$gen_call', From, {init, Terms}} -> rabbit_amqqueue_process:init_declared(Terms, From, Q) end. + +init_slave(Q) -> + rabbit_mirror_queue_slave:init_slave(Q). + +init_restart(#amqqueue{name = QueueName}) -> + {ok, Q = #amqqueue{pid = QPid, + slave_pids = SPids}} = rabbit_amqqueue:lookup(QueueName), + Local = node(QPid) =:= node(), + Slaves = [SPid || SPid <- SPids, rabbit_misc:is_process_alive(SPid)], + case rabbit_misc:is_process_alive(QPid) of + true -> false = Local, %% assertion + rabbit_mirror_queue_slave:init_slave(Q); %% [1] + false -> case Local andalso Slaves =:= [] of + true -> crash_restart(Q); %% [2] + false -> timer:sleep(25), + init_restart(Q) %% [3] + end + end. +%% [1] There is a master on another node. Regardless of whether we +%% were originally a master or a slave, we are now a new slave. +%% +%% [2] Nothing is alive. We are the last best hope. Try to restart as a master. +%% +%% [3] The current master is dead but either there are alive slaves to +%% take over or it's all happening on a different node anyway. This is +%% not a stable situation. Sleep and wait for somebody else to make a +%% move. + +crash_restart(Q = #amqqueue{name = QueueName}) -> + rabbit_log:error( + "Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]), + Self = self(), + rabbit_misc:execute_mnesia_transaction( + fun () -> ok = rabbit_amqqueue:store_queue(Q#amqqueue{pid = Self}) end), + rabbit_amqqueue_process:init_declared( + {no_barrier, non_clean_shutdown}, none, Q). |
