summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl99
1 files changed, 48 insertions, 51 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 6bd3e04657..0e43c38779 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -92,7 +92,7 @@
file_size_limit, %% how big can our files get?
read_file_handles, %% file handles for reading (LRU)
read_file_handles_limit, %% how many file handles can we open?
- on_sync_froms, %% list of commiters to run on sync (reversed)
+ on_sync_txns, %% list of commiters to run on sync (reversed)
commit_timer_ref, %% TRef for our interval timer
last_sync_offset, %% current_offset at the last time we sync'd
message_cache, %% ets message cache
@@ -423,7 +423,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
file_size_limit = FileSizeLimit,
read_file_handles = {dict:new(), gb_trees:empty()},
read_file_handles_limit = ReadFileHandlesLimit,
- on_sync_froms = [],
+ on_sync_txns = [],
commit_timer_ref = undefined,
last_sync_offset = 0,
message_cache = ets:new(?CACHE_ETS_NAME,
@@ -464,12 +464,9 @@ handle_call({phantom_deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, false, State),
reply(Result, State1);
handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) ->
- {Reply, State1} =
+ State1 =
internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, State),
- case Reply of
- true -> reply(ok, State1);
- false -> noreply(State1)
- end;
+ noreply(State1);
handle_call({purge, Q}, _From, State) ->
{ok, Count, State1} = internal_purge(Q, State),
reply(Count, State1);
@@ -683,12 +680,12 @@ noreply(NewState) ->
noreply(NewState, MinPri) ->
noreply1(start_memory_timer(NewState), MinPri).
-noreply1(NewState = #dqstate { on_sync_froms = [],
+noreply1(NewState = #dqstate { on_sync_txns = [],
commit_timer_ref = undefined }, MinPri) ->
{noreply, NewState, binary, MinPri};
noreply1(NewState = #dqstate { commit_timer_ref = undefined }, MinPri) ->
{noreply, start_commit_timer(NewState), 0, MinPri};
-noreply1(NewState = #dqstate { on_sync_froms = [] }, MinPri) ->
+noreply1(NewState = #dqstate { on_sync_txns = [] }, MinPri) ->
{noreply, stop_commit_timer(NewState), binary, MinPri};
noreply1(NewState, MinPri) ->
{noreply, NewState, 0, MinPri}.
@@ -699,12 +696,12 @@ reply(Reply, NewState) ->
reply(Reply, NewState, MinPri) ->
reply1(Reply, start_memory_timer(NewState), MinPri).
-reply1(Reply, NewState = #dqstate { on_sync_froms = [],
+reply1(Reply, NewState = #dqstate { on_sync_txns = [],
commit_timer_ref = undefined }, MinPri) ->
{reply, Reply, NewState, binary, MinPri};
reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }, MinPri) ->
{reply, Reply, start_commit_timer(NewState), 0, MinPri};
-reply1(Reply, NewState = #dqstate { on_sync_froms = [] }, MinPri) ->
+reply1(Reply, NewState = #dqstate { on_sync_txns = [] }, MinPri) ->
{reply, Reply, stop_commit_timer(NewState), binary, MinPri};
reply1(Reply, NewState, MinPri) ->
{reply, Reply, NewState, 0, MinPri}.
@@ -818,12 +815,12 @@ stop_commit_timer(State = #dqstate { commit_timer_ref = TRef }) ->
State #dqstate { commit_timer_ref = undefined }.
sync_current_file_handle(State = #dqstate { current_dirty = false,
- on_sync_froms = [] }) ->
+ on_sync_txns = [] }) ->
State;
sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl,
current_dirty = IsDirty,
current_offset = CurOffset,
- on_sync_froms = Froms,
+ on_sync_txns = Txns,
last_sync_offset = SyncOffset
}) ->
SyncOffset1 = case IsDirty of
@@ -831,10 +828,9 @@ sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl,
CurOffset;
false -> SyncOffset
end,
- lists:map(fun (From) -> gen_server2:reply(From, ok) end,
- lists:reverse(Froms)),
- State #dqstate { current_dirty = false, on_sync_froms = [],
- last_sync_offset = SyncOffset1 }.
+ State1 = lists:foldl(fun internal_do_tx_commit/2, State, lists:reverse(Txns)),
+ State1 #dqstate { current_dirty = false, on_sync_txns = [],
+ last_sync_offset = SyncOffset1 }.
msg_to_bin(Msg = #basic_message { content = Content }) ->
ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
@@ -1078,53 +1074,54 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent,
end.
internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
- State = #dqstate { sequences = Sequences,
- current_file_name = CurFile,
+ State = #dqstate { current_file_name = CurFile,
current_dirty = IsDirty,
- on_sync_froms = SyncFroms,
+ on_sync_txns = Txns,
last_sync_offset = SyncOffset
}) ->
+ NeedsSync = IsDirty andalso
+ lists:any(fun (MsgId) ->
+ [{MsgId, _RefCount, File, Offset,
+ _TotalSize, _IsPersistent}] =
+ dets_ets_lookup(State, MsgId),
+ File =:= CurFile andalso Offset >= SyncOffset
+ end, PubMsgIds),
+ TxnDetails = {Q, PubMsgIds, AckSeqIds, From},
+ case NeedsSync of
+ true ->
+ Txns1 = [TxnDetails | Txns],
+ State #dqstate { on_sync_txns = Txns1 };
+ false ->
+ internal_do_tx_commit(TxnDetails, State)
+ end.
+
+internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From},
+ State = #dqstate { sequences = Sequences }) ->
{InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q),
- WriteSeqId = InitWriteSeqId + erlang:length(PubMsgIds),
- {atomic, {InCurFile, WriteSeqId, State1}} =
+ {atomic, {WriteSeqId, State1}} =
mnesia:transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
- %% must deal with publishes first, if we didn't
- %% then we could end up acking a message before
- %% it's been published, which is clearly
- %% nonsense. I.e. in commit, do not do things in an
- %% order which _could_not_ have happened.
- {InCurFile1, WriteSeqId1} =
+ {ok, WriteSeqId1} =
lists:foldl(
- fun (MsgId, {InCurFileAcc, SeqId}) ->
- [{MsgId, _RefCount, File, Offset,
- _TotalSize, _IsPersistent}] =
- dets_ets_lookup(State, MsgId),
- ok = mnesia:write(
- rabbit_disk_queue,
- #dq_msg_loc { queue_and_seq_id =
- {Q, SeqId},
- msg_id = MsgId,
- is_delivered = false
- },
- write),
- {InCurFileAcc orelse (File =:= CurFile andalso
- Offset >= SyncOffset),
- SeqId + 1}
- end, {false, InitWriteSeqId}, PubMsgIds),
- {ok, State2} = remove_messages(Q, AckSeqIds, txn, State),
- {InCurFile1, WriteSeqId1, State2}
+ fun (MsgId, {ok, SeqId}) ->
+ {mnesia:write(
+ rabbit_disk_queue,
+ #dq_msg_loc { queue_and_seq_id = {Q, SeqId},
+ msg_id = MsgId,
+ is_delivered = false
+ }, write),
+ SeqId + 1}
+ end, {ok, InitWriteSeqId}, PubMsgIds),
+ {ok, State2} = remove_messages(Q, AckSeqIds, txn, State),
+ {WriteSeqId1, State2}
end),
true = case PubMsgIds of
[] -> true;
_ -> ets:insert(Sequences, {Q, InitReadSeqId, WriteSeqId})
end,
- if IsDirty andalso InCurFile ->
- {false, State1 #dqstate { on_sync_froms = [From | SyncFroms] }};
- true ->
- {true, State1}
- end.
+ gen_server2:reply(From, ok),
+ State1.
internal_publish(Q, Message = #basic_message { guid = MsgId },
IsDelivered, State) ->