summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-19 16:47:14 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-19 16:47:14 +0100
commite3503c1672da06391ff10a301ed751931fd89407 (patch)
tree625d7c6a7141b26040a2ff5a9ae28aa58a2c52af
parent1c3e228929a4828e8d731b63db05d31b4d66107d (diff)
downloadrabbitmq-server-git-e3503c1672da06391ff10a301ed751931fd89407.tar.gz
Fixed the commit bug. Really this should probably be in bug20470 but I really didn't want to have to deal with merging and the other information about this bug is in the above comments in 20980 so it's in here.
Now on commit, we test to see if we need to sync the current file. If so then we just store all the txn details in state for later dealing with. If not, we really do the commit there and then and reply. Interestingly, performance is actually better now than it was (see details in bug20470) but, eg, the one-in-one-out at altitude test has further reduced fsyncs from 21 to 6 and now completes in 2.1 seconds, not 3.6 (altitude of 1000, then 5000 @ one in, one out, then 1000 drain). All tests pass. We now guarantee that the messages will be fsync'd to disk _before_ anything is done to mnesia, in all cases of a txn_commit.
-rw-r--r--src/rabbit_disk_queue.erl99
1 files changed, 48 insertions, 51 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 6bd3e04657..0e43c38779 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -92,7 +92,7 @@
file_size_limit, %% how big can our files get?
read_file_handles, %% file handles for reading (LRU)
read_file_handles_limit, %% how many file handles can we open?
- on_sync_froms, %% list of commiters to run on sync (reversed)
+ on_sync_txns, %% list of commiters to run on sync (reversed)
commit_timer_ref, %% TRef for our interval timer
last_sync_offset, %% current_offset at the last time we sync'd
message_cache, %% ets message cache
@@ -423,7 +423,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
file_size_limit = FileSizeLimit,
read_file_handles = {dict:new(), gb_trees:empty()},
read_file_handles_limit = ReadFileHandlesLimit,
- on_sync_froms = [],
+ on_sync_txns = [],
commit_timer_ref = undefined,
last_sync_offset = 0,
message_cache = ets:new(?CACHE_ETS_NAME,
@@ -464,12 +464,9 @@ handle_call({phantom_deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, false, State),
reply(Result, State1);
handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) ->
- {Reply, State1} =
+ State1 =
internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, State),
- case Reply of
- true -> reply(ok, State1);
- false -> noreply(State1)
- end;
+ noreply(State1);
handle_call({purge, Q}, _From, State) ->
{ok, Count, State1} = internal_purge(Q, State),
reply(Count, State1);
@@ -683,12 +680,12 @@ noreply(NewState) ->
noreply(NewState, MinPri) ->
noreply1(start_memory_timer(NewState), MinPri).
-noreply1(NewState = #dqstate { on_sync_froms = [],
+noreply1(NewState = #dqstate { on_sync_txns = [],
commit_timer_ref = undefined }, MinPri) ->
{noreply, NewState, binary, MinPri};
noreply1(NewState = #dqstate { commit_timer_ref = undefined }, MinPri) ->
{noreply, start_commit_timer(NewState), 0, MinPri};
-noreply1(NewState = #dqstate { on_sync_froms = [] }, MinPri) ->
+noreply1(NewState = #dqstate { on_sync_txns = [] }, MinPri) ->
{noreply, stop_commit_timer(NewState), binary, MinPri};
noreply1(NewState, MinPri) ->
{noreply, NewState, 0, MinPri}.
@@ -699,12 +696,12 @@ reply(Reply, NewState) ->
reply(Reply, NewState, MinPri) ->
reply1(Reply, start_memory_timer(NewState), MinPri).
-reply1(Reply, NewState = #dqstate { on_sync_froms = [],
+reply1(Reply, NewState = #dqstate { on_sync_txns = [],
commit_timer_ref = undefined }, MinPri) ->
{reply, Reply, NewState, binary, MinPri};
reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }, MinPri) ->
{reply, Reply, start_commit_timer(NewState), 0, MinPri};
-reply1(Reply, NewState = #dqstate { on_sync_froms = [] }, MinPri) ->
+reply1(Reply, NewState = #dqstate { on_sync_txns = [] }, MinPri) ->
{reply, Reply, stop_commit_timer(NewState), binary, MinPri};
reply1(Reply, NewState, MinPri) ->
{reply, Reply, NewState, 0, MinPri}.
@@ -818,12 +815,12 @@ stop_commit_timer(State = #dqstate { commit_timer_ref = TRef }) ->
State #dqstate { commit_timer_ref = undefined }.
sync_current_file_handle(State = #dqstate { current_dirty = false,
- on_sync_froms = [] }) ->
+ on_sync_txns = [] }) ->
State;
sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl,
current_dirty = IsDirty,
current_offset = CurOffset,
- on_sync_froms = Froms,
+ on_sync_txns = Txns,
last_sync_offset = SyncOffset
}) ->
SyncOffset1 = case IsDirty of
@@ -831,10 +828,9 @@ sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl,
CurOffset;
false -> SyncOffset
end,
- lists:map(fun (From) -> gen_server2:reply(From, ok) end,
- lists:reverse(Froms)),
- State #dqstate { current_dirty = false, on_sync_froms = [],
- last_sync_offset = SyncOffset1 }.
+ State1 = lists:foldl(fun internal_do_tx_commit/2, State, lists:reverse(Txns)),
+ State1 #dqstate { current_dirty = false, on_sync_txns = [],
+ last_sync_offset = SyncOffset1 }.
msg_to_bin(Msg = #basic_message { content = Content }) ->
ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
@@ -1078,53 +1074,54 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent,
end.
internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
- State = #dqstate { sequences = Sequences,
- current_file_name = CurFile,
+ State = #dqstate { current_file_name = CurFile,
current_dirty = IsDirty,
- on_sync_froms = SyncFroms,
+ on_sync_txns = Txns,
last_sync_offset = SyncOffset
}) ->
+ NeedsSync = IsDirty andalso
+ lists:any(fun (MsgId) ->
+ [{MsgId, _RefCount, File, Offset,
+ _TotalSize, _IsPersistent}] =
+ dets_ets_lookup(State, MsgId),
+ File =:= CurFile andalso Offset >= SyncOffset
+ end, PubMsgIds),
+ TxnDetails = {Q, PubMsgIds, AckSeqIds, From},
+ case NeedsSync of
+ true ->
+ Txns1 = [TxnDetails | Txns],
+ State #dqstate { on_sync_txns = Txns1 };
+ false ->
+ internal_do_tx_commit(TxnDetails, State)
+ end.
+
+internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From},
+ State = #dqstate { sequences = Sequences }) ->
{InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q),
- WriteSeqId = InitWriteSeqId + erlang:length(PubMsgIds),
- {atomic, {InCurFile, WriteSeqId, State1}} =
+ {atomic, {WriteSeqId, State1}} =
mnesia:transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
- %% must deal with publishes first, if we didn't
- %% then we could end up acking a message before
- %% it's been published, which is clearly
- %% nonsense. I.e. in commit, do not do things in an
- %% order which _could_not_ have happened.
- {InCurFile1, WriteSeqId1} =
+ {ok, WriteSeqId1} =
lists:foldl(
- fun (MsgId, {InCurFileAcc, SeqId}) ->
- [{MsgId, _RefCount, File, Offset,
- _TotalSize, _IsPersistent}] =
- dets_ets_lookup(State, MsgId),
- ok = mnesia:write(
- rabbit_disk_queue,
- #dq_msg_loc { queue_and_seq_id =
- {Q, SeqId},
- msg_id = MsgId,
- is_delivered = false
- },
- write),
- {InCurFileAcc orelse (File =:= CurFile andalso
- Offset >= SyncOffset),
- SeqId + 1}
- end, {false, InitWriteSeqId}, PubMsgIds),
- {ok, State2} = remove_messages(Q, AckSeqIds, txn, State),
- {InCurFile1, WriteSeqId1, State2}
+ fun (MsgId, {ok, SeqId}) ->
+ {mnesia:write(
+ rabbit_disk_queue,
+ #dq_msg_loc { queue_and_seq_id = {Q, SeqId},
+ msg_id = MsgId,
+ is_delivered = false
+ }, write),
+ SeqId + 1}
+ end, {ok, InitWriteSeqId}, PubMsgIds),
+ {ok, State2} = remove_messages(Q, AckSeqIds, txn, State),
+ {WriteSeqId1, State2}
end),
true = case PubMsgIds of
[] -> true;
_ -> ets:insert(Sequences, {Q, InitReadSeqId, WriteSeqId})
end,
- if IsDirty andalso InCurFile ->
- {false, State1 #dqstate { on_sync_froms = [From | SyncFroms] }};
- true ->
- {true, State1}
- end.
+ gen_server2:reply(From, ok),
+ State1.
internal_publish(Q, Message = #basic_message { guid = MsgId },
IsDelivered, State) ->