diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2017-08-03 16:16:09 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2017-08-03 16:16:09 +0300 |
| commit | 454a26b359b050d62bb745d11e7bc3677aab29bf (patch) | |
| tree | d4fa6f9a14794da4d78725897c1a8e577e740386 | |
| parent | ba218a735477744f7740fb4325868276dd99ea6e (diff) | |
| parent | ac687666a1cc4fb0619792d44cad5f10dd869629 (diff) | |
| download | rabbitmq-server-git-454a26b359b050d62bb745d11e7bc3677aab29bf.tar.gz | |
Merge branch 'master' into rabbitmq-management-460
| -rw-r--r-- | src/rabbit.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 50 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_sup.erl | 39 | ||||
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 87 |
6 files changed, 152 insertions, 43 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index b166e079f4..0a0eb6b71a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -168,12 +168,6 @@ {requires, recovery}, {enables, routing_ready}]}). --rabbit_boot_step({mirrored_queues, - [{description, "adding mirrors to queues"}, - {mfa, {rabbit_mirror_queue_misc, on_node_up, []}}, - {requires, recovery}, - {enables, routing_ready}]}). - -rabbit_boot_step({routing_ready, [{description, "message delivery logic ready"}, {requires, core_initialized}]}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5537634144..a5480f707e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -339,11 +339,17 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args, {ok, Node0} -> Node0; {error, _} -> Node end, - Node1 = rabbit_mirror_queue_misc:initial_queue_node(Q, Node1), - gen_server2:call( - rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare), - {init, new}, infinity). + case rabbit_vhost_sup_sup:get_vhost_sup(VHost, Node1) of + {ok, _} -> + gen_server2:call( + rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare), + {init, new}, infinity); + {error, Error} -> + rabbit_misc:protocol_error(internal_error, + "Cannot declare a queue '~s' on node '~s': ~255p", + [rabbit_misc:rs(QueueName), Node1, Error]) + end. internal_declare(Q, true) -> rabbit_misc:execute_mnesia_tx_with_tail( @@ -456,16 +462,28 @@ with(Name, F, E) -> with(Name, F, E, RetriesLeft) -> case lookup(Name) of - {ok, Q = #amqqueue{}} when RetriesLeft =:= 0 -> + {ok, Q = #amqqueue{state = live}} when RetriesLeft =:= 0 -> %% Something bad happened to that queue, we are bailing out %% on processing current request. E({absent, Q, timeout}); + {ok, Q = #amqqueue{state = stopped}} when RetriesLeft =:= 0 -> + %% The queue was stopped and not migrated + E({absent, Q, stopped}); + %% The queue process has crashed with unknown error {ok, Q = #amqqueue{state = crashed}} -> E({absent, Q, crashed}); + %% The queue process has been stopped by a supervisor. + %% In that case a synchronised slave can take over + %% so we should retry. {ok, Q = #amqqueue{state = stopped}} -> %% The queue process was stopped by the supervisor - E({absent, Q, stopped}); - {ok, Q = #amqqueue{pid = QPid}} -> + rabbit_misc:with_exit_handler( + fun () -> retry_wait(Q, F, E, RetriesLeft) end, + fun () -> F(Q) end); + %% The queue is supposed to be active. + %% The master node can go away or queue can be killed + %% so we retry, waiting for a slave to take over. + {ok, Q = #amqqueue{state = live}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do %% with the QPid. F() should be written s.t. that this @@ -473,14 +491,24 @@ with(Name, F, E, RetriesLeft) -> %% indicates a code bug and we don't want to get stuck in %% the retry loop. rabbit_misc:with_exit_handler( - fun () -> false = rabbit_mnesia:is_process_alive(QPid), - timer:sleep(30), - with(Name, F, E, RetriesLeft - 1) - end, fun () -> F(Q) end); + fun () -> retry_wait(Q, F, E, RetriesLeft) end, + fun () -> F(Q) end); {error, not_found} -> E(not_found_or_absent_dirty(Name)) end. +retry_wait(Q = #amqqueue{pid = QPid, name = Name, state = QState}, F, E, RetriesLeft) -> + case {QState, is_mirrored(Q)} of + %% We don't want to repeat an operation if + %% there are no slaves to migrate to + {stopped, false} -> + E({absent, Q, stopped}); + _ -> + false = rabbit_mnesia:is_process_alive(QPid), + timer:sleep(30), + with(Name, F, E, RetriesLeft - 1) + end. + with(Name, F) -> with(Name, F, fun (E) -> {error, E} end). with_or_die(Name, F) -> diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index dc23095fb1..725cd7d089 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -17,7 +17,7 @@ -module(rabbit_mirror_queue_misc). -behaviour(rabbit_policy_validator). --export([remove_from_queue/3, on_node_up/0, add_mirrors/3, +-export([remove_from_queue/3, on_vhost_up/1, add_mirrors/3, report_deaths/4, store_updated_slaves/1, initial_queue_node/2, suggested_queue_nodes/1, is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1, @@ -53,7 +53,6 @@ -spec remove_from_queue (rabbit_amqqueue:name(), pid(), [pid()]) -> {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}. --spec on_node_up() -> 'ok'. -spec add_mirrors(rabbit_amqqueue:name(), [node()], 'sync' | 'async') -> 'ok'. -spec store_updated_slaves(rabbit_types:amqqueue()) -> @@ -167,12 +166,16 @@ slaves_to_start_on_failure(Q, DeadGMPids) -> {_, NewNodes} = suggested_queue_nodes(Q, ClusterNodes), NewNodes -- OldNodes. -on_node_up() -> +on_vhost_up(VHost) -> QNames = rabbit_misc:execute_mnesia_transaction( fun () -> mnesia:foldl( - fun (Q = #amqqueue{name = QName, + fun + (#amqqueue{name = #resource{virtual_host = OtherVhost}}, + QNames0) when OtherVhost =/= VHost -> + QNames0; + (Q = #amqqueue{name = QName, pid = Pid, slave_pids = SPids}, QNames0) -> %% We don't want to pass in the whole diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 3a214e2634..73c05389be 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -71,6 +71,8 @@ recover(VHost) -> ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), [QName || #amqqueue{name = QName} <- Qs]), ok = rabbit_amqqueue:start(Qs), + %% Start queue mirrors. + ok = rabbit_mirror_queue_misc:on_vhost_up(VHost), ok. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl index 93c26d4e0f..19d7cf61b7 100644 --- a/src/rabbit_vhost_sup_sup.erl +++ b/src/rabbit_vhost_sup_sup.erl @@ -23,15 +23,16 @@ -export([init/1]). -export([start_link/0, start/0]). --export([init_vhost/1, get_vhost_sup/1, get_vhost_sup/2, save_vhost_sup/3]). --export([delete_on_all_nodes/1]). --export([start_on_all_nodes/1]). - --export([save_vhost_process/2]). +-export([init_vhost/1, + start_vhost/1, start_vhost/2, + get_vhost_sup/1, get_vhost_sup/2, + save_vhost_sup/3, + save_vhost_process/2]). +-export([delete_on_all_nodes/1, start_on_all_nodes/1]). -export([is_vhost_alive/1]). %% Internal --export([stop_and_delete_vhost/1, start_vhost/1]). +-export([stop_and_delete_vhost/1]). -record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid, vhost_process_pid}). @@ -61,7 +62,12 @@ init([]) -> start_on_all_nodes(VHost) -> NodesStart = [ {Node, start_vhost(VHost, Node)} || Node <- rabbit_nodes:all_running() ], - Failures = lists:filter(fun({_, {ok, _}}) -> false; (_) -> true end, NodesStart), + Failures = lists:filter(fun + ({_, {ok, _}}) -> false; + ({_, {error, {already_started, _}}}) -> false; + (_) -> true + end, + NodesStart), case Failures of [] -> ok; Errors -> {error, {failed_to_start_vhost_on_nodes, Errors}} @@ -112,6 +118,11 @@ stop_and_delete_vhost(VHost, Node) -> init_vhost(VHost) -> case start_vhost(VHost) of {ok, _} -> ok; + {error, {already_started, _}} -> + rabbit_log:warning( + "Attempting to start an already started vhost '~s'.", + [VHost]), + ok; {error, {no_such_vhost, VHost}} -> {error, {no_such_vhost, VHost}}; {error, Reason} -> @@ -163,22 +174,16 @@ get_vhost_sup(VHost) -> -spec start_vhost(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, term()}. start_vhost(VHost, Node) -> case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of - {ok, Pid} when is_pid(Pid) -> - {ok, Pid}; - {badrpc, RpcErr} -> - {error, RpcErr} + {ok, Pid} -> {ok, Pid}; + {error, Err} -> {error, Err}; + {badrpc, RpcErr} -> {error, RpcErr} end. -spec start_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}. start_vhost(VHost) -> case rabbit_vhost:exists(VHost) of false -> {error, {no_such_vhost, VHost}}; - true -> - case supervisor2:start_child(?MODULE, [VHost]) of - {ok, Pid} -> {ok, Pid}; - {error, {already_started, Pid}} -> {ok, Pid}; - {error, Err} -> {error, Err} - end + true -> supervisor2:start_child(?MODULE, [VHost]) end. -spec is_vhost_alive(rabbit_types:vhost()) -> boolean(). diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index 55979e243a..c70f23c066 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -58,6 +58,10 @@ groups() -> {cluster_size_2, [], [ vhost_deletion, promote_on_shutdown, + slave_recovers_after_vhost_failure, + slave_recovers_after_vhost_down_an_up, + master_migrates_on_vhost_down, + slave_recovers_after_vhost_down_and_master_migrated, queue_survive_adding_dead_vhost_mirror ]}, {cluster_size_3, [], [ @@ -318,6 +322,72 @@ nodes_policy_should_pick_master_from_its_params(Config) -> amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}), _ = rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY). +slave_recovers_after_vhost_failure(Config) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + rabbit_ct_broker_helpers:set_ha_policy_all(Config), + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + QName = <<"slave_recovers_after_vhost_failure-q">>, + amqp_channel:call(ACh, #'queue.declare'{queue = QName}), + timer:sleep(300), + assert_slaves(A, QName, {A, [B]}, [{A, []}]), + + %% Crash vhost on a node hosting a mirror + {ok, Sup} = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, get_vhost_sup, [<<"/">>]), + exit(Sup, foo), + + assert_slaves(A, QName, {A, [B]}, [{A, []}]). + +slave_recovers_after_vhost_down_an_up(Config) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + rabbit_ct_broker_helpers:set_ha_policy_all(Config), + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + QName = <<"slave_recovers_after_vhost_down_an_up-q">>, + amqp_channel:call(ACh, #'queue.declare'{queue = QName}), + timer:sleep(100), + assert_slaves(A, QName, {A, [B]}, [{A, []}]), + + %% Crash vhost on a node hosting a mirror + rabbit_ct_broker_helpers:force_vhost_failure(Config, B, <<"/">>), + %% Vhost is down now + false = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, is_vhost_alive, [<<"/">>]), + timer:sleep(300), + %% Vhost is back up + {ok, _Sup} = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, start_vhost, [<<"/">>]), + + assert_slaves(A, QName, {A, [B]}, [{A, []}]). + +master_migrates_on_vhost_down(Config) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + rabbit_ct_broker_helpers:set_ha_policy_all(Config), + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + QName = <<"master_migrates_on_vhost_down-q">>, + amqp_channel:call(ACh, #'queue.declare'{queue = QName}), + timer:sleep(100), + assert_slaves(A, QName, {A, [B]}, [{A, []}]), + + %% Crash vhost on the node hosting queue master + rabbit_ct_broker_helpers:force_vhost_failure(Config, A, <<"/">>), + timer:sleep(300), + assert_slaves(A, QName, {B, []}). + +slave_recovers_after_vhost_down_and_master_migrated(Config) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + rabbit_ct_broker_helpers:set_ha_policy_all(Config), + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + QName = <<"slave_recovers_after_vhost_down_and_master_migrated-q">>, + amqp_channel:call(ACh, #'queue.declare'{queue = QName}), + timer:sleep(100), + assert_slaves(A, QName, {A, [B]}, [{A, []}]), + %% Crash vhost on the node hosting queue master + rabbit_ct_broker_helpers:force_vhost_failure(Config, A, <<"/">>), + timer:sleep(300), + assert_slaves(B, QName, {B, []}), + + %% Restart the vhost on the node (previously) hosting queue master + {ok, _Sup} = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_vhost_sup_sup, start_vhost, [<<"/">>]), + timer:sleep(300), + assert_slaves(B, QName, {B, [A]}, [{B, []}]). + random_policy(Config) -> run_proper(fun prop_random_policy/1, [Config]). @@ -356,7 +426,7 @@ promote_slave_after_standalone_restart(Config) -> rabbit_ct_broker_helpers:stop_node(Config, B), rabbit_ct_broker_helpers:stop_node(Config, A), - %% Restart one slave + %% Restart one mirror forget_cluster_node(Config, B, C), forget_cluster_node(Config, B, A), @@ -382,9 +452,11 @@ assert_slaves(RPCNode, QName, Exp) -> assert_slaves(RPCNode, QName, Exp, PermittedIntermediate) -> assert_slaves0(RPCNode, QName, Exp, [{get(previous_exp_m_node), get(previous_exp_s_nodes)} | - PermittedIntermediate]). + PermittedIntermediate], 1000). -assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate) -> +assert_slaves0(_, _, _, _, 0) -> + error(give_up_waiting_for_slaves); +assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate, Attempts) -> Q = find_queue(QName, RPCNode), Pid = proplists:get_value(pid, Q), SPids = proplists:get_value(slave_pids, Q), @@ -410,7 +482,8 @@ assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate) -> [State, {ExpMNode, ExpSNodes}]), timer:sleep(100), assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, - PermittedIntermediate) + PermittedIntermediate, + Attempts - 1) end; true -> put(previous_exp_m_node, ExpMNode), @@ -430,10 +503,14 @@ equal_list([H|T], Act) -> case lists:member(H, Act) of end. find_queue(QName, RPCNode) -> + find_queue(QName, RPCNode, 1000). + +find_queue(QName, RPCNode, 0) -> error({did_not_find_queue, QName, RPCNode}); +find_queue(QName, RPCNode, Attempts) -> Qs = rpc:call(RPCNode, rabbit_amqqueue, info_all, [?VHOST], infinity), case find_queue0(QName, Qs) of did_not_find_queue -> timer:sleep(100), - find_queue(QName, RPCNode); + find_queue(QName, RPCNode, Attempts - 1); Q -> Q end. |
