diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 40 |
1 files changed, 24 insertions, 16 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index c6076635f6..192995b219 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -376,8 +376,6 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> %% seems to blow up if it is set private MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]), - {ok, TRef} = timer:apply_interval(?SYNC_INTERVAL, ?MODULE, filesync, []), - InitName = "0" ++ ?FILE_EXTENSION, State = #dqstate { msg_location_dets = MsgLocationDets, @@ -395,8 +393,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> file_size_limit = FileSizeLimit, read_file_handles = {dict:new(), gb_trees:empty()}, read_file_handles_limit = ReadFileHandlesLimit, - on_sync_froms = [], - timer_ref = TRef + on_sync_froms = [], + timer_ref = undefined }, {ok, State1 = #dqstate { current_file_name = CurrentName, current_offset = Offset } } = @@ -528,11 +526,10 @@ terminate(_Reason, State) -> shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, msg_location_ets = MsgLocationEts, current_file_handle = FileHdl, - read_file_handles = {ReadHdls, _ReadHdlsAge}, - timer_ref = TRef + read_file_handles = {ReadHdls, _ReadHdlsAge} }) -> + State1 = stop_commit_timer(State), %% deliberately ignoring return codes here - timer:cancel(TRef), dets:close(MsgLocationDets), file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS)), @@ -545,11 +542,10 @@ shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, dict:fold(fun (_File, Hdl, _Acc) -> file:close(Hdl) end, ok, ReadHdls), - State #dqstate { current_file_handle = undefined, - current_dirty = false, - read_file_handles = {dict:new(), gb_trees:empty()}, - timer_ref = undefined - }. + State1 #dqstate { current_file_handle = undefined, + current_dirty = false, + read_file_handles = {dict:new(), gb_trees:empty()} + }. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -557,14 +553,14 @@ code_change(_OldVsn, State, _Extra) -> %% ---- UTILITY FUNCTIONS ---- noreply(NewState = #dqstate { current_dirty = true }) -> - {noreply, NewState, 0}; + {noreply, start_commit_timer(NewState), 0}; noreply(NewState) -> - {noreply, NewState, infinity}. + {noreply, stop_commit_timer(NewState), infinity}. reply(Reply, NewState = #dqstate { current_dirty = true }) -> - {reply, Reply, NewState, 0}; + {reply, Reply, start_commit_timer(NewState), 0}; reply(Reply, NewState) -> - {reply, Reply, NewState, infinity}. + {reply, Reply, stop_commit_timer(NewState), infinity}. form_filename(Name) -> filename:join(base_directory(), Name). @@ -707,6 +703,18 @@ sequence_lookup(Sequences, Q) -> {ReadSeqId, WriteSeqId, Length} end. +start_commit_timer(State = #dqstate { timer_ref = undefined }) -> + {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, filesync, []), + State #dqstate { timer_ref = TRef }; +start_commit_timer(State) -> + State. + +stop_commit_timer(State = #dqstate { timer_ref = undefined }) -> + State; +stop_commit_timer(State = #dqstate { timer_ref = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #dqstate { timer_ref = undefined }. + sync_current_file_handle(State = #dqstate { current_dirty = false, on_sync_froms = [] }) -> State; |
