diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-29 18:01:10 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-29 18:01:10 +0100 |
| commit | 5c64ee0c50de0d9b3f6e382b9ec8d66ea5a4d51b (patch) | |
| tree | 7487fcde855afe77ee5cf29b05c521b86705e677 /src | |
| parent | 10aa354784bb711c8c0329a5623fc9a6a90c40d2 (diff) | |
| download | rabbitmq-server-git-5c64ee0c50de0d9b3f6e382b9ec8d66ea5a4d51b.tar.gz | |
mmmm. It maybe sort of works. Needs work though
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 76 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_queue_mode_manager.erl | 156 |
6 files changed, 181 insertions, 85 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 7d5e2a796f..9587238835 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -149,9 +149,7 @@ start(normal, []) -> end}, {"disk queue", fun () -> - ok = start_child(rabbit_disk_queue), - %% TODO, CHANGE ME, waiting on bug 20980 - ok = rabbit_disk_queue:to_ram_disk_mode() + ok = start_child(rabbit_disk_queue) end}, {"recovery", fun () -> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c045b3cae3..92272f0cdc 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -42,7 +42,7 @@ -export([notify_sent/2, unblock/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). --export([set_mode/3]). +-export([set_mode/3, set_mode/2, report_memory/1]). -import(mnesia). -import(gen_server2). @@ -105,10 +105,12 @@ -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(set_mode/3 :: (vhost(), amqqueue(), ('disk' | 'mixed')) -> 'ok'). +-spec(set_mode/2 :: (pid(), ('disk' | 'mixed')) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). +-spec(report_memory/1 :: (pid()) -> 'ok'). -endif. @@ -229,7 +231,13 @@ set_mode(VHostPath, Queue, ModeBin) when is_binary(VHostPath) andalso is_binary(Queue) -> Mode = list_to_atom(binary_to_list(ModeBin)), with(rabbit_misc:r(VHostPath, queue, Queue), - fun(Q) -> gen_server2:pcast(Q #amqqueue.pid, 10, {set_mode, Mode}) end). + fun(Q) -> set_mode(Q #amqqueue.pid, Mode) end). + +set_mode(QPid, Mode) -> + gen_server2:pcast(QPid, 10, {set_mode, Mode}). + +report_memory(QPid) -> + gen_server2:cast(QPid, report_memory). info(#amqqueue{ pid = QPid }) -> gen_server2:pcall(QPid, 9, info, infinity). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b6353beff6..2bd170a265 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -37,8 +37,7 @@ -define(UNSENT_MESSAGE_LIMIT, 100). -define(HIBERNATE_AFTER, 1000). --define(MEMORY_REPORT_INTERVAL, 500). --define(MEMORY_REPORT_TIME_INTERVAL, 1000000). %% 1 second in microseconds +-define(MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds -export([start_link/1]). @@ -58,8 +57,7 @@ next_msg_id, active_consumers, blocked_consumers, - memory_report_counter, - old_memory_report + memory_report_timer }). -record(consumer, {tag, ack_required}). @@ -112,8 +110,7 @@ init(Q = #amqqueue { name = QName, durable = Durable }) -> next_msg_id = 1, active_consumers = queue:new(), blocked_consumers = queue:new(), - memory_report_counter = 0, - old_memory_report = {1, now()} + memory_report_timer = start_memory_timer() }, ?HIBERNATE_AFTER}. terminate(_Reason, State) -> @@ -124,6 +121,7 @@ terminate(_Reason, State) -> rollback_transaction(Txn, State1) end, State, all_tx()), rabbit_mixed_queue:delete_queue(NewState #q.mixed_state), + stop_memory_timer(NewState), ok = rabbit_amqqueue:internal_delete(QName). code_change(_OldVsn, State, _Extra) -> @@ -131,16 +129,30 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -reply(Reply, NewState = #q { memory_report_counter = 0 }) -> - {reply, Reply, report_memory(NewState), ?HIBERNATE_AFTER}; -reply(Reply, NewState = #q { memory_report_counter = C }) -> - {reply, Reply, NewState #q { memory_report_counter = C - 1 }, - ?HIBERNATE_AFTER}. - -noreply(NewState = #q { memory_report_counter = 0}) -> - {noreply, report_memory(NewState), ?HIBERNATE_AFTER}; -noreply(NewState = #q { memory_report_counter = C}) -> - {noreply, NewState #q { memory_report_counter = C - 1 }, ?HIBERNATE_AFTER}. +reply(Reply, NewState = #q { memory_report_timer = undefined }) -> + {reply, Reply, start_memory_timer(NewState), ?HIBERNATE_AFTER}; +reply(Reply, NewState) -> + {reply, Reply, NewState, ?HIBERNATE_AFTER}. + +noreply(NewState = #q { memory_report_timer = undefined }) -> + {noreply, start_memory_timer(NewState), ?HIBERNATE_AFTER}; +noreply(NewState) -> + {noreply, NewState, ?HIBERNATE_AFTER}. + +start_memory_timer() -> + {ok, TRef} = timer:apply_interval(?MEMORY_REPORT_TIME_INTERVAL, + rabbit_amqqueue, report_memory, [self()]), + TRef. +start_memory_timer(State = #q { memory_report_timer = undefined }) -> + State #q { memory_report_timer = start_memory_timer() }; +start_memory_timer(State) -> + State. + +stop_memory_timer(State = #q { memory_report_timer = undefined }) -> + State; +stop_memory_timer(State = #q { memory_report_timer = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #q { memory_report_timer = undefined }. lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -543,24 +555,15 @@ i(memory, _) -> i(Item, _) -> throw({bad_argument, Item}). -report_memory(State = #q { old_memory_report = {OldMem, Then}, - mixed_state = MS }) -> +report_memory(State = #q { mixed_state = MS }) -> {MSize, Gain, Loss} = rabbit_mixed_queue:estimate_queue_memory(MS), NewMem = case MSize of 0 -> 1; %% avoid / 0 N -> N end, - State1 = State #q { memory_report_counter = ?MEMORY_REPORT_INTERVAL }, - Now = now(), - case ((NewMem / OldMem) > 1.1 orelse (OldMem / NewMem) > 1.1) andalso - (?MEMORY_REPORT_TIME_INTERVAL < timer:now_diff(Now, Then)) of - true -> - rabbit_queue_mode_manager:report_memory(self(), NewMem, Gain, Loss), - State1 #q { old_memory_report = {NewMem, Now}, - mixed_state = rabbit_mixed_queue:reset_counters(MS) }; - false -> State1 - end. + rabbit_queue_mode_manager:report_memory(self(), NewMem, Gain, Loss), + State #q { mixed_state = rabbit_mixed_queue:reset_counters(MS) }. %--------------------------------------------------------------------------- @@ -834,8 +837,10 @@ handle_cast({set_mode, Mode}, State = #q { mixed_state = MS }) -> disk -> fun rabbit_mixed_queue:to_disk_only_mode/2; mixed -> fun rabbit_mixed_queue:to_mixed_mode/2 end)(PendingMessages, MS), - noreply(State #q { mixed_state = MS1 }). - + noreply(State #q { mixed_state = MS1 }); + +handle_cast(report_memory, State) -> + noreply(report_memory(State)). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> @@ -853,16 +858,11 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_ch_down(DownPid, State); -handle_info(timeout, State = #q { memory_report_counter = Count }) - when Count == ?MEMORY_REPORT_INTERVAL -> - %% Have to do the +1 because the timeout below, with noreply, will -1 +handle_info(timeout, State) -> %% TODO: Once we drop support for R11B-5, we can change this to %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]); - -handle_info(timeout, State) -> - State1 = report_memory(State), - noreply(State1 #q { memory_report_counter = 1 + ?MEMORY_REPORT_INTERVAL }); + State1 = stop_memory_timer(report_memory(State)), + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]); handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 4333f667cd..8db8f24932 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -46,7 +46,7 @@ -export([length/1, filesync/0, cache_info/0]). --export([stop/0, stop_and_obliterate/0, change_memory_footprint/2, +-export([stop/0, stop_and_obliterate/0, conserve_memory/2, to_disk_only_mode/0, to_ram_disk_mode/0]). -include("rabbit.hrl"). @@ -270,7 +270,7 @@ -spec(length/1 :: (queue_name()) -> non_neg_integer()). -spec(filesync/0 :: () -> 'ok'). -spec(cache_info/0 :: () -> [{atom(), term()}]). --spec(change_memory_footprint/2 :: (pid(), bool()) -> 'ok'). +-spec(conserve_memory/2 :: (pid(), bool()) -> 'ok'). -endif. @@ -345,8 +345,8 @@ filesync() -> cache_info() -> gen_server2:call(?SERVER, cache_info, infinity). -change_memory_footprint(_Pid, Conserve) -> - gen_server2:pcast(?SERVER, 9, {change_memory_footprint, Conserve}). +conserve_memory(_Pid, Conserve) -> + gen_server2:pcast(?SERVER, 9, {conserve_memory, Conserve}). %% ---- GEN-SERVER INTERNAL API ---- @@ -360,11 +360,11 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> %% brutal_kill. %% Otherwise, the gen_server will be immediately terminated. process_flag(trap_exit, true), - ok = rabbit_alarm:register(self(), {?MODULE, change_memory_footprint, []}), + ok = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), Node = node(), ok = case mnesia:change_table_copy_type(rabbit_disk_queue, Node, - disc_only_copies) of + disc_copies) of {atomic, ok} -> ok; {aborted, {already_exists, rabbit_disk_queue, Node, disc_only_copies}} -> ok; @@ -391,7 +391,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> State = #dqstate { msg_location_dets = MsgLocationDets, msg_location_ets = MsgLocationEts, - operation_mode = disk_only, + operation_mode = ram_disk, file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, [set, private]), sequences = ets:new(?SEQUENCE_ETS_NAME, @@ -502,7 +502,7 @@ handle_cast({delete_queue, Q}, State) -> noreply(State1); handle_cast(filesync, State) -> noreply(sync_current_file_handle(State)); -handle_cast({change_memory_footprint, Conserve}, State) -> +handle_cast({conserve_memory, Conserve}, State) -> noreply((case Conserve of true -> fun to_disk_only_mode/1; false -> fun to_ram_disk_mode/1 diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 12fede1728..d171cf186f 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -502,7 +502,7 @@ is_empty(#mqstate { length = Length }) -> estimate_queue_memory(#mqstate { memory_size = Size, memory_gain = Gain, memory_loss = Loss }) -> - {Size, Gain, Loss}. + {2*Size, Gain, Loss}. reset_counters(State) -> State #mqstate { memory_gain = 0, memory_loss = 0 }. diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl index 4ed56fd33c..3a55833b34 100644 --- a/src/rabbit_queue_mode_manager.erl +++ b/src/rabbit_queue_mode_manager.erl @@ -42,8 +42,7 @@ -define(TOTAL_TOKENS, 1000). -define(LOW_WATER_MARK_FRACTION, 0.25). --define(EXPIRY_INTERVAL_MICROSECONDS, 5000000). --define(ACTIVITY_THRESHOLD, 10). +-define(ACTIVITY_THRESHOLD, 25). -define(INITIAL_TOKEN_ALLOCATION, 10). -define(SERVER, ?MODULE). @@ -60,10 +59,10 @@ -endif. --record(state, { remaining_tokens, +-record(state, { available_tokens, + available_etokens, mixed_queues, - disk_queues, - bytes_per_token + tokens_per_byte }). start_link() -> @@ -78,44 +77,131 @@ report_memory(Pid, Memory, Gain, Loss) -> init([]) -> process_flag(trap_exit, true), %% todo, fix up this call as os_mon may not be running - {MemTotal, _MemUsed, _BigProc} = memsup:get_memory_data(), - {ok, #state { remaining_tokens = ?TOTAL_TOKENS, + {MemTotal, MemUsed, _BigProc} = memsup:get_memory_data(), + MemAvail = MemTotal - MemUsed, + Avail = ceil(?TOTAL_TOKENS * (1 - ?LOW_WATER_MARK_FRACTION)), + EAvail = ?TOTAL_TOKENS - Avail, + {ok, #state { available_tokens = Avail, + available_etokens = EAvail, mixed_queues = dict:new(), - disk_queues = sets:new(), - bytes_per_token = MemTotal / ?TOTAL_TOKENS + tokens_per_byte = ?TOTAL_TOKENS / MemAvail }}. handle_call({register, Pid}, _From, - State = #state { remaining_tokens = Remaining, - mixed_queues = Mixed, - disk_queues = Disk }) -> + State = #state { available_tokens = Avail, + mixed_queues = Mixed }) -> _MRef = erlang:monitor(process, Pid), {Result, State1} = - case Remaining >= ?INITIAL_TOKEN_ALLOCATION of + case ?INITIAL_TOKEN_ALLOCATION > Avail of true -> - {mixed, State #state { remaining_tokens = - Remaining - ?INITIAL_TOKEN_ALLOCATION, - mixed_queues = dict:store - (Pid, {?INITIAL_TOKEN_ALLOCATION, now()}, - Mixed) }}; - + {disk, State}; false -> - {disk, State #state { disk_queues = - sets:add_element(Pid, Disk) }} + {mixed, State #state { mixed_queues = dict:store + (Pid, {?INITIAL_TOKEN_ALLOCATION, 0}, Mixed) }} end, - {reply, {ok, Result}, State1 }. - -handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost}, State) -> - {noreply, State}. + {reply, {ok, Result}, State1}. + +handle_cast(O = {report_memory, Pid, Memory, BytesGained, BytesLost}, + State = #state { available_tokens = Avail, + available_etokens = EAvail, + tokens_per_byte = TPB, + mixed_queues = Mixed + }) -> + Req = ceil(Memory * TPB), + io:format("~w : ~w ~w ~n", [Pid, Memory, Req]), + LowRate = (BytesGained < ?ACTIVITY_THRESHOLD) + andalso (BytesLost < ?ACTIVITY_THRESHOLD), + io:format("~w ~w~n", [O, LowRate]), + State1 = + case find_queue(Pid, State) of + disk -> + case Req > Avail orelse (2*Req) > (Avail + EAvail) orelse + LowRate of + true -> State; %% remain as disk queue + false -> + %% go to mixed, allocate double Req, and use Extra + rabbit_amqqueue:set_mode(Pid, mixed), + Alloc = lists:min([2*Req, Avail]), + EAlloc = (2*Req) - Alloc, + State #state { available_tokens = Avail - Alloc, + available_etokens = EAvail - EAlloc, + mixed_queues = dict:store + (Pid, {Alloc, EAlloc}, Mixed) + } + end; + {mixed, {OAlloc, OEAlloc}} -> + io:format("~w ; ~w ~w ~n", [Pid, OAlloc, OEAlloc]), + Avail1 = Avail + OAlloc, + EAvail1 = EAvail + OEAlloc, + case Req > (OAlloc + OEAlloc) of + true -> %% getting bigger + case Req > Avail1 of + true -> %% go to disk + rabbit_amqqueue:set_mode(Pid, disk), + State #state { available_tokens = Avail1, + available_etokens = EAvail1, + mixed_queues = + dict:erase(Pid, Mixed) }; + false -> %% request not too big, stay mixed + State #state { available_tokens = Avail1 - Req, + available_etokens = EAvail1, + mixed_queues = dict:store + (Pid, {Req, 0}, Mixed) } + end; + false -> %% getting smaller (or staying same) + case 0 =:= OEAlloc of + true -> + case Req > Avail1 orelse LowRate of + true -> %% go to disk + rabbit_amqqueue:set_mode(Pid, disk), + State #state { available_tokens = Avail1, + available_etokens = EAvail1, + mixed_queues = + dict:erase(Pid, Mixed) }; + false -> %% request not too big, stay mixed + State #state { available_tokens = Avail1 - Req, + available_etokens = EAvail1, + mixed_queues = dict:store + (Pid, {Req, 0}, Mixed) } + end; + false -> + case Req > Avail1 of + true -> + EReq = Req - Avail1, + case EReq > EAvail1 of + true -> %% go to disk + rabbit_amqqueue:set_mode(Pid, disk), + State #state { available_tokens = Avail1, + available_etokens = EAvail1, + mixed_queues = + dict:erase(Pid, Mixed) }; + false -> %% request not too big, stay mixed + State #state { available_tokens = 0, + available_etokens = EAvail1 - EReq, + mixed_queues = dict:store + (Pid, {Avail1, EReq}, Mixed) } + end; + false -> %% request not too big, stay mixed + State #state { available_tokens = Avail1 - Req, + available_etokens = EAvail1, + mixed_queues = dict:store + (Pid, {Req, 0}, Mixed) } + end + end + end + end, + {noreply, State1}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, - State = #state { remaining_tokens = Remaining, + State = #state { available_tokens = Avail, + available_etokens = EAvail, mixed_queues = Mixed }) -> State1 = case find_queue(Pid, State) of disk -> State; - {mixed, {Tokens, _When}} -> - State #state { remaining_tokens = Remaining + Tokens, + {mixed, {Alloc, EAlloc}} -> + State #state { available_tokens = Avail + Alloc, + available_etokens = EAvail + EAlloc, mixed_queues = dict:erase(Pid, Mixed) } end, {noreply, State1}; @@ -130,9 +216,13 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -find_queue(Pid, #state { disk_queues = Disk, mixed_queues = Mixed }) -> - case sets:is_element(Pid, Disk) of - true -> disk; - false -> {mixed, dict:fetch(Pid, Mixed)} +find_queue(Pid, #state { mixed_queues = Mixed }) -> + case dict:find(Pid, Mixed) of + {ok, Value} -> {mixed, Value}; + error -> disk end. - + +ceil(N) when N - trunc(N) > 0 -> + 1 + trunc(N); +ceil(N) -> + N. |
