diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_alarm.erl | 104 | ||||
| -rw-r--r-- | src/rabbit_cli.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_ctl_misc.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_disk_monitor.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 4 |
6 files changed, 142 insertions, 23 deletions
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 557fa31335..111f780076 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -13,6 +13,17 @@ %% The Initial Developer of the Original Code is GoPivotal, Inc. %% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. %% +%% There are two types of alarms handled by this module: +%% +%% * per-node resource (disk, memory) alarms for the whole cluster. If any node +%% has an alarm, then all publishing should be disabled througout the +%% cluster until all alarms clear. When a node sets such an alarm, +%% this information is automatically propagated throughout the cluster. +%% `#alarms.alarmed_nodes' is being used to track this type of alarms. +%% * limits local to this node (file_descriptor_limit). Used for information +%% purposes only: logging and getting node status. This information is not propagated +%% throughout the cluster. `#alarms.alarms' is being used to track this type of alarms. +%% @end -module(rabbit_alarm). @@ -28,20 +39,32 @@ -define(SERVER, ?MODULE). --record(alarms, {alertees, alarmed_nodes, alarms}). - %%---------------------------------------------------------------------------- -ifdef(use_specs). +-record(alarms, {alertees :: dict:dict(pid(), rabbit_types:mfargs()), + alarmed_nodes :: dict:dict(node(), [resource_alarm_source()]), + alarms :: [alarm()]}). + +-type(local_alarm() :: 'file_descriptor_limit'). +-type(resource_alarm_source() :: 'disk' | 'node'). +-type(resource_alarm() :: {resource_limit, resource_alarm_source(), node()}). +-type(alarm() :: local_alarm() | resource_alarm()). + -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(register/2 :: (pid(), rabbit_types:mfargs()) -> [atom()]). --spec(set_alarm/1 :: (any()) -> 'ok'). --spec(clear_alarm/1 :: (any()) -> 'ok'). +-spec(set_alarm/1 :: ({alarm(), []}) -> 'ok'). +-spec(clear_alarm/1 :: (alarm()) -> 'ok'). -spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). +-spec(get_alarms/0 :: () -> [{alarm(), []}]). + +-else. + +-record(alarms, {alertees, alarmed_nodes, alarms}). -endif. @@ -68,6 +91,10 @@ start() -> stop() -> ok. +%% Registers a handler that should be called on every resource alarm change. +%% Given a call rabbit_alarm:register(Pid, {M, F, A}), the handler would be +%% called like this: `apply(M, F, A ++ [Pid, Source, Alert])', where `Source' +%% has the type of resource_alarm_source() and `Alert' has the type of resource_alert(). register(Pid, AlertMFA) -> gen_event:call(?SERVER, ?MODULE, {register, Pid, AlertMFA}, infinity). @@ -79,10 +106,10 @@ get_alarms() -> gen_event:call(?SERVER, ?MODULE, get_alarms, infinity). on_node_up(Node) -> gen_event:notify(?SERVER, {node_up, Node}). on_node_down(Node) -> gen_event:notify(?SERVER, {node_down, Node}). -remote_conserve_resources(Pid, Source, true) -> +remote_conserve_resources(Pid, Source, {true, _, _}) -> gen_event:notify({?SERVER, node(Pid)}, {set_alarm, {{resource_limit, Source, node()}, []}}); -remote_conserve_resources(Pid, Source, false) -> +remote_conserve_resources(Pid, Source, {false, _, _}) -> gen_event:notify({?SERVER, node(Pid)}, {clear_alarm, {resource_limit, Source, node()}}). @@ -98,12 +125,17 @@ handle_call({register, Pid, AlertMFA}, State = #alarms{alarmed_nodes = AN}) -> {ok, lists:usort(lists:append([V || {_, V} <- dict:to_list(AN)])), internal_register(Pid, AlertMFA, State)}; -handle_call(get_alarms, State = #alarms{alarms = Alarms}) -> - {ok, Alarms, State}; +handle_call(get_alarms, State) -> + {ok, get_alarms(State), State}; handle_call(_Request, State) -> {ok, not_understood, State}. +handle_event({set_alarm, {{resource_limit, Source, Node}, []}}, State) -> + case is_node_alarmed(Source, Node, State) of + true -> {ok, State}; + false -> handle_set_resource_alarm(Source, Node, State) + end; handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) -> case lists:member(Alarm, Alarms) of true -> {ok, State}; @@ -111,6 +143,13 @@ handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) -> handle_set_alarm(Alarm, State#alarms{alarms = UpdatedAlarms}) end; +handle_event({clear_alarm, {resource_limit, Source, Node}}, State) -> + case is_node_alarmed(Source, Node, State) of + true -> + handle_clear_resource_alarm(Source, Node, State); + false -> + {ok, State} + end; handle_event({clear_alarm, Alarm}, State = #alarms{alarms = Alarms}) -> case lists:keymember(Alarm, 1, Alarms) of true -> handle_clear_alarm( @@ -127,8 +166,16 @@ handle_event({node_up, Node}, State) -> {register, self(), {?MODULE, remote_conserve_resources, []}}), {ok, State}; -handle_event({node_down, Node}, State) -> - {ok, maybe_alert(fun dict_unappend_all/3, Node, [], false, State)}; +handle_event({node_down, Node}, #alarms{alarmed_nodes = AN} = State) -> + AlarmsForDeadNode = case dict:find(Node, AN) of + {ok, V} -> V; + error -> [] + end, + {ok, lists:foldr(fun(Source, AccState) -> + rabbit_log:warning("~s resource limit alarm cleared for dead node ~p~n", + [Source, Node]), + maybe_alert(fun dict_unappend/3, Node, Source, false, AccState) + end, State, AlarmsForDeadNode)}; handle_event({register, Pid, AlertMFA}, State) -> {ok, internal_register(Pid, AlertMFA, State)}; @@ -158,9 +205,6 @@ dict_append(Key, Val, Dict) -> end, dict:store(Key, lists:usort([Val|L]), Dict). -dict_unappend_all(Key, _Val, Dict) -> - dict:erase(Key, Dict). - dict_unappend(Key, Val, Dict) -> L = case dict:find(Key, Dict) of {ok, V} -> V; @@ -172,10 +216,17 @@ dict_unappend(Key, Val, Dict) -> X -> dict:store(Key, X, Dict) end. -maybe_alert(UpdateFun, Node, Source, Alert, +maybe_alert(UpdateFun, Node, Source, WasAlertAdded, State = #alarms{alarmed_nodes = AN, alertees = Alertees}) -> AN1 = UpdateFun(Node, Source, AN), + %% Is alarm for Source still set on any node? + StillHasAlerts = lists:any(fun ({_Node, NodeAlerts}) -> lists:member(Source, NodeAlerts) end, dict:to_list(AN1)), + case StillHasAlerts of + true -> ok; + false -> rabbit_log:warning("~s resource limit alarm cleared across the cluster~n", [Source]) + end, + Alert = {WasAlertAdded, StillHasAlerts, Node}, case node() of Node -> ok = alert_remote(Alert, Alertees, Source); _ -> ok @@ -202,20 +253,21 @@ internal_register(Pid, {M, F, A} = AlertMFA, State = #alarms{alertees = Alertees}) -> _MRef = erlang:monitor(process, Pid), case dict:find(node(), State#alarms.alarmed_nodes) of - {ok, Sources} -> [apply(M, F, A ++ [Pid, R, true]) || R <- Sources]; + {ok, Sources} -> [apply(M, F, A ++ [Pid, R, {true, true, node()}]) || R <- Sources]; error -> ok end, NewAlertees = dict:store(Pid, AlertMFA, Alertees), State#alarms{alertees = NewAlertees}. -handle_set_alarm({{resource_limit, Source, Node}, []}, State) -> +handle_set_resource_alarm(Source, Node, State) -> rabbit_log:warning( "~s resource limit alarm set on node ~p.~n~n" "**********************************************************~n" "*** Publishers will be blocked until this alarm clears ***~n" "**********************************************************~n", [Source, Node]), - {ok, maybe_alert(fun dict_append/3, Node, Source, true, State)}; + {ok, maybe_alert(fun dict_append/3, Node, Source, true, State)}. + handle_set_alarm({file_descriptor_limit, []}, State) -> rabbit_log:warning( "file descriptor limit alarm set.~n~n" @@ -227,13 +279,27 @@ handle_set_alarm(Alarm, State) -> rabbit_log:warning("alarm '~p' set~n", [Alarm]), {ok, State}. -handle_clear_alarm({resource_limit, Source, Node}, State) -> +handle_clear_resource_alarm(Source, Node, State) -> rabbit_log:warning("~s resource limit alarm cleared on node ~p~n", [Source, Node]), - {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)}; + {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)}. + handle_clear_alarm(file_descriptor_limit, State) -> rabbit_log:warning("file descriptor limit alarm cleared~n"), {ok, State}; handle_clear_alarm(Alarm, State) -> rabbit_log:warning("alarm '~p' cleared~n", [Alarm]), {ok, State}. + +is_node_alarmed(Source, Node, #alarms{alarmed_nodes = AN}) -> + case dict:find(Node, AN) of + {ok, Sources} -> + lists:member(Source, Sources); + error -> + false + end. + +get_alarms(#alarms{alarms = Alarms, + alarmed_nodes = AN}) -> + Alarms ++ [ {{resource_limit, Source, Node}, []} + || {Node, Sources} <- dict:to_list(AN), Source <- Sources ]. diff --git a/src/rabbit_cli.erl b/src/rabbit_cli.erl index e4b3449b0b..522e91120e 100644 --- a/src/rabbit_cli.erl +++ b/src/rabbit_cli.erl @@ -69,6 +69,9 @@ main(ParseFun, DoFun, UsageMod) -> case catch DoFun(Command, Node, Args, Opts) of ok -> rabbit_misc:quit(0); + {ok, Result} -> + rabbit_ctl_misc:print_cmd_result(Command, Result), + rabbit_misc:quit(0); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> %% < R15 PrintInvalidCommandError(), usage(UsageMod); @@ -113,6 +116,9 @@ main(ParseFun, DoFun, UsageMod) -> print_error("operation ~w used with invalid parameter: ~p", [Command, Args]), usage(UsageMod); + {refused, Username, _, _} -> + print_error("failed to authenticate user \"~s\"", [Username]), + rabbit_misc:quit(2); Other -> print_error("~p", [Other]), rabbit_misc:quit(2) diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index d06d59f572..2799d510d0 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -52,6 +52,7 @@ delete_user, change_password, clear_password, + authenticate_user, set_user_tags, list_users, @@ -379,6 +380,10 @@ action(clear_password, Node, Args = [Username], _Opts, Inform) -> Inform("Clearing password for user \"~s\"", [Username]), call(Node, {rabbit_auth_backend_internal, clear_password, Args}); +action(authenticate_user, Node, Args = [Username, _Password], _Opts, Inform) -> + Inform("Authenticating user \"~s\"", [Username]), + call(Node, {rabbit_access_control, check_user_pass_login, Args}); + action(set_user_tags, Node, [Username | TagsStr], _Opts, Inform) -> Tags = [list_to_atom(T) || T <- TagsStr], Inform("Setting tags for user \"~s\" to ~p", [Username, Tags]), diff --git a/src/rabbit_ctl_misc.erl b/src/rabbit_ctl_misc.erl new file mode 100644 index 0000000000..92ae111028 --- /dev/null +++ b/src/rabbit_ctl_misc.erl @@ -0,0 +1,31 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_ctl_misc). + +-export([print_cmd_result/2]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(print_cmd_result/2 :: (atom(), term()) -> string()). + +-endif. + +%%---------------------------------------------------------------------------- + +print_cmd_result(authenticate_user, _Result) -> io:format("Success~n"). diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index 68f6095176..3e9171b79a 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -61,7 +61,10 @@ %% timer that drives periodic checks timer, %% is free disk space alarm currently in effect? - alarmed + alarmed, + %% is monitoring enabled? false on unsupported + %% platforms + enabled }). %%---------------------------------------------------------------------------- @@ -117,7 +120,8 @@ init([Limit]) -> State = #state{dir = Dir, min_interval = ?DEFAULT_MIN_DISK_CHECK_INTERVAL, max_interval = ?DEFAULT_MAX_DISK_CHECK_INTERVAL, - alarmed = false}, + alarmed = false, + enabled = true}, case {catch get_disk_free(Dir), vm_memory_monitor:get_total_memory()} of {N1, N2} when is_integer(N1), is_integer(N2) -> @@ -125,12 +129,17 @@ init([Limit]) -> Err -> rabbit_log:info("Disabling disk free space monitoring " "on unsupported platform:~n~p~n", [Err]), - {stop, unsupported_platform} + {ok, State#state{enabled = false}} end. handle_call(get_disk_free_limit, _From, State = #state{limit = Limit}) -> {reply, Limit, State}; +handle_call({set_disk_free_limit, _}, _From, #state{enabled = false} = State) -> + rabbit_log:info("Cannot set disk free limit: " + "disabled disk free space monitoring", []), + {reply, ok, State}; + handle_call({set_disk_free_limit, Limit}, _From, State) -> {reply, ok, set_disk_limits(State, Limit)}; diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 24b3ae9af4..380d950d05 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -86,7 +86,9 @@ report_ram_duration(Pid, QueueDuration) -> stop() -> gen_server2:cast(?SERVER, stop). -conserve_resources(Pid, disk, Conserve) -> +%% Paging should be enabled/disabled only in response to disk resource alarms +%% for the current node. +conserve_resources(Pid, disk, {_, Conserve, Node}) when node(Pid) =:= Node -> gen_server2:cast(Pid, {disk_alarm, Conserve}); conserve_resources(_Pid, _Source, _Conserve) -> ok. |
