diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2016-01-12 14:31:56 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2016-01-12 14:31:56 +0300 |
| commit | 577183deed496b46d9c491d3ef6ef5d3c313b879 (patch) | |
| tree | a1a6bab1e63024b7bf7ca9713b3097a6bd67a37f /src | |
| parent | ce484f2fa970b9fe7fe542ccedbfef7564b9de40 (diff) | |
| parent | 1dcaad8f4859a04d2e6e427e9aaf4e948bdd417e (diff) | |
| download | rabbitmq-server-git-577183deed496b46d9c491d3ef6ef5d3c313b879.tar.gz | |
Merge branch 'stable' into rabbitmq-server-528
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_control_main.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 86 | ||||
| -rw-r--r-- | src/rabbit_exchange_parameters.erl | 49 | ||||
| -rw-r--r-- | src/rabbit_runtime_parameters.erl | 13 |
4 files changed, 129 insertions, 32 deletions
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 71234fb6a0..c6e39b17f5 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -365,7 +365,10 @@ action(status, Node, [], _Opts, Inform) -> action(cluster_status, Node, [], _Opts, Inform) -> Inform("Cluster status of node ~p", [Node]), - display_call_result(Node, {rabbit_mnesia, status, []}); + Status = unsafe_rpc(Node, rabbit_mnesia, status, []), + io:format("~p~n", [Status ++ [{alarms, + [alarms_by_node(Name) || Name <- nodes_in_cluster(Node)]}]]), + ok; action(environment, Node, _App, _Opts, Inform) -> Inform("Application environment of node ~p", [Node]), @@ -878,3 +881,11 @@ prettify_typed_amqp_value(_Type, Value) -> Value. split_list([]) -> []; split_list([_]) -> exit(even_list_needed); split_list([A, B | T]) -> [{A, B} | split_list(T)]. + +nodes_in_cluster(Node) -> + unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]). + +alarms_by_node(Name) -> + Status = unsafe_rpc(Name, rabbit, status, []), + {_, As} = lists:keyfind(alarms, 1, Status), + {Name, As}. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 9502f3a78a..2e9afbfd2e 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -166,24 +166,37 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> XT = type_to_module(Type), %% We want to upset things if it isn't ok ok = XT:validate(X), - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_exchange, XName}) of - [] -> - {new, store(X)}; - [ExistingX] -> - {existing, ExistingX} - end - end, - fun ({new, Exchange}, Tx) -> - ok = callback(X, create, map_create_tx(Tx), [Exchange]), - rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)), - Exchange; - ({existing, Exchange}, _Tx) -> - Exchange; - (Err, _Tx) -> - Err - end). + %% Avoid a channel exception if there's a race condition + %% with an exchange.delete operation. + %% + %% See rabbitmq/rabbitmq-federation#7. + case rabbit_runtime_parameters:lookup(XName#resource.virtual_host, + ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, + XName#resource.name) of + not_found -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_exchange, XName}) of + [] -> + {new, store(X)}; + [ExistingX] -> + {existing, ExistingX} + end + end, + fun ({new, Exchange}, Tx) -> + ok = callback(X, create, map_create_tx(Tx), [Exchange]), + rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)), + Exchange; + ({existing, Exchange}, _Tx) -> + Exchange; + (Err, _Tx) -> + Err + end); + _ -> + rabbit_log:warning("ignoring exchange.declare for exchange ~p, + exchange.delete in progress~n.", [XName]), + X + end. map_create_tx(true) -> transaction; map_create_tx(false) -> none. @@ -427,18 +440,31 @@ delete(XName, IfUnused) -> true -> fun conditional_delete/2; false -> fun unconditional_delete/2 end, - call_with_exchange( - XName, - fun (X) -> - case Fun(X, false) of - {deleted, X, Bs, Deletions} -> - rabbit_binding:process_deletions( - rabbit_binding:add_deletion( - XName, {X, deleted, Bs}, Deletions)); - {error, _InUseOrNotFound} = E -> - rabbit_misc:const(E) - end - end). + try + %% guard exchange.declare operations from failing when there's + %% a race condition between it and an exchange.delete. + %% + %% see rabbitmq/rabbitmq-federation#7 + rabbit_runtime_parameters:set(XName#resource.virtual_host, + ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, + XName#resource.name, true, none), + call_with_exchange( + XName, + fun (X) -> + case Fun(X, false) of + {deleted, X, Bs, Deletions} -> + rabbit_binding:process_deletions( + rabbit_binding:add_deletion( + XName, {X, deleted, Bs}, Deletions)); + {error, _InUseOrNotFound} = E -> + rabbit_misc:const(E) + end + end) + after + rabbit_runtime_parameters:clear(XName#resource.virtual_host, + ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, + XName#resource.name) + end. validate_binding(X = #exchange{type = XType}, Binding) -> Module = type_to_module(XType), diff --git a/src/rabbit_exchange_parameters.erl b/src/rabbit_exchange_parameters.erl new file mode 100644 index 0000000000..c0ca0a985b --- /dev/null +++ b/src/rabbit_exchange_parameters.erl @@ -0,0 +1,49 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_exchange_parameters). + +-behaviour(rabbit_runtime_parameter). + +-include("rabbit.hrl"). + +-export([register/0]). +-export([validate/5, notify/4, notify_clear/3]). + +-import(rabbit_misc, [pget/2]). + +-rabbit_boot_step({?MODULE, + [{description, "exchange parameters"}, + {mfa, {rabbit_exchange_parameters, register, []}}, + {requires, rabbit_registry}, + {enables, recovery}]}). + +register() -> + rabbit_registry:register(runtime_parameter, + ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, ?MODULE), + %% ensure there are no leftovers from before node restart/crash + rabbit_runtime_parameters:clear_component( + ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT), + ok. + +validate(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name, _Term, _User) -> + ok. + +notify(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name, _Term) -> + ok. + +notify_clear(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name) -> + ok. diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index a9e401ed28..ba1a830df1 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -51,7 +51,7 @@ -export([parse_set/5, set/5, set_any/5, clear/3, clear_any/3, list/0, list/1, list_component/1, list/2, list_formatted/1, list_formatted/3, - lookup/3, value/3, value/4, info_keys/0]). + lookup/3, value/3, value/4, info_keys/0, clear_component/1]). -export([set_global/2, value_global/1, value_global/2]). @@ -171,6 +171,17 @@ clear(_, <<"policy">> , _) -> clear(VHost, Component, Name) -> clear_any(VHost, Component, Name). +clear_component(Component) -> + case rabbit_runtime_parameters:list_component(Component) of + [] -> + ok; + Xs -> + [rabbit_runtime_parameters:clear(pget(vhost, X), + pget(component, X), + pget(name, X))|| X <- Xs], + ok + end. + clear_any(VHost, Component, Name) -> Notify = fun () -> case lookup_component(Component) of |
