summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl18
-rw-r--r--src/rabbit_disk_queue.erl94
-rw-r--r--src/rabbit_queue_mode_manager.erl18
3 files changed, 78 insertions, 52 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index aab336ca7c..a1b5a895bd 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -130,22 +130,19 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-reply(Reply, NewState = #q { memory_report_timer = undefined }) ->
- {reply, Reply, start_memory_timer(NewState), binary};
reply(Reply, NewState) ->
- {reply, Reply, NewState, binary}.
+ {reply, Reply, start_memory_timer(NewState), binary}.
-noreply(NewState = #q { memory_report_timer = undefined }) ->
- {noreply, start_memory_timer(NewState), binary};
noreply(NewState) ->
- {noreply, NewState, binary}.
+ {noreply, start_memory_timer(NewState), binary}.
start_memory_timer() ->
{ok, TRef} = timer:apply_after(?MEMORY_REPORT_TIME_INTERVAL,
rabbit_amqqueue, report_memory, [self()]),
TRef.
start_memory_timer(State = #q { memory_report_timer = undefined }) ->
- State #q { memory_report_timer = start_memory_timer() };
+ report_memory(false,
+ State #q { memory_report_timer = start_memory_timer() });
start_memory_timer(State) ->
State.
@@ -842,8 +839,8 @@ handle_cast({set_mode, Mode}, State = #q { mixed_state = MS }) ->
handle_cast(report_memory, State) ->
%% deliberately don't call noreply/2 as we don't want to restart the timer
- {noreply, (report_memory(false, State))
- #q { memory_report_timer = undefined }, binary}.
+ %% by unsetting the timer, we force a report on the next normal message
+ {noreply, State #q { memory_report_timer = undefined }, binary}.
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
@@ -862,9 +859,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_ch_down(DownPid, State);
handle_info(timeout, State) ->
- %% TODO: Once we drop support for R11B-5, we can change this to
- %% {noreply, State, hibernate};
State1 = stop_memory_timer(report_memory(true, State)),
+ %% don't call noreply/1 as that'll restart the memory_report_timer
{noreply, State1, hibernate};
handle_info(Info, State) ->
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 86a47c38bd..d8c6580f01 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -71,6 +71,7 @@
-define(FILE_SIZE_LIMIT, (256*1024*1024)).
-define(SYNC_INTERVAL, 5). %% milliseconds
+-define(HIBERNATE_AFTER_MIN, 1000).
-record(dqstate,
{msg_location_dets, %% where are messages?
@@ -88,7 +89,7 @@
read_file_handles, %% file handles for reading (LRU)
read_file_handles_limit, %% how many file handles can we open?
on_sync_froms, %% list of commiters to run on sync (reversed)
- timer_ref, %% TRef for our interval timer
+ commit_timer_ref, %% TRef for our interval timer
last_sync_offset, %% current_offset at the last time we sync'd
message_cache, %% ets message cache
memory_report_timer, %% TRef for the memory report timer
@@ -391,9 +392,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% seems to blow up if it is set private
MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]),
- {ok, TRef} = timer:apply_interval(?MEMORY_REPORT_TIME_INTERVAL,
- rabbit_disk_queue, report_memory, []),
-
+ TRef = start_memory_timer(),
InitName = "0" ++ ?FILE_EXTENSION,
State =
@@ -413,7 +412,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
read_file_handles = {dict:new(), gb_trees:empty()},
read_file_handles_limit = ReadFileHandlesLimit,
on_sync_froms = [],
- timer_ref = undefined,
+ commit_timer_ref = undefined,
last_sync_offset = 0,
message_cache = ets:new(?CACHE_ETS_NAME,
[set, private]),
@@ -441,7 +440,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
{ok, case Mode of
mixed -> State2;
disk -> to_disk_only_mode(State2)
- end}.
+ end, {binary, ?HIBERNATE_AFTER_MIN}}.
handle_call({deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, true, false, State),
@@ -521,14 +520,18 @@ handle_cast({set_mode, Mode}, State) ->
mixed -> fun to_ram_disk_mode/1
end)(State));
handle_cast(report_memory, State) ->
- Bytes = memory_use(State),
- rabbit_queue_mode_manager:report_memory(self(), 2.5 * Bytes),
- noreply(State).
+ %% call noreply1/1, not noreply/1, as we don't want to restart the
+ %% memory_report_timer
+ %% by unsetting the timer, we force a report on the next normal message
+ noreply1(State #dqstate { memory_report_timer = undefined }).
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
-handle_info(timeout, State = #dqstate { timer_ref = TRef })
- when TRef /= undefined ->
+handle_info(timeout, State = #dqstate { commit_timer_ref = undefined }) ->
+ ok = report_memory(true, State),
+ %% don't use noreply/1 or noreply1/1 as they'll restart the memory timer
+ {noreply, stop_memory_timer(State), hibernate};
+handle_info(timeout, State) ->
noreply(sync_current_file_handle(State));
handle_info(_Info, State) ->
noreply(State).
@@ -539,12 +542,10 @@ terminate(_Reason, State) ->
shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
msg_location_ets = MsgLocationEts,
current_file_handle = FileHdl,
- read_file_handles = {ReadHdls, _ReadHdlsAge},
- memory_report_timer = TRef
+ read_file_handles = {ReadHdls, _ReadHdlsAge}
}) ->
%% deliberately ignoring return codes here
- timer:cancel(TRef),
- State1 = stop_commit_timer(State),
+ State1 = stop_commit_timer(stop_memory_timer(State)),
dets:close(MsgLocationDets),
file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++
?FILE_EXTENSION_DETS)),
@@ -568,6 +569,27 @@ code_change(_OldVsn, State, _Extra) ->
%% ---- UTILITY FUNCTIONS ----
+stop_memory_timer(State = #dqstate { memory_report_timer = undefined }) ->
+ State;
+stop_memory_timer(State = #dqstate { memory_report_timer = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #dqstate { memory_report_timer = undefined }.
+
+start_memory_timer() ->
+ {ok, TRef} = timer:apply_after(?MEMORY_REPORT_TIME_INTERVAL,
+ rabbit_disk_queue, report_memory, []),
+ TRef.
+
+start_memory_timer(State = #dqstate { memory_report_timer = undefined }) ->
+ report_memory(false, State),
+ State #dqstate { memory_report_timer = start_memory_timer() };
+start_memory_timer(State) ->
+ State.
+
+report_memory(Hibernating, State) ->
+ Bytes = memory_use(State),
+ rabbit_queue_mode_manager:report_memory(self(), 2.5 * Bytes, Hibernating).
+
memory_use(#dqstate { operation_mode = ram_disk,
file_summary = FileSummary,
sequences = Sequences,
@@ -633,22 +655,30 @@ to_ram_disk_mode(State = #dqstate { operation_mode = disk_only,
mnesia_bytes_per_record = undefined,
ets_bytes_per_record = undefined }.
-noreply(NewState = #dqstate { on_sync_froms = [], timer_ref = undefined }) ->
- {noreply, NewState, infinity};
-noreply(NewState = #dqstate { timer_ref = undefined }) ->
- {noreply, start_commit_timer(NewState), 0};
-noreply(NewState = #dqstate { on_sync_froms = [] }) ->
- {noreply, stop_commit_timer(NewState), infinity};
noreply(NewState) ->
+ noreply1(start_memory_timer(NewState)).
+
+noreply1(NewState = #dqstate { on_sync_froms = [],
+ commit_timer_ref = undefined }) ->
+ {noreply, NewState, binary};
+noreply1(NewState = #dqstate { commit_timer_ref = undefined }) ->
+ {noreply, start_commit_timer(NewState), 0};
+noreply1(NewState = #dqstate { on_sync_froms = [] }) ->
+ {noreply, stop_commit_timer(NewState), binary};
+noreply1(NewState) ->
{noreply, NewState, 0}.
-reply(Reply, NewState = #dqstate { on_sync_froms = [], timer_ref = undefined }) ->
- {reply, Reply, NewState, infinity};
-reply(Reply, NewState = #dqstate { timer_ref = undefined }) ->
- {reply, Reply, start_commit_timer(NewState), 0};
-reply(Reply, NewState = #dqstate { on_sync_froms = [] }) ->
- {reply, Reply, stop_commit_timer(NewState), infinity};
reply(Reply, NewState) ->
+ reply1(Reply, start_memory_timer(NewState)).
+
+reply1(Reply, NewState = #dqstate { on_sync_froms = [],
+ commit_timer_ref = undefined }) ->
+ {reply, Reply, NewState, binary};
+reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }) ->
+ {reply, Reply, start_commit_timer(NewState), 0};
+reply1(Reply, NewState = #dqstate { on_sync_froms = [] }) ->
+ {reply, Reply, stop_commit_timer(NewState), binary};
+reply1(Reply, NewState) ->
{reply, Reply, NewState, 0}.
form_filename(Name) ->
@@ -793,15 +823,15 @@ sequence_lookup(Sequences, Q) ->
{ReadSeqId, WriteSeqId, Length}
end.
-start_commit_timer(State = #dqstate { timer_ref = undefined }) ->
+start_commit_timer(State = #dqstate { commit_timer_ref = undefined }) ->
{ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, filesync, []),
- State #dqstate { timer_ref = TRef }.
+ State #dqstate { commit_timer_ref = TRef }.
-stop_commit_timer(State = #dqstate { timer_ref = undefined }) ->
+stop_commit_timer(State = #dqstate { commit_timer_ref = undefined }) ->
State;
-stop_commit_timer(State = #dqstate { timer_ref = TRef }) ->
+stop_commit_timer(State = #dqstate { commit_timer_ref = TRef }) ->
{ok, cancel} = timer:cancel(TRef),
- State #dqstate { timer_ref = undefined }.
+ State #dqstate { commit_timer_ref = undefined }.
sync_current_file_handle(State = #dqstate { current_dirty = false,
on_sync_froms = [] }) ->
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
index 359ef70871..99f6e408d9 100644
--- a/src/rabbit_queue_mode_manager.erl
+++ b/src/rabbit_queue_mode_manager.erl
@@ -38,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([register/4, report_memory/2, report_memory/5, info/0,
+-export([register/4, report_memory/3, report_memory/5, info/0,
pin_to_disk/1, unpin_to_disk/1]).
-define(TOTAL_TOKENS, 1000).
@@ -54,7 +54,7 @@
-spec(start_link/0 :: () ->
({'ok', pid()} | 'ignore' | {'error', any()})).
-spec(register/4 :: (pid(), atom(), atom(), list()) -> {'ok', queue_mode()}).
--spec(report_memory/2 :: (pid(), non_neg_integer()) -> 'ok').
+-spec(report_memory/3 :: (pid(), non_neg_integer(), bool()) -> 'ok').
-spec(report_memory/5 :: (pid(), non_neg_integer(),
non_neg_integer(), non_neg_integer(), bool()) ->
'ok').
@@ -131,11 +131,11 @@
%% occurring.
%%
%% The queue process deliberately reports 4 times its estimated RAM
-%% usage, and the disk_queue 2 times. In practise, this seems to work
-%% well. Note that we are deliberately running out of tokes a little
-%% early because of the fact that the mixed -> disk transition can
-%% transiently eat a lot of memory and take some time (flushing a few
-%% million messages to disk is never going to be instantaneous).
+%% usage, and the disk_queue 2.5 times. In practise, this seems to
+%% work well. Note that we are deliberately running out of tokes a
+%% little early because of the fact that the mixed -> disk transition
+%% can transiently eat a lot of memory and take some time (flushing a
+%% few million messages to disk is never going to be instantaneous).
start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
@@ -149,8 +149,8 @@ pin_to_disk(Pid) ->
unpin_to_disk(Pid) ->
gen_server2:call(?SERVER, {unpin_to_disk, Pid}).
-report_memory(Pid, Memory) ->
- report_memory(Pid, Memory, undefined, undefined, false).
+report_memory(Pid, Memory, Hibernating) ->
+ report_memory(Pid, Memory, undefined, undefined, Hibernating).
report_memory(Pid, Memory, Gain, Loss, Hibernating) ->
gen_server2:cast(?SERVER,