diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-05-29 12:43:38 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-05-29 12:43:38 +0100 |
| commit | f51c08553dda5fabb2fcb01e07dc6143d82bcbbf (patch) | |
| tree | 78b58b61404f73ea05ec0c3d30898638c655cbe4 | |
| parent | c40e02dcf6be936bc746902efe2a74006a77fea5 (diff) | |
| parent | 61db51426b622d5bff0031234f56236b5dd8bf45 (diff) | |
| download | rabbitmq-server-git-f51c08553dda5fabb2fcb01e07dc6143d82bcbbf.tar.gz | |
Merge bug26210
| -rw-r--r-- | docs/rabbitmq.config.example | 14 | ||||
| -rwxr-xr-x | quickcheck | 3 | ||||
| -rw-r--r-- | src/gm_qc.erl | 245 | ||||
| -rw-r--r-- | src/rabbit_channel_interceptor.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_dead_letter.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 49 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 2 |
8 files changed, 316 insertions, 45 deletions
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index b0e13b1b8d..26de71b70d 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -257,9 +257,13 @@ %% {certfile, "/path/to/cert.pem"}, %% {keyfile, "/path/to/key.pem"}]}]}, + %% One of 'basic', 'detailed' or 'none'. See + %% http://www.rabbitmq.com/management.html#fine-stats for more details. + %% {rates_mode, basic}, + %% Configure how long aggregated data (such as message rates and queue %% lengths) is retained. Please read the plugin's documentation in - %% https://www.rabbitmq.com/management.html#configuration for more + %% http://www.rabbitmq.com/management.html#configuration for more %% details. %% %% {sample_retention_policies, @@ -268,14 +272,6 @@ %% {detailed, [{10, 5}]}]} ]}, - {rabbitmq_management_agent, - [%% Misc/Advanced Options - %% - %% NB: Change these only if you understand what you are doing! - %% - %% {force_fine_statistics, true} - ]}, - %% ---------------------------------------------------------------------------- %% RabbitMQ Shovel Plugin %% diff --git a/quickcheck b/quickcheck index b5382d75b9..40f130919f 100755 --- a/quickcheck +++ b/quickcheck @@ -17,7 +17,8 @@ main([NodeStr, ModStr, TrialsStr]) -> case rpc:call(Node, proper, module, [Mod] ++ [[{numtests, Trials}, {constraint_tries, 200}]]) of [] -> ok; - _ -> quit(1) + R -> io:format("~p.~n", [R]), + quit(1) end; {badrpc, Reason} -> io:format("Could not contact node ~p: ~p.~n", [Node, Reason]), diff --git a/src/gm_qc.erl b/src/gm_qc.erl new file mode 100644 index 0000000000..2382f7f17a --- /dev/null +++ b/src/gm_qc.erl @@ -0,0 +1,245 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(gm_qc). +-ifdef(use_proper_qc). +%%-include("rabbit.hrl"). +-include_lib("proper/include/proper.hrl"). + +-define(GROUP, test_group). +-define(MAX_SIZE, 5). +-define(MSG_TIMEOUT, 1000000). %% micros + +-export([prop_gm_test/0]). + +-behaviour(proper_statem). +-export([initial_state/0, command/1, precondition/2, postcondition/3, + next_state/3]). + +-behaviour(gm). +-export([joined/2, members_changed/3, handle_msg/3, terminate/2]). + +%% Helpers +-export([do_join/0, do_leave/1, do_send/2]). + +-record(state, {seq, msgs}). + +prop_gm_test() -> + ?FORALL(Cmds, commands(?MODULE, initial_state()), + gm_test(Cmds)). + +gm_test(Cmds) -> + {_H, State, Res} = run_commands(?MODULE, Cmds), + cleanup(State), + ?WHENFAIL( + io:format("Result: ~p~n", [Res]), + aggregate(command_names(Cmds), Res =:= ok)). + +cleanup(S) -> + #state{msgs = Msgs} = ensure_outstanding_msgs_received(S), + All = gms(Msgs), + [gm:leave(GM) || GM <- All], + [await_death(GM) || GM <- All], + ok. + +await_death(P) -> + MRef = erlang:monitor(process, P), + receive + {'DOWN', MRef, process, _, _} -> ok + end. + +%% --------------------------------------------------------------------------- +%% proper_statem +%% --------------------------------------------------------------------------- + +initial_state() -> #state{seq = 1, + msgs = dict:new()}. + +command(S = #state{msgs = Msgs}) -> + case dict:size(Msgs) of + 0 -> qc_join(S); + _ -> frequency([{1, qc_join(S)}, + {1, qc_leave(S)}, + {10, qc_send(S)}]) + end. + +qc_join(_S) -> {call,?MODULE,do_join, []}. +qc_leave(#state{msgs = Msgs}) -> {call,?MODULE,do_leave,[random(gms(Msgs))]}. +qc_send(#state{seq = N, + msgs = Msgs}) -> {call,?MODULE,do_send, [N, random(gms(Msgs))]}. + +random([]) -> will_fail_precondition; +random(L) -> lists:nth(random:uniform(length(L)), L). + +precondition(#state{msgs = Msgs}, {call, ?MODULE, do_join, []}) -> + dict:size(Msgs) < ?MAX_SIZE; + +precondition(#state{msgs = Msgs}, {call, ?MODULE, do_leave, [_GM]}) -> + dict:size(Msgs) > 0; + +precondition(#state{msgs = Msgs}, {call, ?MODULE, do_send, [_N, _GM]}) -> + dict:size(Msgs) > 0. + +postcondition(S = #state{msgs = Msgs}, {call, ?MODULE, do_join, []}, GM) -> + [begin + gm:broadcast(Existing, heartbeat), + receive + {birth, Existing, GM} -> ok + after 1000 -> + exit({birth_timeout, Existing, did_not_announce, GM}) + end + end || Existing <- gms(Msgs) -- [GM]], + assert(S); + +postcondition(S = #state{msgs = Msgs}, + {call, ?MODULE, do_leave, [Dead]}, _Res) -> + [await_death(Existing, Dead, 5) || Existing <- gms(Msgs) -- [Dead]], + assert(S#state{msgs = dict:erase(Dead, Msgs)}); + +postcondition(S = #state{}, {call, _M, _F, _A}, _Res) -> + assert(S). + +next_state(S = #state{msgs = Msgs}, GM, {call, ?MODULE, do_join, []}) -> + S#state{msgs = dict:store(GM, {gb_trees:empty(), gb_sets:empty()}, Msgs)}; + +next_state(S = #state{msgs = Msgs}, _GM, {call, ?MODULE, do_leave, [GM]}) -> + true = dict:is_key(GM, Msgs), + S#state{msgs = dict:erase(GM, Msgs)}; + +next_state(S = #state{seq = Seq, + msgs = Msgs}, Msg, {call, ?MODULE, do_send, [_, _]}) -> + TS = timestamp(), + Msgs1 = dict:map(fun (_GM, {Tree, Set}) -> + {gb_trees:insert(Msg, TS, Tree), + gb_sets:add_element({TS, Msg}, Set)} + end, Msgs), + drain(S#state{seq = Seq + 1, + msgs = Msgs1}). + +%% --------------------------------------------------------------------------- +%% GM +%% --------------------------------------------------------------------------- + +joined(Pid, _Members) -> Pid ! {joined, self()}, ok. +members_changed(Pid, Bs, Ds) -> [Pid ! {birth, self(), B} || B <- Bs], + [Pid ! {death, self(), D} || D <- Ds], + ok. +handle_msg(_Pid, _From, heartbeat) -> ok; +handle_msg(Pid, _From, Msg) -> Pid ! {gm, self(), Msg}, ok. +terminate(_Pid, _Reason) -> ok. + +%% --------------------------------------------------------------------------- +%% Helpers +%% --------------------------------------------------------------------------- + +do_join() -> + {ok, GM} = gm:start_link(?GROUP, ?MODULE, self(), + fun rabbit_misc:execute_mnesia_transaction/1), + receive + {joined, GM} -> ok + end, + GM. + +do_leave(GM) -> + gm:leave(GM). + +do_send(Seq, GM) -> + Msg = [{sequence, Seq}, + {first_gm, GM}], + gm:broadcast(GM, Msg), + Msg. + +await_death(GM, ToDie, 0) -> + exit({death_msg_timeout, GM, ToDie}); +await_death(GM, ToDie, N) -> + gm:broadcast(GM, heartbeat), + receive + {death, GM, ToDie} -> ok + after 100 -> + await_death(GM, ToDie, N - 1) + end. + +gms(Msgs) -> dict:fetch_keys(Msgs). + +drain(S = #state{msgs = Msgs}) -> + receive + {gm, GM, Msg} -> + case dict:find(GM, Msgs) of + {ok, {Tree, Set}} -> + case gb_trees:lookup(Msg, Tree) of + {value, TS} -> + Msgs1 = dict:store( + GM, {gb_trees:delete(Msg, Tree), + gb_sets:del_element({TS, Msg}, Set)}, + Msgs), + drain(S#state{msgs = Msgs1}); + none -> + %% Message from GM that joined after we + %% broadcast the message. OK. + drain(S) + end; + error -> + %% Message from GM that has already died. OK. + drain(S) + end + after 0 -> + S + end. + +assert(#state{msgs = Msgs}) -> + TS = timestamp(), + dict:fold(fun (GM, {_Tree, Set}, none) -> + case gb_sets:size(Set) of + 0 -> ok; + _ -> {TS0, Msg} = gb_sets:smallest(Set), + case TS0 + ?MSG_TIMEOUT < TS of + true -> exit({msg_timeout, + [{msg, Msg}, + {gm, GM}, + {all, gms(Msgs)}]}); + false -> ok + end + end, + none + end, none, Msgs), + true. + +ensure_outstanding_msgs_received(S) -> + case outstanding_msgs(S) of + false -> S; + true -> timer:sleep(100), + S2 = drain(S), + assert(S2), + ensure_outstanding_msgs_received(S2) + end. + +outstanding_msgs(#state{msgs = Msgs}) -> + dict:fold(fun (_GM, {_Tree, Set}, false) -> not gb_sets:is_empty(Set); + (_GM, {_Tree, _Set}, true) -> true + end, false, Msgs). + +timestamp() -> timer:now_diff(os:timestamp(), {0, 0, 0}). + +-else. + +-export([prop_disabled/0]). + +prop_disabled() -> + exit({compiled_without_proper, + "PropEr was not present during compilation of the test module. " + "Hence all tests are disabled."}). + +-endif. diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl index 81c17fbfbe..db9349acfb 100644 --- a/src/rabbit_channel_interceptor.erl +++ b/src/rabbit_channel_interceptor.erl @@ -33,7 +33,7 @@ -callback description() -> [proplists:property()]. -callback intercept(original_method(), rabbit_types:vhost()) -> - rabbit_types:ok_or_error2(processed_method(), any()). + processed_method() | rabbit_misc:channel_or_connection_exit(). %% Whether the interceptor wishes to intercept the amqp method -callback applies_to(intercept_method()) -> boolean(). @@ -62,20 +62,15 @@ intercept_method(M, VHost) -> intercept_method(M, _VHost, []) -> M; intercept_method(M, VHost, [I]) -> - case I:intercept(M, VHost) of - {ok, M2} -> - case validate_method(M, M2) of - true -> - M2; - _ -> - internal_error("Interceptor: ~p expected " - "to return method: ~p but returned: ~p", - [I, rabbit_misc:method_record_type(M), - rabbit_misc:method_record_type(M2)]) - end; - {error, Reason} -> - internal_error("Interceptor: ~p failed with reason: ~p", - [I, Reason]) + M2 = I:intercept(M, VHost), + case validate_method(M, M2) of + true -> + M2; + _ -> + internal_error("Interceptor: ~p expected " + "to return method: ~p but returned: ~p", + [I, rabbit_misc:method_record_type(M), + rabbit_misc:method_record_type(M2)]) end; intercept_method(M, _VHost, Is) -> internal_error("More than one interceptor for method: ~p -- ~p", diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index ec32e6878d..728bc43117 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -129,6 +129,9 @@ is_cycle(Queue, Deaths) -> {longstr, <<"rejected">>} =/= rabbit_misc:table_lookup(D, <<"reason">>); (_) -> + %% There was something we didn't expect, therefore + %% a client must have put it there, therefore the + %% cycle was not "fully automatic". false end, Cycle ++ [H]) end. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 2b16b9118d..24b22d4cc5 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -170,10 +170,24 @@ terminate({shutdown, dropped} = Reason, State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}; terminate(Reason, - State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + State = #state { name = QName, + backing_queue = BQ, + backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but %% shouldn't be deleted. Most likely safe shutdown of this - %% node. Thus just let some other slave take over. + %% node. + {ok, Q = #amqqueue{sync_slave_pids = SSPids}} = + rabbit_amqqueue:lookup(QName), + case SSPids =:= [] andalso + rabbit_policy:get(<<"ha-promote-on-shutdown">>, Q) =/= <<"always">> of + true -> %% Remove the whole queue to avoid data loss + rabbit_mirror_queue_misc:log_warning( + QName, "Stopping all nodes on master shutdown since no " + "synchronised slave is available~n", []), + stop_all_slaves(Reason, State); + false -> %% Just let some other slave take over. + ok + end, State #state { backing_queue_state = BQ:terminate(Reason, BQS) }. delete_and_terminate(Reason, State = #state { backing_queue = BQ, @@ -181,7 +195,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ, stop_all_slaves(Reason, State), State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}. -stop_all_slaves(Reason, #state{name = QName, gm = GM}) -> +stop_all_slaves(Reason, #state{name = QName, gm = GM}) -> {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), MRefs = [erlang:monitor(process, Pid) || Pid <- [GM | SPids]], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index b0f092a9a0..7aec1ac81f 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -29,16 +29,19 @@ -include("rabbit.hrl"). --rabbit_boot_step({?MODULE, - [{description, "HA policy validation"}, - {mfa, {rabbit_registry, register, - [policy_validator, <<"ha-mode">>, ?MODULE]}}, - {mfa, {rabbit_registry, register, - [policy_validator, <<"ha-params">>, ?MODULE]}}, - {mfa, {rabbit_registry, register, - [policy_validator, <<"ha-sync-mode">>, ?MODULE]}}, - {requires, rabbit_registry}, - {enables, recovery}]}). +-rabbit_boot_step( + {?MODULE, + [{description, "HA policy validation"}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-mode">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-params">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-sync-mode">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, recovery}]}). %%---------------------------------------------------------------------------- @@ -374,16 +377,21 @@ validate_policy(KeyList) -> Mode = proplists:get_value(<<"ha-mode">>, KeyList, none), Params = proplists:get_value(<<"ha-params">>, KeyList, none), SyncMode = proplists:get_value(<<"ha-sync-mode">>, KeyList, none), - case {Mode, Params, SyncMode} of - {none, none, none} -> + PromoteOnShutdown = proplists:get_value( + <<"ha-promote-on-shutdown">>, KeyList, none), + case {Mode, Params, SyncMode, PromoteOnShutdown} of + {none, none, none, none} -> ok; - {none, _, _} -> - {error, "ha-mode must be specified to specify ha-params or " - "ha-sync-mode", []}; + {none, _, _, _} -> + {error, "ha-mode must be specified to specify ha-params, " + "ha-sync-mode or ha-promote-on-shutdown", []}; _ -> case module(Mode) of {ok, M} -> case M:validate_policy(Params) of - ok -> validate_sync_mode(SyncMode); + ok -> case validate_sync_mode(SyncMode) of + ok -> validate_pos(PromoteOnShutdown); + E -> E + end; E -> E end; _ -> {error, "~p is not a valid ha-mode value", [Mode]} @@ -398,3 +406,12 @@ validate_sync_mode(SyncMode) -> Mode -> {error, "ha-sync-mode must be \"manual\" " "or \"automatic\", got ~p", [Mode]} end. + +validate_pos(PromoteOnShutdown) -> + case PromoteOnShutdown of + <<"always">> -> ok; + <<"when-synced">> -> ok; + none -> ok; + Mode -> {error, "ha-promote-on-shutdown must be " + "\"always\" or \"when-synced\", got ~p", [Mode]} + end. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 58e93a3f9e..18c07f86f1 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -81,7 +81,7 @@ -ifdef(use_specs). --export_type([resource_name/0, thunk/1]). +-export_type([resource_name/0, thunk/1, channel_or_connection_exit/0]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). |
