summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl51
1 files changed, 31 insertions, 20 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 1e2226bb02..c6076635f6 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -85,7 +85,7 @@
file_size_limit, %% how big can our files get?
read_file_handles, %% file handles for reading (LRU)
read_file_handles_limit, %% how many file handles can we open?
- on_sync_functions, %% list of functions to run on sync (reversed)
+ on_sync_froms, %% list of commiters to run on sync (reversed)
timer_ref %% TRef for our interval timer
}).
@@ -395,7 +395,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
file_size_limit = FileSizeLimit,
read_file_handles = {dict:new(), gb_trees:empty()},
read_file_handles_limit = ReadFileHandlesLimit,
- on_sync_functions = [],
+ on_sync_froms = [],
timer_ref = TRef
},
{ok, State1 = #dqstate { current_file_name = CurrentName,
@@ -427,8 +427,12 @@ handle_call({phantom_deliver, Q}, _From, State) ->
reply(Result, State1);
handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) ->
PubMsgSeqIds = zip_with_tail(PubMsgIds, {duplicate, next}),
- {ok, State1} = internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From, State),
- noreply(State1);
+ {Reply, State1} =
+ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From, State),
+ case Reply of
+ true -> reply(ok, State1);
+ false -> noreply(State1)
+ end;
handle_call({purge, Q}, _From, State) ->
{ok, Count, State1} = internal_purge(Q, State),
reply(Count, State1);
@@ -703,16 +707,20 @@ sequence_lookup(Sequences, Q) ->
{ReadSeqId, WriteSeqId, Length}
end.
+sync_current_file_handle(State = #dqstate { current_dirty = false,
+ on_sync_froms = [] }) ->
+ State;
sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl,
current_dirty = IsDirty,
- on_sync_functions = Funcs
+ on_sync_froms = Froms
}) ->
ok = case IsDirty of
true -> file:sync(CurHdl);
false -> ok
end,
- lists:map(fun (Fun) -> Fun() end, lists:reverse(Funcs)),
- State #dqstate { current_dirty = false, on_sync_functions = [] }.
+ lists:map(fun (From) -> gen_server2:reply(From, ok) end,
+ lists:reverse(Froms)),
+ State #dqstate { current_dirty = false, on_sync_froms = [] }.
%% ---- INTERNAL RAW FUNCTIONS ----
@@ -858,7 +866,9 @@ internal_tx_publish(MsgId, MsgBody,
%% can call this with PubMsgSeqIds as zip(PubMsgIds, duplicate(N, next))
internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From,
State = #dqstate { sequences = Sequences,
- on_sync_functions = SyncFuncs
+ current_file_name = CurFile,
+ current_dirty = IsDirty,
+ on_sync_froms = SyncFroms
}) ->
{PubList, PubAcc, ReadSeqId, Length} =
case PubMsgSeqIds of
@@ -871,7 +881,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From,
{ zip_with_tail(PubMsgSeqIds, {last, {next, next}}),
InitWriteSeqId, InitReadSeqId1, InitLength}
end,
- {atomic, {WriteSeqId, State1}} =
+ {atomic, {InCurFile, WriteSeqId, State1}} =
mnesia:transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
@@ -880,11 +890,11 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From,
%% it's been published, which is clearly
%% nonsense. I.e. in commit, do not do things in an
%% order which _could_not_ have happened.
- WriteSeqId1 =
+ {InCurFile1, WriteSeqId1} =
lists:foldl(
fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}},
- ExpectedSeqId) ->
- [{MsgId, _RefCount, _File, _Offset,
+ {InCurFileAcc, ExpectedSeqId}) ->
+ [{MsgId, _RefCount, File, _Offset,
_TotalSize}] = dets_ets_lookup(State, MsgId),
SeqId1 = adjust_last_msg_seq_id(
Q, ExpectedSeqId, SeqId, write),
@@ -899,21 +909,22 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From,
next_seq_id = NextSeqId1
},
write),
- NextSeqId1
- end, PubAcc, PubList),
+ {InCurFileAcc orelse File =:= CurFile,
+ NextSeqId1}
+ end, {false, PubAcc}, PubList),
{ok, State2} = remove_messages(Q, AckSeqIds, txn, State),
- {WriteSeqId1, State2}
+ {InCurFile1, WriteSeqId1, State2}
end),
true = case PubList of
[] -> true;
_ -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId,
Length + erlang:length(PubList)})
end,
- {ok,
- State1 #dqstate { on_sync_functions = [fun() ->
- gen_server2:reply(From, ok)
- end | SyncFuncs]}
- }.
+ if IsDirty andalso InCurFile ->
+ {false, State1 #dqstate { on_sync_froms = [From | SyncFroms] }};
+ true ->
+ {true, State1}
+ end.
%% SeqId can be 'next'
internal_publish(Q, MsgId, SeqId, MsgBody, IsDelivered, State) ->