diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-24 23:27:29 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-24 23:27:29 +0100 |
| commit | 00485709f0b41a09014deb355490036a9a52c062 (patch) | |
| tree | 5d448d4a0ecc6d43d44c8cd50a7be8c122135c32 /src | |
| parent | 0a02dd9cd09af2132c4bf72e82a2d33daeb34cc1 (diff) | |
| download | rabbitmq-server-git-00485709f0b41a09014deb355490036a9a52c062.tar.gz | |
some more scaffolding for tokens
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_memsup_linux.erl | 72 | ||||
| -rw-r--r-- | src/rabbit_queue_mode_manager.erl | 67 |
2 files changed, 99 insertions, 40 deletions
diff --git a/src/rabbit_memsup_linux.erl b/src/rabbit_memsup_linux.erl index ffdc7e9946..158df6798a 100644 --- a/src/rabbit_memsup_linux.erl +++ b/src/rabbit_memsup_linux.erl @@ -44,7 +44,13 @@ -define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000). --record(state, {memory_fraction, alarmed, timeout, timer}). +-record(state, {memory_fraction, + alarmed, + timeout, + timer, + total_memory, + allocated_memory + }). %%---------------------------------------------------------------------------- @@ -69,10 +75,13 @@ update() -> init(_Args) -> Fraction = os_mon:get_env(memsup, system_memory_high_watermark), TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), - {ok, #state{alarmed = false, - memory_fraction = Fraction, - timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, - timer = TRef}}. + {ok, update(#state{alarmed = false, + memory_fraction = Fraction, + timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, + timer = TRef, + total_memory = undefined, + allocated_memory = undefined + })}. start_timer(Timeout) -> {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []), @@ -94,11 +103,33 @@ handle_call({set_check_interval, Timeout}, _From, State) -> {ok, cancel} = timer:cancel(State#state.timer), {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; -handle_call(_Request, _From, State) -> +handle_call(get_memory_data, _From, + State = #state { total_memory = MemTotal, + allocated_memory = MemUsed }) -> + {reply, {MemTotal, MemUsed, undefined}, State}; + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(update, State) -> + {noreply, update(State)}; + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(_Info, State) -> {noreply, State}. -handle_cast(update, State = #state{alarmed = Alarmed, - memory_fraction = MemoryFraction}) -> +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +update(State = #state{alarmed = Alarmed, + memory_fraction = MemoryFraction}) -> File = read_proc_file("/proc/meminfo"), Lines = string:tokens(File, "\n"), Dict = dict:from_list(lists:map(fun parse_line/1, Lines)), @@ -116,21 +147,8 @@ handle_cast(update, State = #state{alarmed = Alarmed, _ -> ok end, - {noreply, State#state{alarmed = NewAlarmed}}; - -handle_cast(_Request, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%---------------------------------------------------------------------------- + State#state{alarmed = NewAlarmed, + total_memory = MemTotal, allocated_memory = MemUsed}. -define(BUFFER_SIZE, 1024). @@ -152,5 +170,9 @@ read_proc_file(IoDevice, Acc) -> %% A line looks like "FooBar: 123456 kB" parse_line(Line) -> - [Name, Value | _] = string:tokens(Line, ": "), - {list_to_atom(Name), list_to_integer(Value)}. + [Name, Value | Rest] = string:tokens(Line, ": "), + Value1 = case Rest of + [] -> list_to_integer(Value); %% no units + ["kB"] -> list_to_integer(Value) * 1024 + end, + {list_to_atom(Name), Value1}. diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl index 5a3b464d02..4ed56fd33c 100644 --- a/src/rabbit_queue_mode_manager.erl +++ b/src/rabbit_queue_mode_manager.erl @@ -40,6 +40,12 @@ -export([register/1, report_memory/4]). +-define(TOTAL_TOKENS, 1000). +-define(LOW_WATER_MARK_FRACTION, 0.25). +-define(EXPIRY_INTERVAL_MICROSECONDS, 5000000). +-define(ACTIVITY_THRESHOLD, 10). +-define(INITIAL_TOKEN_ALLOCATION, 10). + -define(SERVER, ?MODULE). -ifdef(use_specs). @@ -54,8 +60,10 @@ -endif. --record(state, { mode, - queues +-record(state, { remaining_tokens, + mixed_queues, + disk_queues, + bytes_per_token }). start_link() -> @@ -69,26 +77,48 @@ report_memory(Pid, Memory, Gain, Loss) -> init([]) -> process_flag(trap_exit, true), - {ok, #state { mode = unlimited, - queues = dict:new() + %% todo, fix up this call as os_mon may not be running + {MemTotal, _MemUsed, _BigProc} = memsup:get_memory_data(), + {ok, #state { remaining_tokens = ?TOTAL_TOKENS, + mixed_queues = dict:new(), + disk_queues = sets:new(), + bytes_per_token = MemTotal / ?TOTAL_TOKENS }}. handle_call({register, Pid}, _From, - State = #state { queues = Qs, mode = Mode }) -> + State = #state { remaining_tokens = Remaining, + mixed_queues = Mixed, + disk_queues = Disk }) -> _MRef = erlang:monitor(process, Pid), - Result = case Mode of - disk_only -> disk; - _ -> mixed - end, - {reply, {ok, Result}, State #state { queues = dict:store(Pid, 0, Qs) }}. - -handle_cast(Any, State) -> - io:format("~w~n", [Any]), + {Result, State1} = + case Remaining >= ?INITIAL_TOKEN_ALLOCATION of + true -> + {mixed, State #state { remaining_tokens = + Remaining - ?INITIAL_TOKEN_ALLOCATION, + mixed_queues = dict:store + (Pid, {?INITIAL_TOKEN_ALLOCATION, now()}, + Mixed) }}; + + false -> + {disk, State #state { disk_queues = + sets:add_element(Pid, Disk) }} + end, + {reply, {ok, Result}, State1 }. + +handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost}, State) -> {noreply, State}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, - State = #state { queues = Qs }) -> - {noreply, State #state { queues = dict:erase(Pid, Qs) }}; + State = #state { remaining_tokens = Remaining, + mixed_queues = Mixed }) -> + State1 = case find_queue(Pid, State) of + disk -> + State; + {mixed, {Tokens, _When}} -> + State #state { remaining_tokens = Remaining + Tokens, + mixed_queues = dict:erase(Pid, Mixed) } + end, + {noreply, State1}; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info(_Info, State) -> @@ -99,3 +129,10 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + +find_queue(Pid, #state { disk_queues = Disk, mixed_queues = Mixed }) -> + case sets:is_element(Pid, Disk) of + true -> disk; + false -> {mixed, dict:fetch(Pid, Mixed)} + end. + |
