summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl40
1 files changed, 24 insertions, 16 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index c6076635f6..192995b219 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -376,8 +376,6 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% seems to blow up if it is set private
MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]),
- {ok, TRef} = timer:apply_interval(?SYNC_INTERVAL, ?MODULE, filesync, []),
-
InitName = "0" ++ ?FILE_EXTENSION,
State =
#dqstate { msg_location_dets = MsgLocationDets,
@@ -395,8 +393,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
file_size_limit = FileSizeLimit,
read_file_handles = {dict:new(), gb_trees:empty()},
read_file_handles_limit = ReadFileHandlesLimit,
- on_sync_froms = [],
- timer_ref = TRef
+ on_sync_froms = [],
+ timer_ref = undefined
},
{ok, State1 = #dqstate { current_file_name = CurrentName,
current_offset = Offset } } =
@@ -528,11 +526,10 @@ terminate(_Reason, State) ->
shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
msg_location_ets = MsgLocationEts,
current_file_handle = FileHdl,
- read_file_handles = {ReadHdls, _ReadHdlsAge},
- timer_ref = TRef
+ read_file_handles = {ReadHdls, _ReadHdlsAge}
}) ->
+ State1 = stop_commit_timer(State),
%% deliberately ignoring return codes here
- timer:cancel(TRef),
dets:close(MsgLocationDets),
file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++
?FILE_EXTENSION_DETS)),
@@ -545,11 +542,10 @@ shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
dict:fold(fun (_File, Hdl, _Acc) ->
file:close(Hdl)
end, ok, ReadHdls),
- State #dqstate { current_file_handle = undefined,
- current_dirty = false,
- read_file_handles = {dict:new(), gb_trees:empty()},
- timer_ref = undefined
- }.
+ State1 #dqstate { current_file_handle = undefined,
+ current_dirty = false,
+ read_file_handles = {dict:new(), gb_trees:empty()}
+ }.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -557,14 +553,14 @@ code_change(_OldVsn, State, _Extra) ->
%% ---- UTILITY FUNCTIONS ----
noreply(NewState = #dqstate { current_dirty = true }) ->
- {noreply, NewState, 0};
+ {noreply, start_commit_timer(NewState), 0};
noreply(NewState) ->
- {noreply, NewState, infinity}.
+ {noreply, stop_commit_timer(NewState), infinity}.
reply(Reply, NewState = #dqstate { current_dirty = true }) ->
- {reply, Reply, NewState, 0};
+ {reply, Reply, start_commit_timer(NewState), 0};
reply(Reply, NewState) ->
- {reply, Reply, NewState, infinity}.
+ {reply, Reply, stop_commit_timer(NewState), infinity}.
form_filename(Name) ->
filename:join(base_directory(), Name).
@@ -707,6 +703,18 @@ sequence_lookup(Sequences, Q) ->
{ReadSeqId, WriteSeqId, Length}
end.
+start_commit_timer(State = #dqstate { timer_ref = undefined }) ->
+ {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, filesync, []),
+ State #dqstate { timer_ref = TRef };
+start_commit_timer(State) ->
+ State.
+
+stop_commit_timer(State = #dqstate { timer_ref = undefined }) ->
+ State;
+stop_commit_timer(State = #dqstate { timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #dqstate { timer_ref = undefined }.
+
sync_current_file_handle(State = #dqstate { current_dirty = false,
on_sync_froms = [] }) ->
State;