summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-01-21 11:39:06 +0000
committerSimon MacMullen <simon@rabbitmq.com>2015-01-21 11:39:06 +0000
commit0068d17d865761cadedf4de278664f78cccccff7 (patch)
treebb5ac7e77a8a3858b538503bf4464f7fc5402564
parentb29eab1cfe77086e7c5dd5af84c226b7ef05d43a (diff)
downloadrabbitmq-server-git-0068d17d865761cadedf4de278664f78cccccff7.tar.gz
Fix a couple of silly bugs: specify the journal prefix lengths for ack and del, and correctly assemble the segment backwards.
-rw-r--r--src/rabbit_queue_index.erl40
1 files changed, 22 insertions, 18 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index b56a990791..676cb6e043 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -759,10 +759,12 @@ recover_journal(State) ->
end, Segments),
State1 #qistate { segments = Segments1 }.
-parse_journal_entries(<<?DEL_JPREFIX, SeqId:?SEQ_BITS, Rest/binary>>, State) ->
+parse_journal_entries(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>, State) ->
parse_journal_entries(Rest, add_to_journal(SeqId, del, State));
-parse_journal_entries(<<?ACK_JPREFIX, SeqId:?SEQ_BITS, Rest/binary>>, State) ->
+parse_journal_entries(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>, State) ->
parse_journal_entries(Rest, add_to_journal(SeqId, ack, State));
parse_journal_entries(<<0:?JPREFIX_BITS, 0:?SEQ_BITS,
0:?PUB_RECORD_SIZE_BYTES/unit:8, _/binary>>, State) ->
@@ -883,25 +885,27 @@ segments_new() ->
entry_to_segment(_RelSeq, {?PUB, del, ack}, Buf) ->
Buf;
entry_to_segment(RelSeq, {Pub, Del, Ack}, Buf) ->
- Buf1 = case Pub of
- no_pub ->
+ %% NB: we are assembling the segment in reverse order here, so
+ %% del/ack comes first.
+ Buf1 = case {Del, Ack} of
+ {no_del, no_ack} ->
Buf;
- {IsPersistent, Bin, MsgBin} ->
- [[<<?PUB_PREFIX:?PUB_PREFIX_BITS,
- (bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS, Bin/binary,
- (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin] | Buf]
+ _ ->
+ Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>>,
+ case {Del, Ack} of
+ {del, ack} -> [[Binary, Binary] | Buf];
+ _ -> [Binary | Buf]
+ end
end,
- case {Del, Ack} of
- {no_del, no_ack} ->
+ case Pub of
+ no_pub ->
Buf1;
- _ ->
- Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>,
- case {Del, Ack} of
- {del, ack} -> [[Binary, Binary] | Buf];
- _ -> [Binary | Buf]
- end
+ {IsPersistent, Bin, MsgBin} ->
+ [[<<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ (bool_to_int(IsPersistent)):1,
+ RelSeq:?REL_SEQ_BITS, Bin/binary,
+ (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin] | Buf1]
end.
read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},