summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-05-29 12:43:38 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-05-29 12:43:38 +0100
commitf51c08553dda5fabb2fcb01e07dc6143d82bcbbf (patch)
tree78b58b61404f73ea05ec0c3d30898638c655cbe4 /src
parentc40e02dcf6be936bc746902efe2a74006a77fea5 (diff)
parent61db51426b622d5bff0031234f56236b5dd8bf45 (diff)
downloadrabbitmq-server-git-f51c08553dda5fabb2fcb01e07dc6143d82bcbbf.tar.gz
Merge bug26210
Diffstat (limited to 'src')
-rw-r--r--src/gm_qc.erl245
-rw-r--r--src/rabbit_channel_interceptor.erl25
-rw-r--r--src/rabbit_dead_letter.erl3
-rw-r--r--src/rabbit_mirror_queue_master.erl20
-rw-r--r--src/rabbit_mirror_queue_misc.erl49
-rw-r--r--src/rabbit_misc.erl2
6 files changed, 309 insertions, 35 deletions
diff --git a/src/gm_qc.erl b/src/gm_qc.erl
new file mode 100644
index 0000000000..2382f7f17a
--- /dev/null
+++ b/src/gm_qc.erl
@@ -0,0 +1,245 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(gm_qc).
+-ifdef(use_proper_qc).
+%%-include("rabbit.hrl").
+-include_lib("proper/include/proper.hrl").
+
+-define(GROUP, test_group).
+-define(MAX_SIZE, 5).
+-define(MSG_TIMEOUT, 1000000). %% micros
+
+-export([prop_gm_test/0]).
+
+-behaviour(proper_statem).
+-export([initial_state/0, command/1, precondition/2, postcondition/3,
+ next_state/3]).
+
+-behaviour(gm).
+-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
+
+%% Helpers
+-export([do_join/0, do_leave/1, do_send/2]).
+
+-record(state, {seq, msgs}).
+
+prop_gm_test() ->
+ ?FORALL(Cmds, commands(?MODULE, initial_state()),
+ gm_test(Cmds)).
+
+gm_test(Cmds) ->
+ {_H, State, Res} = run_commands(?MODULE, Cmds),
+ cleanup(State),
+ ?WHENFAIL(
+ io:format("Result: ~p~n", [Res]),
+ aggregate(command_names(Cmds), Res =:= ok)).
+
+cleanup(S) ->
+ #state{msgs = Msgs} = ensure_outstanding_msgs_received(S),
+ All = gms(Msgs),
+ [gm:leave(GM) || GM <- All],
+ [await_death(GM) || GM <- All],
+ ok.
+
+await_death(P) ->
+ MRef = erlang:monitor(process, P),
+ receive
+ {'DOWN', MRef, process, _, _} -> ok
+ end.
+
+%% ---------------------------------------------------------------------------
+%% proper_statem
+%% ---------------------------------------------------------------------------
+
+initial_state() -> #state{seq = 1,
+ msgs = dict:new()}.
+
+command(S = #state{msgs = Msgs}) ->
+ case dict:size(Msgs) of
+ 0 -> qc_join(S);
+ _ -> frequency([{1, qc_join(S)},
+ {1, qc_leave(S)},
+ {10, qc_send(S)}])
+ end.
+
+qc_join(_S) -> {call,?MODULE,do_join, []}.
+qc_leave(#state{msgs = Msgs}) -> {call,?MODULE,do_leave,[random(gms(Msgs))]}.
+qc_send(#state{seq = N,
+ msgs = Msgs}) -> {call,?MODULE,do_send, [N, random(gms(Msgs))]}.
+
+random([]) -> will_fail_precondition;
+random(L) -> lists:nth(random:uniform(length(L)), L).
+
+precondition(#state{msgs = Msgs}, {call, ?MODULE, do_join, []}) ->
+ dict:size(Msgs) < ?MAX_SIZE;
+
+precondition(#state{msgs = Msgs}, {call, ?MODULE, do_leave, [_GM]}) ->
+ dict:size(Msgs) > 0;
+
+precondition(#state{msgs = Msgs}, {call, ?MODULE, do_send, [_N, _GM]}) ->
+ dict:size(Msgs) > 0.
+
+postcondition(S = #state{msgs = Msgs}, {call, ?MODULE, do_join, []}, GM) ->
+ [begin
+ gm:broadcast(Existing, heartbeat),
+ receive
+ {birth, Existing, GM} -> ok
+ after 1000 ->
+ exit({birth_timeout, Existing, did_not_announce, GM})
+ end
+ end || Existing <- gms(Msgs) -- [GM]],
+ assert(S);
+
+postcondition(S = #state{msgs = Msgs},
+ {call, ?MODULE, do_leave, [Dead]}, _Res) ->
+ [await_death(Existing, Dead, 5) || Existing <- gms(Msgs) -- [Dead]],
+ assert(S#state{msgs = dict:erase(Dead, Msgs)});
+
+postcondition(S = #state{}, {call, _M, _F, _A}, _Res) ->
+ assert(S).
+
+next_state(S = #state{msgs = Msgs}, GM, {call, ?MODULE, do_join, []}) ->
+ S#state{msgs = dict:store(GM, {gb_trees:empty(), gb_sets:empty()}, Msgs)};
+
+next_state(S = #state{msgs = Msgs}, _GM, {call, ?MODULE, do_leave, [GM]}) ->
+ true = dict:is_key(GM, Msgs),
+ S#state{msgs = dict:erase(GM, Msgs)};
+
+next_state(S = #state{seq = Seq,
+ msgs = Msgs}, Msg, {call, ?MODULE, do_send, [_, _]}) ->
+ TS = timestamp(),
+ Msgs1 = dict:map(fun (_GM, {Tree, Set}) ->
+ {gb_trees:insert(Msg, TS, Tree),
+ gb_sets:add_element({TS, Msg}, Set)}
+ end, Msgs),
+ drain(S#state{seq = Seq + 1,
+ msgs = Msgs1}).
+
+%% ---------------------------------------------------------------------------
+%% GM
+%% ---------------------------------------------------------------------------
+
+joined(Pid, _Members) -> Pid ! {joined, self()}, ok.
+members_changed(Pid, Bs, Ds) -> [Pid ! {birth, self(), B} || B <- Bs],
+ [Pid ! {death, self(), D} || D <- Ds],
+ ok.
+handle_msg(_Pid, _From, heartbeat) -> ok;
+handle_msg(Pid, _From, Msg) -> Pid ! {gm, self(), Msg}, ok.
+terminate(_Pid, _Reason) -> ok.
+
+%% ---------------------------------------------------------------------------
+%% Helpers
+%% ---------------------------------------------------------------------------
+
+do_join() ->
+ {ok, GM} = gm:start_link(?GROUP, ?MODULE, self(),
+ fun rabbit_misc:execute_mnesia_transaction/1),
+ receive
+ {joined, GM} -> ok
+ end,
+ GM.
+
+do_leave(GM) ->
+ gm:leave(GM).
+
+do_send(Seq, GM) ->
+ Msg = [{sequence, Seq},
+ {first_gm, GM}],
+ gm:broadcast(GM, Msg),
+ Msg.
+
+await_death(GM, ToDie, 0) ->
+ exit({death_msg_timeout, GM, ToDie});
+await_death(GM, ToDie, N) ->
+ gm:broadcast(GM, heartbeat),
+ receive
+ {death, GM, ToDie} -> ok
+ after 100 ->
+ await_death(GM, ToDie, N - 1)
+ end.
+
+gms(Msgs) -> dict:fetch_keys(Msgs).
+
+drain(S = #state{msgs = Msgs}) ->
+ receive
+ {gm, GM, Msg} ->
+ case dict:find(GM, Msgs) of
+ {ok, {Tree, Set}} ->
+ case gb_trees:lookup(Msg, Tree) of
+ {value, TS} ->
+ Msgs1 = dict:store(
+ GM, {gb_trees:delete(Msg, Tree),
+ gb_sets:del_element({TS, Msg}, Set)},
+ Msgs),
+ drain(S#state{msgs = Msgs1});
+ none ->
+ %% Message from GM that joined after we
+ %% broadcast the message. OK.
+ drain(S)
+ end;
+ error ->
+ %% Message from GM that has already died. OK.
+ drain(S)
+ end
+ after 0 ->
+ S
+ end.
+
+assert(#state{msgs = Msgs}) ->
+ TS = timestamp(),
+ dict:fold(fun (GM, {_Tree, Set}, none) ->
+ case gb_sets:size(Set) of
+ 0 -> ok;
+ _ -> {TS0, Msg} = gb_sets:smallest(Set),
+ case TS0 + ?MSG_TIMEOUT < TS of
+ true -> exit({msg_timeout,
+ [{msg, Msg},
+ {gm, GM},
+ {all, gms(Msgs)}]});
+ false -> ok
+ end
+ end,
+ none
+ end, none, Msgs),
+ true.
+
+ensure_outstanding_msgs_received(S) ->
+ case outstanding_msgs(S) of
+ false -> S;
+ true -> timer:sleep(100),
+ S2 = drain(S),
+ assert(S2),
+ ensure_outstanding_msgs_received(S2)
+ end.
+
+outstanding_msgs(#state{msgs = Msgs}) ->
+ dict:fold(fun (_GM, {_Tree, Set}, false) -> not gb_sets:is_empty(Set);
+ (_GM, {_Tree, _Set}, true) -> true
+ end, false, Msgs).
+
+timestamp() -> timer:now_diff(os:timestamp(), {0, 0, 0}).
+
+-else.
+
+-export([prop_disabled/0]).
+
+prop_disabled() ->
+ exit({compiled_without_proper,
+ "PropEr was not present during compilation of the test module. "
+ "Hence all tests are disabled."}).
+
+-endif.
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl
index 81c17fbfbe..db9349acfb 100644
--- a/src/rabbit_channel_interceptor.erl
+++ b/src/rabbit_channel_interceptor.erl
@@ -33,7 +33,7 @@
-callback description() -> [proplists:property()].
-callback intercept(original_method(), rabbit_types:vhost()) ->
- rabbit_types:ok_or_error2(processed_method(), any()).
+ processed_method() | rabbit_misc:channel_or_connection_exit().
%% Whether the interceptor wishes to intercept the amqp method
-callback applies_to(intercept_method()) -> boolean().
@@ -62,20 +62,15 @@ intercept_method(M, VHost) ->
intercept_method(M, _VHost, []) ->
M;
intercept_method(M, VHost, [I]) ->
- case I:intercept(M, VHost) of
- {ok, M2} ->
- case validate_method(M, M2) of
- true ->
- M2;
- _ ->
- internal_error("Interceptor: ~p expected "
- "to return method: ~p but returned: ~p",
- [I, rabbit_misc:method_record_type(M),
- rabbit_misc:method_record_type(M2)])
- end;
- {error, Reason} ->
- internal_error("Interceptor: ~p failed with reason: ~p",
- [I, Reason])
+ M2 = I:intercept(M, VHost),
+ case validate_method(M, M2) of
+ true ->
+ M2;
+ _ ->
+ internal_error("Interceptor: ~p expected "
+ "to return method: ~p but returned: ~p",
+ [I, rabbit_misc:method_record_type(M),
+ rabbit_misc:method_record_type(M2)])
end;
intercept_method(M, _VHost, Is) ->
internal_error("More than one interceptor for method: ~p -- ~p",
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
index ec32e6878d..728bc43117 100644
--- a/src/rabbit_dead_letter.erl
+++ b/src/rabbit_dead_letter.erl
@@ -129,6 +129,9 @@ is_cycle(Queue, Deaths) ->
{longstr, <<"rejected">>} =/=
rabbit_misc:table_lookup(D, <<"reason">>);
(_) ->
+ %% There was something we didn't expect, therefore
+ %% a client must have put it there, therefore the
+ %% cycle was not "fully automatic".
false
end, Cycle ++ [H])
end.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 2b16b9118d..24b22d4cc5 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -170,10 +170,24 @@ terminate({shutdown, dropped} = Reason,
State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)};
terminate(Reason,
- State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ State = #state { name = QName,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
%% shouldn't be deleted. Most likely safe shutdown of this
- %% node. Thus just let some other slave take over.
+ %% node.
+ {ok, Q = #amqqueue{sync_slave_pids = SSPids}} =
+ rabbit_amqqueue:lookup(QName),
+ case SSPids =:= [] andalso
+ rabbit_policy:get(<<"ha-promote-on-shutdown">>, Q) =/= <<"always">> of
+ true -> %% Remove the whole queue to avoid data loss
+ rabbit_mirror_queue_misc:log_warning(
+ QName, "Stopping all nodes on master shutdown since no "
+ "synchronised slave is available~n", []),
+ stop_all_slaves(Reason, State);
+ false -> %% Just let some other slave take over.
+ ok
+ end,
State #state { backing_queue_state = BQ:terminate(Reason, BQS) }.
delete_and_terminate(Reason, State = #state { backing_queue = BQ,
@@ -181,7 +195,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ,
stop_all_slaves(Reason, State),
State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}.
-stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
+stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
MRefs = [erlang:monitor(process, Pid) || Pid <- [GM | SPids]],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index b0f092a9a0..7aec1ac81f 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -29,16 +29,19 @@
-include("rabbit.hrl").
--rabbit_boot_step({?MODULE,
- [{description, "HA policy validation"},
- {mfa, {rabbit_registry, register,
- [policy_validator, <<"ha-mode">>, ?MODULE]}},
- {mfa, {rabbit_registry, register,
- [policy_validator, <<"ha-params">>, ?MODULE]}},
- {mfa, {rabbit_registry, register,
- [policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
- {requires, rabbit_registry},
- {enables, recovery}]}).
+-rabbit_boot_step(
+ {?MODULE,
+ [{description, "HA policy validation"},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-mode">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-params">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
%%----------------------------------------------------------------------------
@@ -374,16 +377,21 @@ validate_policy(KeyList) ->
Mode = proplists:get_value(<<"ha-mode">>, KeyList, none),
Params = proplists:get_value(<<"ha-params">>, KeyList, none),
SyncMode = proplists:get_value(<<"ha-sync-mode">>, KeyList, none),
- case {Mode, Params, SyncMode} of
- {none, none, none} ->
+ PromoteOnShutdown = proplists:get_value(
+ <<"ha-promote-on-shutdown">>, KeyList, none),
+ case {Mode, Params, SyncMode, PromoteOnShutdown} of
+ {none, none, none, none} ->
ok;
- {none, _, _} ->
- {error, "ha-mode must be specified to specify ha-params or "
- "ha-sync-mode", []};
+ {none, _, _, _} ->
+ {error, "ha-mode must be specified to specify ha-params, "
+ "ha-sync-mode or ha-promote-on-shutdown", []};
_ ->
case module(Mode) of
{ok, M} -> case M:validate_policy(Params) of
- ok -> validate_sync_mode(SyncMode);
+ ok -> case validate_sync_mode(SyncMode) of
+ ok -> validate_pos(PromoteOnShutdown);
+ E -> E
+ end;
E -> E
end;
_ -> {error, "~p is not a valid ha-mode value", [Mode]}
@@ -398,3 +406,12 @@ validate_sync_mode(SyncMode) ->
Mode -> {error, "ha-sync-mode must be \"manual\" "
"or \"automatic\", got ~p", [Mode]}
end.
+
+validate_pos(PromoteOnShutdown) ->
+ case PromoteOnShutdown of
+ <<"always">> -> ok;
+ <<"when-synced">> -> ok;
+ none -> ok;
+ Mode -> {error, "ha-promote-on-shutdown must be "
+ "\"always\" or \"when-synced\", got ~p", [Mode]}
+ end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 58e93a3f9e..18c07f86f1 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -81,7 +81,7 @@
-ifdef(use_specs).
--export_type([resource_name/0, thunk/1]).
+-export_type([resource_name/0, thunk/1, channel_or_connection_exit/0]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).