diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 94 | ||||
| -rw-r--r-- | src/rabbit_queue_mode_manager.erl | 18 |
3 files changed, 78 insertions, 52 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index aab336ca7c..a1b5a895bd 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -130,22 +130,19 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -reply(Reply, NewState = #q { memory_report_timer = undefined }) -> - {reply, Reply, start_memory_timer(NewState), binary}; reply(Reply, NewState) -> - {reply, Reply, NewState, binary}. + {reply, Reply, start_memory_timer(NewState), binary}. -noreply(NewState = #q { memory_report_timer = undefined }) -> - {noreply, start_memory_timer(NewState), binary}; noreply(NewState) -> - {noreply, NewState, binary}. + {noreply, start_memory_timer(NewState), binary}. start_memory_timer() -> {ok, TRef} = timer:apply_after(?MEMORY_REPORT_TIME_INTERVAL, rabbit_amqqueue, report_memory, [self()]), TRef. start_memory_timer(State = #q { memory_report_timer = undefined }) -> - State #q { memory_report_timer = start_memory_timer() }; + report_memory(false, + State #q { memory_report_timer = start_memory_timer() }); start_memory_timer(State) -> State. @@ -842,8 +839,8 @@ handle_cast({set_mode, Mode}, State = #q { mixed_state = MS }) -> handle_cast(report_memory, State) -> %% deliberately don't call noreply/2 as we don't want to restart the timer - {noreply, (report_memory(false, State)) - #q { memory_report_timer = undefined }, binary}. + %% by unsetting the timer, we force a report on the next normal message + {noreply, State #q { memory_report_timer = undefined }, binary}. handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> @@ -862,9 +859,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_ch_down(DownPid, State); handle_info(timeout, State) -> - %% TODO: Once we drop support for R11B-5, we can change this to - %% {noreply, State, hibernate}; State1 = stop_memory_timer(report_memory(true, State)), + %% don't call noreply/1 as that'll restart the memory_report_timer {noreply, State1, hibernate}; handle_info(Info, State) -> diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 86a47c38bd..d8c6580f01 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -71,6 +71,7 @@ -define(FILE_SIZE_LIMIT, (256*1024*1024)). -define(SYNC_INTERVAL, 5). %% milliseconds +-define(HIBERNATE_AFTER_MIN, 1000). -record(dqstate, {msg_location_dets, %% where are messages? @@ -88,7 +89,7 @@ read_file_handles, %% file handles for reading (LRU) read_file_handles_limit, %% how many file handles can we open? on_sync_froms, %% list of commiters to run on sync (reversed) - timer_ref, %% TRef for our interval timer + commit_timer_ref, %% TRef for our interval timer last_sync_offset, %% current_offset at the last time we sync'd message_cache, %% ets message cache memory_report_timer, %% TRef for the memory report timer @@ -391,9 +392,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> %% seems to blow up if it is set private MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]), - {ok, TRef} = timer:apply_interval(?MEMORY_REPORT_TIME_INTERVAL, - rabbit_disk_queue, report_memory, []), - + TRef = start_memory_timer(), InitName = "0" ++ ?FILE_EXTENSION, State = @@ -413,7 +412,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> read_file_handles = {dict:new(), gb_trees:empty()}, read_file_handles_limit = ReadFileHandlesLimit, on_sync_froms = [], - timer_ref = undefined, + commit_timer_ref = undefined, last_sync_offset = 0, message_cache = ets:new(?CACHE_ETS_NAME, [set, private]), @@ -441,7 +440,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> {ok, case Mode of mixed -> State2; disk -> to_disk_only_mode(State2) - end}. + end, {binary, ?HIBERNATE_AFTER_MIN}}. handle_call({deliver, Q}, _From, State) -> {ok, Result, State1} = internal_deliver(Q, true, false, State), @@ -521,14 +520,18 @@ handle_cast({set_mode, Mode}, State) -> mixed -> fun to_ram_disk_mode/1 end)(State)); handle_cast(report_memory, State) -> - Bytes = memory_use(State), - rabbit_queue_mode_manager:report_memory(self(), 2.5 * Bytes), - noreply(State). + %% call noreply1/1, not noreply/1, as we don't want to restart the + %% memory_report_timer + %% by unsetting the timer, we force a report on the next normal message + noreply1(State #dqstate { memory_report_timer = undefined }). handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; -handle_info(timeout, State = #dqstate { timer_ref = TRef }) - when TRef /= undefined -> +handle_info(timeout, State = #dqstate { commit_timer_ref = undefined }) -> + ok = report_memory(true, State), + %% don't use noreply/1 or noreply1/1 as they'll restart the memory timer + {noreply, stop_memory_timer(State), hibernate}; +handle_info(timeout, State) -> noreply(sync_current_file_handle(State)); handle_info(_Info, State) -> noreply(State). @@ -539,12 +542,10 @@ terminate(_Reason, State) -> shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, msg_location_ets = MsgLocationEts, current_file_handle = FileHdl, - read_file_handles = {ReadHdls, _ReadHdlsAge}, - memory_report_timer = TRef + read_file_handles = {ReadHdls, _ReadHdlsAge} }) -> %% deliberately ignoring return codes here - timer:cancel(TRef), - State1 = stop_commit_timer(State), + State1 = stop_commit_timer(stop_memory_timer(State)), dets:close(MsgLocationDets), file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS)), @@ -568,6 +569,27 @@ code_change(_OldVsn, State, _Extra) -> %% ---- UTILITY FUNCTIONS ---- +stop_memory_timer(State = #dqstate { memory_report_timer = undefined }) -> + State; +stop_memory_timer(State = #dqstate { memory_report_timer = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #dqstate { memory_report_timer = undefined }. + +start_memory_timer() -> + {ok, TRef} = timer:apply_after(?MEMORY_REPORT_TIME_INTERVAL, + rabbit_disk_queue, report_memory, []), + TRef. + +start_memory_timer(State = #dqstate { memory_report_timer = undefined }) -> + report_memory(false, State), + State #dqstate { memory_report_timer = start_memory_timer() }; +start_memory_timer(State) -> + State. + +report_memory(Hibernating, State) -> + Bytes = memory_use(State), + rabbit_queue_mode_manager:report_memory(self(), 2.5 * Bytes, Hibernating). + memory_use(#dqstate { operation_mode = ram_disk, file_summary = FileSummary, sequences = Sequences, @@ -633,22 +655,30 @@ to_ram_disk_mode(State = #dqstate { operation_mode = disk_only, mnesia_bytes_per_record = undefined, ets_bytes_per_record = undefined }. -noreply(NewState = #dqstate { on_sync_froms = [], timer_ref = undefined }) -> - {noreply, NewState, infinity}; -noreply(NewState = #dqstate { timer_ref = undefined }) -> - {noreply, start_commit_timer(NewState), 0}; -noreply(NewState = #dqstate { on_sync_froms = [] }) -> - {noreply, stop_commit_timer(NewState), infinity}; noreply(NewState) -> + noreply1(start_memory_timer(NewState)). + +noreply1(NewState = #dqstate { on_sync_froms = [], + commit_timer_ref = undefined }) -> + {noreply, NewState, binary}; +noreply1(NewState = #dqstate { commit_timer_ref = undefined }) -> + {noreply, start_commit_timer(NewState), 0}; +noreply1(NewState = #dqstate { on_sync_froms = [] }) -> + {noreply, stop_commit_timer(NewState), binary}; +noreply1(NewState) -> {noreply, NewState, 0}. -reply(Reply, NewState = #dqstate { on_sync_froms = [], timer_ref = undefined }) -> - {reply, Reply, NewState, infinity}; -reply(Reply, NewState = #dqstate { timer_ref = undefined }) -> - {reply, Reply, start_commit_timer(NewState), 0}; -reply(Reply, NewState = #dqstate { on_sync_froms = [] }) -> - {reply, Reply, stop_commit_timer(NewState), infinity}; reply(Reply, NewState) -> + reply1(Reply, start_memory_timer(NewState)). + +reply1(Reply, NewState = #dqstate { on_sync_froms = [], + commit_timer_ref = undefined }) -> + {reply, Reply, NewState, binary}; +reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }) -> + {reply, Reply, start_commit_timer(NewState), 0}; +reply1(Reply, NewState = #dqstate { on_sync_froms = [] }) -> + {reply, Reply, stop_commit_timer(NewState), binary}; +reply1(Reply, NewState) -> {reply, Reply, NewState, 0}. form_filename(Name) -> @@ -793,15 +823,15 @@ sequence_lookup(Sequences, Q) -> {ReadSeqId, WriteSeqId, Length} end. -start_commit_timer(State = #dqstate { timer_ref = undefined }) -> +start_commit_timer(State = #dqstate { commit_timer_ref = undefined }) -> {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, filesync, []), - State #dqstate { timer_ref = TRef }. + State #dqstate { commit_timer_ref = TRef }. -stop_commit_timer(State = #dqstate { timer_ref = undefined }) -> +stop_commit_timer(State = #dqstate { commit_timer_ref = undefined }) -> State; -stop_commit_timer(State = #dqstate { timer_ref = TRef }) -> +stop_commit_timer(State = #dqstate { commit_timer_ref = TRef }) -> {ok, cancel} = timer:cancel(TRef), - State #dqstate { timer_ref = undefined }. + State #dqstate { commit_timer_ref = undefined }. sync_current_file_handle(State = #dqstate { current_dirty = false, on_sync_froms = [] }) -> diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl index 359ef70871..99f6e408d9 100644 --- a/src/rabbit_queue_mode_manager.erl +++ b/src/rabbit_queue_mode_manager.erl @@ -38,7 +38,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([register/4, report_memory/2, report_memory/5, info/0, +-export([register/4, report_memory/3, report_memory/5, info/0, pin_to_disk/1, unpin_to_disk/1]). -define(TOTAL_TOKENS, 1000). @@ -54,7 +54,7 @@ -spec(start_link/0 :: () -> ({'ok', pid()} | 'ignore' | {'error', any()})). -spec(register/4 :: (pid(), atom(), atom(), list()) -> {'ok', queue_mode()}). --spec(report_memory/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(report_memory/3 :: (pid(), non_neg_integer(), bool()) -> 'ok'). -spec(report_memory/5 :: (pid(), non_neg_integer(), non_neg_integer(), non_neg_integer(), bool()) -> 'ok'). @@ -131,11 +131,11 @@ %% occurring. %% %% The queue process deliberately reports 4 times its estimated RAM -%% usage, and the disk_queue 2 times. In practise, this seems to work -%% well. Note that we are deliberately running out of tokes a little -%% early because of the fact that the mixed -> disk transition can -%% transiently eat a lot of memory and take some time (flushing a few -%% million messages to disk is never going to be instantaneous). +%% usage, and the disk_queue 2.5 times. In practise, this seems to +%% work well. Note that we are deliberately running out of tokes a +%% little early because of the fact that the mixed -> disk transition +%% can transiently eat a lot of memory and take some time (flushing a +%% few million messages to disk is never going to be instantaneous). start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). @@ -149,8 +149,8 @@ pin_to_disk(Pid) -> unpin_to_disk(Pid) -> gen_server2:call(?SERVER, {unpin_to_disk, Pid}). -report_memory(Pid, Memory) -> - report_memory(Pid, Memory, undefined, undefined, false). +report_memory(Pid, Memory, Hibernating) -> + report_memory(Pid, Memory, undefined, undefined, Hibernating). report_memory(Pid, Memory, Gain, Loss, Hibernating) -> gen_server2:cast(?SERVER, |
