summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2016-01-12 14:31:56 +0300
committerMichael Klishin <michael@clojurewerkz.org>2016-01-12 14:31:56 +0300
commit577183deed496b46d9c491d3ef6ef5d3c313b879 (patch)
treea1a6bab1e63024b7bf7ca9713b3097a6bd67a37f /src
parentce484f2fa970b9fe7fe542ccedbfef7564b9de40 (diff)
parent1dcaad8f4859a04d2e6e427e9aaf4e948bdd417e (diff)
downloadrabbitmq-server-git-577183deed496b46d9c491d3ef6ef5d3c313b879.tar.gz
Merge branch 'stable' into rabbitmq-server-528
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_control_main.erl13
-rw-r--r--src/rabbit_exchange.erl86
-rw-r--r--src/rabbit_exchange_parameters.erl49
-rw-r--r--src/rabbit_runtime_parameters.erl13
4 files changed, 129 insertions, 32 deletions
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 71234fb6a0..c6e39b17f5 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -365,7 +365,10 @@ action(status, Node, [], _Opts, Inform) ->
action(cluster_status, Node, [], _Opts, Inform) ->
Inform("Cluster status of node ~p", [Node]),
- display_call_result(Node, {rabbit_mnesia, status, []});
+ Status = unsafe_rpc(Node, rabbit_mnesia, status, []),
+ io:format("~p~n", [Status ++ [{alarms,
+ [alarms_by_node(Name) || Name <- nodes_in_cluster(Node)]}]]),
+ ok;
action(environment, Node, _App, _Opts, Inform) ->
Inform("Application environment of node ~p", [Node]),
@@ -878,3 +881,11 @@ prettify_typed_amqp_value(_Type, Value) -> Value.
split_list([]) -> [];
split_list([_]) -> exit(even_list_needed);
split_list([A, B | T]) -> [{A, B} | split_list(T)].
+
+nodes_in_cluster(Node) ->
+ unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]).
+
+alarms_by_node(Name) ->
+ Status = unsafe_rpc(Name, rabbit, status, []),
+ {_, As} = lists:keyfind(alarms, 1, Status),
+ {Name, As}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 9502f3a78a..2e9afbfd2e 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -166,24 +166,37 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
XT = type_to_module(Type),
%% We want to upset things if it isn't ok
ok = XT:validate(X),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_exchange, XName}) of
- [] ->
- {new, store(X)};
- [ExistingX] ->
- {existing, ExistingX}
- end
- end,
- fun ({new, Exchange}, Tx) ->
- ok = callback(X, create, map_create_tx(Tx), [Exchange]),
- rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
- Exchange;
- ({existing, Exchange}, _Tx) ->
- Exchange;
- (Err, _Tx) ->
- Err
- end).
+ %% Avoid a channel exception if there's a race condition
+ %% with an exchange.delete operation.
+ %%
+ %% See rabbitmq/rabbitmq-federation#7.
+ case rabbit_runtime_parameters:lookup(XName#resource.virtual_host,
+ ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
+ XName#resource.name) of
+ not_found ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_exchange, XName}) of
+ [] ->
+ {new, store(X)};
+ [ExistingX] ->
+ {existing, ExistingX}
+ end
+ end,
+ fun ({new, Exchange}, Tx) ->
+ ok = callback(X, create, map_create_tx(Tx), [Exchange]),
+ rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
+ Exchange;
+ ({existing, Exchange}, _Tx) ->
+ Exchange;
+ (Err, _Tx) ->
+ Err
+ end);
+ _ ->
+ rabbit_log:warning("ignoring exchange.declare for exchange ~p,
+ exchange.delete in progress~n.", [XName]),
+ X
+ end.
map_create_tx(true) -> transaction;
map_create_tx(false) -> none.
@@ -427,18 +440,31 @@ delete(XName, IfUnused) ->
true -> fun conditional_delete/2;
false -> fun unconditional_delete/2
end,
- call_with_exchange(
- XName,
- fun (X) ->
- case Fun(X, false) of
- {deleted, X, Bs, Deletions} ->
- rabbit_binding:process_deletions(
- rabbit_binding:add_deletion(
- XName, {X, deleted, Bs}, Deletions));
- {error, _InUseOrNotFound} = E ->
- rabbit_misc:const(E)
- end
- end).
+ try
+ %% guard exchange.declare operations from failing when there's
+ %% a race condition between it and an exchange.delete.
+ %%
+ %% see rabbitmq/rabbitmq-federation#7
+ rabbit_runtime_parameters:set(XName#resource.virtual_host,
+ ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
+ XName#resource.name, true, none),
+ call_with_exchange(
+ XName,
+ fun (X) ->
+ case Fun(X, false) of
+ {deleted, X, Bs, Deletions} ->
+ rabbit_binding:process_deletions(
+ rabbit_binding:add_deletion(
+ XName, {X, deleted, Bs}, Deletions));
+ {error, _InUseOrNotFound} = E ->
+ rabbit_misc:const(E)
+ end
+ end)
+ after
+ rabbit_runtime_parameters:clear(XName#resource.virtual_host,
+ ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
+ XName#resource.name)
+ end.
validate_binding(X = #exchange{type = XType}, Binding) ->
Module = type_to_module(XType),
diff --git a/src/rabbit_exchange_parameters.erl b/src/rabbit_exchange_parameters.erl
new file mode 100644
index 0000000000..c0ca0a985b
--- /dev/null
+++ b/src/rabbit_exchange_parameters.erl
@@ -0,0 +1,49 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_exchange_parameters).
+
+-behaviour(rabbit_runtime_parameter).
+
+-include("rabbit.hrl").
+
+-export([register/0]).
+-export([validate/5, notify/4, notify_clear/3]).
+
+-import(rabbit_misc, [pget/2]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "exchange parameters"},
+ {mfa, {rabbit_exchange_parameters, register, []}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
+
+register() ->
+ rabbit_registry:register(runtime_parameter,
+ ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, ?MODULE),
+ %% ensure there are no leftovers from before node restart/crash
+ rabbit_runtime_parameters:clear_component(
+ ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT),
+ ok.
+
+validate(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name, _Term, _User) ->
+ ok.
+
+notify(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name, _Term) ->
+ ok.
+
+notify_clear(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name) ->
+ ok.
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index a9e401ed28..ba1a830df1 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -51,7 +51,7 @@
-export([parse_set/5, set/5, set_any/5, clear/3, clear_any/3, list/0, list/1,
list_component/1, list/2, list_formatted/1, list_formatted/3,
- lookup/3, value/3, value/4, info_keys/0]).
+ lookup/3, value/3, value/4, info_keys/0, clear_component/1]).
-export([set_global/2, value_global/1, value_global/2]).
@@ -171,6 +171,17 @@ clear(_, <<"policy">> , _) ->
clear(VHost, Component, Name) ->
clear_any(VHost, Component, Name).
+clear_component(Component) ->
+ case rabbit_runtime_parameters:list_component(Component) of
+ [] ->
+ ok;
+ Xs ->
+ [rabbit_runtime_parameters:clear(pget(vhost, X),
+ pget(component, X),
+ pget(name, X))|| X <- Xs],
+ ok
+ end.
+
clear_any(VHost, Component, Name) ->
Notify = fun () ->
case lookup_component(Component) of