diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 99 |
1 files changed, 48 insertions, 51 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 6bd3e04657..0e43c38779 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -92,7 +92,7 @@ file_size_limit, %% how big can our files get? read_file_handles, %% file handles for reading (LRU) read_file_handles_limit, %% how many file handles can we open? - on_sync_froms, %% list of commiters to run on sync (reversed) + on_sync_txns, %% list of commiters to run on sync (reversed) commit_timer_ref, %% TRef for our interval timer last_sync_offset, %% current_offset at the last time we sync'd message_cache, %% ets message cache @@ -423,7 +423,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> file_size_limit = FileSizeLimit, read_file_handles = {dict:new(), gb_trees:empty()}, read_file_handles_limit = ReadFileHandlesLimit, - on_sync_froms = [], + on_sync_txns = [], commit_timer_ref = undefined, last_sync_offset = 0, message_cache = ets:new(?CACHE_ETS_NAME, @@ -464,12 +464,9 @@ handle_call({phantom_deliver, Q}, _From, State) -> {ok, Result, State1} = internal_deliver(Q, false, State), reply(Result, State1); handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) -> - {Reply, State1} = + State1 = internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, State), - case Reply of - true -> reply(ok, State1); - false -> noreply(State1) - end; + noreply(State1); handle_call({purge, Q}, _From, State) -> {ok, Count, State1} = internal_purge(Q, State), reply(Count, State1); @@ -683,12 +680,12 @@ noreply(NewState) -> noreply(NewState, MinPri) -> noreply1(start_memory_timer(NewState), MinPri). -noreply1(NewState = #dqstate { on_sync_froms = [], +noreply1(NewState = #dqstate { on_sync_txns = [], commit_timer_ref = undefined }, MinPri) -> {noreply, NewState, binary, MinPri}; noreply1(NewState = #dqstate { commit_timer_ref = undefined }, MinPri) -> {noreply, start_commit_timer(NewState), 0, MinPri}; -noreply1(NewState = #dqstate { on_sync_froms = [] }, MinPri) -> +noreply1(NewState = #dqstate { on_sync_txns = [] }, MinPri) -> {noreply, stop_commit_timer(NewState), binary, MinPri}; noreply1(NewState, MinPri) -> {noreply, NewState, 0, MinPri}. @@ -699,12 +696,12 @@ reply(Reply, NewState) -> reply(Reply, NewState, MinPri) -> reply1(Reply, start_memory_timer(NewState), MinPri). -reply1(Reply, NewState = #dqstate { on_sync_froms = [], +reply1(Reply, NewState = #dqstate { on_sync_txns = [], commit_timer_ref = undefined }, MinPri) -> {reply, Reply, NewState, binary, MinPri}; reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }, MinPri) -> {reply, Reply, start_commit_timer(NewState), 0, MinPri}; -reply1(Reply, NewState = #dqstate { on_sync_froms = [] }, MinPri) -> +reply1(Reply, NewState = #dqstate { on_sync_txns = [] }, MinPri) -> {reply, Reply, stop_commit_timer(NewState), binary, MinPri}; reply1(Reply, NewState, MinPri) -> {reply, Reply, NewState, 0, MinPri}. @@ -818,12 +815,12 @@ stop_commit_timer(State = #dqstate { commit_timer_ref = TRef }) -> State #dqstate { commit_timer_ref = undefined }. sync_current_file_handle(State = #dqstate { current_dirty = false, - on_sync_froms = [] }) -> + on_sync_txns = [] }) -> State; sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl, current_dirty = IsDirty, current_offset = CurOffset, - on_sync_froms = Froms, + on_sync_txns = Txns, last_sync_offset = SyncOffset }) -> SyncOffset1 = case IsDirty of @@ -831,10 +828,9 @@ sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl, CurOffset; false -> SyncOffset end, - lists:map(fun (From) -> gen_server2:reply(From, ok) end, - lists:reverse(Froms)), - State #dqstate { current_dirty = false, on_sync_froms = [], - last_sync_offset = SyncOffset1 }. + State1 = lists:foldl(fun internal_do_tx_commit/2, State, lists:reverse(Txns)), + State1 #dqstate { current_dirty = false, on_sync_txns = [], + last_sync_offset = SyncOffset1 }. msg_to_bin(Msg = #basic_message { content = Content }) -> ClearedContent = rabbit_binary_parser:clear_decoded_content(Content), @@ -1078,53 +1074,54 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent, end. internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, - State = #dqstate { sequences = Sequences, - current_file_name = CurFile, + State = #dqstate { current_file_name = CurFile, current_dirty = IsDirty, - on_sync_froms = SyncFroms, + on_sync_txns = Txns, last_sync_offset = SyncOffset }) -> + NeedsSync = IsDirty andalso + lists:any(fun (MsgId) -> + [{MsgId, _RefCount, File, Offset, + _TotalSize, _IsPersistent}] = + dets_ets_lookup(State, MsgId), + File =:= CurFile andalso Offset >= SyncOffset + end, PubMsgIds), + TxnDetails = {Q, PubMsgIds, AckSeqIds, From}, + case NeedsSync of + true -> + Txns1 = [TxnDetails | Txns], + State #dqstate { on_sync_txns = Txns1 }; + false -> + internal_do_tx_commit(TxnDetails, State) + end. + +internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From}, + State = #dqstate { sequences = Sequences }) -> {InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q), - WriteSeqId = InitWriteSeqId + erlang:length(PubMsgIds), - {atomic, {InCurFile, WriteSeqId, State1}} = + {atomic, {WriteSeqId, State1}} = mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), - %% must deal with publishes first, if we didn't - %% then we could end up acking a message before - %% it's been published, which is clearly - %% nonsense. I.e. in commit, do not do things in an - %% order which _could_not_ have happened. - {InCurFile1, WriteSeqId1} = + {ok, WriteSeqId1} = lists:foldl( - fun (MsgId, {InCurFileAcc, SeqId}) -> - [{MsgId, _RefCount, File, Offset, - _TotalSize, _IsPersistent}] = - dets_ets_lookup(State, MsgId), - ok = mnesia:write( - rabbit_disk_queue, - #dq_msg_loc { queue_and_seq_id = - {Q, SeqId}, - msg_id = MsgId, - is_delivered = false - }, - write), - {InCurFileAcc orelse (File =:= CurFile andalso - Offset >= SyncOffset), - SeqId + 1} - end, {false, InitWriteSeqId}, PubMsgIds), - {ok, State2} = remove_messages(Q, AckSeqIds, txn, State), - {InCurFile1, WriteSeqId1, State2} + fun (MsgId, {ok, SeqId}) -> + {mnesia:write( + rabbit_disk_queue, + #dq_msg_loc { queue_and_seq_id = {Q, SeqId}, + msg_id = MsgId, + is_delivered = false + }, write), + SeqId + 1} + end, {ok, InitWriteSeqId}, PubMsgIds), + {ok, State2} = remove_messages(Q, AckSeqIds, txn, State), + {WriteSeqId1, State2} end), true = case PubMsgIds of [] -> true; _ -> ets:insert(Sequences, {Q, InitReadSeqId, WriteSeqId}) end, - if IsDirty andalso InCurFile -> - {false, State1 #dqstate { on_sync_froms = [From | SyncFroms] }}; - true -> - {true, State1} - end. + gen_server2:reply(From, ok), + State1. internal_publish(Q, Message = #basic_message { guid = MsgId }, IsDelivered, State) -> |
