diff options
| author | Matthias Radestock <matthias@lshift.net> | 2009-10-08 03:09:49 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2009-10-08 03:09:49 +0100 |
| commit | bfb9afede0a4393a47664de78f42321e0b01ba76 (patch) | |
| tree | 8f6ff2443aeea16b4d0518d8a6321630042a7635 /src | |
| parent | 98a679a1e632dbeb768e8dc436d9bf7aac91e582 (diff) | |
| download | rabbitmq-server-git-bfb9afede0a4393a47664de78f42321e0b01ba76.tar.gz | |
make msg_store responsible for sync'ing
The API to the msg_store has changed: now instead of asking whether a
sync is needed for a set of msg ids, and subsequently requesting a
sync, we request a sync for a set of msg ids and supply a callback
that is invoked when that sync is done. That way the msg_store can
make its own decisions on when to sync, and less logic is required by
callers.
During queue deletion we must remove *all* queue messages from the
store, including those that are part of committed transactions for
which the disk_queue has not yet received the sync callback. To do
that we keep a record of these messages in a dict in the state. The
dict also ensures that we do not act on a sync callback involving a
queue which has since been deleted and perhaps recreated.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 183 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 95 |
2 files changed, 148 insertions, 130 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 02a8ed8c3c..893fae8e5c 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -44,8 +44,6 @@ prefetch/1 ]). --export([filesync/0]). - -export([stop/0, stop_and_obliterate/0]). %%---------------------------------------------------------------------------- @@ -63,16 +61,14 @@ is_persistent = true }). --define(SYNC_INTERVAL, 5). %% milliseconds -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). -define(SERVER, ?MODULE). -record(dqstate, - {sequences, %% next read and write for each q - on_sync_txns, %% list of commiters to run on sync (reversed) - commit_timer_ref %% TRef for our interval timer + { sequences, %% next read and write for each q + pending_commits %% dict of txns waiting for msg_store }). %%---------------------------------------------------------------------------- @@ -109,7 +105,6 @@ A, queue_name()) -> A). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). --spec(filesync/0 :: () -> 'ok'). -endif. @@ -173,8 +168,10 @@ stop() -> stop_and_obliterate() -> gen_server2:call(?SERVER, stop_vaporise, infinity). -filesync() -> - gen_server2:pcall(?SERVER, 9, filesync). +%% private + +finalise_commit(TxId) -> + gen_server2:cast(?SERVER, {finalise_commit, TxId}). %%---------------------------------------------------------------------------- %% gen_server behaviour @@ -203,9 +200,7 @@ init([]) -> Sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]), ok = extract_sequence_numbers(Sequences), - State = #dqstate { sequences = Sequences, - on_sync_txns = [], - commit_timer_ref = undefined }, + State = #dqstate { sequences = Sequences, pending_commits = dict:new() }, {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -222,8 +217,6 @@ handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) -> handle_call({purge, Q}, _From, State) -> {ok, Count, State1} = internal_purge(Q, State), reply(Count, State1); -handle_call(filesync, _From, State) -> - reply(ok, sync(State)); handle_call({delete_queue, Q}, From, State) -> gen_server2:reply(From, ok), {ok, State1} = internal_delete_queue(Q, State), @@ -275,13 +268,12 @@ handle_cast({prefetch, Q, From}, State) -> internal_fetch_attributes(Q, ignore_delivery, State1); false -> ok end, - noreply(State1). + noreply(State1); +handle_cast({finalise_commit, TxId}, State) -> + noreply(finalise_commit(TxId, State)). handle_info({'EXIT', _Pid, Reason}, State) -> - {stop, Reason, State}; -handle_info(timeout, State) -> - %% must have commit_timer set, so timeout was 0, and we're not hibernating - noreply(sync(State)). + {stop, Reason, State}. terminate(_Reason, State) -> State1 = shutdown(State), @@ -291,10 +283,9 @@ terminate(_Reason, State) -> shutdown(State = #dqstate { sequences = undefined }) -> State; shutdown(State = #dqstate { sequences = Sequences }) -> - State1 = stop_commit_timer(State), ok = rabbit_msg_store:stop(), ets:delete(Sequences), - State1 #dqstate { sequences = undefined }. + State #dqstate { sequences = undefined }. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -304,28 +295,10 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- noreply(State) -> - noreply1(State). - -noreply1(State) -> - {State1, Timeout} = next_state(State), - {noreply, State1, Timeout}. + {noreply, State, hibernate}. reply(Reply, State) -> - reply1(Reply, State). - -reply1(Reply, State) -> - {State1, Timeout} = next_state(State), - {reply, Reply, State1, Timeout}. - -next_state(State = #dqstate { on_sync_txns = [], - commit_timer_ref = undefined }) -> - {State, hibernate}; -next_state(State = #dqstate { commit_timer_ref = undefined }) -> - {start_commit_timer(State), 0}; -next_state(State = #dqstate { on_sync_txns = [] }) -> - {stop_commit_timer(State), hibernate}; -next_state(State) -> - {State, 0}. + {reply, Reply, State, hibernate}. form_filename(Name) -> filename:join(base_directory(), Name). @@ -339,25 +312,6 @@ sequence_lookup(Sequences, Q) -> [{_, ReadSeqId, WriteSeqId}] -> {ReadSeqId, WriteSeqId} end. -start_commit_timer(State = #dqstate { commit_timer_ref = undefined }) -> - {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, filesync, []), - State #dqstate { commit_timer_ref = TRef }. - -stop_commit_timer(State = #dqstate { commit_timer_ref = undefined }) -> - State; -stop_commit_timer(State = #dqstate { commit_timer_ref = TRef }) -> - {ok, cancel} = timer:cancel(TRef), - State #dqstate { commit_timer_ref = undefined }. - -sync(State = #dqstate { on_sync_txns = Txns }) -> - ok = rabbit_msg_store:sync(), - case Txns of - [] -> State; - _ -> lists:foldl(fun internal_do_tx_commit/2, - State #dqstate { on_sync_txns = [] }, - lists:reverse(Txns)) - end. - %%---------------------------------------------------------------------------- %% internal functions %%---------------------------------------------------------------------------- @@ -404,10 +358,9 @@ maybe_advance(pop_queue, Sequences, Q, ReadSeqId, WriteSeqId) -> true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}), ok. -internal_foldl(Q, Fun, Init, State) -> - State1 = #dqstate { sequences = Sequences } = sync(State), +internal_foldl(Q, Fun, Init, State = #dqstate { sequences = Sequences }) -> {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), - internal_foldl(Q, WriteSeqId, Fun, State1, Init, ReadSeqId). + internal_foldl(Q, WriteSeqId, Fun, State, Init, ReadSeqId). internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) -> {ok, Acc, State}; @@ -438,44 +391,57 @@ internal_tx_publish(Message = #basic_message { guid = MsgId, {ok, State}. internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, - State = #dqstate { on_sync_txns = Txns }) -> - TxnDetails = {Q, PubMsgIds, AckSeqIds, From}, - case rabbit_msg_store:needs_sync( - [MsgId || {MsgId, _IsDelivered, _IsPersistent} <- PubMsgIds]) of - true -> Txns1 = [TxnDetails | Txns], - State #dqstate { on_sync_txns = Txns1 }; - false -> internal_do_tx_commit(TxnDetails, State) + State = #dqstate { pending_commits = PendingCommits }) -> + ok = rabbit_msg_store:sync([MsgId || {MsgId, _, _} <- PubMsgIds], + fun () -> finalise_commit({Q, From}) end), + PendingCommits1 = dict:store(Q, {PubMsgIds, AckSeqIds, From}, + PendingCommits), + State #dqstate { pending_commits = PendingCommits1 }. + +finalise_commit({Q, From}, + State = #dqstate { sequences = Sequences, + pending_commits = PendingCommits }) -> + case dict:find(Q, PendingCommits) of + {ok, {PubMsgIds, AckSeqIds, From}} -> + {InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q), + WriteSeqId = + rabbit_misc:execute_mnesia_transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + {ok, WriteSeqId1} = + lists:foldl( + fun ({MsgId, IsDelivered, IsPersistent}, + {ok, SeqId}) -> + {mnesia:write( + rabbit_disk_queue, + #dq_msg_loc { + queue_and_seq_id = {Q, SeqId}, + msg_id = MsgId, + is_delivered = IsDelivered, + is_persistent = IsPersistent + }, write), + SeqId + 1} + end, {ok, InitWriteSeqId}, PubMsgIds), + WriteSeqId1 + end), + {ok, State1} = remove_messages(Q, AckSeqIds, State), + true = case PubMsgIds of + [] -> true; + _ -> ets:insert(Sequences, + {Q, InitReadSeqId, WriteSeqId}) + end, + gen_server2:reply(From, ok), + State1 # dqstate { pending_commits = + dict:erase(Q, PendingCommits) }; + {ok, _} -> + %% sync notification for a deleted queue which has since + %% been recreated + State; + error -> + %% sync notification for a deleted queue + State end. -internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From}, - State = #dqstate { sequences = Sequences }) -> - {InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q), - WriteSeqId = - rabbit_misc:execute_mnesia_transaction( - fun() -> - ok = mnesia:write_lock_table(rabbit_disk_queue), - {ok, WriteSeqId1} = - lists:foldl( - fun ({MsgId, IsDelivered, IsPersistent}, {ok, SeqId}) -> - {mnesia:write( - rabbit_disk_queue, - #dq_msg_loc { queue_and_seq_id = {Q, SeqId}, - msg_id = MsgId, - is_delivered = IsDelivered, - is_persistent = IsPersistent - }, write), - SeqId + 1} - end, {ok, InitWriteSeqId}, PubMsgIds), - WriteSeqId1 - end), - {ok, State1} = remove_messages(Q, AckSeqIds, State), - true = case PubMsgIds of - [] -> true; - _ -> ets:insert(Sequences, {Q, InitReadSeqId, WriteSeqId}) - end, - gen_server2:reply(From, ok), - State1. - internal_publish(Q, Message = #basic_message { guid = MsgId, is_persistent = IsPersistent }, IsDelivered, State) -> @@ -588,12 +554,23 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) -> {ok, WriteSeqId - ReadSeqId, State1} end. -internal_delete_queue(Q, State) -> - State1 = sync(State), +internal_delete_queue(Q, + State = #dqstate { pending_commits = PendingCommits }) -> + %% remove pending commits + State1 = case dict:find(Q, PendingCommits) of + {ok, {PubMsgIds, _, _}} -> + ok = rabbit_msg_store:remove( + [MsgId || {MsgId, _, _} <- PubMsgIds]), + State # dqstate { pending_commits = + dict:erase(Q, PendingCommits) }; + error -> + State + end, + %% remove everything undelivered {ok, _Count, State2 = #dqstate { sequences = Sequences }} = - internal_purge(Q, State1), %% remove everything undelivered + internal_purge(Q, State1), true = ets:delete(Sequences, Q), - %% now remove everything already delivered + %% remove everything already delivered Objs = mnesia:dirty_match_object( rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, '_'}, _ = '_' }), diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index aa779e61ae..f973de5dde 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,7 +34,9 @@ -behaviour(gen_server2). -export([start_link/3, write/2, read/1, contains/1, remove/1, release/1, - needs_sync/1, sync/0, stop/0]). + sync/2, stop/0]). + +-export([sync/0]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -43,6 +45,7 @@ -define(MAX_READ_FILE_HANDLES, 256). -define(FILE_SIZE_LIMIT, (256*1024*1024)). +-define(SYNC_INTERVAL, 5). %% milliseconds %%---------------------------------------------------------------------------- @@ -61,8 +64,7 @@ -spec(contains/1 :: (msg_id()) -> boolean()). -spec(remove/1 :: ([msg_id()]) -> 'ok'). -spec(release/1 :: ([msg_id()]) -> 'ok'). --spec(needs_sync/1 :: ([msg_id()]) -> boolean()). --spec(sync/0 :: () -> 'ok'). +-spec(sync/2 :: ([msg_id()], thunk(any())) -> 'ok'). -spec(stop/0 :: () -> 'ok'). -endif. @@ -81,6 +83,8 @@ file_size_limit, %% how big can our files get? read_file_handle_cache, %% file handle cache for reading last_sync_offset, %% current_offset at the last time we sync'd + on_sync, %% pending sync requests + sync_timer_ref, %% TRef for our interval timer message_cache %% ets message cache }). @@ -232,9 +236,9 @@ read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity). contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity). remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}). release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}). -needs_sync(MsgIds) -> gen_server2:call(?SERVER, {needs_sync, MsgIds}, infinity). -sync() -> gen_server2:call(?SERVER, sync, infinity). +sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}). stop() -> gen_server2:call(?SERVER, stop, infinity). +sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal %%---------------------------------------------------------------------------- %% gen_server callbacks @@ -262,6 +266,8 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> file_size_limit = ?FILE_SIZE_LIMIT, read_file_handle_cache = HandleCache, last_sync_offset = 0, + on_sync = [], + sync_timer_ref = undefined, message_cache = MessageCache }, @@ -330,21 +336,6 @@ handle_call({contains, MsgId}, _From, State) -> #msg_location {} -> true end, State); -handle_call({needs_sync, _MsgIds}, _From, - State = #msstate { current_dirty = false }) -> - reply(false, State); -handle_call({needs_sync, MsgIds}, _From, - State = #msstate { current_file = CurFile, - last_sync_offset = SyncOffset }) -> - reply(lists:any(fun (MsgId) -> - #msg_location { file = File, offset = Offset } = - index_lookup(MsgId, State), - File =:= CurFile andalso Offset >= SyncOffset - end, MsgIds), State); - -handle_call(sync, _From, State) -> - reply(ok, sync(State)); - handle_call(stop, _From, State) -> {stop, normal, ok, State}. @@ -403,10 +394,32 @@ handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) -> handle_cast({release, MsgIds}, State) -> lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds), - noreply(State). + noreply(State); -handle_info(_Info, State) -> - noreply(State). +handle_cast({sync, _MsgIds, K}, + State = #msstate { current_dirty = false }) -> + K(), + noreply(State); + +handle_cast({sync, MsgIds, K}, + State = #msstate { current_file = CurFile, + last_sync_offset = SyncOffset, + on_sync = Syncs }) -> + case lists:any(fun (MsgId) -> + #msg_location { file = File, offset = Offset } = + index_lookup(MsgId, State), + File =:= CurFile andalso Offset >= SyncOffset + end, MsgIds) of + false -> K(), + noreply(State); + true -> noreply(State #msstate { on_sync = [K | Syncs] }) + end; + +handle_cast(sync, State) -> + noreply(sync(State)). + +handle_info(timeout, State) -> + noreply(sync(State)). terminate(_Reason, State = #msstate { msg_locations = MsgLocations, file_summary = FileSummary, @@ -434,9 +447,32 @@ code_change(_OldVsn, State, _Extra) -> %% general helper functions %%---------------------------------------------------------------------------- -noreply(State) -> {noreply, State}. +noreply(State) -> + {State1, Timeout} = next_state(State), + {noreply, State1, Timeout}. + +reply(Reply, State) -> + {State1, Timeout} = next_state(State), + {reply, Reply, State1, Timeout}. -reply(Reply, State) -> {reply, Reply, State}. +next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) -> + {State, infinity}; +next_state(State = #msstate { sync_timer_ref = undefined }) -> + {start_sync_timer(State), 0}; +next_state(State = #msstate { on_sync = [] }) -> + {stop_sync_timer(State), infinity}; +next_state(State) -> + {State, 0}. + +start_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> + {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, []), + State #msstate { sync_timer_ref = TRef }. + +stop_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> + State; +stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #msstate { sync_timer_ref = undefined }. form_filename(Dir, Name) -> filename:join(Dir, Name). @@ -465,9 +501,14 @@ truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> sync(State = #msstate { current_dirty = false }) -> State; sync(State = #msstate { current_file_handle = CurHdl, - current_offset = CurOffset }) -> + current_offset = CurOffset, + on_sync = Syncs }) -> + State1 = stop_sync_timer(State), ok = file:sync(CurHdl), - State #msstate { current_dirty = false, last_sync_offset = CurOffset }. + lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), + State1 #msstate { current_dirty = false, + last_sync_offset = CurOffset, + on_sync = [] }. with_read_handle_at(File, Offset, Fun, State = #msstate { dir = Dir, |
