summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-07-13 16:36:23 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-07-13 16:36:23 +0100
commit07ea1d8f2b9e88f965a41b94f8cee2ead78162c0 (patch)
treef4381e920afc8f125130f417f2856185b3feb0a6
parentd127747f95dd39cf3f5bdd499524f5e11a2b272e (diff)
downloadrabbitmq-server-git-07ea1d8f2b9e88f965a41b94f8cee2ead78162c0.tar.gz
`rabbit_alarm' on its own again, added callbacks to fhc and `vm_memory_monitor'
-rw-r--r--Makefile4
-rw-r--r--src/file_handle_cache.erl25
-rw-r--r--src/rabbit.erl13
-rw-r--r--src/rabbit_alarm.erl43
-rw-r--r--src/rabbit_disk_monitor.erl4
-rw-r--r--src/vm_memory_monitor.erl31
6 files changed, 84 insertions, 36 deletions
diff --git a/Makefile b/Makefile
index 0e3960dcfa..3681a7878d 100644
--- a/Makefile
+++ b/Makefile
@@ -217,11 +217,11 @@ stop-rabbit-on-node: all
echo "rabbit:stop()." | $(ERL_CALL)
set-resource-alarm: all
- echo "alarm_handler:set_alarm({{resource_limit, $(SOURCE), node()}, []})." | \
+ echo "rabbit_alarm:set_alarm({{resource_limit, $(SOURCE), node()}, []})." | \
$(ERL_CALL)
clear-resource-alarm: all
- echo "alarm_handler:clear_alarm({resource_limit, $(SOURCE), node()})." | \
+ echo "rabbit_alarm:clear_alarm({resource_limit, $(SOURCE), node()})." | \
$(ERL_CALL)
stop-node:
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index f3b4dbafa2..9a49cc6a4d 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -150,8 +150,8 @@
info/0, info/1]).
-export([ulimit/0]).
--export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3, prioritise_cast/2]).
+-export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2,
+ handle_info/2, terminate/2, code_change/3, prioritise_cast/2]).
-define(SERVER, ?MODULE).
-define(RESERVED_FOR_OTHERS, 100).
@@ -195,7 +195,9 @@
obtain_count,
obtain_pending,
clients,
- timer_ref
+ timer_ref,
+ alarm_set,
+ alarm_clear
}).
-record(cstate,
@@ -270,6 +272,10 @@
start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
+start_link(AlarmSet, AlarmClear) ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [AlarmSet, AlarmClear],
+ [{timeout, infinity}]).
+
register_callback(M, F, A)
when is_atom(M) andalso is_atom(F) andalso is_list(A) ->
gen_server2:cast(?SERVER, {register_callback, self(), {M, F, A}}).
@@ -807,6 +813,8 @@ i(Item, _) -> throw({bad_argument, Item}).
%%----------------------------------------------------------------------------
init([]) ->
+ init([fun alarm_handler:set_alarm/1, fun alarm_handler:clear_alarm/1]);
+init([AlarmSet, AlarmClear]) ->
Limit = case application:get_env(file_handles_high_watermark) of
{ok, Watermark} when (is_integer(Watermark) andalso
Watermark > 0) ->
@@ -830,7 +838,9 @@ init([]) ->
obtain_count = 0,
obtain_pending = pending_new(),
clients = Clients,
- timer_ref = undefined }}.
+ timer_ref = undefined,
+ alarm_set = AlarmSet,
+ alarm_clear = AlarmClear }}.
prioritise_cast(Msg, _State) ->
case Msg of
@@ -1026,10 +1036,11 @@ obtain_limit_reached(#fhc_state { obtain_limit = Limit,
obtain_count = Count}) ->
Limit =/= infinity andalso Count >= Limit.
-adjust_alarm(OldState, NewState) ->
+adjust_alarm(OldState = #fhc_state { alarm_set = AlarmSet,
+ alarm_clear = AlarmClear }, NewState) ->
case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of
- {false, true} -> alarm_handler:set_alarm({file_descriptor_limit, []});
- {true, false} -> alarm_handler:clear_alarm(file_descriptor_limit);
+ {false, true} -> AlarmSet({file_descriptor_limit, []});
+ {true, false} -> AlarmClear(file_descriptor_limit);
_ -> ok
end,
NewState.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index fda489fe61..ed258c71c9 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -20,7 +20,8 @@
-export([start/0, boot/0, stop/0,
stop_and_halt/0, await_startup/0, status/0, is_running/0,
- is_running/1, environment/0, rotate_logs/1, force_event_refresh/0]).
+ is_running/1, environment/0, rotate_logs/1, force_event_refresh/0,
+ start_fhc/0]).
-export([start/2, stop/1]).
@@ -53,8 +54,7 @@
-rabbit_boot_step({file_handle_cache,
[{description, "file handle cache server"},
- {mfa, {rabbit_sup, start_restartable_child,
- [file_handle_cache]}},
+ {mfa, {rabbit, start_fhc, []}},
{requires, pre_boot},
{enables, worker_pool}]}).
@@ -730,3 +730,10 @@ config_files() ->
[File] <- Files];
error -> []
end.
+
+%% We don't want this in fhc since it references rabbit stuff. And we can't put
+%% this in the bootstep directly.
+start_fhc() ->
+ rabbit_sup:start_restartable_child(
+ file_handle_cache,
+ [fun rabbit_alarm:set_alarm/1, fun rabbit_alarm:clear_alarm/1]).
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 665e2cb9fc..69256b4da8 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -18,22 +18,28 @@
-behaviour(gen_event).
--export([start/0, stop/0, register/2, on_node_up/1, on_node_down/1]).
+-export([start_link/0, start/0, stop/0, register/2, set_alarm/1,
+ clear_alarm/1, on_node_up/1, on_node_down/1]).
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2, code_change/3]).
-export([remote_conserve_resources/3]). %% Internal use only
+-define(SERVER, ?MODULE).
+
-record(alarms, {alertees, alarmed_nodes}).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()).
+-spec(set_alarm/1 :: (any()) -> 'ok').
+-spec(clear_alarm/1 :: (any()) -> 'ok').
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
@@ -41,36 +47,45 @@
%%----------------------------------------------------------------------------
+start_link() ->
+ gen_event:start_link({local, ?SERVER}).
+
start() ->
- ok = alarm_handler:add_alarm_handler(?MODULE, []),
+ ok = rabbit_sup:start_restartable_child(?MODULE),
+ ok = gen_event:add_handler(?SERVER, ?MODULE, []),
{ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
- rabbit_sup:start_restartable_child(vm_memory_monitor, [MemoryWatermark]),
-
+ rabbit_sup:start_restartable_child(
+ vm_memory_monitor, [MemoryWatermark, fun rabbit_alarm:set_alarm/1,
+ fun rabbit_alarm:clear_alarm/1]),
{ok, DiskLimit} = application:get_env(disk_free_limit),
rabbit_sup:start_restartable_child(rabbit_disk_monitor, [DiskLimit]),
ok.
stop() ->
- ok = alarm_handler:delete_alarm_handler(?MODULE).
+ ok.
register(Pid, HighMemMFA) ->
- gen_event:call(alarm_handler, ?MODULE,
- {register, Pid, HighMemMFA},
+ gen_event:call(?SERVER, ?MODULE, {register, Pid, HighMemMFA},
infinity).
-on_node_up(Node) -> gen_event:notify(alarm_handler, {node_up, Node}).
+set_alarm(Alarm) ->
+ gen_event:notify(?SERVER, {set_alarm, Alarm}).
-on_node_down(Node) -> gen_event:notify(alarm_handler, {node_down, Node}).
+clear_alarm(Alarm) ->
+ gen_event:notify(?SERVER, {clear_alarm, Alarm}).
+
+on_node_up(Node) -> gen_event:notify(?SERVER, {node_up, Node}).
+
+on_node_down(Node) -> gen_event:notify(?SERVER, {node_down, Node}).
-%% Can't use alarm_handler:{set,clear}_alarm because that doesn't
-%% permit notifying a remote node.
remote_conserve_resources(Pid, Source, true) ->
- gen_event:notify({alarm_handler, node(Pid)},
+ gen_event:notify({?SERVER, node(Pid)},
{set_alarm, {{resource_limit, Source, node()}, []}});
remote_conserve_resources(Pid, Source, false) ->
- gen_event:notify({alarm_handler, node(Pid)},
+ gen_event:notify({?SERVER, node(Pid)},
{clear_alarm, {resource_limit, Source, node()}}).
+
%%----------------------------------------------------------------------------
init([]) ->
@@ -105,7 +120,7 @@ handle_event({clear_alarm, file_descriptor_limit}, State) ->
handle_event({node_up, Node}, State) ->
%% Must do this via notify and not call to avoid possible deadlock.
ok = gen_event:notify(
- {alarm_handler, Node},
+ {?SERVER, Node},
{register, self(), {?MODULE, remote_conserve_resources, []}}),
{ok, State};
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index 58375abb45..e72181c061 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -149,10 +149,10 @@ internal_update(State = #state { limit = Limit,
case {Alarmed, NewAlarmed} of
{false, true} ->
emit_update_info("exceeded", CurrentFreeBytes, LimitBytes),
- alarm_handler:set_alarm({{resource_limit, disk, node()}, []});
+ rabbit_alarm:set_alarm({{resource_limit, disk, node()}, []});
{true, false} ->
emit_update_info("below limit", CurrentFreeBytes, LimitBytes),
- alarm_handler:clear_alarm({resource_limit, disk, node()});
+ rabbit_alarm:clear_alarm({resource_limit, disk, node()});
_ ->
ok
end,
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index fb184d1ab2..535208afc7 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -27,7 +27,7 @@
-behaviour(gen_server).
--export([start_link/1]).
+-export([start_link/1, start_link/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -51,7 +51,9 @@
memory_limit,
timeout,
timer,
- alarmed
+ alarmed,
+ alarm_set,
+ alarm_clear
}).
%%----------------------------------------------------------------------------
@@ -59,6 +61,8 @@
-ifdef(use_specs).
-spec(start_link/1 :: (float()) -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/3 :: (float(), fun ((any()) -> 'ok'),
+ fun ((any()) -> 'ok')) -> rabbit_types:ok_pid_or_error()).
-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')).
-spec(get_vm_limit/0 :: () -> non_neg_integer()).
-spec(get_check_interval/0 :: () -> non_neg_integer()).
@@ -99,14 +103,23 @@ get_memory_limit() ->
%% gen_server callbacks
%%----------------------------------------------------------------------------
-start_link(Args) ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
+start_link(MemFraction) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [MemFraction], []).
+
+start_link(MemFraction, AlarmSet, AlarmClear) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE,
+ [MemFraction, AlarmSet, AlarmClear], []).
init([MemFraction]) ->
+ init([MemFraction, fun alarm_handler:set_alarm/1,
+ fun alarm_handler:clear_alarm/1]);
+init([MemFraction, AlarmSet, AlarmClear]) ->
TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
State = #state { timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
timer = TRef,
- alarmed = false},
+ alarmed = false,
+ alarm_set = AlarmSet,
+ alarm_clear = AlarmClear },
{ok, set_mem_limits(State, MemFraction)}.
handle_call(get_vm_memory_high_watermark, _From, State) ->
@@ -175,16 +188,18 @@ set_mem_limits(State, MemFraction) ->
memory_limit = MemLim }).
internal_update(State = #state { memory_limit = MemLimit,
- alarmed = Alarmed}) ->
+ alarmed = Alarmed,
+ alarm_set = AlarmSet,
+ alarm_clear = AlarmClear }) ->
MemUsed = erlang:memory(total),
NewAlarmed = MemUsed > MemLimit,
case {Alarmed, NewAlarmed} of
{false, true} ->
emit_update_info(set, MemUsed, MemLimit),
- alarm_handler:set_alarm({{resource_limit, memory, node()}, []});
+ AlarmSet({{resource_limit, memory, node()}, []});
{true, false} ->
emit_update_info(clear, MemUsed, MemLimit),
- alarm_handler:clear_alarm({resource_limit, memory, node()});
+ AlarmClear({resource_limit, memory, node()});
_ ->
ok
end,