summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2019-09-07 21:17:32 +0300
committerGitHub <noreply@github.com>2019-09-07 21:17:32 +0300
commitdd46dd395145e43535b9dd74a66b77e34173fca7 (patch)
tree916fd656a97696818778f3a1ad6ceaf81135346f /src
parent5e8d4f1ee624b368f04ba78fdd08a028e6fd98c6 (diff)
parent04801bd7f43cf735de0f853827c79bba15c69c34 (diff)
downloadrabbitmq-server-git-dd46dd395145e43535b9dd74a66b77e34173fca7.tar.gz
Merge pull request #2100 from tomyouyou/masterv3.8.0-rc.1
force to flush segments when recovering a message with last non-CleanShutdown.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl20
1 files changed, 11 insertions, 9 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index e4f31ab36f..07238c4a13 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -640,7 +640,8 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
{{Segment = #segment { unacked = UnackedCount }, Dirty},
UnackedBytes} =
recover_segment(ContainsCheckFun, CleanShutdown,
- segment_find_or_new(Seg, Dir, Segments2)),
+ segment_find_or_new(Seg, Dir, Segments2),
+ State1#qistate.max_journal_entries),
{segment_store(Segment, Segments2),
CountAcc + UnackedCount,
BytesAcc + UnackedBytes, DirtyCount + Dirty}
@@ -664,7 +665,7 @@ terminate(State = #qistate { journal_handle = JournalHdl,
segments = undefined }}.
recover_segment(ContainsCheckFun, CleanShutdown,
- Segment = #segment { journal_entries = JEntries }) ->
+ Segment = #segment { journal_entries = JEntries }, MaxJournal) ->
{SegEntries, UnackedCount} = load_segment(false, Segment),
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
@@ -673,7 +674,7 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegmentAndDirtyCount, Bytes}) ->
{MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin),
{recover_message(ContainsCheckFun(MsgOrId), CleanShutdown,
- Del, RelSeq, SegmentAndDirtyCount),
+ Del, RelSeq, SegmentAndDirtyCount, MaxJournal),
Bytes + case IsPersistent of
true -> MsgProps#message_properties.size;
false -> 0
@@ -682,15 +683,16 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{{Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, 0},
SegEntries1).
-recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount) ->
+recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount, _MaxJournal) ->
SegmentAndDirtyCount;
-recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount) ->
+recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount, _MaxJournal) ->
SegmentAndDirtyCount;
-recover_message( true, false, no_del, RelSeq, {Segment, DirtyCount}) ->
- {add_to_journal(RelSeq, del, Segment), DirtyCount + 1};
-recover_message(false, _, del, RelSeq, {Segment, DirtyCount}) ->
+recover_message( true, false, no_del, RelSeq, {Segment, _DirtyCount}, MaxJournal) ->
+ %% force to flush the segment
+ {add_to_journal(RelSeq, del, Segment), MaxJournal + 1};
+recover_message(false, _, del, RelSeq, {Segment, DirtyCount}, _MaxJournal) ->
{add_to_journal(RelSeq, ack, Segment), DirtyCount + 1};
-recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) ->
+recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}, _MaxJournal) ->
{add_to_journal(RelSeq, ack,
add_to_journal(RelSeq, del, Segment)),
DirtyCount + 2}.