summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2016-08-18 12:17:34 +0300
committerMichael Klishin <mklishin@pivotal.io>2016-08-18 12:17:34 +0300
commit754feab6936ff1954964bf04423a2758899b91dc (patch)
treed4d72c1ef7e84d994c52805a955e38d6967efd99 /src
parentdad8511a6409816d3d8012fdb70213122bb690e4 (diff)
parent9f1c12f8ab553aab2da435f82a805da8b08c9149 (diff)
downloadrabbitmq-server-git-754feab6936ff1954964bf04423a2758899b91dc.tar.gz
Merge branch 'master' into rabbitmq-server-500-squashed
Conflicts: src/rabbit_control_main.erl
Diffstat (limited to 'src')
-rw-r--r--src/gm.erl5
-rw-r--r--src/rabbit_autoheal.erl11
-rw-r--r--src/rabbit_cli.erl35
-rw-r--r--src/rabbit_control_main.erl73
4 files changed, 84 insertions, 40 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 3554f01d56..985b5bfa6d 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -757,7 +757,10 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
catch
lost_membership ->
{stop, normal, State}
- end.
+ end;
+handle_info(_, State) ->
+ %% Discard any unexpected messages, such as late replies from neighbour_call/2
+ noreply(State).
terminate(Reason, #state { module = Module, callback_args = Args }) ->
Module:handle_terminate(Args, Reason).
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl
index 5865ba8227..db4d41221e 100644
--- a/src/rabbit_autoheal.erl
+++ b/src/rabbit_autoheal.erl
@@ -297,6 +297,17 @@ winner_finish(Notify) ->
send(leader(), {autoheal_finished, node()}),
not_healing.
+%% XXX This can enter infinite loop, if mnesia was somehow restarted
+%% outside of our control - i.e. somebody started app back by hand or
+%% completely restarted node. One possible solution would be something
+%% like this (but it needs some more pondering and is left for some
+%% other patch):
+%% - monitor top-level mnesia supervisors of all losers
+%% - notify loosers about the fact that they are indeed loosers
+%% - wait for all monitors to go 'DOWN' (+ maybe some timeout on the whole process)
+%% - do one round of parallel rpc calls to check whether mnesia is still stoppend on all
+%% loosers
+%% - If everything is still stopped, continue autoheall process. Or cancel it otherwise.
wait_for_mnesia_shutdown([Node | Rest] = AllNodes) ->
case rpc:call(Node, mnesia, system_info, [is_running]) of
no ->
diff --git a/src/rabbit_cli.erl b/src/rabbit_cli.erl
index 862725f550..65e8563ddf 100644
--- a/src/rabbit_cli.erl
+++ b/src/rabbit_cli.erl
@@ -18,7 +18,7 @@
-include("rabbit_cli.hrl").
-export([main/3, start_distribution/0, start_distribution/1,
- parse_arguments/4, filter_opts/2,
+ parse_arguments/4, mutually_exclusive_flags/3,
rpc_call/4, rpc_call/5, rpc_call/7]).
%%----------------------------------------------------------------------------
@@ -42,8 +42,7 @@
[{string(), optdef()}], string(), [string()]) ->
parse_result().
--spec filter_opts([{option_name(), option_value()}], [option_name()]) ->
- [boolean()].
+-spec mutually_exclusive_flags([{option_name(), option_value()}], term(), [{option_name(), term()}]) -> {ok, term()} | {error, string()}.
-spec rpc_call(node(), atom(), atom(), [any()]) -> any().
-spec rpc_call(node(), atom(), atom(), [any()], number()) -> any().
@@ -266,20 +265,22 @@ process_opts(Defs, C, [A | As], Found, KVs, Outs) ->
{none, _, _} -> no_command
end.
-%% When we have a set of flags that are used for filtering, we want by
-%% default to include every such option in our output. But if a user
-%% explicitly specified any such flag, we want to include only items
-%% which he has requested.
-filter_opts(CurrentOptionValues, AllOptionNames) ->
- Explicit = lists:map(fun(OptName) ->
- proplists:get_bool(OptName, CurrentOptionValues)
- end,
- AllOptionNames),
- case lists:member(true, Explicit) of
- true ->
- Explicit;
- false ->
- lists:duplicate(length(AllOptionNames), true)
+mutually_exclusive_flags(CurrentOptionValues, Default, FlagsAndValues) ->
+ PresentFlags = lists:filtermap(fun({OptName, _} = _O) ->
+ proplists:get_bool(OptName, CurrentOptionValues)
+ end,
+ FlagsAndValues),
+ case PresentFlags of
+ [] ->
+ {ok, Default};
+ [{_, Value}] ->
+ {ok, Value};
+ _ ->
+ Names = [ [$', N, $'] || {N, _} <- PresentFlags ],
+ CommaSeparated = string:join(lists:droplast(Names), ", "),
+ AndOneMore = lists:last(Names),
+ Msg = io_lib:format("Options ~s and ~s are mutually exclusive", [CommaSeparated, AndOneMore]),
+ {error, lists:flatten(Msg)}
end.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index ae98e51443..0e99ac0f7e 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -76,8 +76,7 @@
{set_vhost_limits, [?VHOST_DEF]},
{clear_vhost_limits, [?VHOST_DEF]},
-
- {list_queues, [?VHOST_DEF, ?OFFLINE_DEF, ?ONLINE_DEF]},
+ {list_queues, [?VHOST_DEF, ?OFFLINE_DEF, ?ONLINE_DEF, ?LOCAL_DEF]},
{list_exchanges, [?VHOST_DEF]},
{list_bindings, [?VHOST_DEF]},
{list_connections, [?VHOST_DEF]},
@@ -647,26 +646,47 @@ action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout)
[{timeout, Timeout}, to_bin_utf8, is_escaped]);
action(list_queues, Node, Args, Opts, Inform, Timeout) ->
- Inform("Listing queues", []),
- %% User options
- [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- ArgAtoms = default_if_empty(Args, [name, messages]),
-
- %% Data for emission
- Nodes = nodes_in_cluster(Node, Timeout),
- OnlineChunks = if Online -> length(Nodes); true -> 0 end,
- OfflineChunks = if Offline -> 1; true -> 0 end,
- ChunksOpt = {chunks, OnlineChunks + OfflineChunks},
- TimeoutOpt = {timeout, Timeout},
- EmissionRef = make_ref(),
- EmissionRefOpt = {ref, EmissionRef},
-
- _ = Online andalso start_emission(Node, {rabbit_amqqueue, emit_info_all, [Nodes, VHostArg, ArgAtoms]},
- [TimeoutOpt, EmissionRefOpt]),
- _ = Offline andalso start_emission(Node, {rabbit_amqqueue, emit_info_down, [VHostArg, ArgAtoms]},
- [TimeoutOpt, EmissionRefOpt]),
- display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]);
+ case rabbit_cli:mutually_exclusive_flags(
+ Opts, all, [{?ONLINE_OPT, online}
+ ,{?OFFLINE_OPT, offline}
+ ,{?LOCAL_OPT, local}]) of
+ {ok, Filter} ->
+ Inform("Listing queues", []),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ ArgAtoms = default_if_empty(Args, [name, messages]),
+
+ %% Data for emission
+ Nodes = nodes_in_cluster(Node, Timeout),
+ ChunksOpt = {chunks, get_number_of_chunks(Filter, Nodes)},
+ TimeoutOpt = {timeout, Timeout},
+ EmissionRef = make_ref(),
+ EmissionRefOpt = {ref, EmissionRef},
+
+ case Filter of
+ all ->
+ start_emission(Node, {rabbit_amqqueue, emit_info_all,
+ [Nodes, VHostArg, ArgAtoms]},
+ [TimeoutOpt, EmissionRefOpt]),
+ start_emission(Node, {rabbit_amqqueue, emit_info_down,
+ [VHostArg, ArgAtoms]},
+ [TimeoutOpt, EmissionRefOpt]);
+ online ->
+ start_emission(Node, {rabbit_amqqueue, emit_info_all,
+ [Nodes, VHostArg, ArgAtoms]},
+ [TimeoutOpt, EmissionRefOpt]);
+ offline ->
+ start_emission(Node, {rabbit_amqqueue, emit_info_down,
+ [VHostArg, ArgAtoms]},
+ [TimeoutOpt, EmissionRefOpt]);
+ local ->
+ start_emission(Node, {rabbit_amqqueue, emit_info_local,
+ [VHostArg, ArgAtoms]},
+ [TimeoutOpt, EmissionRefOpt])
+ end,
+ display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]);
+ {error, ErrStr} ->
+ {error_string, ErrStr}
+ end;
action(list_exchanges, Node, Args, Opts, Inform, Timeout) ->
Inform("Listing exchanges", []),
@@ -1008,3 +1028,12 @@ alarms_by_node(Name) ->
{_, As} = lists:keyfind(alarms, 1, Status),
{Name, As}
end.
+
+get_number_of_chunks(all, Nodes) ->
+ length(Nodes) + 1;
+get_number_of_chunks(online, Nodes) ->
+ length(Nodes);
+get_number_of_chunks(offline, _) ->
+ 1;
+get_number_of_chunks(local, _) ->
+ 1.