summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_alarm.erl104
-rw-r--r--src/rabbit_cli.erl6
-rw-r--r--src/rabbit_control_main.erl5
-rw-r--r--src/rabbit_ctl_misc.erl31
-rw-r--r--src/rabbit_disk_monitor.erl15
-rw-r--r--src/rabbit_memory_monitor.erl4
6 files changed, 142 insertions, 23 deletions
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 557fa31335..111f780076 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -13,6 +13,17 @@
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
%%
+%% There are two types of alarms handled by this module:
+%%
+%% * per-node resource (disk, memory) alarms for the whole cluster. If any node
+%% has an alarm, then all publishing should be disabled througout the
+%% cluster until all alarms clear. When a node sets such an alarm,
+%% this information is automatically propagated throughout the cluster.
+%% `#alarms.alarmed_nodes' is being used to track this type of alarms.
+%% * limits local to this node (file_descriptor_limit). Used for information
+%% purposes only: logging and getting node status. This information is not propagated
+%% throughout the cluster. `#alarms.alarms' is being used to track this type of alarms.
+%% @end
-module(rabbit_alarm).
@@ -28,20 +39,32 @@
-define(SERVER, ?MODULE).
--record(alarms, {alertees, alarmed_nodes, alarms}).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+-record(alarms, {alertees :: dict:dict(pid(), rabbit_types:mfargs()),
+ alarmed_nodes :: dict:dict(node(), [resource_alarm_source()]),
+ alarms :: [alarm()]}).
+
+-type(local_alarm() :: 'file_descriptor_limit').
+-type(resource_alarm_source() :: 'disk' | 'node').
+-type(resource_alarm() :: {resource_limit, resource_alarm_source(), node()}).
+-type(alarm() :: local_alarm() | resource_alarm()).
+
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> [atom()]).
--spec(set_alarm/1 :: (any()) -> 'ok').
--spec(clear_alarm/1 :: (any()) -> 'ok').
+-spec(set_alarm/1 :: ({alarm(), []}) -> 'ok').
+-spec(clear_alarm/1 :: (alarm()) -> 'ok').
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
+-spec(get_alarms/0 :: () -> [{alarm(), []}]).
+
+-else.
+
+-record(alarms, {alertees, alarmed_nodes, alarms}).
-endif.
@@ -68,6 +91,10 @@ start() ->
stop() -> ok.
+%% Registers a handler that should be called on every resource alarm change.
+%% Given a call rabbit_alarm:register(Pid, {M, F, A}), the handler would be
+%% called like this: `apply(M, F, A ++ [Pid, Source, Alert])', where `Source'
+%% has the type of resource_alarm_source() and `Alert' has the type of resource_alert().
register(Pid, AlertMFA) ->
gen_event:call(?SERVER, ?MODULE, {register, Pid, AlertMFA}, infinity).
@@ -79,10 +106,10 @@ get_alarms() -> gen_event:call(?SERVER, ?MODULE, get_alarms, infinity).
on_node_up(Node) -> gen_event:notify(?SERVER, {node_up, Node}).
on_node_down(Node) -> gen_event:notify(?SERVER, {node_down, Node}).
-remote_conserve_resources(Pid, Source, true) ->
+remote_conserve_resources(Pid, Source, {true, _, _}) ->
gen_event:notify({?SERVER, node(Pid)},
{set_alarm, {{resource_limit, Source, node()}, []}});
-remote_conserve_resources(Pid, Source, false) ->
+remote_conserve_resources(Pid, Source, {false, _, _}) ->
gen_event:notify({?SERVER, node(Pid)},
{clear_alarm, {resource_limit, Source, node()}}).
@@ -98,12 +125,17 @@ handle_call({register, Pid, AlertMFA}, State = #alarms{alarmed_nodes = AN}) ->
{ok, lists:usort(lists:append([V || {_, V} <- dict:to_list(AN)])),
internal_register(Pid, AlertMFA, State)};
-handle_call(get_alarms, State = #alarms{alarms = Alarms}) ->
- {ok, Alarms, State};
+handle_call(get_alarms, State) ->
+ {ok, get_alarms(State), State};
handle_call(_Request, State) ->
{ok, not_understood, State}.
+handle_event({set_alarm, {{resource_limit, Source, Node}, []}}, State) ->
+ case is_node_alarmed(Source, Node, State) of
+ true -> {ok, State};
+ false -> handle_set_resource_alarm(Source, Node, State)
+ end;
handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
case lists:member(Alarm, Alarms) of
true -> {ok, State};
@@ -111,6 +143,13 @@ handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
handle_set_alarm(Alarm, State#alarms{alarms = UpdatedAlarms})
end;
+handle_event({clear_alarm, {resource_limit, Source, Node}}, State) ->
+ case is_node_alarmed(Source, Node, State) of
+ true ->
+ handle_clear_resource_alarm(Source, Node, State);
+ false ->
+ {ok, State}
+ end;
handle_event({clear_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
case lists:keymember(Alarm, 1, Alarms) of
true -> handle_clear_alarm(
@@ -127,8 +166,16 @@ handle_event({node_up, Node}, State) ->
{register, self(), {?MODULE, remote_conserve_resources, []}}),
{ok, State};
-handle_event({node_down, Node}, State) ->
- {ok, maybe_alert(fun dict_unappend_all/3, Node, [], false, State)};
+handle_event({node_down, Node}, #alarms{alarmed_nodes = AN} = State) ->
+ AlarmsForDeadNode = case dict:find(Node, AN) of
+ {ok, V} -> V;
+ error -> []
+ end,
+ {ok, lists:foldr(fun(Source, AccState) ->
+ rabbit_log:warning("~s resource limit alarm cleared for dead node ~p~n",
+ [Source, Node]),
+ maybe_alert(fun dict_unappend/3, Node, Source, false, AccState)
+ end, State, AlarmsForDeadNode)};
handle_event({register, Pid, AlertMFA}, State) ->
{ok, internal_register(Pid, AlertMFA, State)};
@@ -158,9 +205,6 @@ dict_append(Key, Val, Dict) ->
end,
dict:store(Key, lists:usort([Val|L]), Dict).
-dict_unappend_all(Key, _Val, Dict) ->
- dict:erase(Key, Dict).
-
dict_unappend(Key, Val, Dict) ->
L = case dict:find(Key, Dict) of
{ok, V} -> V;
@@ -172,10 +216,17 @@ dict_unappend(Key, Val, Dict) ->
X -> dict:store(Key, X, Dict)
end.
-maybe_alert(UpdateFun, Node, Source, Alert,
+maybe_alert(UpdateFun, Node, Source, WasAlertAdded,
State = #alarms{alarmed_nodes = AN,
alertees = Alertees}) ->
AN1 = UpdateFun(Node, Source, AN),
+ %% Is alarm for Source still set on any node?
+ StillHasAlerts = lists:any(fun ({_Node, NodeAlerts}) -> lists:member(Source, NodeAlerts) end, dict:to_list(AN1)),
+ case StillHasAlerts of
+ true -> ok;
+ false -> rabbit_log:warning("~s resource limit alarm cleared across the cluster~n", [Source])
+ end,
+ Alert = {WasAlertAdded, StillHasAlerts, Node},
case node() of
Node -> ok = alert_remote(Alert, Alertees, Source);
_ -> ok
@@ -202,20 +253,21 @@ internal_register(Pid, {M, F, A} = AlertMFA,
State = #alarms{alertees = Alertees}) ->
_MRef = erlang:monitor(process, Pid),
case dict:find(node(), State#alarms.alarmed_nodes) of
- {ok, Sources} -> [apply(M, F, A ++ [Pid, R, true]) || R <- Sources];
+ {ok, Sources} -> [apply(M, F, A ++ [Pid, R, {true, true, node()}]) || R <- Sources];
error -> ok
end,
NewAlertees = dict:store(Pid, AlertMFA, Alertees),
State#alarms{alertees = NewAlertees}.
-handle_set_alarm({{resource_limit, Source, Node}, []}, State) ->
+handle_set_resource_alarm(Source, Node, State) ->
rabbit_log:warning(
"~s resource limit alarm set on node ~p.~n~n"
"**********************************************************~n"
"*** Publishers will be blocked until this alarm clears ***~n"
"**********************************************************~n",
[Source, Node]),
- {ok, maybe_alert(fun dict_append/3, Node, Source, true, State)};
+ {ok, maybe_alert(fun dict_append/3, Node, Source, true, State)}.
+
handle_set_alarm({file_descriptor_limit, []}, State) ->
rabbit_log:warning(
"file descriptor limit alarm set.~n~n"
@@ -227,13 +279,27 @@ handle_set_alarm(Alarm, State) ->
rabbit_log:warning("alarm '~p' set~n", [Alarm]),
{ok, State}.
-handle_clear_alarm({resource_limit, Source, Node}, State) ->
+handle_clear_resource_alarm(Source, Node, State) ->
rabbit_log:warning("~s resource limit alarm cleared on node ~p~n",
[Source, Node]),
- {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)};
+ {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)}.
+
handle_clear_alarm(file_descriptor_limit, State) ->
rabbit_log:warning("file descriptor limit alarm cleared~n"),
{ok, State};
handle_clear_alarm(Alarm, State) ->
rabbit_log:warning("alarm '~p' cleared~n", [Alarm]),
{ok, State}.
+
+is_node_alarmed(Source, Node, #alarms{alarmed_nodes = AN}) ->
+ case dict:find(Node, AN) of
+ {ok, Sources} ->
+ lists:member(Source, Sources);
+ error ->
+ false
+ end.
+
+get_alarms(#alarms{alarms = Alarms,
+ alarmed_nodes = AN}) ->
+ Alarms ++ [ {{resource_limit, Source, Node}, []}
+ || {Node, Sources} <- dict:to_list(AN), Source <- Sources ].
diff --git a/src/rabbit_cli.erl b/src/rabbit_cli.erl
index e4b3449b0b..522e91120e 100644
--- a/src/rabbit_cli.erl
+++ b/src/rabbit_cli.erl
@@ -69,6 +69,9 @@ main(ParseFun, DoFun, UsageMod) ->
case catch DoFun(Command, Node, Args, Opts) of
ok ->
rabbit_misc:quit(0);
+ {ok, Result} ->
+ rabbit_ctl_misc:print_cmd_result(Command, Result),
+ rabbit_misc:quit(0);
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> %% < R15
PrintInvalidCommandError(),
usage(UsageMod);
@@ -113,6 +116,9 @@ main(ParseFun, DoFun, UsageMod) ->
print_error("operation ~w used with invalid parameter: ~p",
[Command, Args]),
usage(UsageMod);
+ {refused, Username, _, _} ->
+ print_error("failed to authenticate user \"~s\"", [Username]),
+ rabbit_misc:quit(2);
Other ->
print_error("~p", [Other]),
rabbit_misc:quit(2)
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index d06d59f572..2799d510d0 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -52,6 +52,7 @@
delete_user,
change_password,
clear_password,
+ authenticate_user,
set_user_tags,
list_users,
@@ -379,6 +380,10 @@ action(clear_password, Node, Args = [Username], _Opts, Inform) ->
Inform("Clearing password for user \"~s\"", [Username]),
call(Node, {rabbit_auth_backend_internal, clear_password, Args});
+action(authenticate_user, Node, Args = [Username, _Password], _Opts, Inform) ->
+ Inform("Authenticating user \"~s\"", [Username]),
+ call(Node, {rabbit_access_control, check_user_pass_login, Args});
+
action(set_user_tags, Node, [Username | TagsStr], _Opts, Inform) ->
Tags = [list_to_atom(T) || T <- TagsStr],
Inform("Setting tags for user \"~s\" to ~p", [Username, Tags]),
diff --git a/src/rabbit_ctl_misc.erl b/src/rabbit_ctl_misc.erl
new file mode 100644
index 0000000000..92ae111028
--- /dev/null
+++ b/src/rabbit_ctl_misc.erl
@@ -0,0 +1,31 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_ctl_misc).
+
+-export([print_cmd_result/2]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(print_cmd_result/2 :: (atom(), term()) -> string()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+print_cmd_result(authenticate_user, _Result) -> io:format("Success~n").
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index 68f6095176..3e9171b79a 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -61,7 +61,10 @@
%% timer that drives periodic checks
timer,
%% is free disk space alarm currently in effect?
- alarmed
+ alarmed,
+ %% is monitoring enabled? false on unsupported
+ %% platforms
+ enabled
}).
%%----------------------------------------------------------------------------
@@ -117,7 +120,8 @@ init([Limit]) ->
State = #state{dir = Dir,
min_interval = ?DEFAULT_MIN_DISK_CHECK_INTERVAL,
max_interval = ?DEFAULT_MAX_DISK_CHECK_INTERVAL,
- alarmed = false},
+ alarmed = false,
+ enabled = true},
case {catch get_disk_free(Dir),
vm_memory_monitor:get_total_memory()} of
{N1, N2} when is_integer(N1), is_integer(N2) ->
@@ -125,12 +129,17 @@ init([Limit]) ->
Err ->
rabbit_log:info("Disabling disk free space monitoring "
"on unsupported platform:~n~p~n", [Err]),
- {stop, unsupported_platform}
+ {ok, State#state{enabled = false}}
end.
handle_call(get_disk_free_limit, _From, State = #state{limit = Limit}) ->
{reply, Limit, State};
+handle_call({set_disk_free_limit, _}, _From, #state{enabled = false} = State) ->
+ rabbit_log:info("Cannot set disk free limit: "
+ "disabled disk free space monitoring", []),
+ {reply, ok, State};
+
handle_call({set_disk_free_limit, Limit}, _From, State) ->
{reply, ok, set_disk_limits(State, Limit)};
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 24b3ae9af4..380d950d05 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -86,7 +86,9 @@ report_ram_duration(Pid, QueueDuration) ->
stop() ->
gen_server2:cast(?SERVER, stop).
-conserve_resources(Pid, disk, Conserve) ->
+%% Paging should be enabled/disabled only in response to disk resource alarms
+%% for the current node.
+conserve_resources(Pid, disk, {_, Conserve, Node}) when node(Pid) =:= Node ->
gen_server2:cast(Pid, {disk_alarm, Conserve});
conserve_resources(_Pid, _Source, _Conserve) ->
ok.