diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-05-22 18:26:30 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-05-22 18:26:30 +0100 |
| commit | 17cc6ac00585977c4e84fecd584fd89d024fc195 (patch) | |
| tree | cbae64009b9e4b6f00afaf4f007f8015419bf15b | |
| parent | 76403c059d1ec0c418fe35a0d1e8a10ce09420a9 (diff) | |
| download | rabbitmq-server-git-17cc6ac00585977c4e84fecd584fd89d024fc195.tar.gz | |
Yup, basically, same bug in three places. Fixed. It all works.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 15 |
1 files changed, 11 insertions, 4 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 2e3ff89a3d..81617b8b6c 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -697,16 +697,20 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, }) -> {PubList, PubAcc, ReadSeqId, Length} = case PubMsgSeqIds of - [] -> {[], undefined, undefined}; - [_|PubMsgSeqIdsTail] -> + [] -> {[], undefined, undefined, undefined}; + [{_, FirstSeqIdTo}|PubMsgSeqIdsTail] -> {InitReadSeqId, InitWriteSeqId, InitLength} = case ets:lookup(Sequences, Q) of [] -> {0,0,0}; [{Q, ReadSeqId2, WriteSeqId2, Length2}] -> {ReadSeqId2, WriteSeqId2, Length2} end, + InitReadSeqId2 = if InitReadSeqId == InitWriteSeqId andalso FirstSeqIdTo > InitWriteSeqId -> + FirstSeqIdTo; + true -> InitReadSeqId + end, { lists:zip(PubMsgSeqIds, (PubMsgSeqIdsTail ++ [{next, next}])), - InitWriteSeqId, InitReadSeqId, InitLength} + InitWriteSeqId, InitReadSeqId2, InitLength} end, {atomic, {Sync, WriteSeqId, State2}} = mnesia:transaction( @@ -762,12 +766,15 @@ internal_publish(Q, MsgId, SeqId, MsgBody, State) -> end, WriteSeqId3 = adjust_last_msg_seq_id(Q, WriteSeqId, SeqId, dirty), WriteSeqId3Next = WriteSeqId3 + 1, - true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId3Next, Length + 1}), + ReadSeqId3 = if ReadSeqId == WriteSeqId andalso WriteSeqId3 > WriteSeqId -> WriteSeqId3; + true -> ReadSeqId + end, ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId3}, msg_id = MsgId, next_seq_id = WriteSeqId3Next, is_delivered = false}), + true = ets:insert(Sequences, {Q, ReadSeqId3, WriteSeqId3Next, Length + 1}), {ok, State1}. internal_tx_cancel(MsgIds, State) -> |
