summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl15
1 files changed, 11 insertions, 4 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 2e3ff89a3d..81617b8b6c 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -697,16 +697,20 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
}) ->
{PubList, PubAcc, ReadSeqId, Length} =
case PubMsgSeqIds of
- [] -> {[], undefined, undefined};
- [_|PubMsgSeqIdsTail] ->
+ [] -> {[], undefined, undefined, undefined};
+ [{_, FirstSeqIdTo}|PubMsgSeqIdsTail] ->
{InitReadSeqId, InitWriteSeqId, InitLength} =
case ets:lookup(Sequences, Q) of
[] -> {0,0,0};
[{Q, ReadSeqId2, WriteSeqId2, Length2}] ->
{ReadSeqId2, WriteSeqId2, Length2}
end,
+ InitReadSeqId2 = if InitReadSeqId == InitWriteSeqId andalso FirstSeqIdTo > InitWriteSeqId ->
+ FirstSeqIdTo;
+ true -> InitReadSeqId
+ end,
{ lists:zip(PubMsgSeqIds, (PubMsgSeqIdsTail ++ [{next, next}])),
- InitWriteSeqId, InitReadSeqId, InitLength}
+ InitWriteSeqId, InitReadSeqId2, InitLength}
end,
{atomic, {Sync, WriteSeqId, State2}} =
mnesia:transaction(
@@ -762,12 +766,15 @@ internal_publish(Q, MsgId, SeqId, MsgBody, State) ->
end,
WriteSeqId3 = adjust_last_msg_seq_id(Q, WriteSeqId, SeqId, dirty),
WriteSeqId3Next = WriteSeqId3 + 1,
- true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId3Next, Length + 1}),
+ ReadSeqId3 = if ReadSeqId == WriteSeqId andalso WriteSeqId3 > WriteSeqId -> WriteSeqId3;
+ true -> ReadSeqId
+ end,
ok = mnesia:dirty_write(rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId3},
msg_id = MsgId,
next_seq_id = WriteSeqId3Next,
is_delivered = false}),
+ true = ets:insert(Sequences, {Q, ReadSeqId3, WriteSeqId3Next, Length + 1}),
{ok, State1}.
internal_tx_cancel(MsgIds, State) ->