summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_queue_index.erl20
1 files changed, 11 insertions, 9 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index e4f31ab36f..07238c4a13 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -640,7 +640,8 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
{{Segment = #segment { unacked = UnackedCount }, Dirty},
UnackedBytes} =
recover_segment(ContainsCheckFun, CleanShutdown,
- segment_find_or_new(Seg, Dir, Segments2)),
+ segment_find_or_new(Seg, Dir, Segments2),
+ State1#qistate.max_journal_entries),
{segment_store(Segment, Segments2),
CountAcc + UnackedCount,
BytesAcc + UnackedBytes, DirtyCount + Dirty}
@@ -664,7 +665,7 @@ terminate(State = #qistate { journal_handle = JournalHdl,
segments = undefined }}.
recover_segment(ContainsCheckFun, CleanShutdown,
- Segment = #segment { journal_entries = JEntries }) ->
+ Segment = #segment { journal_entries = JEntries }, MaxJournal) ->
{SegEntries, UnackedCount} = load_segment(false, Segment),
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
@@ -673,7 +674,7 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegmentAndDirtyCount, Bytes}) ->
{MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin),
{recover_message(ContainsCheckFun(MsgOrId), CleanShutdown,
- Del, RelSeq, SegmentAndDirtyCount),
+ Del, RelSeq, SegmentAndDirtyCount, MaxJournal),
Bytes + case IsPersistent of
true -> MsgProps#message_properties.size;
false -> 0
@@ -682,15 +683,16 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{{Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, 0},
SegEntries1).
-recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount) ->
+recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount, _MaxJournal) ->
SegmentAndDirtyCount;
-recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount) ->
+recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount, _MaxJournal) ->
SegmentAndDirtyCount;
-recover_message( true, false, no_del, RelSeq, {Segment, DirtyCount}) ->
- {add_to_journal(RelSeq, del, Segment), DirtyCount + 1};
-recover_message(false, _, del, RelSeq, {Segment, DirtyCount}) ->
+recover_message( true, false, no_del, RelSeq, {Segment, _DirtyCount}, MaxJournal) ->
+ %% force to flush the segment
+ {add_to_journal(RelSeq, del, Segment), MaxJournal + 1};
+recover_message(false, _, del, RelSeq, {Segment, DirtyCount}, _MaxJournal) ->
{add_to_journal(RelSeq, ack, Segment), DirtyCount + 1};
-recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) ->
+recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}, _MaxJournal) ->
{add_to_journal(RelSeq, ack,
add_to_journal(RelSeq, del, Segment)),
DirtyCount + 2}.