diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2015-01-21 11:39:06 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2015-01-21 11:39:06 +0000 |
| commit | 0068d17d865761cadedf4de278664f78cccccff7 (patch) | |
| tree | bb5ac7e77a8a3858b538503bf4464f7fc5402564 | |
| parent | b29eab1cfe77086e7c5dd5af84c226b7ef05d43a (diff) | |
| download | rabbitmq-server-git-0068d17d865761cadedf4de278664f78cccccff7.tar.gz | |
Fix a couple of silly bugs: specify the journal prefix lengths for ack and del, and correctly assemble the segment backwards.
| -rw-r--r-- | src/rabbit_queue_index.erl | 40 |
1 files changed, 22 insertions, 18 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index b56a990791..676cb6e043 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -759,10 +759,12 @@ recover_journal(State) -> end, Segments), State1 #qistate { segments = Segments1 }. -parse_journal_entries(<<?DEL_JPREFIX, SeqId:?SEQ_BITS, Rest/binary>>, State) -> +parse_journal_entries(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>, State) -> parse_journal_entries(Rest, add_to_journal(SeqId, del, State)); -parse_journal_entries(<<?ACK_JPREFIX, SeqId:?SEQ_BITS, Rest/binary>>, State) -> +parse_journal_entries(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>, State) -> parse_journal_entries(Rest, add_to_journal(SeqId, ack, State)); parse_journal_entries(<<0:?JPREFIX_BITS, 0:?SEQ_BITS, 0:?PUB_RECORD_SIZE_BYTES/unit:8, _/binary>>, State) -> @@ -883,25 +885,27 @@ segments_new() -> entry_to_segment(_RelSeq, {?PUB, del, ack}, Buf) -> Buf; entry_to_segment(RelSeq, {Pub, Del, Ack}, Buf) -> - Buf1 = case Pub of - no_pub -> + %% NB: we are assembling the segment in reverse order here, so + %% del/ack comes first. + Buf1 = case {Del, Ack} of + {no_del, no_ack} -> Buf; - {IsPersistent, Bin, MsgBin} -> - [[<<?PUB_PREFIX:?PUB_PREFIX_BITS, - (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS, Bin/binary, - (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin] | Buf] + _ -> + Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>>, + case {Del, Ack} of + {del, ack} -> [[Binary, Binary] | Buf]; + _ -> [Binary | Buf] + end end, - case {Del, Ack} of - {no_del, no_ack} -> + case Pub of + no_pub -> Buf1; - _ -> - Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>, - case {Del, Ack} of - {del, ack} -> [[Binary, Binary] | Buf]; - _ -> [Binary | Buf] - end + {IsPersistent, Bin, MsgBin} -> + [[<<?PUB_PREFIX:?PUB_PREFIX_BITS, + (bool_to_int(IsPersistent)):1, + RelSeq:?REL_SEQ_BITS, Bin/binary, + (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin] | Buf1] end. read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, |
