summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2016-09-03 01:24:06 +0300
committerMichael Klishin <michael@clojurewerkz.org>2016-09-03 01:24:06 +0300
commitf8371f775321ec5a0e15776bfa1d0072c9732c84 (patch)
tree5696defc11a8c7268f787e374249796699909332
parent1e3c0b48edee2fdd4ed713faabf4ee5521ea7e59 (diff)
parenteabdd68997169ce7a5fc25adce43e05028cb1b49 (diff)
downloadrabbitmq-server-git-f8371f775321ec5a0e15776bfa1d0072c9732c84.tar.gz
Merge branch 'rabbitmq-server-803-stable' into stablerabbitmq_v3_6_6_milestone4
-rw-r--r--Makefile2
-rw-r--r--src/rabbit_amqqueue_process.erl77
-rw-r--r--src/rabbit_mirror_queue_misc.erl15
-rw-r--r--src/rabbit_policy.erl4
-rw-r--r--src/rabbit_upgrade_functions.erl19
-rw-r--r--test/dynamic_ha_SUITE.erl154
6 files changed, 242 insertions, 29 deletions
diff --git a/Makefile b/Makefile
index 179af0f93b..0885e5d48d 100644
--- a/Makefile
+++ b/Makefile
@@ -5,7 +5,7 @@ VERSION ?= $(call get_app_version,src/$(PROJECT).app.src)
PACKAGES_DIR ?= $(abspath PACKAGES)
DEPS = ranch $(PLUGINS)
-TEST_DEPS = amqp_client meck
+TEST_DEPS = amqp_client meck proper
define usage_xml_to_erl
$(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, src/rabbit_%_usage.erl, $(subst -,_,$(1))))
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 66df42987c..29f8e53f08 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -85,6 +85,7 @@
%% e.g. message expiration messages from previously set up timers
%% that may or may not be still valid
args_policy_version,
+ mirroring_policy_version = 0,
%% running | flow | idle
status
}).
@@ -1225,22 +1226,15 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast(start_mirroring, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- %% lookup again to get policy for init_with_existing_bq
- {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
- true = BQ =/= rabbit_mirror_queue_master, %% assertion
- BQ1 = rabbit_mirror_queue_master,
- BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
- noreply(State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
-
-handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- BQ = rabbit_mirror_queue_master, %% assertion
- {BQ1, BQS1} = BQ:stop_mirroring(BQS),
- noreply(State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
+handle_cast(update_mirroring, State = #q{q = Q,
+ mirroring_policy_version = Version}) ->
+ case needs_update_mirroring(Q, Version) of
+ false ->
+ noreply(State);
+ {Policy, NewVersion} ->
+ State1 = State#q{mirroring_policy_version = NewVersion},
+ noreply(update_mirroring(Policy, State1))
+ end;
handle_cast({credit, ChPid, CTag, Credit, Drain},
State = #q{consumers = Consumers,
@@ -1383,3 +1377,54 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
{hibernate, stop_rate_timer(State1)}.
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+
+needs_update_mirroring(Q, Version) ->
+ {ok, UpQ} = rabbit_amqqueue:lookup(Q#amqqueue.name),
+ DBVersion = UpQ#amqqueue.policy_version,
+ case DBVersion > Version of
+ true -> {rabbit_policy:get(<<"ha-mode">>, UpQ), DBVersion};
+ false -> false
+ end.
+
+update_mirroring(Policy, State = #q{backing_queue = BQ}) ->
+ case update_to(Policy, BQ) of
+ start_mirroring ->
+ start_mirroring(State);
+ stop_mirroring ->
+ stop_mirroring(State);
+ ignore ->
+ State;
+ update_ha_mode ->
+ update_ha_mode(State)
+ end.
+
+update_to(undefined, rabbit_mirror_queue_master) ->
+ stop_mirroring;
+update_to(_, rabbit_mirror_queue_master) ->
+ update_ha_mode;
+update_to(undefined, BQ) when BQ =/= rabbit_mirror_queue_master ->
+ ignore;
+update_to(_, BQ) when BQ =/= rabbit_mirror_queue_master ->
+ start_mirroring.
+
+start_mirroring(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ %% lookup again to get policy for init_with_existing_bq
+ {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
+ true = BQ =/= rabbit_mirror_queue_master, %% assertion
+ BQ1 = rabbit_mirror_queue_master,
+ BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
+ State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1}.
+
+stop_mirroring(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQ = rabbit_mirror_queue_master, %% assertion
+ {BQ1, BQS1} = BQ:stop_mirroring(BQS),
+ State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1}.
+
+update_ha_mode(State) ->
+ {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
+ ok = rabbit_mirror_queue_misc:update_mirrors(Q),
+ State.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 4205fabb83..375a0366dd 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -20,7 +20,7 @@
-export([remove_from_queue/3, on_node_up/0, add_mirrors/3,
report_deaths/4, store_updated_slaves/1,
initial_queue_node/2, suggested_queue_nodes/1,
- is_mirrored/1, update_mirrors/2, validate_policy/1,
+ is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1,
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
sync_batch_size/1, log_info/3, log_warning/3]).
@@ -402,15 +402,12 @@ update_mirrors(OldQ = #amqqueue{pid = QPid},
NewQ = #amqqueue{pid = QPid}) ->
case {is_mirrored(OldQ), is_mirrored(NewQ)} of
{false, false} -> ok;
- {true, false} -> rabbit_amqqueue:stop_mirroring(QPid);
- {false, true} -> rabbit_amqqueue:start_mirroring(QPid);
- {true, true} -> update_mirrors0(OldQ, NewQ)
+ _ -> rabbit_amqqueue:update_mirroring(QPid)
end.
-update_mirrors0(OldQ = #amqqueue{name = QName},
- NewQ = #amqqueue{name = QName}) ->
- {OldMNode, OldSNodes, _} = actual_queue_nodes(OldQ),
- {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ),
+update_mirrors(Q = #amqqueue{name = QName}) ->
+ {OldMNode, OldSNodes, _} = actual_queue_nodes(Q),
+ {NewMNode, NewSNodes} = suggested_queue_nodes(Q),
OldNodes = [OldMNode | OldSNodes],
NewNodes = [NewMNode | NewSNodes],
%% When a mirror dies, remove_from_queue/2 might have to add new
@@ -424,7 +421,7 @@ update_mirrors0(OldQ = #amqqueue{name = QName},
drop_mirrors(QName, OldNodes -- NewNodes),
%% This is for the case where no extra nodes were added but we changed to
%% a policy requiring auto-sync.
- maybe_auto_sync(NewQ),
+ maybe_auto_sync(Q),
ok.
%% The arrival of a newly synced slave may cause the master to die if
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index eb8cf63327..a9caadf972 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -276,7 +276,9 @@ update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
NewPolicy -> case rabbit_amqqueue:update(
QName, fun(Q1) ->
rabbit_queue_decorator:set(
- Q1#amqqueue{policy = NewPolicy})
+ Q1#amqqueue{policy = NewPolicy,
+ policy_version =
+ Q1#amqqueue.policy_version + 1 })
end) of
#amqqueue{} = Q1 -> {Q, Q1};
not_found -> {Q, Q }
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 67c2a84a0e..8609a0e424 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -51,6 +51,7 @@
-rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}).
-rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}).
-rabbit_upgrade({recoverable_slaves, mnesia, [queue_state]}).
+-rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}).
-rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}).
%% -------------------------------------------------------------------
@@ -433,6 +434,24 @@ recoverable_slaves(Table) ->
sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators,
state]).
+policy_version() ->
+ ok = policy_version(rabbit_queue),
+ ok = policy_version(rabbit_durable_queue).
+
+policy_version(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators,
+ State}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators,
+ State, 0}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state,
+ policy_version]).
+
%% Prior to 3.6.0, passwords were hashed using MD5, this populates
%% existing records with said default. Users created with 3.6.0+ will
%% have internal_user.hashing_algorithm populated by the internal
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index 5872d97d4c..bba7fad707 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -31,6 +31,7 @@
%% The first two are change_policy, the last two are change_cluster
-include_lib("common_test/include/ct.hrl").
+-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
@@ -61,6 +62,10 @@ groups() ->
{cluster_size_3, [], [
change_policy,
rapid_change
+ % FIXME: Re-enable those tests when the know issues are
+ % fixed.
+ %failing_random_policies,
+ %random_policy
]}
]}
].
@@ -137,7 +142,7 @@ change_policy(Config) ->
assert_slaves(A, ?QNAME, {A, [C]}, [{A, [B, C]}]),
%% Clear the policy, and we go back to non-mirrored
- rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY),
assert_slaves(A, ?QNAME, {A, ''}),
%% Test switching "away" from an unmirrored node
@@ -206,7 +211,7 @@ rapid_loop(Config, Node, MRef) ->
after 0 ->
rabbit_ct_broker_helpers:set_ha_policy(Config, Node, ?POLICY,
<<"all">>),
- rabbit_ct_broker_helpers:clear_policy(Config, Node, ?POLICY),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, Node, ?POLICY),
rapid_loop(Config, Node, MRef)
end.
@@ -253,6 +258,23 @@ promote_on_shutdown(Config) ->
durable = true}),
ok.
+random_policy(Config) ->
+ run_proper(fun prop_random_policy/1, [Config]).
+
+failing_random_policies(Config) ->
+ [A, B | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+ %% Those set of policies were found as failing by PropEr in the
+ %% `random_policy` test above. We add them explicitely here to make
+ %% sure they get tested.
+ ?assertEqual(true, test_random_policy(Config, Nodes,
+ [{nodes, [A, B]}, {nodes, [A]}])),
+ ?assertEqual(true, test_random_policy(Config, Nodes,
+ [{exactly, 3}, undefined, all, {nodes, [B]}])),
+ ?assertEqual(true, test_random_policy(Config, Nodes,
+ [all, undefined, {exactly, 2}, all, {exactly, 3}, {exactly, 3},
+ undefined, {exactly, 3}, all])).
+
%%----------------------------------------------------------------------------
assert_slaves(RPCNode, QName, Exp) ->
@@ -327,3 +349,131 @@ get_stacktrace() ->
_:e ->
erlang:get_stacktrace()
end.
+
+%%----------------------------------------------------------------------------
+run_proper(Fun, Args) ->
+ ?assertEqual(true,
+ proper:counterexample(erlang:apply(Fun, Args),
+ [{numtests, 25},
+ {on_output, fun(F, A) -> ct:pal(?LOW_IMPORTANCE, F, A) end}])).
+
+prop_random_policy(Config) ->
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+ ?FORALL(
+ Policies, non_empty(list(policy_gen(Nodes))),
+ test_random_policy(Config, Nodes, Policies)).
+
+test_random_policy(Config, Nodes, Policies) ->
+ [NodeA | _] = Nodes,
+ Ch = rabbit_ct_client_helpers:open_channel(Config, NodeA),
+ amqp_channel:call(Ch, #'queue.declare'{queue = ?QNAME}),
+ %% Add some load so mirrors can be busy synchronising
+ rabbit_ct_client_helpers:publish(Ch, ?QNAME, 100000),
+ %% Apply policies in parallel on all nodes
+ apply_in_parallel(Config, Nodes, Policies),
+ %% Give it some time to generate all internal notifications
+ timer:sleep(2000),
+ %% Check the result
+ Result = wait_for_last_policy(?QNAME, NodeA, Policies, 30),
+ %% Cleanup
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}),
+ _ = rabbit_ct_broker_helpers:clear_policy(Config, NodeA, ?POLICY),
+ Result.
+
+apply_in_parallel(Config, Nodes, Policies) ->
+ Self = self(),
+ [spawn_link(fun() ->
+ [begin
+ apply_policy(Config, N, Policy)
+ end || Policy <- Policies],
+ Self ! parallel_task_done
+ end) || N <- Nodes],
+ [receive
+ parallel_task_done ->
+ ok
+ end || _ <- Nodes].
+
+%% Proper generators
+policy_gen(Nodes) ->
+ %% Stop mirroring needs to be called often to trigger rabbitmq-server#803
+ frequency([{3, undefined},
+ {1, all},
+ {1, {nodes, nodes_gen(Nodes)}},
+ {1, {exactly, choose(1, 3)}}
+ ]).
+
+nodes_gen(Nodes) ->
+ ?LET(List, non_empty(list(oneof(Nodes))),
+ sets:to_list(sets:from_list(List))).
+
+%% Checks
+wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries) ->
+ %% Ensure the owner/master is able to process a call request,
+ %% which means that all pending casts have been processed.
+ %% Use the information returned by owner/master to verify the
+ %% test result
+ Info = find_queue(QueueName, NodeA),
+ Pid = proplists:get_value(pid, Info),
+ Node = node(Pid),
+ %% Gets owner/master
+ case rpc:call(Node, gen_server, call, [Pid, info], 5000) of
+ {badrpc, _} ->
+ %% The queue is probably being migrated to another node.
+ %% Let's wait a bit longer.
+ timer:sleep(1000),
+ wait_for_last_policy(QueueName, NodeA, TestedPolicies, Tries - 1);
+ FinalInfo ->
+ %% The last policy is the final state
+ LastPolicy = lists:last(TestedPolicies),
+ case verify_policy(LastPolicy, FinalInfo) of
+ true ->
+ true;
+ false when Tries =:= 1 ->
+ Policies = rpc:call(Node, rabbit_policy, list, [], 5000),
+ ct:pal(
+ "Last policy not applied:~n"
+ " Queue node: ~s (~p)~n"
+ " Queue info: ~p~n"
+ " Configured policies: ~p~n"
+ " Tested policies: ~p",
+ [Node, Pid, FinalInfo, Policies, TestedPolicies]),
+ false;
+ false ->
+ timer:sleep(1000),
+ wait_for_last_policy(QueueName, NodeA, TestedPolicies,
+ Tries - 1)
+ end
+ end.
+
+verify_policy(undefined, Info) ->
+ %% If the queue is not mirrored, it returns ''
+ '' == proplists:get_value(slave_pids, Info);
+verify_policy(all, Info) ->
+ 2 == length(proplists:get_value(slave_pids, Info));
+verify_policy({exactly, 1}, Info) ->
+ %% If the queue is mirrored, it returns a list
+ [] == proplists:get_value(slave_pids, Info);
+verify_policy({exactly, N}, Info) ->
+ (N - 1) == length(proplists:get_value(slave_pids, Info));
+verify_policy({nodes, Nodes}, Info) ->
+ Master = node(proplists:get_value(pid, Info)),
+ Slaves = [node(P) || P <- proplists:get_value(slave_pids, Info)],
+ lists:sort(Nodes) == lists:sort([Master | Slaves]).
+
+%% Policies
+apply_policy(Config, N, undefined) ->
+ _ = rabbit_ct_broker_helpers:clear_policy(Config, N, ?POLICY);
+apply_policy(Config, N, all) ->
+ rabbit_ct_broker_helpers:set_ha_policy(
+ Config, N, ?POLICY, <<"all">>,
+ [{<<"ha-sync-mode">>, <<"automatic">>}]);
+apply_policy(Config, N, {nodes, Nodes}) ->
+ NNodes = [rabbit_misc:atom_to_binary(Node) || Node <- Nodes],
+ rabbit_ct_broker_helpers:set_ha_policy(
+ Config, N, ?POLICY, {<<"nodes">>, NNodes},
+ [{<<"ha-sync-mode">>, <<"automatic">>}]);
+apply_policy(Config, N, {exactly, Exactly}) ->
+ rabbit_ct_broker_helpers:set_ha_policy(
+ Config, N, ?POLICY, {<<"exactly">>, Exactly},
+ [{<<"ha-sync-mode">>, <<"automatic">>}]).