diff options
| author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-07-13 16:36:23 +0100 |
|---|---|---|
| committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-07-13 16:36:23 +0100 |
| commit | 07ea1d8f2b9e88f965a41b94f8cee2ead78162c0 (patch) | |
| tree | f4381e920afc8f125130f417f2856185b3feb0a6 | |
| parent | d127747f95dd39cf3f5bdd499524f5e11a2b272e (diff) | |
| download | rabbitmq-server-git-07ea1d8f2b9e88f965a41b94f8cee2ead78162c0.tar.gz | |
`rabbit_alarm' on its own again, added callbacks to fhc and `vm_memory_monitor'
| -rw-r--r-- | Makefile | 4 | ||||
| -rw-r--r-- | src/file_handle_cache.erl | 25 | ||||
| -rw-r--r-- | src/rabbit.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_alarm.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_disk_monitor.erl | 4 | ||||
| -rw-r--r-- | src/vm_memory_monitor.erl | 31 |
6 files changed, 84 insertions, 36 deletions
@@ -217,11 +217,11 @@ stop-rabbit-on-node: all echo "rabbit:stop()." | $(ERL_CALL) set-resource-alarm: all - echo "alarm_handler:set_alarm({{resource_limit, $(SOURCE), node()}, []})." | \ + echo "rabbit_alarm:set_alarm({{resource_limit, $(SOURCE), node()}, []})." | \ $(ERL_CALL) clear-resource-alarm: all - echo "alarm_handler:clear_alarm({resource_limit, $(SOURCE), node()})." | \ + echo "rabbit_alarm:clear_alarm({resource_limit, $(SOURCE), node()})." | \ $(ERL_CALL) stop-node: diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index f3b4dbafa2..9a49cc6a4d 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -150,8 +150,8 @@ info/0, info/1]). -export([ulimit/0]). --export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, prioritise_cast/2]). +-export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2, + handle_info/2, terminate/2, code_change/3, prioritise_cast/2]). -define(SERVER, ?MODULE). -define(RESERVED_FOR_OTHERS, 100). @@ -195,7 +195,9 @@ obtain_count, obtain_pending, clients, - timer_ref + timer_ref, + alarm_set, + alarm_clear }). -record(cstate, @@ -270,6 +272,10 @@ start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]). +start_link(AlarmSet, AlarmClear) -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [AlarmSet, AlarmClear], + [{timeout, infinity}]). + register_callback(M, F, A) when is_atom(M) andalso is_atom(F) andalso is_list(A) -> gen_server2:cast(?SERVER, {register_callback, self(), {M, F, A}}). @@ -807,6 +813,8 @@ i(Item, _) -> throw({bad_argument, Item}). %%---------------------------------------------------------------------------- init([]) -> + init([fun alarm_handler:set_alarm/1, fun alarm_handler:clear_alarm/1]); +init([AlarmSet, AlarmClear]) -> Limit = case application:get_env(file_handles_high_watermark) of {ok, Watermark} when (is_integer(Watermark) andalso Watermark > 0) -> @@ -830,7 +838,9 @@ init([]) -> obtain_count = 0, obtain_pending = pending_new(), clients = Clients, - timer_ref = undefined }}. + timer_ref = undefined, + alarm_set = AlarmSet, + alarm_clear = AlarmClear }}. prioritise_cast(Msg, _State) -> case Msg of @@ -1026,10 +1036,11 @@ obtain_limit_reached(#fhc_state { obtain_limit = Limit, obtain_count = Count}) -> Limit =/= infinity andalso Count >= Limit. -adjust_alarm(OldState, NewState) -> +adjust_alarm(OldState = #fhc_state { alarm_set = AlarmSet, + alarm_clear = AlarmClear }, NewState) -> case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of - {false, true} -> alarm_handler:set_alarm({file_descriptor_limit, []}); - {true, false} -> alarm_handler:clear_alarm(file_descriptor_limit); + {false, true} -> AlarmSet({file_descriptor_limit, []}); + {true, false} -> AlarmClear(file_descriptor_limit); _ -> ok end, NewState. diff --git a/src/rabbit.erl b/src/rabbit.erl index fda489fe61..ed258c71c9 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -20,7 +20,8 @@ -export([start/0, boot/0, stop/0, stop_and_halt/0, await_startup/0, status/0, is_running/0, - is_running/1, environment/0, rotate_logs/1, force_event_refresh/0]). + is_running/1, environment/0, rotate_logs/1, force_event_refresh/0, + start_fhc/0]). -export([start/2, stop/1]). @@ -53,8 +54,7 @@ -rabbit_boot_step({file_handle_cache, [{description, "file handle cache server"}, - {mfa, {rabbit_sup, start_restartable_child, - [file_handle_cache]}}, + {mfa, {rabbit, start_fhc, []}}, {requires, pre_boot}, {enables, worker_pool}]}). @@ -730,3 +730,10 @@ config_files() -> [File] <- Files]; error -> [] end. + +%% We don't want this in fhc since it references rabbit stuff. And we can't put +%% this in the bootstep directly. +start_fhc() -> + rabbit_sup:start_restartable_child( + file_handle_cache, + [fun rabbit_alarm:set_alarm/1, fun rabbit_alarm:clear_alarm/1]). diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 665e2cb9fc..69256b4da8 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -18,22 +18,28 @@ -behaviour(gen_event). --export([start/0, stop/0, register/2, on_node_up/1, on_node_down/1]). +-export([start_link/0, start/0, stop/0, register/2, set_alarm/1, + clear_alarm/1, on_node_up/1, on_node_down/1]). -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). -export([remote_conserve_resources/3]). %% Internal use only +-define(SERVER, ?MODULE). + -record(alarms, {alertees, alarmed_nodes}). %%---------------------------------------------------------------------------- -ifdef(use_specs). +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()). +-spec(set_alarm/1 :: (any()) -> 'ok'). +-spec(clear_alarm/1 :: (any()) -> 'ok'). -spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). @@ -41,36 +47,45 @@ %%---------------------------------------------------------------------------- +start_link() -> + gen_event:start_link({local, ?SERVER}). + start() -> - ok = alarm_handler:add_alarm_handler(?MODULE, []), + ok = rabbit_sup:start_restartable_child(?MODULE), + ok = gen_event:add_handler(?SERVER, ?MODULE, []), {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark), - rabbit_sup:start_restartable_child(vm_memory_monitor, [MemoryWatermark]), - + rabbit_sup:start_restartable_child( + vm_memory_monitor, [MemoryWatermark, fun rabbit_alarm:set_alarm/1, + fun rabbit_alarm:clear_alarm/1]), {ok, DiskLimit} = application:get_env(disk_free_limit), rabbit_sup:start_restartable_child(rabbit_disk_monitor, [DiskLimit]), ok. stop() -> - ok = alarm_handler:delete_alarm_handler(?MODULE). + ok. register(Pid, HighMemMFA) -> - gen_event:call(alarm_handler, ?MODULE, - {register, Pid, HighMemMFA}, + gen_event:call(?SERVER, ?MODULE, {register, Pid, HighMemMFA}, infinity). -on_node_up(Node) -> gen_event:notify(alarm_handler, {node_up, Node}). +set_alarm(Alarm) -> + gen_event:notify(?SERVER, {set_alarm, Alarm}). -on_node_down(Node) -> gen_event:notify(alarm_handler, {node_down, Node}). +clear_alarm(Alarm) -> + gen_event:notify(?SERVER, {clear_alarm, Alarm}). + +on_node_up(Node) -> gen_event:notify(?SERVER, {node_up, Node}). + +on_node_down(Node) -> gen_event:notify(?SERVER, {node_down, Node}). -%% Can't use alarm_handler:{set,clear}_alarm because that doesn't -%% permit notifying a remote node. remote_conserve_resources(Pid, Source, true) -> - gen_event:notify({alarm_handler, node(Pid)}, + gen_event:notify({?SERVER, node(Pid)}, {set_alarm, {{resource_limit, Source, node()}, []}}); remote_conserve_resources(Pid, Source, false) -> - gen_event:notify({alarm_handler, node(Pid)}, + gen_event:notify({?SERVER, node(Pid)}, {clear_alarm, {resource_limit, Source, node()}}). + %%---------------------------------------------------------------------------- init([]) -> @@ -105,7 +120,7 @@ handle_event({clear_alarm, file_descriptor_limit}, State) -> handle_event({node_up, Node}, State) -> %% Must do this via notify and not call to avoid possible deadlock. ok = gen_event:notify( - {alarm_handler, Node}, + {?SERVER, Node}, {register, self(), {?MODULE, remote_conserve_resources, []}}), {ok, State}; diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index 58375abb45..e72181c061 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -149,10 +149,10 @@ internal_update(State = #state { limit = Limit, case {Alarmed, NewAlarmed} of {false, true} -> emit_update_info("exceeded", CurrentFreeBytes, LimitBytes), - alarm_handler:set_alarm({{resource_limit, disk, node()}, []}); + rabbit_alarm:set_alarm({{resource_limit, disk, node()}, []}); {true, false} -> emit_update_info("below limit", CurrentFreeBytes, LimitBytes), - alarm_handler:clear_alarm({resource_limit, disk, node()}); + rabbit_alarm:clear_alarm({resource_limit, disk, node()}); _ -> ok end, diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index fb184d1ab2..535208afc7 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -27,7 +27,7 @@ -behaviour(gen_server). --export([start_link/1]). +-export([start_link/1, start_link/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -51,7 +51,9 @@ memory_limit, timeout, timer, - alarmed + alarmed, + alarm_set, + alarm_clear }). %%---------------------------------------------------------------------------- @@ -59,6 +61,8 @@ -ifdef(use_specs). -spec(start_link/1 :: (float()) -> rabbit_types:ok_pid_or_error()). +-spec(start_link/3 :: (float(), fun ((any()) -> 'ok'), + fun ((any()) -> 'ok')) -> rabbit_types:ok_pid_or_error()). -spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')). -spec(get_vm_limit/0 :: () -> non_neg_integer()). -spec(get_check_interval/0 :: () -> non_neg_integer()). @@ -99,14 +103,23 @@ get_memory_limit() -> %% gen_server callbacks %%---------------------------------------------------------------------------- -start_link(Args) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). +start_link(MemFraction) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [MemFraction], []). + +start_link(MemFraction, AlarmSet, AlarmClear) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, + [MemFraction, AlarmSet, AlarmClear], []). init([MemFraction]) -> + init([MemFraction, fun alarm_handler:set_alarm/1, + fun alarm_handler:clear_alarm/1]); +init([MemFraction, AlarmSet, AlarmClear]) -> TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), State = #state { timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, timer = TRef, - alarmed = false}, + alarmed = false, + alarm_set = AlarmSet, + alarm_clear = AlarmClear }, {ok, set_mem_limits(State, MemFraction)}. handle_call(get_vm_memory_high_watermark, _From, State) -> @@ -175,16 +188,18 @@ set_mem_limits(State, MemFraction) -> memory_limit = MemLim }). internal_update(State = #state { memory_limit = MemLimit, - alarmed = Alarmed}) -> + alarmed = Alarmed, + alarm_set = AlarmSet, + alarm_clear = AlarmClear }) -> MemUsed = erlang:memory(total), NewAlarmed = MemUsed > MemLimit, case {Alarmed, NewAlarmed} of {false, true} -> emit_update_info(set, MemUsed, MemLimit), - alarm_handler:set_alarm({{resource_limit, memory, node()}, []}); + AlarmSet({{resource_limit, memory, node()}, []}); {true, false} -> emit_update_info(clear, MemUsed, MemLimit), - alarm_handler:clear_alarm({resource_limit, memory, node()}); + AlarmClear({resource_limit, memory, node()}); _ -> ok end, |
