summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_prequeue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/src/rabbit_prequeue.erl')
-rw-r--r--deps/rabbit/src/rabbit_prequeue.erl100
1 files changed, 100 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_prequeue.erl b/deps/rabbit/src/rabbit_prequeue.erl
new file mode 100644
index 0000000000..b5af8927c7
--- /dev/null
+++ b/deps/rabbit/src/rabbit_prequeue.erl
@@ -0,0 +1,100 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2010-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_prequeue).
+
+%% This is the initial gen_server that all queue processes start off
+%% as. It handles the decision as to whether we need to start a new
+%% mirror, a new master/unmirrored, or whether we are restarting (and
+%% if so, as what). Thus a crashing queue process can restart from here
+%% and always do the right thing.
+
+-export([start_link/3]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-behaviour(gen_server2).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("amqqueue.hrl").
+
+%%----------------------------------------------------------------------------
+
+-export_type([start_mode/0]).
+
+-type start_mode() :: 'declare' | 'recovery' | 'slave'.
+
+%%----------------------------------------------------------------------------
+
+-spec start_link(amqqueue:amqqueue(), start_mode(), pid())
+ -> rabbit_types:ok_pid_or_error().
+
+start_link(Q, StartMode, Marker) ->
+ gen_server2:start_link(?MODULE, {Q, StartMode, Marker}, []).
+
+%%----------------------------------------------------------------------------
+
+init({Q, StartMode, Marker}) ->
+ init(Q, case {is_process_alive(Marker), StartMode} of
+ {true, slave} -> slave;
+ {true, _} -> master;
+ {false, _} -> restart
+ end).
+
+init(Q, master) -> rabbit_amqqueue_process:init(Q);
+init(Q, slave) -> rabbit_mirror_queue_slave:init(Q);
+
+init(Q0, restart) when ?is_amqqueue(Q0) ->
+ QueueName = amqqueue:get_name(Q0),
+ {ok, Q1} = rabbit_amqqueue:lookup(QueueName),
+ QPid = amqqueue:get_pid(Q1),
+ SPids = amqqueue:get_slave_pids(Q1),
+ LocalOrMasterDown = node(QPid) =:= node()
+ orelse not rabbit_mnesia:on_running_node(QPid),
+ Slaves = [SPid || SPid <- SPids, rabbit_mnesia:is_process_alive(SPid)],
+ case rabbit_mnesia:is_process_alive(QPid) of
+ true -> false = LocalOrMasterDown, %% assertion
+ rabbit_mirror_queue_slave:go(self(), async),
+ rabbit_mirror_queue_slave:init(Q1); %% [1]
+ false -> case LocalOrMasterDown andalso Slaves =:= [] of
+ true -> crash_restart(Q1); %% [2]
+ false -> timer:sleep(25),
+ init(Q1, restart) %% [3]
+ end
+ end.
+%% [1] There is a master on another node. Regardless of whether we
+%% were originally a master or a mirror, we are now a new slave.
+%%
+%% [2] Nothing is alive. We are the last best hope. Try to restart as a master.
+%%
+%% [3] The current master is dead but either there are alive mirrors to
+%% take over or it's all happening on a different node anyway. This is
+%% not a stable situation. Sleep and wait for somebody else to make a
+%% move.
+
+crash_restart(Q0) when ?is_amqqueue(Q0) ->
+ QueueName = amqqueue:get_name(Q0),
+ rabbit_log:error("Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]),
+ gen_server2:cast(self(), init),
+ Q1 = amqqueue:set_pid(Q0, self()),
+ rabbit_amqqueue_process:init(Q1).
+
+%%----------------------------------------------------------------------------
+
+%% This gen_server2 always hands over to some other module at the end
+%% of init/1.
+-spec handle_call(_, _, _) -> no_return().
+handle_call(_Msg, _From, _State) -> exit(unreachable).
+-spec handle_cast(_, _) -> no_return().
+handle_cast(_Msg, _State) -> exit(unreachable).
+-spec handle_info(_, _) -> no_return().
+handle_info(_Msg, _State) -> exit(unreachable).
+-spec terminate(_, _) -> no_return().
+terminate(_Reason, _State) -> exit(unreachable).
+-spec code_change(_, _, _) -> no_return().
+code_change(_OldVsn, _State, _Extra) -> exit(unreachable).