summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-05-27 13:09:45 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-05-27 13:09:45 +0100
commitecdb21a6b5e02000b293b20b14dcf96ef910c410 (patch)
tree2b11c92909cdfdf50fe50a0df4e767d8659cf721 /src
parent045bad63a17da8334a8b9dd5b97cf26231a8d3c3 (diff)
parentb3339ca123dd13b0725e42e508c257ca3789b8ea (diff)
downloadrabbitmq-server-git-ecdb21a6b5e02000b293b20b14dcf96ef910c410.tar.gz
stable to default
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel_interceptor.erl25
-rw-r--r--src/rabbit_dead_letter.erl3
-rw-r--r--src/rabbit_mirror_queue_master.erl20
-rw-r--r--src/rabbit_mirror_queue_misc.erl49
-rw-r--r--src/rabbit_misc.erl2
5 files changed, 64 insertions, 35 deletions
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl
index 81c17fbfbe..db9349acfb 100644
--- a/src/rabbit_channel_interceptor.erl
+++ b/src/rabbit_channel_interceptor.erl
@@ -33,7 +33,7 @@
-callback description() -> [proplists:property()].
-callback intercept(original_method(), rabbit_types:vhost()) ->
- rabbit_types:ok_or_error2(processed_method(), any()).
+ processed_method() | rabbit_misc:channel_or_connection_exit().
%% Whether the interceptor wishes to intercept the amqp method
-callback applies_to(intercept_method()) -> boolean().
@@ -62,20 +62,15 @@ intercept_method(M, VHost) ->
intercept_method(M, _VHost, []) ->
M;
intercept_method(M, VHost, [I]) ->
- case I:intercept(M, VHost) of
- {ok, M2} ->
- case validate_method(M, M2) of
- true ->
- M2;
- _ ->
- internal_error("Interceptor: ~p expected "
- "to return method: ~p but returned: ~p",
- [I, rabbit_misc:method_record_type(M),
- rabbit_misc:method_record_type(M2)])
- end;
- {error, Reason} ->
- internal_error("Interceptor: ~p failed with reason: ~p",
- [I, Reason])
+ M2 = I:intercept(M, VHost),
+ case validate_method(M, M2) of
+ true ->
+ M2;
+ _ ->
+ internal_error("Interceptor: ~p expected "
+ "to return method: ~p but returned: ~p",
+ [I, rabbit_misc:method_record_type(M),
+ rabbit_misc:method_record_type(M2)])
end;
intercept_method(M, _VHost, Is) ->
internal_error("More than one interceptor for method: ~p -- ~p",
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
index ec32e6878d..728bc43117 100644
--- a/src/rabbit_dead_letter.erl
+++ b/src/rabbit_dead_letter.erl
@@ -129,6 +129,9 @@ is_cycle(Queue, Deaths) ->
{longstr, <<"rejected">>} =/=
rabbit_misc:table_lookup(D, <<"reason">>);
(_) ->
+ %% There was something we didn't expect, therefore
+ %% a client must have put it there, therefore the
+ %% cycle was not "fully automatic".
false
end, Cycle ++ [H])
end.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 2b16b9118d..24b22d4cc5 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -170,10 +170,24 @@ terminate({shutdown, dropped} = Reason,
State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)};
terminate(Reason,
- State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ State = #state { name = QName,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
%% shouldn't be deleted. Most likely safe shutdown of this
- %% node. Thus just let some other slave take over.
+ %% node.
+ {ok, Q = #amqqueue{sync_slave_pids = SSPids}} =
+ rabbit_amqqueue:lookup(QName),
+ case SSPids =:= [] andalso
+ rabbit_policy:get(<<"ha-promote-on-shutdown">>, Q) =/= <<"always">> of
+ true -> %% Remove the whole queue to avoid data loss
+ rabbit_mirror_queue_misc:log_warning(
+ QName, "Stopping all nodes on master shutdown since no "
+ "synchronised slave is available~n", []),
+ stop_all_slaves(Reason, State);
+ false -> %% Just let some other slave take over.
+ ok
+ end,
State #state { backing_queue_state = BQ:terminate(Reason, BQS) }.
delete_and_terminate(Reason, State = #state { backing_queue = BQ,
@@ -181,7 +195,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ,
stop_all_slaves(Reason, State),
State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}.
-stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
+stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
MRefs = [erlang:monitor(process, Pid) || Pid <- [GM | SPids]],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index b0f092a9a0..7aec1ac81f 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -29,16 +29,19 @@
-include("rabbit.hrl").
--rabbit_boot_step({?MODULE,
- [{description, "HA policy validation"},
- {mfa, {rabbit_registry, register,
- [policy_validator, <<"ha-mode">>, ?MODULE]}},
- {mfa, {rabbit_registry, register,
- [policy_validator, <<"ha-params">>, ?MODULE]}},
- {mfa, {rabbit_registry, register,
- [policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
- {requires, rabbit_registry},
- {enables, recovery}]}).
+-rabbit_boot_step(
+ {?MODULE,
+ [{description, "HA policy validation"},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-mode">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-params">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
%%----------------------------------------------------------------------------
@@ -374,16 +377,21 @@ validate_policy(KeyList) ->
Mode = proplists:get_value(<<"ha-mode">>, KeyList, none),
Params = proplists:get_value(<<"ha-params">>, KeyList, none),
SyncMode = proplists:get_value(<<"ha-sync-mode">>, KeyList, none),
- case {Mode, Params, SyncMode} of
- {none, none, none} ->
+ PromoteOnShutdown = proplists:get_value(
+ <<"ha-promote-on-shutdown">>, KeyList, none),
+ case {Mode, Params, SyncMode, PromoteOnShutdown} of
+ {none, none, none, none} ->
ok;
- {none, _, _} ->
- {error, "ha-mode must be specified to specify ha-params or "
- "ha-sync-mode", []};
+ {none, _, _, _} ->
+ {error, "ha-mode must be specified to specify ha-params, "
+ "ha-sync-mode or ha-promote-on-shutdown", []};
_ ->
case module(Mode) of
{ok, M} -> case M:validate_policy(Params) of
- ok -> validate_sync_mode(SyncMode);
+ ok -> case validate_sync_mode(SyncMode) of
+ ok -> validate_pos(PromoteOnShutdown);
+ E -> E
+ end;
E -> E
end;
_ -> {error, "~p is not a valid ha-mode value", [Mode]}
@@ -398,3 +406,12 @@ validate_sync_mode(SyncMode) ->
Mode -> {error, "ha-sync-mode must be \"manual\" "
"or \"automatic\", got ~p", [Mode]}
end.
+
+validate_pos(PromoteOnShutdown) ->
+ case PromoteOnShutdown of
+ <<"always">> -> ok;
+ <<"when-synced">> -> ok;
+ none -> ok;
+ Mode -> {error, "ha-promote-on-shutdown must be "
+ "\"always\" or \"when-synced\", got ~p", [Mode]}
+ end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 58e93a3f9e..18c07f86f1 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -81,7 +81,7 @@
-ifdef(use_specs).
--export_type([resource_name/0, thunk/1]).
+-export_type([resource_name/0, thunk/1, channel_or_connection_exit/0]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).