diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 51 |
1 files changed, 31 insertions, 20 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 1e2226bb02..c6076635f6 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -85,7 +85,7 @@ file_size_limit, %% how big can our files get? read_file_handles, %% file handles for reading (LRU) read_file_handles_limit, %% how many file handles can we open? - on_sync_functions, %% list of functions to run on sync (reversed) + on_sync_froms, %% list of commiters to run on sync (reversed) timer_ref %% TRef for our interval timer }). @@ -395,7 +395,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> file_size_limit = FileSizeLimit, read_file_handles = {dict:new(), gb_trees:empty()}, read_file_handles_limit = ReadFileHandlesLimit, - on_sync_functions = [], + on_sync_froms = [], timer_ref = TRef }, {ok, State1 = #dqstate { current_file_name = CurrentName, @@ -427,8 +427,12 @@ handle_call({phantom_deliver, Q}, _From, State) -> reply(Result, State1); handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) -> PubMsgSeqIds = zip_with_tail(PubMsgIds, {duplicate, next}), - {ok, State1} = internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From, State), - noreply(State1); + {Reply, State1} = + internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From, State), + case Reply of + true -> reply(ok, State1); + false -> noreply(State1) + end; handle_call({purge, Q}, _From, State) -> {ok, Count, State1} = internal_purge(Q, State), reply(Count, State1); @@ -703,16 +707,20 @@ sequence_lookup(Sequences, Q) -> {ReadSeqId, WriteSeqId, Length} end. +sync_current_file_handle(State = #dqstate { current_dirty = false, + on_sync_froms = [] }) -> + State; sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl, current_dirty = IsDirty, - on_sync_functions = Funcs + on_sync_froms = Froms }) -> ok = case IsDirty of true -> file:sync(CurHdl); false -> ok end, - lists:map(fun (Fun) -> Fun() end, lists:reverse(Funcs)), - State #dqstate { current_dirty = false, on_sync_functions = [] }. + lists:map(fun (From) -> gen_server2:reply(From, ok) end, + lists:reverse(Froms)), + State #dqstate { current_dirty = false, on_sync_froms = [] }. %% ---- INTERNAL RAW FUNCTIONS ---- @@ -858,7 +866,9 @@ internal_tx_publish(MsgId, MsgBody, %% can call this with PubMsgSeqIds as zip(PubMsgIds, duplicate(N, next)) internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From, State = #dqstate { sequences = Sequences, - on_sync_functions = SyncFuncs + current_file_name = CurFile, + current_dirty = IsDirty, + on_sync_froms = SyncFroms }) -> {PubList, PubAcc, ReadSeqId, Length} = case PubMsgSeqIds of @@ -871,7 +881,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From, { zip_with_tail(PubMsgSeqIds, {last, {next, next}}), InitWriteSeqId, InitReadSeqId1, InitLength} end, - {atomic, {WriteSeqId, State1}} = + {atomic, {InCurFile, WriteSeqId, State1}} = mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), @@ -880,11 +890,11 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From, %% it's been published, which is clearly %% nonsense. I.e. in commit, do not do things in an %% order which _could_not_ have happened. - WriteSeqId1 = + {InCurFile1, WriteSeqId1} = lists:foldl( fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}}, - ExpectedSeqId) -> - [{MsgId, _RefCount, _File, _Offset, + {InCurFileAcc, ExpectedSeqId}) -> + [{MsgId, _RefCount, File, _Offset, _TotalSize}] = dets_ets_lookup(State, MsgId), SeqId1 = adjust_last_msg_seq_id( Q, ExpectedSeqId, SeqId, write), @@ -899,21 +909,22 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From, next_seq_id = NextSeqId1 }, write), - NextSeqId1 - end, PubAcc, PubList), + {InCurFileAcc orelse File =:= CurFile, + NextSeqId1} + end, {false, PubAcc}, PubList), {ok, State2} = remove_messages(Q, AckSeqIds, txn, State), - {WriteSeqId1, State2} + {InCurFile1, WriteSeqId1, State2} end), true = case PubList of [] -> true; _ -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId, Length + erlang:length(PubList)}) end, - {ok, - State1 #dqstate { on_sync_functions = [fun() -> - gen_server2:reply(From, ok) - end | SyncFuncs]} - }. + if IsDirty andalso InCurFile -> + {false, State1 #dqstate { on_sync_froms = [From | SyncFroms] }}; + true -> + {true, State1} + end. %% SeqId can be 'next' internal_publish(Q, MsgId, SeqId, MsgBody, IsDelivered, State) -> |
