summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-10-08 03:09:49 +0100
committerMatthias Radestock <matthias@lshift.net>2009-10-08 03:09:49 +0100
commitbfb9afede0a4393a47664de78f42321e0b01ba76 (patch)
tree8f6ff2443aeea16b4d0518d8a6321630042a7635 /src
parent98a679a1e632dbeb768e8dc436d9bf7aac91e582 (diff)
downloadrabbitmq-server-git-bfb9afede0a4393a47664de78f42321e0b01ba76.tar.gz
make msg_store responsible for sync'ing
The API to the msg_store has changed: now instead of asking whether a sync is needed for a set of msg ids, and subsequently requesting a sync, we request a sync for a set of msg ids and supply a callback that is invoked when that sync is done. That way the msg_store can make its own decisions on when to sync, and less logic is required by callers. During queue deletion we must remove *all* queue messages from the store, including those that are part of committed transactions for which the disk_queue has not yet received the sync callback. To do that we keep a record of these messages in a dict in the state. The dict also ensures that we do not act on a sync callback involving a queue which has since been deleted and perhaps recreated.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl183
-rw-r--r--src/rabbit_msg_store.erl95
2 files changed, 148 insertions, 130 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 02a8ed8c3c..893fae8e5c 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -44,8 +44,6 @@
prefetch/1
]).
--export([filesync/0]).
-
-export([stop/0, stop_and_obliterate/0]).
%%----------------------------------------------------------------------------
@@ -63,16 +61,14 @@
is_persistent = true
}).
--define(SYNC_INTERVAL, 5). %% milliseconds
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
-define(SERVER, ?MODULE).
-record(dqstate,
- {sequences, %% next read and write for each q
- on_sync_txns, %% list of commiters to run on sync (reversed)
- commit_timer_ref %% TRef for our interval timer
+ { sequences, %% next read and write for each q
+ pending_commits %% dict of txns waiting for msg_store
}).
%%----------------------------------------------------------------------------
@@ -109,7 +105,6 @@
A, queue_name()) -> A).
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
--spec(filesync/0 :: () -> 'ok').
-endif.
@@ -173,8 +168,10 @@ stop() ->
stop_and_obliterate() ->
gen_server2:call(?SERVER, stop_vaporise, infinity).
-filesync() ->
- gen_server2:pcall(?SERVER, 9, filesync).
+%% private
+
+finalise_commit(TxId) ->
+ gen_server2:cast(?SERVER, {finalise_commit, TxId}).
%%----------------------------------------------------------------------------
%% gen_server behaviour
@@ -203,9 +200,7 @@ init([]) ->
Sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]),
ok = extract_sequence_numbers(Sequences),
- State = #dqstate { sequences = Sequences,
- on_sync_txns = [],
- commit_timer_ref = undefined },
+ State = #dqstate { sequences = Sequences, pending_commits = dict:new() },
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -222,8 +217,6 @@ handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) ->
handle_call({purge, Q}, _From, State) ->
{ok, Count, State1} = internal_purge(Q, State),
reply(Count, State1);
-handle_call(filesync, _From, State) ->
- reply(ok, sync(State));
handle_call({delete_queue, Q}, From, State) ->
gen_server2:reply(From, ok),
{ok, State1} = internal_delete_queue(Q, State),
@@ -275,13 +268,12 @@ handle_cast({prefetch, Q, From}, State) ->
internal_fetch_attributes(Q, ignore_delivery, State1);
false -> ok
end,
- noreply(State1).
+ noreply(State1);
+handle_cast({finalise_commit, TxId}, State) ->
+ noreply(finalise_commit(TxId, State)).
handle_info({'EXIT', _Pid, Reason}, State) ->
- {stop, Reason, State};
-handle_info(timeout, State) ->
- %% must have commit_timer set, so timeout was 0, and we're not hibernating
- noreply(sync(State)).
+ {stop, Reason, State}.
terminate(_Reason, State) ->
State1 = shutdown(State),
@@ -291,10 +283,9 @@ terminate(_Reason, State) ->
shutdown(State = #dqstate { sequences = undefined }) ->
State;
shutdown(State = #dqstate { sequences = Sequences }) ->
- State1 = stop_commit_timer(State),
ok = rabbit_msg_store:stop(),
ets:delete(Sequences),
- State1 #dqstate { sequences = undefined }.
+ State #dqstate { sequences = undefined }.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -304,28 +295,10 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
noreply(State) ->
- noreply1(State).
-
-noreply1(State) ->
- {State1, Timeout} = next_state(State),
- {noreply, State1, Timeout}.
+ {noreply, State, hibernate}.
reply(Reply, State) ->
- reply1(Reply, State).
-
-reply1(Reply, State) ->
- {State1, Timeout} = next_state(State),
- {reply, Reply, State1, Timeout}.
-
-next_state(State = #dqstate { on_sync_txns = [],
- commit_timer_ref = undefined }) ->
- {State, hibernate};
-next_state(State = #dqstate { commit_timer_ref = undefined }) ->
- {start_commit_timer(State), 0};
-next_state(State = #dqstate { on_sync_txns = [] }) ->
- {stop_commit_timer(State), hibernate};
-next_state(State) ->
- {State, 0}.
+ {reply, Reply, State, hibernate}.
form_filename(Name) ->
filename:join(base_directory(), Name).
@@ -339,25 +312,6 @@ sequence_lookup(Sequences, Q) ->
[{_, ReadSeqId, WriteSeqId}] -> {ReadSeqId, WriteSeqId}
end.
-start_commit_timer(State = #dqstate { commit_timer_ref = undefined }) ->
- {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, filesync, []),
- State #dqstate { commit_timer_ref = TRef }.
-
-stop_commit_timer(State = #dqstate { commit_timer_ref = undefined }) ->
- State;
-stop_commit_timer(State = #dqstate { commit_timer_ref = TRef }) ->
- {ok, cancel} = timer:cancel(TRef),
- State #dqstate { commit_timer_ref = undefined }.
-
-sync(State = #dqstate { on_sync_txns = Txns }) ->
- ok = rabbit_msg_store:sync(),
- case Txns of
- [] -> State;
- _ -> lists:foldl(fun internal_do_tx_commit/2,
- State #dqstate { on_sync_txns = [] },
- lists:reverse(Txns))
- end.
-
%%----------------------------------------------------------------------------
%% internal functions
%%----------------------------------------------------------------------------
@@ -404,10 +358,9 @@ maybe_advance(pop_queue, Sequences, Q, ReadSeqId, WriteSeqId) ->
true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}),
ok.
-internal_foldl(Q, Fun, Init, State) ->
- State1 = #dqstate { sequences = Sequences } = sync(State),
+internal_foldl(Q, Fun, Init, State = #dqstate { sequences = Sequences }) ->
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
- internal_foldl(Q, WriteSeqId, Fun, State1, Init, ReadSeqId).
+ internal_foldl(Q, WriteSeqId, Fun, State, Init, ReadSeqId).
internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) ->
{ok, Acc, State};
@@ -438,44 +391,57 @@ internal_tx_publish(Message = #basic_message { guid = MsgId,
{ok, State}.
internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
- State = #dqstate { on_sync_txns = Txns }) ->
- TxnDetails = {Q, PubMsgIds, AckSeqIds, From},
- case rabbit_msg_store:needs_sync(
- [MsgId || {MsgId, _IsDelivered, _IsPersistent} <- PubMsgIds]) of
- true -> Txns1 = [TxnDetails | Txns],
- State #dqstate { on_sync_txns = Txns1 };
- false -> internal_do_tx_commit(TxnDetails, State)
+ State = #dqstate { pending_commits = PendingCommits }) ->
+ ok = rabbit_msg_store:sync([MsgId || {MsgId, _, _} <- PubMsgIds],
+ fun () -> finalise_commit({Q, From}) end),
+ PendingCommits1 = dict:store(Q, {PubMsgIds, AckSeqIds, From},
+ PendingCommits),
+ State #dqstate { pending_commits = PendingCommits1 }.
+
+finalise_commit({Q, From},
+ State = #dqstate { sequences = Sequences,
+ pending_commits = PendingCommits }) ->
+ case dict:find(Q, PendingCommits) of
+ {ok, {PubMsgIds, AckSeqIds, From}} ->
+ {InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q),
+ WriteSeqId =
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ {ok, WriteSeqId1} =
+ lists:foldl(
+ fun ({MsgId, IsDelivered, IsPersistent},
+ {ok, SeqId}) ->
+ {mnesia:write(
+ rabbit_disk_queue,
+ #dq_msg_loc {
+ queue_and_seq_id = {Q, SeqId},
+ msg_id = MsgId,
+ is_delivered = IsDelivered,
+ is_persistent = IsPersistent
+ }, write),
+ SeqId + 1}
+ end, {ok, InitWriteSeqId}, PubMsgIds),
+ WriteSeqId1
+ end),
+ {ok, State1} = remove_messages(Q, AckSeqIds, State),
+ true = case PubMsgIds of
+ [] -> true;
+ _ -> ets:insert(Sequences,
+ {Q, InitReadSeqId, WriteSeqId})
+ end,
+ gen_server2:reply(From, ok),
+ State1 # dqstate { pending_commits =
+ dict:erase(Q, PendingCommits) };
+ {ok, _} ->
+ %% sync notification for a deleted queue which has since
+ %% been recreated
+ State;
+ error ->
+ %% sync notification for a deleted queue
+ State
end.
-internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From},
- State = #dqstate { sequences = Sequences }) ->
- {InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q),
- WriteSeqId =
- rabbit_misc:execute_mnesia_transaction(
- fun() ->
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- {ok, WriteSeqId1} =
- lists:foldl(
- fun ({MsgId, IsDelivered, IsPersistent}, {ok, SeqId}) ->
- {mnesia:write(
- rabbit_disk_queue,
- #dq_msg_loc { queue_and_seq_id = {Q, SeqId},
- msg_id = MsgId,
- is_delivered = IsDelivered,
- is_persistent = IsPersistent
- }, write),
- SeqId + 1}
- end, {ok, InitWriteSeqId}, PubMsgIds),
- WriteSeqId1
- end),
- {ok, State1} = remove_messages(Q, AckSeqIds, State),
- true = case PubMsgIds of
- [] -> true;
- _ -> ets:insert(Sequences, {Q, InitReadSeqId, WriteSeqId})
- end,
- gen_server2:reply(From, ok),
- State1.
-
internal_publish(Q, Message = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
IsDelivered, State) ->
@@ -588,12 +554,23 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
{ok, WriteSeqId - ReadSeqId, State1}
end.
-internal_delete_queue(Q, State) ->
- State1 = sync(State),
+internal_delete_queue(Q,
+ State = #dqstate { pending_commits = PendingCommits }) ->
+ %% remove pending commits
+ State1 = case dict:find(Q, PendingCommits) of
+ {ok, {PubMsgIds, _, _}} ->
+ ok = rabbit_msg_store:remove(
+ [MsgId || {MsgId, _, _} <- PubMsgIds]),
+ State # dqstate { pending_commits =
+ dict:erase(Q, PendingCommits) };
+ error ->
+ State
+ end,
+ %% remove everything undelivered
{ok, _Count, State2 = #dqstate { sequences = Sequences }} =
- internal_purge(Q, State1), %% remove everything undelivered
+ internal_purge(Q, State1),
true = ets:delete(Sequences, Q),
- %% now remove everything already delivered
+ %% remove everything already delivered
Objs = mnesia:dirty_match_object(
rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id = {Q, '_'}, _ = '_' }),
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index aa779e61ae..f973de5dde 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -34,7 +34,9 @@
-behaviour(gen_server2).
-export([start_link/3, write/2, read/1, contains/1, remove/1, release/1,
- needs_sync/1, sync/0, stop/0]).
+ sync/2, stop/0]).
+
+-export([sync/0]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -43,6 +45,7 @@
-define(MAX_READ_FILE_HANDLES, 256).
-define(FILE_SIZE_LIMIT, (256*1024*1024)).
+-define(SYNC_INTERVAL, 5). %% milliseconds
%%----------------------------------------------------------------------------
@@ -61,8 +64,7 @@
-spec(contains/1 :: (msg_id()) -> boolean()).
-spec(remove/1 :: ([msg_id()]) -> 'ok').
-spec(release/1 :: ([msg_id()]) -> 'ok').
--spec(needs_sync/1 :: ([msg_id()]) -> boolean()).
--spec(sync/0 :: () -> 'ok').
+-spec(sync/2 :: ([msg_id()], thunk(any())) -> 'ok').
-spec(stop/0 :: () -> 'ok').
-endif.
@@ -81,6 +83,8 @@
file_size_limit, %% how big can our files get?
read_file_handle_cache, %% file handle cache for reading
last_sync_offset, %% current_offset at the last time we sync'd
+ on_sync, %% pending sync requests
+ sync_timer_ref, %% TRef for our interval timer
message_cache %% ets message cache
}).
@@ -232,9 +236,9 @@ read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity).
contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity).
remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}).
release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}).
-needs_sync(MsgIds) -> gen_server2:call(?SERVER, {needs_sync, MsgIds}, infinity).
-sync() -> gen_server2:call(?SERVER, sync, infinity).
+sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}).
stop() -> gen_server2:call(?SERVER, stop, infinity).
+sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal
%%----------------------------------------------------------------------------
%% gen_server callbacks
@@ -262,6 +266,8 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
file_size_limit = ?FILE_SIZE_LIMIT,
read_file_handle_cache = HandleCache,
last_sync_offset = 0,
+ on_sync = [],
+ sync_timer_ref = undefined,
message_cache = MessageCache
},
@@ -330,21 +336,6 @@ handle_call({contains, MsgId}, _From, State) ->
#msg_location {} -> true
end, State);
-handle_call({needs_sync, _MsgIds}, _From,
- State = #msstate { current_dirty = false }) ->
- reply(false, State);
-handle_call({needs_sync, MsgIds}, _From,
- State = #msstate { current_file = CurFile,
- last_sync_offset = SyncOffset }) ->
- reply(lists:any(fun (MsgId) ->
- #msg_location { file = File, offset = Offset } =
- index_lookup(MsgId, State),
- File =:= CurFile andalso Offset >= SyncOffset
- end, MsgIds), State);
-
-handle_call(sync, _From, State) ->
- reply(ok, sync(State));
-
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
@@ -403,10 +394,32 @@ handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) ->
handle_cast({release, MsgIds}, State) ->
lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds),
- noreply(State).
+ noreply(State);
-handle_info(_Info, State) ->
- noreply(State).
+handle_cast({sync, _MsgIds, K},
+ State = #msstate { current_dirty = false }) ->
+ K(),
+ noreply(State);
+
+handle_cast({sync, MsgIds, K},
+ State = #msstate { current_file = CurFile,
+ last_sync_offset = SyncOffset,
+ on_sync = Syncs }) ->
+ case lists:any(fun (MsgId) ->
+ #msg_location { file = File, offset = Offset } =
+ index_lookup(MsgId, State),
+ File =:= CurFile andalso Offset >= SyncOffset
+ end, MsgIds) of
+ false -> K(),
+ noreply(State);
+ true -> noreply(State #msstate { on_sync = [K | Syncs] })
+ end;
+
+handle_cast(sync, State) ->
+ noreply(sync(State)).
+
+handle_info(timeout, State) ->
+ noreply(sync(State)).
terminate(_Reason, State = #msstate { msg_locations = MsgLocations,
file_summary = FileSummary,
@@ -434,9 +447,32 @@ code_change(_OldVsn, State, _Extra) ->
%% general helper functions
%%----------------------------------------------------------------------------
-noreply(State) -> {noreply, State}.
+noreply(State) ->
+ {State1, Timeout} = next_state(State),
+ {noreply, State1, Timeout}.
+
+reply(Reply, State) ->
+ {State1, Timeout} = next_state(State),
+ {reply, Reply, State1, Timeout}.
-reply(Reply, State) -> {reply, Reply, State}.
+next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) ->
+ {State, infinity};
+next_state(State = #msstate { sync_timer_ref = undefined }) ->
+ {start_sync_timer(State), 0};
+next_state(State = #msstate { on_sync = [] }) ->
+ {stop_sync_timer(State), infinity};
+next_state(State) ->
+ {State, 0}.
+
+start_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
+ {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, []),
+ State #msstate { sync_timer_ref = TRef }.
+
+stop_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
+ State;
+stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #msstate { sync_timer_ref = undefined }.
form_filename(Dir, Name) -> filename:join(Dir, Name).
@@ -465,9 +501,14 @@ truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
sync(State = #msstate { current_dirty = false }) ->
State;
sync(State = #msstate { current_file_handle = CurHdl,
- current_offset = CurOffset }) ->
+ current_offset = CurOffset,
+ on_sync = Syncs }) ->
+ State1 = stop_sync_timer(State),
ok = file:sync(CurHdl),
- State #msstate { current_dirty = false, last_sync_offset = CurOffset }.
+ lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
+ State1 #msstate { current_dirty = false,
+ last_sync_offset = CurOffset,
+ on_sync = [] }.
with_read_handle_at(File, Offset, Fun,
State = #msstate { dir = Dir,