summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl25
1 files changed, 14 insertions, 11 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 7710a0a226..c7ef117717 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -705,10 +705,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
[{Q, ReadSeqId2, WriteSeqId2, Length2}] ->
{ReadSeqId2, WriteSeqId2, Length2}
end,
- InitReadSeqId2 = if InitReadSeqId == InitWriteSeqId andalso FirstSeqIdTo > InitWriteSeqId andalso FirstSeqIdTo /= next ->
- FirstSeqIdTo;
- true -> InitReadSeqId
- end,
+ InitReadSeqId2 = determine_next_read_id(InitReadSeqId, InitWriteSeqId, FirstSeqIdTo),
{ lists:zip(PubMsgSeqIds, (PubMsgSeqIdsTail ++ [{next, next}])),
InitWriteSeqId, InitReadSeqId2, InitLength}
end,
@@ -764,11 +761,9 @@ internal_publish(Q, MsgId, SeqId, MsgBody, State) ->
[{Q, ReadSeqId2, WriteSeqId2, Length2}] ->
{ReadSeqId2, WriteSeqId2, Length2}
end,
+ ReadSeqId3 = determine_next_read_id(ReadSeqId, WriteSeqId, SeqId),
WriteSeqId3 = adjust_last_msg_seq_id(Q, WriteSeqId, SeqId, dirty),
WriteSeqId3Next = WriteSeqId3 + 1,
- ReadSeqId3 = if ReadSeqId == WriteSeqId andalso WriteSeqId3 > WriteSeqId -> WriteSeqId3;
- true -> ReadSeqId
- end,
ok = mnesia:dirty_write(rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId3},
msg_id = MsgId,
@@ -783,6 +778,17 @@ internal_tx_cancel(MsgIds, State) ->
MsgSeqIds = lists:zip(MsgIds, lists:duplicate(erlang:length(MsgIds), undefined)),
remove_messages(undefined, MsgSeqIds, false, State).
+determine_next_read_id(CurrentReadWrite, CurrentReadWrite, CurrentReadWrite) ->
+ CurrentReadWrite;
+determine_next_read_id(CurrentRead, _CurrentWrite, next) ->
+ CurrentRead;
+determine_next_read_id(CurrentReadWrite, CurrentReadWrite, NextWrite)
+ when NextWrite > CurrentReadWrite ->
+ NextWrite;
+determine_next_read_id(CurrentRead, CurrentWrite, NextWrite)
+ when NextWrite >= CurrentWrite ->
+ CurrentRead.
+
internal_requeue(_Q, [], State) ->
{ok, State};
internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|MsgSeqIdsTail],
@@ -811,10 +817,7 @@ internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|MsgSeqIdsTail],
%% the Q _must_ already exist
[{Q, ReadSeqId, WriteSeqId, Length}] = ets:lookup(Sequences, Q),
- ReadSeqId2 =
- if ReadSeqId == WriteSeqId andalso FirstSeqIdTo > WriteSeqId andalso FirstSeqIdTo /= next -> FirstSeqIdTo;
- true -> ReadSeqId
- end,
+ ReadSeqId2 = determine_next_read_id(ReadSeqId, WriteSeqId, FirstSeqIdTo),
MsgSeqIdsZipped = lists:zip(MsgSeqIds, MsgSeqIdsTail ++ [{next, next}]),
{atomic, WriteSeqId2} =
mnesia:transaction(