summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_queue_index.erl153
1 files changed, 78 insertions, 75 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 136ff82995..6345428e8d 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -116,7 +116,7 @@
})).
-type(msg_id() :: binary()).
-type(seq_id() :: integer()).
--type(seg_dict() :: {dict(), [segment()], file_path()}).
+-type(seg_dict() :: {dict(), [segment()]}).
-type(qistate() :: #qistate { dir :: file_path(),
segments :: seg_dict(),
journal_handle :: hdl(),
@@ -160,48 +160,45 @@ init(Name) ->
%% 3. Load each segment in turn and filter out messages that are
%% not in the msg_store, by adding acks to the journal. These
%% acks only go to the RAM journal as it doesn't matter if we
- %% lose them. Also mark delivered if not clean shutdown.
+ %% lose them. Also mark delivered if not clean shutdown. Also
+ %% find the number of unacked messages.
AllSegs = all_segment_nums(State2),
CleanShutdown = detect_clean_shutdown(Dir),
%% We know the journal is empty here, so we don't need to combine
%% with the journal, and we don't need to worry about messages
%% that have been acked.
- State3 = #qistate { segments = Segments } =
+ {State3 = #qistate { segments = Segments }, Count} =
lists:foldl(
- fun (Seg, StateN = #qistate { segments = SegmentsN }) ->
- Segment = segment_find(Seg, SegmentsN),
+ fun (Seg, {StateN = #qistate { segments = SegmentsN }, CountAcc}) ->
+ Segment = segment_find_or_new(Seg, Dir, SegmentsN),
{SegDict, _PubCount, _AckCount, Segment1} =
load_segment(false, Segment),
SegmentsN1 = segment_store(Segment1, SegmentsN),
- dict:fold(
- fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack},
- StateM) ->
- SeqId = reconstruct_seq_id(Seg, RelSeq),
- InMsgStore = rabbit_msg_store:contains(MsgId),
- case {InMsgStore, CleanShutdown} of
- {true, true} ->
- StateM;
- {true, false} when Del == del ->
- StateM;
- {true, false} ->
- add_to_journal(SeqId, del, StateM);
- {false, _} when Del == del ->
- add_to_journal(SeqId, ack, StateM);
- {false, _} ->
- add_to_journal(
- SeqId, ack,
- add_to_journal(SeqId, del, StateM))
- end
- end, StateN #qistate { segments = SegmentsN1 }, SegDict)
- end, State2, AllSegs),
- %% 4. Go through all segments and calculate the number of unacked
- %% messages we have.
- Count = lists:foldl(
- fun (Seg, CountAcc) ->
- #segment { pubs = PubCount, acks = AckCount } =
- segment_find(Seg, Segments),
- CountAcc + PubCount - AckCount
- end, 0, AllSegs),
+ StateN1 = #qistate { segments = SegmentsN2 } =
+ dict:fold(
+ fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack},
+ StateM) ->
+ SeqId = reconstruct_seq_id(Seg, RelSeq),
+ InMsgStore = rabbit_msg_store:contains(MsgId),
+ case {InMsgStore, CleanShutdown} of
+ {true, true} ->
+ StateM;
+ {true, false} when Del == del ->
+ StateM;
+ {true, false} ->
+ add_to_journal(SeqId, del, StateM);
+ {false, _} when Del == del ->
+ add_to_journal(SeqId, ack, StateM);
+ {false, _} ->
+ add_to_journal(
+ SeqId, ack,
+ add_to_journal(SeqId, del, StateM))
+ end
+ end, StateN #qistate { segments=SegmentsN1 }, SegDict),
+ {ok, #segment { pubs = PubCount, acks = AckCount }} =
+ segment_find(Seg, SegmentsN2),
+ {StateN1, CountAcc + PubCount - AckCount}
+ end, {State2, 0}, AllSegs),
{Count, State3}.
terminate(State) ->
@@ -249,7 +246,7 @@ sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
flush_journal(State = #qistate { dirty_count = 0 }) ->
State;
-flush_journal(State = #qistate { segments = Segments, dir = Dir }) ->
+flush_journal(State = #qistate { segments = Segments }) ->
Segments1 =
segment_fold(
fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount,
@@ -273,15 +270,16 @@ flush_journal(State = #qistate { segments = Segments, dir = Dir }) ->
end,
segment_store(Segment1, SegmentsN)
end
- end, segment_new(Dir), Segments),
+ end, segments_new(), Segments),
{JournalHdl, State1} =
get_journal_handle(State #qistate { segments = Segments1 }),
ok = file_handle_cache:clear(JournalHdl),
State1 #qistate { dirty_count = 0 }.
-read_segment_entries(InitSeqId, State = #qistate { segments = Segments }) ->
+read_segment_entries(InitSeqId, State = #qistate { segments = Segments,
+ dir = Dir }) ->
{Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId),
- Segment = segment_find(Seg, Segments),
+ Segment = segment_find_or_new(Seg, Dir, Segments),
{SegDict, _PubCount, _AckCount,
Segment1 = #segment { journal_entries = JEntries }} =
load_segment(false, Segment),
@@ -412,7 +410,7 @@ blank_state(QueueName) ->
Dir = filename:join(queues_dir(), StrName),
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
#qistate { dir = Dir,
- segments = segment_new(Dir),
+ segments = segments_new(),
journal_handle = undefined,
dirty_count = 0
}.
@@ -475,58 +473,62 @@ get_segment_handle(Segment = #segment { handle = undefined, path = Path }) ->
get_segment_handle(Segment = #segment { handle = Hdl }) ->
{Hdl, Segment}.
-segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_], _Dir}) ->
- Segment; %% 1 or (2, matches head)
-segment_find(Seg, {_Segments, [_, Segment = #segment { num = Seg }], _Dir}) ->
- Segment; %% 2, matches tail
-segment_find(Seg, {Segments, _, Dir}) -> %% no match
- case dict:find(Seg, Segments) of
- {ok, Segment = #segment{}} -> Segment;
- error -> #segment { pubs = 0,
- acks = 0,
- handle = undefined,
- journal_entries = journal_new(),
- path = seg_num_to_path(Dir, Seg),
- num = Seg
- }
+segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_]}) ->
+ {ok, Segment}; %% 1 or (2, matches head)
+segment_find(Seg, {_Segments, [_, Segment = #segment { num = Seg }]}) ->
+ {ok, Segment}; %% 2, matches tail
+segment_find(Seg, {Segments, _}) -> %% no match
+ dict:find(Seg, Segments).
+
+segment_new(Seg, Dir) ->
+ #segment { pubs = 0,
+ acks = 0,
+ handle = undefined,
+ journal_entries = journal_new(),
+ path = seg_num_to_path(Dir, Seg),
+ num = Seg
+ }.
+
+segment_find_or_new(Seg, Dir, Segments) ->
+ case segment_find(Seg, Segments) of
+ error -> segment_new(Seg, Dir);
+ {ok, Segment} -> Segment
end.
segment_store(Segment = #segment { num = Seg }, %% 1 or (2, matches head)
- {Segments, [#segment { num = Seg } | Tail], Dir}) ->
- {Segments, [Segment | Tail], Dir};
+ {Segments, [#segment { num = Seg } | Tail]}) ->
+ {Segments, [Segment | Tail]};
segment_store(Segment = #segment { num = Seg }, %% 2, matches tail
- {Segments, [SegmentA, #segment { num = Seg }], Dir}) ->
- {Segments, [SegmentA, Segment], Dir};
-segment_store(Segment = #segment { num = Seg },
- {Segments, [], Dir}) ->
- {dict:erase(Seg, Segments), [Segment], Dir};
-segment_store(Segment = #segment { num = Seg },
- {Segments, [SegmentA], Dir}) ->
- {dict:erase(Seg, Segments), [Segment, SegmentA], Dir};
+ {Segments, [SegmentA, #segment { num = Seg }]}) ->
+ {Segments, [SegmentA, Segment]};
+segment_store(Segment = #segment { num = Seg }, {Segments, []}) ->
+ {dict:erase(Seg, Segments), [Segment]};
+segment_store(Segment = #segment { num = Seg }, {Segments, [SegmentA]}) ->
+ {dict:erase(Seg, Segments), [Segment, SegmentA]};
segment_store(Segment = #segment { num = Seg },
- {Segments, [SegmentA, SegmentB], Dir}) ->
+ {Segments, [SegmentA, SegmentB]}) ->
{dict:store(SegmentB#segment.num, SegmentB, dict:erase(Seg, Segments)),
- [Segment, SegmentA], Dir}.
+ [Segment, SegmentA]}.
-segment_fold(Fun, Acc, {Segments, [], _Dir}) ->
+segment_fold(Fun, Acc, {Segments, []}) ->
dict:fold(Fun, Acc, Segments);
-segment_fold(Fun, Acc, {Segments, CachedSegments, _Dir}) ->
+segment_fold(Fun, Acc, {Segments, CachedSegments}) ->
Acc1 = lists:foldl(fun (Segment = #segment { num = Num }, AccN) ->
Fun(Num, Segment, AccN)
end, Acc, CachedSegments),
dict:fold(Fun, Acc1, Segments).
-segment_map(Fun, {Segments, CachedSegments, Dir}) ->
+segment_map(Fun, {Segments, CachedSegments}) ->
{dict:map(Fun, Segments),
lists:map(fun (Segment = #segment { num = Num }) -> Fun(Num, Segment) end,
- CachedSegments), Dir}.
+ CachedSegments)}.
-segment_fetch_keys({Segments, CachedSegments, _Dir}) ->
+segment_fetch_keys({Segments, CachedSegments}) ->
lists:map(fun (Segment) -> Segment#segment.num end, CachedSegments) ++
dict:fetch_keys(Segments).
-segment_new(Dir) ->
- {dict:new(), [], Dir}.
+segments_new() ->
+ {dict:new(), []}.
get_journal_handle(State =
#qistate { journal_handle = undefined, dir = Dir }) ->
@@ -583,7 +585,7 @@ terminate(StoreShutdown, State =
true -> store_clean_shutdown(Dir);
false -> ok
end,
- State #qistate { journal_handle = undefined, segments = segment_new(Dir) }.
+ State #qistate { journal_handle = undefined, segments = segments_new() }.
%%----------------------------------------------------------------------------
%% Majors
@@ -712,11 +714,12 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
end.
add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount,
- segments = Segments }) ->
+ segments = Segments,
+ dir = Dir }) ->
{Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
Segment = #segment { journal_entries = SegJDict,
pubs = PubCount, acks = AckCount } =
- segment_find(Seg, Segments),
+ segment_find_or_new(Seg, Dir, Segments),
SegJDict1 = add_to_journal(RelSeq, Action, SegJDict),
Segment1 = Segment #segment { journal_entries = SegJDict1 },
Segment2 =