summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-24 23:27:29 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-24 23:27:29 +0100
commit00485709f0b41a09014deb355490036a9a52c062 (patch)
tree5d448d4a0ecc6d43d44c8cd50a7be8c122135c32 /src
parent0a02dd9cd09af2132c4bf72e82a2d33daeb34cc1 (diff)
downloadrabbitmq-server-git-00485709f0b41a09014deb355490036a9a52c062.tar.gz
some more scaffolding for tokens
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_memsup_linux.erl72
-rw-r--r--src/rabbit_queue_mode_manager.erl67
2 files changed, 99 insertions, 40 deletions
diff --git a/src/rabbit_memsup_linux.erl b/src/rabbit_memsup_linux.erl
index ffdc7e9946..158df6798a 100644
--- a/src/rabbit_memsup_linux.erl
+++ b/src/rabbit_memsup_linux.erl
@@ -44,7 +44,13 @@
-define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000).
--record(state, {memory_fraction, alarmed, timeout, timer}).
+-record(state, {memory_fraction,
+ alarmed,
+ timeout,
+ timer,
+ total_memory,
+ allocated_memory
+ }).
%%----------------------------------------------------------------------------
@@ -69,10 +75,13 @@ update() ->
init(_Args) ->
Fraction = os_mon:get_env(memsup, system_memory_high_watermark),
TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
- {ok, #state{alarmed = false,
- memory_fraction = Fraction,
- timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
- timer = TRef}}.
+ {ok, update(#state{alarmed = false,
+ memory_fraction = Fraction,
+ timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
+ timer = TRef,
+ total_memory = undefined,
+ allocated_memory = undefined
+ })}.
start_timer(Timeout) ->
{ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
@@ -94,11 +103,33 @@ handle_call({set_check_interval, Timeout}, _From, State) ->
{ok, cancel} = timer:cancel(State#state.timer),
{reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
-handle_call(_Request, _From, State) ->
+handle_call(get_memory_data, _From,
+ State = #state { total_memory = MemTotal,
+ allocated_memory = MemUsed }) ->
+ {reply, {MemTotal, MemUsed, undefined}, State};
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast(update, State) ->
+ {noreply, update(State)};
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
{noreply, State}.
-handle_cast(update, State = #state{alarmed = Alarmed,
- memory_fraction = MemoryFraction}) ->
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------------
+
+update(State = #state{alarmed = Alarmed,
+ memory_fraction = MemoryFraction}) ->
File = read_proc_file("/proc/meminfo"),
Lines = string:tokens(File, "\n"),
Dict = dict:from_list(lists:map(fun parse_line/1, Lines)),
@@ -116,21 +147,8 @@ handle_cast(update, State = #state{alarmed = Alarmed,
_ ->
ok
end,
- {noreply, State#state{alarmed = NewAlarmed}};
-
-handle_cast(_Request, State) ->
- {noreply, State}.
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%----------------------------------------------------------------------------
+ State#state{alarmed = NewAlarmed,
+ total_memory = MemTotal, allocated_memory = MemUsed}.
-define(BUFFER_SIZE, 1024).
@@ -152,5 +170,9 @@ read_proc_file(IoDevice, Acc) ->
%% A line looks like "FooBar: 123456 kB"
parse_line(Line) ->
- [Name, Value | _] = string:tokens(Line, ": "),
- {list_to_atom(Name), list_to_integer(Value)}.
+ [Name, Value | Rest] = string:tokens(Line, ": "),
+ Value1 = case Rest of
+ [] -> list_to_integer(Value); %% no units
+ ["kB"] -> list_to_integer(Value) * 1024
+ end,
+ {list_to_atom(Name), Value1}.
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
index 5a3b464d02..4ed56fd33c 100644
--- a/src/rabbit_queue_mode_manager.erl
+++ b/src/rabbit_queue_mode_manager.erl
@@ -40,6 +40,12 @@
-export([register/1, report_memory/4]).
+-define(TOTAL_TOKENS, 1000).
+-define(LOW_WATER_MARK_FRACTION, 0.25).
+-define(EXPIRY_INTERVAL_MICROSECONDS, 5000000).
+-define(ACTIVITY_THRESHOLD, 10).
+-define(INITIAL_TOKEN_ALLOCATION, 10).
+
-define(SERVER, ?MODULE).
-ifdef(use_specs).
@@ -54,8 +60,10 @@
-endif.
--record(state, { mode,
- queues
+-record(state, { remaining_tokens,
+ mixed_queues,
+ disk_queues,
+ bytes_per_token
}).
start_link() ->
@@ -69,26 +77,48 @@ report_memory(Pid, Memory, Gain, Loss) ->
init([]) ->
process_flag(trap_exit, true),
- {ok, #state { mode = unlimited,
- queues = dict:new()
+ %% todo, fix up this call as os_mon may not be running
+ {MemTotal, _MemUsed, _BigProc} = memsup:get_memory_data(),
+ {ok, #state { remaining_tokens = ?TOTAL_TOKENS,
+ mixed_queues = dict:new(),
+ disk_queues = sets:new(),
+ bytes_per_token = MemTotal / ?TOTAL_TOKENS
}}.
handle_call({register, Pid}, _From,
- State = #state { queues = Qs, mode = Mode }) ->
+ State = #state { remaining_tokens = Remaining,
+ mixed_queues = Mixed,
+ disk_queues = Disk }) ->
_MRef = erlang:monitor(process, Pid),
- Result = case Mode of
- disk_only -> disk;
- _ -> mixed
- end,
- {reply, {ok, Result}, State #state { queues = dict:store(Pid, 0, Qs) }}.
-
-handle_cast(Any, State) ->
- io:format("~w~n", [Any]),
+ {Result, State1} =
+ case Remaining >= ?INITIAL_TOKEN_ALLOCATION of
+ true ->
+ {mixed, State #state { remaining_tokens =
+ Remaining - ?INITIAL_TOKEN_ALLOCATION,
+ mixed_queues = dict:store
+ (Pid, {?INITIAL_TOKEN_ALLOCATION, now()},
+ Mixed) }};
+
+ false ->
+ {disk, State #state { disk_queues =
+ sets:add_element(Pid, Disk) }}
+ end,
+ {reply, {ok, Result}, State1 }.
+
+handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost}, State) ->
{noreply, State}.
handle_info({'DOWN', _MRef, process, Pid, _Reason},
- State = #state { queues = Qs }) ->
- {noreply, State #state { queues = dict:erase(Pid, Qs) }};
+ State = #state { remaining_tokens = Remaining,
+ mixed_queues = Mixed }) ->
+ State1 = case find_queue(Pid, State) of
+ disk ->
+ State;
+ {mixed, {Tokens, _When}} ->
+ State #state { remaining_tokens = Remaining + Tokens,
+ mixed_queues = dict:erase(Pid, Mixed) }
+ end,
+ {noreply, State1};
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info(_Info, State) ->
@@ -99,3 +129,10 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+
+find_queue(Pid, #state { disk_queues = Disk, mixed_queues = Mixed }) ->
+ case sets:is_element(Pid, Disk) of
+ true -> disk;
+ false -> {mixed, dict:fetch(Pid, Mixed)}
+ end.
+