diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-05-27 13:09:45 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-05-27 13:09:45 +0100 |
| commit | ecdb21a6b5e02000b293b20b14dcf96ef910c410 (patch) | |
| tree | 2b11c92909cdfdf50fe50a0df4e767d8659cf721 /src | |
| parent | 045bad63a17da8334a8b9dd5b97cf26231a8d3c3 (diff) | |
| parent | b3339ca123dd13b0725e42e508c257ca3789b8ea (diff) | |
| download | rabbitmq-server-git-ecdb21a6b5e02000b293b20b14dcf96ef910c410.tar.gz | |
stable to default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel_interceptor.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_dead_letter.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 49 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 2 |
5 files changed, 64 insertions, 35 deletions
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl index 81c17fbfbe..db9349acfb 100644 --- a/src/rabbit_channel_interceptor.erl +++ b/src/rabbit_channel_interceptor.erl @@ -33,7 +33,7 @@ -callback description() -> [proplists:property()]. -callback intercept(original_method(), rabbit_types:vhost()) -> - rabbit_types:ok_or_error2(processed_method(), any()). + processed_method() | rabbit_misc:channel_or_connection_exit(). %% Whether the interceptor wishes to intercept the amqp method -callback applies_to(intercept_method()) -> boolean(). @@ -62,20 +62,15 @@ intercept_method(M, VHost) -> intercept_method(M, _VHost, []) -> M; intercept_method(M, VHost, [I]) -> - case I:intercept(M, VHost) of - {ok, M2} -> - case validate_method(M, M2) of - true -> - M2; - _ -> - internal_error("Interceptor: ~p expected " - "to return method: ~p but returned: ~p", - [I, rabbit_misc:method_record_type(M), - rabbit_misc:method_record_type(M2)]) - end; - {error, Reason} -> - internal_error("Interceptor: ~p failed with reason: ~p", - [I, Reason]) + M2 = I:intercept(M, VHost), + case validate_method(M, M2) of + true -> + M2; + _ -> + internal_error("Interceptor: ~p expected " + "to return method: ~p but returned: ~p", + [I, rabbit_misc:method_record_type(M), + rabbit_misc:method_record_type(M2)]) end; intercept_method(M, _VHost, Is) -> internal_error("More than one interceptor for method: ~p -- ~p", diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index ec32e6878d..728bc43117 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -129,6 +129,9 @@ is_cycle(Queue, Deaths) -> {longstr, <<"rejected">>} =/= rabbit_misc:table_lookup(D, <<"reason">>); (_) -> + %% There was something we didn't expect, therefore + %% a client must have put it there, therefore the + %% cycle was not "fully automatic". false end, Cycle ++ [H]) end. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 2b16b9118d..24b22d4cc5 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -170,10 +170,24 @@ terminate({shutdown, dropped} = Reason, State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}; terminate(Reason, - State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + State = #state { name = QName, + backing_queue = BQ, + backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but %% shouldn't be deleted. Most likely safe shutdown of this - %% node. Thus just let some other slave take over. + %% node. + {ok, Q = #amqqueue{sync_slave_pids = SSPids}} = + rabbit_amqqueue:lookup(QName), + case SSPids =:= [] andalso + rabbit_policy:get(<<"ha-promote-on-shutdown">>, Q) =/= <<"always">> of + true -> %% Remove the whole queue to avoid data loss + rabbit_mirror_queue_misc:log_warning( + QName, "Stopping all nodes on master shutdown since no " + "synchronised slave is available~n", []), + stop_all_slaves(Reason, State); + false -> %% Just let some other slave take over. + ok + end, State #state { backing_queue_state = BQ:terminate(Reason, BQS) }. delete_and_terminate(Reason, State = #state { backing_queue = BQ, @@ -181,7 +195,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ, stop_all_slaves(Reason, State), State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}. -stop_all_slaves(Reason, #state{name = QName, gm = GM}) -> +stop_all_slaves(Reason, #state{name = QName, gm = GM}) -> {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), MRefs = [erlang:monitor(process, Pid) || Pid <- [GM | SPids]], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index b0f092a9a0..7aec1ac81f 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -29,16 +29,19 @@ -include("rabbit.hrl"). --rabbit_boot_step({?MODULE, - [{description, "HA policy validation"}, - {mfa, {rabbit_registry, register, - [policy_validator, <<"ha-mode">>, ?MODULE]}}, - {mfa, {rabbit_registry, register, - [policy_validator, <<"ha-params">>, ?MODULE]}}, - {mfa, {rabbit_registry, register, - [policy_validator, <<"ha-sync-mode">>, ?MODULE]}}, - {requires, rabbit_registry}, - {enables, recovery}]}). +-rabbit_boot_step( + {?MODULE, + [{description, "HA policy validation"}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-mode">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-params">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-sync-mode">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, recovery}]}). %%---------------------------------------------------------------------------- @@ -374,16 +377,21 @@ validate_policy(KeyList) -> Mode = proplists:get_value(<<"ha-mode">>, KeyList, none), Params = proplists:get_value(<<"ha-params">>, KeyList, none), SyncMode = proplists:get_value(<<"ha-sync-mode">>, KeyList, none), - case {Mode, Params, SyncMode} of - {none, none, none} -> + PromoteOnShutdown = proplists:get_value( + <<"ha-promote-on-shutdown">>, KeyList, none), + case {Mode, Params, SyncMode, PromoteOnShutdown} of + {none, none, none, none} -> ok; - {none, _, _} -> - {error, "ha-mode must be specified to specify ha-params or " - "ha-sync-mode", []}; + {none, _, _, _} -> + {error, "ha-mode must be specified to specify ha-params, " + "ha-sync-mode or ha-promote-on-shutdown", []}; _ -> case module(Mode) of {ok, M} -> case M:validate_policy(Params) of - ok -> validate_sync_mode(SyncMode); + ok -> case validate_sync_mode(SyncMode) of + ok -> validate_pos(PromoteOnShutdown); + E -> E + end; E -> E end; _ -> {error, "~p is not a valid ha-mode value", [Mode]} @@ -398,3 +406,12 @@ validate_sync_mode(SyncMode) -> Mode -> {error, "ha-sync-mode must be \"manual\" " "or \"automatic\", got ~p", [Mode]} end. + +validate_pos(PromoteOnShutdown) -> + case PromoteOnShutdown of + <<"always">> -> ok; + <<"when-synced">> -> ok; + none -> ok; + Mode -> {error, "ha-promote-on-shutdown must be " + "\"always\" or \"when-synced\", got ~p", [Mode]} + end. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 58e93a3f9e..18c07f86f1 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -81,7 +81,7 @@ -ifdef(use_specs). --export_type([resource_name/0, thunk/1]). +-export_type([resource_name/0, thunk/1, channel_or_connection_exit/0]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). |
