summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-18 14:19:16 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-18 14:19:16 +0100
commit2c635d61153bc3d5a6c953dc8c632129babbdc8b (patch)
tree3963a454134a5dc26211ac94b121fbd5e4de11cd
parentd51da63d259e8a687849b694d3a9ea2725b2fa56 (diff)
downloadrabbitmq-server-git-2c635d61153bc3d5a6c953dc8c632129babbdc8b.tar.gz
stop the commit timer if we're no longer dirty. This means it should no longer be a repeat timer because once it's set were either going to receive the explicit sync call or we're going to timeout on message queue at which point we're no longer dirty and so we'll then cancel the timer....
-rw-r--r--src/rabbit_disk_queue.erl40
1 files changed, 24 insertions, 16 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index c6076635f6..192995b219 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -376,8 +376,6 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% seems to blow up if it is set private
MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]),
- {ok, TRef} = timer:apply_interval(?SYNC_INTERVAL, ?MODULE, filesync, []),
-
InitName = "0" ++ ?FILE_EXTENSION,
State =
#dqstate { msg_location_dets = MsgLocationDets,
@@ -395,8 +393,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
file_size_limit = FileSizeLimit,
read_file_handles = {dict:new(), gb_trees:empty()},
read_file_handles_limit = ReadFileHandlesLimit,
- on_sync_froms = [],
- timer_ref = TRef
+ on_sync_froms = [],
+ timer_ref = undefined
},
{ok, State1 = #dqstate { current_file_name = CurrentName,
current_offset = Offset } } =
@@ -528,11 +526,10 @@ terminate(_Reason, State) ->
shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
msg_location_ets = MsgLocationEts,
current_file_handle = FileHdl,
- read_file_handles = {ReadHdls, _ReadHdlsAge},
- timer_ref = TRef
+ read_file_handles = {ReadHdls, _ReadHdlsAge}
}) ->
+ State1 = stop_commit_timer(State),
%% deliberately ignoring return codes here
- timer:cancel(TRef),
dets:close(MsgLocationDets),
file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++
?FILE_EXTENSION_DETS)),
@@ -545,11 +542,10 @@ shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
dict:fold(fun (_File, Hdl, _Acc) ->
file:close(Hdl)
end, ok, ReadHdls),
- State #dqstate { current_file_handle = undefined,
- current_dirty = false,
- read_file_handles = {dict:new(), gb_trees:empty()},
- timer_ref = undefined
- }.
+ State1 #dqstate { current_file_handle = undefined,
+ current_dirty = false,
+ read_file_handles = {dict:new(), gb_trees:empty()}
+ }.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -557,14 +553,14 @@ code_change(_OldVsn, State, _Extra) ->
%% ---- UTILITY FUNCTIONS ----
noreply(NewState = #dqstate { current_dirty = true }) ->
- {noreply, NewState, 0};
+ {noreply, start_commit_timer(NewState), 0};
noreply(NewState) ->
- {noreply, NewState, infinity}.
+ {noreply, stop_commit_timer(NewState), infinity}.
reply(Reply, NewState = #dqstate { current_dirty = true }) ->
- {reply, Reply, NewState, 0};
+ {reply, Reply, start_commit_timer(NewState), 0};
reply(Reply, NewState) ->
- {reply, Reply, NewState, infinity}.
+ {reply, Reply, stop_commit_timer(NewState), infinity}.
form_filename(Name) ->
filename:join(base_directory(), Name).
@@ -707,6 +703,18 @@ sequence_lookup(Sequences, Q) ->
{ReadSeqId, WriteSeqId, Length}
end.
+start_commit_timer(State = #dqstate { timer_ref = undefined }) ->
+ {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, filesync, []),
+ State #dqstate { timer_ref = TRef };
+start_commit_timer(State) ->
+ State.
+
+stop_commit_timer(State = #dqstate { timer_ref = undefined }) ->
+ State;
+stop_commit_timer(State = #dqstate { timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #dqstate { timer_ref = undefined }.
+
sync_current_file_handle(State = #dqstate { current_dirty = false,
on_sync_froms = [] }) ->
State;