diff options
| author | Michael Klishin <michael@novemberain.com> | 2016-05-30 00:27:45 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@novemberain.com> | 2016-05-30 00:27:45 +0300 |
| commit | 455644498e97f91c7005bed85d2519db5f506030 (patch) | |
| tree | ebf93b11a5ff4f83b37f17a0d9a6c8de4b91307f | |
| parent | e0210397f3d8328f598d5829d3aa5ba7374f72c9 (diff) | |
| parent | f14ae2212e649d3d1017e0aa3a3ae93b1778cfa8 (diff) | |
| download | rabbitmq-server-git-455644498e97f91c7005bed85d2519db5f506030.tar.gz | |
Merge pull request #814 from rabbitmq/rabbitmq-server-803
Handle concurrent application of HA policy
| -rw-r--r-- | Makefile | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 80 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_policy.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 19 | ||||
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 115 |
6 files changed, 204 insertions, 33 deletions
@@ -82,7 +82,7 @@ TEST_DEPS += rabbitmq_amqp1_0 \ rabbitmq_web_stomp # FIXME: Remove rabbitmq_test as TEST_DEPS from here for now. -TEST_DEPS := amqp_client meck $(filter-out rabbitmq_test,$(TEST_DEPS)) +TEST_DEPS := amqp_client meck proper $(filter-out rabbitmq_test,$(TEST_DEPS)) include erlang.mk diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e6eab47bbc..2db86391a5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -54,6 +54,7 @@ max_length, max_bytes, args_policy_version, + mirroring_policy_version = 0, status }). @@ -702,7 +703,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers, exclusive_consumer = Holder1}, notify_decorators(State2), case should_auto_delete(State2) of - true -> + true -> log_auto_delete( io_lib:format( "because all of its consumers (~p) were on a channel that was closed", @@ -1071,11 +1072,11 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, notify_decorators(State1), case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); - true -> + true -> log_auto_delete( io_lib:format( "because its last consumer with tag '~s' was cancelled", - [ConsumerTag]), + [ConsumerTag]), State), stop(ok, State1) end @@ -1207,22 +1208,15 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); -handle_cast(start_mirroring, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - %% lookup again to get policy for init_with_existing_bq - {ok, Q} = rabbit_amqqueue:lookup(qname(State)), - true = BQ =/= rabbit_mirror_queue_master, %% assertion - BQ1 = rabbit_mirror_queue_master, - BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), - noreply(State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); - -handle_cast(stop_mirroring, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - BQ = rabbit_mirror_queue_master, %% assertion - {BQ1, BQS1} = BQ:stop_mirroring(BQS), - noreply(State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); +handle_cast(update_mirroring, State = #q{q = Q, + mirroring_policy_version = Version}) -> + case needs_update_mirroring(Q, Version) of + false -> + noreply(State); + {Policy, NewVersion} -> + State1 = State#q{mirroring_policy_version = NewVersion}, + noreply(update_mirroring(Policy, State1)) + end; handle_cast({credit, ChPid, CTag, Credit, Drain}, State = #q{consumers = Consumers, @@ -1381,9 +1375,57 @@ log_auto_delete(Reason, #q{ q = #amqqueue{ name = Resource } }) -> Reason, [QName, VHost]). +needs_update_mirroring(Q, Version) -> + {ok, UpQ} = rabbit_amqqueue:lookup(Q#amqqueue.name), + DBVersion = UpQ#amqqueue.policy_version, + case DBVersion > Version of + true -> {rabbit_policy:get(<<"ha-mode">>, UpQ), DBVersion}; + false -> false + end. +update_mirroring(Policy, State = #q{backing_queue = BQ}) -> + case update_to(Policy, BQ) of + start_mirroring -> + start_mirroring(State); + stop_mirroring -> + stop_mirroring(State); + ignore -> + State; + update_ha_mode -> + update_ha_mode(State) + end. +update_to(undefined, rabbit_mirror_queue_master) -> + stop_mirroring; +update_to(_, rabbit_mirror_queue_master) -> + update_ha_mode; +update_to(undefined, BQ) when BQ =/= rabbit_mirror_queue_master -> + ignore; +update_to(_, BQ) when BQ =/= rabbit_mirror_queue_master -> + start_mirroring. + +start_mirroring(State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + %% lookup again to get policy for init_with_existing_bq + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + true = BQ =/= rabbit_mirror_queue_master, %% assertion + BQ1 = rabbit_mirror_queue_master, + BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), + State#q{backing_queue = BQ1, + backing_queue_state = BQS1}. + +stop_mirroring(State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQ = rabbit_mirror_queue_master, %% assertion + {BQ1, BQS1} = BQ:stop_mirroring(BQS), + State#q{backing_queue = BQ1, + backing_queue_state = BQS1}. + +update_ha_mode(State) -> + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + ok = rabbit_mirror_queue_misc:update_mirrors(Q), + State. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 849efa3611..b188298a9b 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -20,7 +20,7 @@ -export([remove_from_queue/3, on_node_up/0, add_mirrors/3, report_deaths/4, store_updated_slaves/1, initial_queue_node/2, suggested_queue_nodes/1, - is_mirrored/1, update_mirrors/2, validate_policy/1, + is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1, maybe_auto_sync/1, maybe_drop_master_after_sync/1, sync_batch_size/1, log_info/3, log_warning/3]). @@ -64,6 +64,8 @@ -spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()). -spec(update_mirrors/2 :: (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). +-spec(update_mirrors/1 :: + (rabbit_types:amqqueue()) -> 'ok'). -spec(maybe_drop_master_after_sync/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(maybe_auto_sync/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(log_info/3 :: (rabbit_amqqueue:name(), string(), [any()]) -> 'ok'). @@ -384,15 +386,12 @@ update_mirrors(OldQ = #amqqueue{pid = QPid}, NewQ = #amqqueue{pid = QPid}) -> case {is_mirrored(OldQ), is_mirrored(NewQ)} of {false, false} -> ok; - {true, false} -> rabbit_amqqueue:stop_mirroring(QPid); - {false, true} -> rabbit_amqqueue:start_mirroring(QPid); - {true, true} -> update_mirrors0(OldQ, NewQ) + _ -> rabbit_amqqueue:update_mirroring(QPid) end. -update_mirrors0(OldQ = #amqqueue{name = QName}, - NewQ = #amqqueue{name = QName}) -> - {OldMNode, OldSNodes, _} = actual_queue_nodes(OldQ), - {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ), +update_mirrors(Q = #amqqueue{name = QName}) -> + {OldMNode, OldSNodes, _} = actual_queue_nodes(Q), + {NewMNode, NewSNodes} = suggested_queue_nodes(Q), OldNodes = [OldMNode | OldSNodes], NewNodes = [NewMNode | NewSNodes], %% When a mirror dies, remove_from_queue/2 might have to add new @@ -406,7 +405,7 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, drop_mirrors(QName, OldNodes -- NewNodes), %% This is for the case where no extra nodes were added but we changed to %% a policy requiring auto-sync. - maybe_auto_sync(NewQ), + maybe_auto_sync(Q), ok. %% The arrival of a newly synced slave may cause the master to die if diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index eb8cf63327..a9caadf972 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -276,7 +276,9 @@ update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) -> NewPolicy -> case rabbit_amqqueue:update( QName, fun(Q1) -> rabbit_queue_decorator:set( - Q1#amqqueue{policy = NewPolicy}) + Q1#amqqueue{policy = NewPolicy, + policy_version = + Q1#amqqueue.policy_version + 1 }) end) of #amqqueue{} = Q1 -> {Q, Q1}; not_found -> {Q, Q } diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index b99a1d12ee..0f55b9e4a9 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -52,6 +52,7 @@ -rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}). -rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}). -rabbit_upgrade({recoverable_slaves, mnesia, [queue_state]}). +-rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}). -rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}). %% ------------------------------------------------------------------- @@ -447,6 +448,24 @@ recoverable_slaves(Table) -> sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state]). +policy_version() -> + ok = policy_version(rabbit_queue), + ok = policy_version(rabbit_durable_queue). + +policy_version(Table) -> + transform( + Table, + fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators, + State}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators, + State, 0} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state, + policy_version]). + %% Prior to 3.6.0, passwords were hashed using MD5, this populates %% existing records with said default. Users created with 3.6.0+ will %% have internal_user.hashing_algorithm populated by the internal diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index 5872d97d4c..c54e4c2994 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -31,6 +31,7 @@ %% The first two are change_policy, the last two are change_cluster -include_lib("common_test/include/ct.hrl"). +-include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). @@ -60,7 +61,8 @@ groups() -> ]}, {cluster_size_3, [], [ change_policy, - rapid_change + rapid_change, + random_policy ]} ]} ]. @@ -137,7 +139,7 @@ change_policy(Config) -> assert_slaves(A, ?QNAME, {A, [C]}, [{A, [B, C]}]), %% Clear the policy, and we go back to non-mirrored - rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY), + ok = rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY), assert_slaves(A, ?QNAME, {A, ''}), %% Test switching "away" from an unmirrored node @@ -206,7 +208,7 @@ rapid_loop(Config, Node, MRef) -> after 0 -> rabbit_ct_broker_helpers:set_ha_policy(Config, Node, ?POLICY, <<"all">>), - rabbit_ct_broker_helpers:clear_policy(Config, Node, ?POLICY), + ok = rabbit_ct_broker_helpers:clear_policy(Config, Node, ?POLICY), rapid_loop(Config, Node, MRef) end. @@ -253,6 +255,9 @@ promote_on_shutdown(Config) -> durable = true}), ok. +random_policy(Config) -> + run_proper(fun prop_random_policy/1, [Config]). + %%---------------------------------------------------------------------------- assert_slaves(RPCNode, QName, Exp) -> @@ -327,3 +332,107 @@ get_stacktrace() -> _:e -> erlang:get_stacktrace() end. + +%%---------------------------------------------------------------------------- +run_proper(Fun, Args) -> + case proper:counterexample(erlang:apply(Fun, Args), + [{numtests, 25}, + {on_output, fun(F, A) -> + io:format(user, F, A) + end}]) of + true -> + true; + Value -> + exit(Value) + end. + +prop_random_policy(Config) -> + [NodeA, _, _] = Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + ?FORALL( + Policies, non_empty(list(policy_gen(Nodes))), + begin + Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA), + amqp_channel:call(Ch, #'queue.declare'{queue = ?QNAME}), + %% Add some load so mirrors can be busy synchronising + rabbit_ct_client_helpers:publish(Ch, ?QNAME, 100000), + %% Apply policies in parallel on all nodes + apply_in_parallel(Config, Nodes, Policies), + %% The last policy is the final state + Last = lists:last(Policies), + %% Give it some time to generate all internal notifications + timer:sleep(2000), + %% Ensure the owner/master is able to process a call request, + %% which means that all pending casts have been processed. + %% Use the information returned by owner/master to verify the + %% test result + Info = find_queue(?QNAME, NodeA), + %% Gets owner/master + Pid = proplists:get_value(pid, Info), + FinalInfo = rpc:call(node(Pid), gen_server, call, [Pid, info], 5000), + %% Check the result + Result = verify_policy(Last, FinalInfo), + %% Cleanup + amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}), + _ = rabbit_ct_broker_helpers:clear_policy(Config, NodeA, ?POLICY), + Result + end). + +apply_in_parallel(Config, Nodes, Policies) -> + Self = self(), + [spawn_link(fun() -> + [begin + apply_policy(Config, N, Policy) + end || Policy <- Policies], + Self ! parallel_task_done + end) || N <- Nodes], + [receive + parallel_task_done -> + ok + end || _ <- Nodes]. + +%% Proper generators +policy_gen(Nodes) -> + %% Stop mirroring needs to be called often to trigger rabbitmq-server#803 + frequency([{3, undefined}, + {1, all}, + {1, {nodes, nodes_gen(Nodes)}}, + {1, {exactly, choose(1, 3)}} + ]). + +nodes_gen(Nodes) -> + ?LET(List, non_empty(list(oneof(Nodes))), + sets:to_list(sets:from_list(List))). + +%% Checks +verify_policy(undefined, Info) -> + %% If the queue is not mirrored, it returns '' + '' == proplists:get_value(slave_pids, Info); +verify_policy(all, Info) -> + 2 == length(proplists:get_value(slave_pids, Info)); +verify_policy({exactly, 1}, Info) -> + %% If the queue is mirrored, it returns a list + [] == proplists:get_value(slave_pids, Info); +verify_policy({exactly, N}, Info) -> + (N - 1) == length(proplists:get_value(slave_pids, Info)); +verify_policy({nodes, Nodes}, Info) -> + Master = node(proplists:get_value(pid, Info)), + Slaves = [node(P) || P <- proplists:get_value(slave_pids, Info)], + lists:sort(Nodes) == lists:sort([Master | Slaves]). + +%% Policies +apply_policy(Config, N, undefined) -> + _ = rabbit_ct_broker_helpers:clear_policy(Config, N, ?POLICY); +apply_policy(Config, N, all) -> + rabbit_ct_broker_helpers:set_ha_policy( + Config, N, ?POLICY, <<"all">>, + [{<<"ha-sync-mode">>, <<"automatic">>}]); +apply_policy(Config, N, {nodes, Nodes}) -> + NNodes = [rabbit_misc:atom_to_binary(Node) || Node <- Nodes], + rabbit_ct_broker_helpers:set_ha_policy( + Config, N, ?POLICY, {<<"nodes">>, NNodes}, + [{<<"ha-sync-mode">>, <<"automatic">>}]); +apply_policy(Config, N, {exactly, Exactly}) -> + rabbit_ct_broker_helpers:set_ha_policy( + Config, N, ?POLICY, {<<"exactly">>, Exactly}, + [{<<"ha-sync-mode">>, <<"automatic">>}]). |
