summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2017-08-03 16:16:09 +0300
committerMichael Klishin <michael@clojurewerkz.org>2017-08-03 16:16:09 +0300
commit454a26b359b050d62bb745d11e7bc3677aab29bf (patch)
treed4fa6f9a14794da4d78725897c1a8e577e740386
parentba218a735477744f7740fb4325868276dd99ea6e (diff)
parentac687666a1cc4fb0619792d44cad5f10dd869629 (diff)
downloadrabbitmq-server-git-454a26b359b050d62bb745d11e7bc3677aab29bf.tar.gz
Merge branch 'master' into rabbitmq-management-460
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_amqqueue.erl50
-rw-r--r--src/rabbit_mirror_queue_misc.erl11
-rw-r--r--src/rabbit_vhost.erl2
-rw-r--r--src/rabbit_vhost_sup_sup.erl39
-rw-r--r--test/dynamic_ha_SUITE.erl87
6 files changed, 152 insertions, 43 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b166e079f4..0a0eb6b71a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -168,12 +168,6 @@
{requires, recovery},
{enables, routing_ready}]}).
--rabbit_boot_step({mirrored_queues,
- [{description, "adding mirrors to queues"},
- {mfa, {rabbit_mirror_queue_misc, on_node_up, []}},
- {requires, recovery},
- {enables, routing_ready}]}).
-
-rabbit_boot_step({routing_ready,
[{description, "message delivery logic ready"},
{requires, core_initialized}]}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5537634144..a5480f707e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -339,11 +339,17 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
{ok, Node0} -> Node0;
{error, _} -> Node
end,
-
Node1 = rabbit_mirror_queue_misc:initial_queue_node(Q, Node1),
- gen_server2:call(
- rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare),
- {init, new}, infinity).
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost, Node1) of
+ {ok, _} ->
+ gen_server2:call(
+ rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare),
+ {init, new}, infinity);
+ {error, Error} ->
+ rabbit_misc:protocol_error(internal_error,
+ "Cannot declare a queue '~s' on node '~s': ~255p",
+ [rabbit_misc:rs(QueueName), Node1, Error])
+ end.
internal_declare(Q, true) ->
rabbit_misc:execute_mnesia_tx_with_tail(
@@ -456,16 +462,28 @@ with(Name, F, E) ->
with(Name, F, E, RetriesLeft) ->
case lookup(Name) of
- {ok, Q = #amqqueue{}} when RetriesLeft =:= 0 ->
+ {ok, Q = #amqqueue{state = live}} when RetriesLeft =:= 0 ->
%% Something bad happened to that queue, we are bailing out
%% on processing current request.
E({absent, Q, timeout});
+ {ok, Q = #amqqueue{state = stopped}} when RetriesLeft =:= 0 ->
+ %% The queue was stopped and not migrated
+ E({absent, Q, stopped});
+ %% The queue process has crashed with unknown error
{ok, Q = #amqqueue{state = crashed}} ->
E({absent, Q, crashed});
+ %% The queue process has been stopped by a supervisor.
+ %% In that case a synchronised slave can take over
+ %% so we should retry.
{ok, Q = #amqqueue{state = stopped}} ->
%% The queue process was stopped by the supervisor
- E({absent, Q, stopped});
- {ok, Q = #amqqueue{pid = QPid}} ->
+ rabbit_misc:with_exit_handler(
+ fun () -> retry_wait(Q, F, E, RetriesLeft) end,
+ fun () -> F(Q) end);
+ %% The queue is supposed to be active.
+ %% The master node can go away or queue can be killed
+ %% so we retry, waiting for a slave to take over.
+ {ok, Q = #amqqueue{state = live}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
%% with the QPid. F() should be written s.t. that this
@@ -473,14 +491,24 @@ with(Name, F, E, RetriesLeft) ->
%% indicates a code bug and we don't want to get stuck in
%% the retry loop.
rabbit_misc:with_exit_handler(
- fun () -> false = rabbit_mnesia:is_process_alive(QPid),
- timer:sleep(30),
- with(Name, F, E, RetriesLeft - 1)
- end, fun () -> F(Q) end);
+ fun () -> retry_wait(Q, F, E, RetriesLeft) end,
+ fun () -> F(Q) end);
{error, not_found} ->
E(not_found_or_absent_dirty(Name))
end.
+retry_wait(Q = #amqqueue{pid = QPid, name = Name, state = QState}, F, E, RetriesLeft) ->
+ case {QState, is_mirrored(Q)} of
+ %% We don't want to repeat an operation if
+ %% there are no slaves to migrate to
+ {stopped, false} ->
+ E({absent, Q, stopped});
+ _ ->
+ false = rabbit_mnesia:is_process_alive(QPid),
+ timer:sleep(30),
+ with(Name, F, E, RetriesLeft - 1)
+ end.
+
with(Name, F) -> with(Name, F, fun (E) -> {error, E} end).
with_or_die(Name, F) ->
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index dc23095fb1..725cd7d089 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -17,7 +17,7 @@
-module(rabbit_mirror_queue_misc).
-behaviour(rabbit_policy_validator).
--export([remove_from_queue/3, on_node_up/0, add_mirrors/3,
+-export([remove_from_queue/3, on_vhost_up/1, add_mirrors/3,
report_deaths/4, store_updated_slaves/1,
initial_queue_node/2, suggested_queue_nodes/1,
is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1,
@@ -53,7 +53,6 @@
-spec remove_from_queue
(rabbit_amqqueue:name(), pid(), [pid()]) ->
{'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}.
--spec on_node_up() -> 'ok'.
-spec add_mirrors(rabbit_amqqueue:name(), [node()], 'sync' | 'async') ->
'ok'.
-spec store_updated_slaves(rabbit_types:amqqueue()) ->
@@ -167,12 +166,16 @@ slaves_to_start_on_failure(Q, DeadGMPids) ->
{_, NewNodes} = suggested_queue_nodes(Q, ClusterNodes),
NewNodes -- OldNodes.
-on_node_up() ->
+on_vhost_up(VHost) ->
QNames =
rabbit_misc:execute_mnesia_transaction(
fun () ->
mnesia:foldl(
- fun (Q = #amqqueue{name = QName,
+ fun
+ (#amqqueue{name = #resource{virtual_host = OtherVhost}},
+ QNames0) when OtherVhost =/= VHost ->
+ QNames0;
+ (Q = #amqqueue{name = QName,
pid = Pid,
slave_pids = SPids}, QNames0) ->
%% We don't want to pass in the whole
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 3a214e2634..73c05389be 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -71,6 +71,8 @@ recover(VHost) ->
ok = rabbit_binding:recover(rabbit_exchange:recover(VHost),
[QName || #amqqueue{name = QName} <- Qs]),
ok = rabbit_amqqueue:start(Qs),
+ %% Start queue mirrors.
+ ok = rabbit_mirror_queue_misc:on_vhost_up(VHost),
ok.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl
index 93c26d4e0f..19d7cf61b7 100644
--- a/src/rabbit_vhost_sup_sup.erl
+++ b/src/rabbit_vhost_sup_sup.erl
@@ -23,15 +23,16 @@
-export([init/1]).
-export([start_link/0, start/0]).
--export([init_vhost/1, get_vhost_sup/1, get_vhost_sup/2, save_vhost_sup/3]).
--export([delete_on_all_nodes/1]).
--export([start_on_all_nodes/1]).
-
--export([save_vhost_process/2]).
+-export([init_vhost/1,
+ start_vhost/1, start_vhost/2,
+ get_vhost_sup/1, get_vhost_sup/2,
+ save_vhost_sup/3,
+ save_vhost_process/2]).
+-export([delete_on_all_nodes/1, start_on_all_nodes/1]).
-export([is_vhost_alive/1]).
%% Internal
--export([stop_and_delete_vhost/1, start_vhost/1]).
+-export([stop_and_delete_vhost/1]).
-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid, vhost_process_pid}).
@@ -61,7 +62,12 @@ init([]) ->
start_on_all_nodes(VHost) ->
NodesStart = [ {Node, start_vhost(VHost, Node)}
|| Node <- rabbit_nodes:all_running() ],
- Failures = lists:filter(fun({_, {ok, _}}) -> false; (_) -> true end, NodesStart),
+ Failures = lists:filter(fun
+ ({_, {ok, _}}) -> false;
+ ({_, {error, {already_started, _}}}) -> false;
+ (_) -> true
+ end,
+ NodesStart),
case Failures of
[] -> ok;
Errors -> {error, {failed_to_start_vhost_on_nodes, Errors}}
@@ -112,6 +118,11 @@ stop_and_delete_vhost(VHost, Node) ->
init_vhost(VHost) ->
case start_vhost(VHost) of
{ok, _} -> ok;
+ {error, {already_started, _}} ->
+ rabbit_log:warning(
+ "Attempting to start an already started vhost '~s'.",
+ [VHost]),
+ ok;
{error, {no_such_vhost, VHost}} ->
{error, {no_such_vhost, VHost}};
{error, Reason} ->
@@ -163,22 +174,16 @@ get_vhost_sup(VHost) ->
-spec start_vhost(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, term()}.
start_vhost(VHost, Node) ->
case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of
- {ok, Pid} when is_pid(Pid) ->
- {ok, Pid};
- {badrpc, RpcErr} ->
- {error, RpcErr}
+ {ok, Pid} -> {ok, Pid};
+ {error, Err} -> {error, Err};
+ {badrpc, RpcErr} -> {error, RpcErr}
end.
-spec start_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
start_vhost(VHost) ->
case rabbit_vhost:exists(VHost) of
false -> {error, {no_such_vhost, VHost}};
- true ->
- case supervisor2:start_child(?MODULE, [VHost]) of
- {ok, Pid} -> {ok, Pid};
- {error, {already_started, Pid}} -> {ok, Pid};
- {error, Err} -> {error, Err}
- end
+ true -> supervisor2:start_child(?MODULE, [VHost])
end.
-spec is_vhost_alive(rabbit_types:vhost()) -> boolean().
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index 55979e243a..c70f23c066 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -58,6 +58,10 @@ groups() ->
{cluster_size_2, [], [
vhost_deletion,
promote_on_shutdown,
+ slave_recovers_after_vhost_failure,
+ slave_recovers_after_vhost_down_an_up,
+ master_migrates_on_vhost_down,
+ slave_recovers_after_vhost_down_and_master_migrated,
queue_survive_adding_dead_vhost_mirror
]},
{cluster_size_3, [], [
@@ -318,6 +322,72 @@ nodes_policy_should_pick_master_from_its_params(Config) ->
amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}),
_ = rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY).
+slave_recovers_after_vhost_failure(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ rabbit_ct_broker_helpers:set_ha_policy_all(Config),
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ QName = <<"slave_recovers_after_vhost_failure-q">>,
+ amqp_channel:call(ACh, #'queue.declare'{queue = QName}),
+ timer:sleep(300),
+ assert_slaves(A, QName, {A, [B]}, [{A, []}]),
+
+ %% Crash vhost on a node hosting a mirror
+ {ok, Sup} = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, get_vhost_sup, [<<"/">>]),
+ exit(Sup, foo),
+
+ assert_slaves(A, QName, {A, [B]}, [{A, []}]).
+
+slave_recovers_after_vhost_down_an_up(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ rabbit_ct_broker_helpers:set_ha_policy_all(Config),
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ QName = <<"slave_recovers_after_vhost_down_an_up-q">>,
+ amqp_channel:call(ACh, #'queue.declare'{queue = QName}),
+ timer:sleep(100),
+ assert_slaves(A, QName, {A, [B]}, [{A, []}]),
+
+ %% Crash vhost on a node hosting a mirror
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, B, <<"/">>),
+ %% Vhost is down now
+ false = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, is_vhost_alive, [<<"/">>]),
+ timer:sleep(300),
+ %% Vhost is back up
+ {ok, _Sup} = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, start_vhost, [<<"/">>]),
+
+ assert_slaves(A, QName, {A, [B]}, [{A, []}]).
+
+master_migrates_on_vhost_down(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ rabbit_ct_broker_helpers:set_ha_policy_all(Config),
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ QName = <<"master_migrates_on_vhost_down-q">>,
+ amqp_channel:call(ACh, #'queue.declare'{queue = QName}),
+ timer:sleep(100),
+ assert_slaves(A, QName, {A, [B]}, [{A, []}]),
+
+ %% Crash vhost on the node hosting queue master
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, A, <<"/">>),
+ timer:sleep(300),
+ assert_slaves(A, QName, {B, []}).
+
+slave_recovers_after_vhost_down_and_master_migrated(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ rabbit_ct_broker_helpers:set_ha_policy_all(Config),
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ QName = <<"slave_recovers_after_vhost_down_and_master_migrated-q">>,
+ amqp_channel:call(ACh, #'queue.declare'{queue = QName}),
+ timer:sleep(100),
+ assert_slaves(A, QName, {A, [B]}, [{A, []}]),
+ %% Crash vhost on the node hosting queue master
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, A, <<"/">>),
+ timer:sleep(300),
+ assert_slaves(B, QName, {B, []}),
+
+ %% Restart the vhost on the node (previously) hosting queue master
+ {ok, _Sup} = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_vhost_sup_sup, start_vhost, [<<"/">>]),
+ timer:sleep(300),
+ assert_slaves(B, QName, {B, [A]}, [{B, []}]).
+
random_policy(Config) ->
run_proper(fun prop_random_policy/1, [Config]).
@@ -356,7 +426,7 @@ promote_slave_after_standalone_restart(Config) ->
rabbit_ct_broker_helpers:stop_node(Config, B),
rabbit_ct_broker_helpers:stop_node(Config, A),
- %% Restart one slave
+ %% Restart one mirror
forget_cluster_node(Config, B, C),
forget_cluster_node(Config, B, A),
@@ -382,9 +452,11 @@ assert_slaves(RPCNode, QName, Exp) ->
assert_slaves(RPCNode, QName, Exp, PermittedIntermediate) ->
assert_slaves0(RPCNode, QName, Exp,
[{get(previous_exp_m_node), get(previous_exp_s_nodes)} |
- PermittedIntermediate]).
+ PermittedIntermediate], 1000).
-assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate) ->
+assert_slaves0(_, _, _, _, 0) ->
+ error(give_up_waiting_for_slaves);
+assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate, Attempts) ->
Q = find_queue(QName, RPCNode),
Pid = proplists:get_value(pid, Q),
SPids = proplists:get_value(slave_pids, Q),
@@ -410,7 +482,8 @@ assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate) ->
[State, {ExpMNode, ExpSNodes}]),
timer:sleep(100),
assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes},
- PermittedIntermediate)
+ PermittedIntermediate,
+ Attempts - 1)
end;
true ->
put(previous_exp_m_node, ExpMNode),
@@ -430,10 +503,14 @@ equal_list([H|T], Act) -> case lists:member(H, Act) of
end.
find_queue(QName, RPCNode) ->
+ find_queue(QName, RPCNode, 1000).
+
+find_queue(QName, RPCNode, 0) -> error({did_not_find_queue, QName, RPCNode});
+find_queue(QName, RPCNode, Attempts) ->
Qs = rpc:call(RPCNode, rabbit_amqqueue, info_all, [?VHOST], infinity),
case find_queue0(QName, Qs) of
did_not_find_queue -> timer:sleep(100),
- find_queue(QName, RPCNode);
+ find_queue(QName, RPCNode, Attempts - 1);
Q -> Q
end.