diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2017-07-31 11:23:22 +0100 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2017-07-31 16:48:21 +0100 |
| commit | d39d7e09b2deca27a774da18540f07afb79ded01 (patch) | |
| tree | 2eaf0b68dfa5928e9a21ca83cee8bd861f566805 | |
| parent | 98566cdd9c22f556c732b8c91059ca44f5a5ed34 (diff) | |
| download | rabbitmq-server-git-d39d7e09b2deca27a774da18540f07afb79ded01.tar.gz | |
Start slave queues after vhost recover, instead of node start.
Vhost supervisors can crash and restart without crashing the
node, so the slave queues on this vhosts should be started
after the vhsot recovery instead of node boot process.
Fixes #1314
[#149484151]
| -rw-r--r-- | src/rabbit.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 2 | ||||
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 86 |
4 files changed, 90 insertions, 15 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index b166e079f4..0a0eb6b71a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -168,12 +168,6 @@ {requires, recovery}, {enables, routing_ready}]}). --rabbit_boot_step({mirrored_queues, - [{description, "adding mirrors to queues"}, - {mfa, {rabbit_mirror_queue_misc, on_node_up, []}}, - {requires, recovery}, - {enables, routing_ready}]}). - -rabbit_boot_step({routing_ready, [{description, "message delivery logic ready"}, {requires, core_initialized}]}). diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 59522da4a9..dab98c740e 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -17,7 +17,7 @@ -module(rabbit_mirror_queue_misc). -behaviour(rabbit_policy_validator). --export([remove_from_queue/3, on_node_up/0, add_mirrors/3, +-export([remove_from_queue/3, on_vhost_up/1, add_mirrors/3, report_deaths/4, store_updated_slaves/1, initial_queue_node/2, suggested_queue_nodes/1, is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1, @@ -53,7 +53,6 @@ -spec remove_from_queue (rabbit_amqqueue:name(), pid(), [pid()]) -> {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}. --spec on_node_up() -> 'ok'. -spec add_mirrors(rabbit_amqqueue:name(), [node()], 'sync' | 'async') -> 'ok'. -spec store_updated_slaves(rabbit_types:amqqueue()) -> @@ -167,12 +166,16 @@ slaves_to_start_on_failure(Q, DeadGMPids) -> {_, NewNodes} = suggested_queue_nodes(Q, ClusterNodes), NewNodes -- OldNodes. -on_node_up() -> +on_vhost_up(VHost) -> QNames = rabbit_misc:execute_mnesia_transaction( fun () -> mnesia:foldl( - fun (Q = #amqqueue{name = QName, + fun + (#amqqueue{name = #resource{virtual_host = OtherVhost}}, + QNames0) when OtherVhost =/= VHost -> + QNames0; + (Q = #amqqueue{name = QName, pid = Pid, slave_pids = SPids}, QNames0) -> %% We don't want to pass in the whole diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 30557fc7be..c6ee2bf08f 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -71,6 +71,8 @@ recover(VHost) -> ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), [QName || #amqqueue{name = QName} <- Qs]), ok = rabbit_amqqueue:start(Qs), + %% Start slaves. + ok = rabbit_mirror_queue_misc:on_vhost_up(VHost), ok. %%---------------------------------------------------------------------------- diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index b2f212fe75..f1ff2f6685 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -57,7 +57,11 @@ groups() -> {clustered, [], [ {cluster_size_2, [], [ vhost_deletion, - promote_on_shutdown + promote_on_shutdown, + slave_recovers_after_vhost_failure, + slave_recovers_after_vhost_down_an_up, + master_migrates_on_vhost_down, + slave_recovers_after_vhost_down_and_master_migrated ]}, {cluster_size_3, [], [ change_policy, @@ -303,6 +307,71 @@ nodes_policy_should_pick_master_from_its_params(Config) -> amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}), _ = rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY). +slave_recovers_after_vhost_failure(Config) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + rabbit_ct_broker_helpers:set_ha_policy_all(Config), + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + QName = <<"slave_recovers_after_vhost_failure-q">>, + amqp_channel:call(ACh, #'queue.declare'{queue = QName}), + timer:sleep(300), + assert_slaves(A, QName, {A, [B]}, [{A, []}]), + + %% Crash vhost on slave node + {ok, Sup} = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, vhost_sup, [<<"/">>]), + exit(Sup, foo), + + assert_slaves(A, QName, {A, [B]}, [{A, []}]). + +slave_recovers_after_vhost_down_an_up(Config) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + rabbit_ct_broker_helpers:set_ha_policy_all(Config), + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + QName = <<"slave_recovers_after_vhost_down_an_up-q">>, + amqp_channel:call(ACh, #'queue.declare'{queue = QName}), + timer:sleep(100), + assert_slaves(A, QName, {A, [B]}, [{A, []}]), + + %% Crash vhost on slave node + rabbit_ct_broker_helpers:force_vhost_failure(Config, B, <<"/">>), + %% Vhost is down now + false = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, is_vhost_alive, [<<"/">>]), + %% Vhost is back up + {ok, _Sup} = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, vhost_sup, [<<"/">>]), + + assert_slaves(A, QName, {A, [B]}, [{A, []}]). + +master_migrates_on_vhost_down(Config) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + rabbit_ct_broker_helpers:set_ha_policy_all(Config), + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + QName = <<"master_migrates_on_vhost_down-q">>, + amqp_channel:call(ACh, #'queue.declare'{queue = QName}), + timer:sleep(100), + assert_slaves(A, QName, {A, [B]}, [{A, []}]), + + %% Crash vhost on master node + rabbit_ct_broker_helpers:force_vhost_failure(Config, A, <<"/">>), + timer:sleep(300), + assert_slaves(A, QName, {B, []}). + +slave_recovers_after_vhost_down_and_master_migrated(Config) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + rabbit_ct_broker_helpers:set_ha_policy_all(Config), + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + QName = <<"slave_recovers_after_vhost_down_and_master_migrated-q">>, + amqp_channel:call(ACh, #'queue.declare'{queue = QName}), + timer:sleep(100), + assert_slaves(A, QName, {A, [B]}, [{A, []}]), + %% Crash vhost on master node + rabbit_ct_broker_helpers:force_vhost_failure(Config, A, <<"/">>), + timer:sleep(300), + assert_slaves(B, QName, {B, []}), + + %% Restart the vhost on (former) master node + {ok, _Sup} = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_vhost_sup_sup, vhost_sup, [<<"/">>]), + timer:sleep(300), + assert_slaves(B, QName, {B, [A]}, [{B, []}]). + random_policy(Config) -> run_proper(fun prop_random_policy/1, [Config]). @@ -367,9 +436,11 @@ assert_slaves(RPCNode, QName, Exp) -> assert_slaves(RPCNode, QName, Exp, PermittedIntermediate) -> assert_slaves0(RPCNode, QName, Exp, [{get(previous_exp_m_node), get(previous_exp_s_nodes)} | - PermittedIntermediate]). + PermittedIntermediate], 1000). -assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate) -> +assert_slaves0(_, _, _, _, 0) -> + error(give_up_waiting_for_slaves); +assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate, Attempts) -> Q = find_queue(QName, RPCNode), Pid = proplists:get_value(pid, Q), SPids = proplists:get_value(slave_pids, Q), @@ -395,7 +466,8 @@ assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate) -> [State, {ExpMNode, ExpSNodes}]), timer:sleep(100), assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, - PermittedIntermediate) + PermittedIntermediate, + Attempts - 1) end; true -> put(previous_exp_m_node, ExpMNode), @@ -415,10 +487,14 @@ equal_list([H|T], Act) -> case lists:member(H, Act) of end. find_queue(QName, RPCNode) -> + find_queue(QName, RPCNode, 1000). + +find_queue(QName, RPCNode, 0) -> error({did_not_find_queue, QName, RPCNode}); +find_queue(QName, RPCNode, Attempts) -> Qs = rpc:call(RPCNode, rabbit_amqqueue, info_all, [?VHOST], infinity), case find_queue0(QName, Qs) of did_not_find_queue -> timer:sleep(100), - find_queue(QName, RPCNode); + find_queue(QName, RPCNode, Attempts - 1); Q -> Q end. |
