diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2019-09-07 21:17:32 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-09-07 21:17:32 +0300 |
| commit | dd46dd395145e43535b9dd74a66b77e34173fca7 (patch) | |
| tree | 916fd656a97696818778f3a1ad6ceaf81135346f /src | |
| parent | 5e8d4f1ee624b368f04ba78fdd08a028e6fd98c6 (diff) | |
| parent | 04801bd7f43cf735de0f853827c79bba15c69c34 (diff) | |
| download | rabbitmq-server-git-dd46dd395145e43535b9dd74a66b77e34173fca7.tar.gz | |
Merge pull request #2100 from tomyouyou/masterv3.8.0-rc.1
force to flush segments when recovering a message with last non-CleanShutdown.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 20 |
1 files changed, 11 insertions, 9 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index e4f31ab36f..07238c4a13 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -640,7 +640,8 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> {{Segment = #segment { unacked = UnackedCount }, Dirty}, UnackedBytes} = recover_segment(ContainsCheckFun, CleanShutdown, - segment_find_or_new(Seg, Dir, Segments2)), + segment_find_or_new(Seg, Dir, Segments2), + State1#qistate.max_journal_entries), {segment_store(Segment, Segments2), CountAcc + UnackedCount, BytesAcc + UnackedBytes, DirtyCount + Dirty} @@ -664,7 +665,7 @@ terminate(State = #qistate { journal_handle = JournalHdl, segments = undefined }}. recover_segment(ContainsCheckFun, CleanShutdown, - Segment = #segment { journal_entries = JEntries }) -> + Segment = #segment { journal_entries = JEntries }, MaxJournal) -> {SegEntries, UnackedCount} = load_segment(false, Segment), {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), @@ -673,7 +674,7 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegmentAndDirtyCount, Bytes}) -> {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin), {recover_message(ContainsCheckFun(MsgOrId), CleanShutdown, - Del, RelSeq, SegmentAndDirtyCount), + Del, RelSeq, SegmentAndDirtyCount, MaxJournal), Bytes + case IsPersistent of true -> MsgProps#message_properties.size; false -> 0 @@ -682,15 +683,16 @@ recover_segment(ContainsCheckFun, CleanShutdown, {{Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, 0}, SegEntries1). -recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount) -> +recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount, _MaxJournal) -> SegmentAndDirtyCount; -recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount) -> +recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount, _MaxJournal) -> SegmentAndDirtyCount; -recover_message( true, false, no_del, RelSeq, {Segment, DirtyCount}) -> - {add_to_journal(RelSeq, del, Segment), DirtyCount + 1}; -recover_message(false, _, del, RelSeq, {Segment, DirtyCount}) -> +recover_message( true, false, no_del, RelSeq, {Segment, _DirtyCount}, MaxJournal) -> + %% force to flush the segment + {add_to_journal(RelSeq, del, Segment), MaxJournal + 1}; +recover_message(false, _, del, RelSeq, {Segment, DirtyCount}, _MaxJournal) -> {add_to_journal(RelSeq, ack, Segment), DirtyCount + 1}; -recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) -> +recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}, _MaxJournal) -> {add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)), DirtyCount + 2}. |
