diff options
Diffstat (limited to 'deps/rabbit/src/rabbit_prequeue.erl')
-rw-r--r-- | deps/rabbit/src/rabbit_prequeue.erl | 100 |
1 files changed, 100 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_prequeue.erl b/deps/rabbit/src/rabbit_prequeue.erl new file mode 100644 index 0000000000..b5af8927c7 --- /dev/null +++ b/deps/rabbit/src/rabbit_prequeue.erl @@ -0,0 +1,100 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2010-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_prequeue). + +%% This is the initial gen_server that all queue processes start off +%% as. It handles the decision as to whether we need to start a new +%% mirror, a new master/unmirrored, or whether we are restarting (and +%% if so, as what). Thus a crashing queue process can restart from here +%% and always do the right thing. + +-export([start_link/3]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-behaviour(gen_server2). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("amqqueue.hrl"). + +%%---------------------------------------------------------------------------- + +-export_type([start_mode/0]). + +-type start_mode() :: 'declare' | 'recovery' | 'slave'. + +%%---------------------------------------------------------------------------- + +-spec start_link(amqqueue:amqqueue(), start_mode(), pid()) + -> rabbit_types:ok_pid_or_error(). + +start_link(Q, StartMode, Marker) -> + gen_server2:start_link(?MODULE, {Q, StartMode, Marker}, []). + +%%---------------------------------------------------------------------------- + +init({Q, StartMode, Marker}) -> + init(Q, case {is_process_alive(Marker), StartMode} of + {true, slave} -> slave; + {true, _} -> master; + {false, _} -> restart + end). + +init(Q, master) -> rabbit_amqqueue_process:init(Q); +init(Q, slave) -> rabbit_mirror_queue_slave:init(Q); + +init(Q0, restart) when ?is_amqqueue(Q0) -> + QueueName = amqqueue:get_name(Q0), + {ok, Q1} = rabbit_amqqueue:lookup(QueueName), + QPid = amqqueue:get_pid(Q1), + SPids = amqqueue:get_slave_pids(Q1), + LocalOrMasterDown = node(QPid) =:= node() + orelse not rabbit_mnesia:on_running_node(QPid), + Slaves = [SPid || SPid <- SPids, rabbit_mnesia:is_process_alive(SPid)], + case rabbit_mnesia:is_process_alive(QPid) of + true -> false = LocalOrMasterDown, %% assertion + rabbit_mirror_queue_slave:go(self(), async), + rabbit_mirror_queue_slave:init(Q1); %% [1] + false -> case LocalOrMasterDown andalso Slaves =:= [] of + true -> crash_restart(Q1); %% [2] + false -> timer:sleep(25), + init(Q1, restart) %% [3] + end + end. +%% [1] There is a master on another node. Regardless of whether we +%% were originally a master or a mirror, we are now a new slave. +%% +%% [2] Nothing is alive. We are the last best hope. Try to restart as a master. +%% +%% [3] The current master is dead but either there are alive mirrors to +%% take over or it's all happening on a different node anyway. This is +%% not a stable situation. Sleep and wait for somebody else to make a +%% move. + +crash_restart(Q0) when ?is_amqqueue(Q0) -> + QueueName = amqqueue:get_name(Q0), + rabbit_log:error("Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]), + gen_server2:cast(self(), init), + Q1 = amqqueue:set_pid(Q0, self()), + rabbit_amqqueue_process:init(Q1). + +%%---------------------------------------------------------------------------- + +%% This gen_server2 always hands over to some other module at the end +%% of init/1. +-spec handle_call(_, _, _) -> no_return(). +handle_call(_Msg, _From, _State) -> exit(unreachable). +-spec handle_cast(_, _) -> no_return(). +handle_cast(_Msg, _State) -> exit(unreachable). +-spec handle_info(_, _) -> no_return(). +handle_info(_Msg, _State) -> exit(unreachable). +-spec terminate(_, _) -> no_return(). +terminate(_Reason, _State) -> exit(unreachable). +-spec code_change(_, _, _) -> no_return(). +code_change(_OldVsn, _State, _Extra) -> exit(unreachable). |