summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl250
1 files changed, 131 insertions, 119 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index adc3f74286..829b03aa18 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -87,9 +87,7 @@
{ dir,
segments,
journal_handle,
- dirty_count,
- last_seg_a,
- last_seg_b
+ dirty_count
}).
-record(segment,
@@ -108,10 +106,19 @@
-ifdef(use_specs).
-type(hdl() :: ('undefined' | any())).
+-type(segment() :: ('undefined' |
+ #segment { pubs :: non_neg_integer(),
+ acks :: non_neg_integer(),
+ handle :: hdl(),
+ journal_entries :: dict(),
+ path :: file_path(),
+ num :: non_neg_integer()
+ })).
-type(msg_id() :: binary()).
-type(seq_id() :: integer()).
+-type(seg_dict() :: {dict(), [segment()], file_path()}).
-type(qistate() :: #qistate { dir :: file_path(),
- segments :: dict(),
+ segments :: seg_dict(),
journal_handle :: hdl(),
dirty_count :: integer()
}).
@@ -159,11 +166,13 @@ init(Name) ->
%% We know the journal is empty here, so we don't need to combine
%% with the journal, and we don't need to worry about messages
%% that have been acked.
- State3 =
+ State3 = #qistate { segments = Segments } =
lists:foldl(
- fun (Seg, StateN) ->
- {SegDict, _PubCount, _AckCount, StateN1} =
- load_segment(Seg, false, StateN),
+ fun (Seg, StateN = #qistate { segments = SegmentsN }) ->
+ Segment = segment_find(Seg, SegmentsN),
+ {SegDict, _PubCount, _AckCount, Segment1} =
+ load_segment(false, Segment),
+ SegmentsN1 = segment_store(Segment1, SegmentsN),
dict:fold(
fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack},
StateM) ->
@@ -183,14 +192,14 @@ init(Name) ->
SeqId, ack,
add_to_journal(SeqId, del, StateM))
end
- end, StateN1, SegDict)
+ end, StateN #qistate { segments = SegmentsN1 }, SegDict)
end, State2, AllSegs),
%% 4. Go through all segments and calculate the number of unacked
%% messages we have.
Count = lists:foldl(
fun (Seg, CountAcc) ->
#segment { pubs = PubCount, acks = AckCount } =
- find_segment(Seg, State3),
+ segment_find(Seg, Segments),
CountAcc + PubCount - AckCount
end, 0, AllSegs),
{Count, State3}.
@@ -240,40 +249,40 @@ sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
flush_journal(State = #qistate { dirty_count = 0 }) ->
State;
-flush_journal(State) ->
- State1 = #qistate { segments = Segments } = get_all_segments(State),
- State2 =
- dict:fold(
+flush_journal(State = #qistate { segments = Segments }) ->
+ Segments1 =
+ segment_fold(
fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount,
- acks = AckCount } = Segment, StateN) ->
+ acks = AckCount } = Segment, SegmentsN) ->
case PubCount > 0 andalso PubCount == AckCount of
true ->
- ok = delete_segment(Segment),
- StateN;
+ segment_erase(delete_segment(Segment), SegmentsN);
false ->
case 0 == dict:size(JEntries) of
true ->
- store_segment(Segment, StateN);
+ SegmentsN;
false ->
{Hdl, Segment1} = get_segment_handle(Segment),
dict:fold(fun write_entry_to_segment/3,
Hdl, JEntries),
ok = file_handle_cache:sync(Hdl),
- store_segment(
+ segment_store(
Segment1 #segment { journal_entries =
- dict:new() }, StateN)
+ dict:new() }, SegmentsN)
end
end
- end, State1 #qistate { segments = dict:new() }, Segments),
- {JournalHdl, State3} = get_journal_handle(State2),
+ end, Segments, Segments),
+ {JournalHdl, State1} =
+ get_journal_handle(State #qistate { segments = Segments1 }),
ok = file_handle_cache:clear(JournalHdl),
- State3 #qistate { dirty_count = 0 }.
+ State1 #qistate { dirty_count = 0 }.
-read_segment_entries(InitSeqId, State) ->
+read_segment_entries(InitSeqId, State = #qistate { segments = Segments }) ->
{Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId),
- {SegDict, _PubCount, _AckCount, State1} =
- load_segment(Seg, false, State),
- #segment { journal_entries = JEntries } = find_segment(Seg, State1),
+ Segment = segment_find(Seg, Segments),
+ {SegDict, _PubCount, _AckCount,
+ Segment1 = #segment { journal_entries = JEntries }} =
+ load_segment(false, Segment),
SegDict1 = journal_plus_segment(JEntries, SegDict),
%% deliberately sort the list desc, because foldl will reverse it
RelSeqs = rev_sort(dict:fetch_keys(SegDict1)),
@@ -283,7 +292,7 @@ read_segment_entries(InitSeqId, State) ->
[ {MsgId, reconstruct_seq_id(Seg, RelSeq),
IsPersistent, IsDelivered == del} | Acc ]
end, [], RelSeqs),
- State1}.
+ State #qistate { segments = segment_store(Segment1, Segments) }}.
next_segment_boundary(SeqId) ->
{Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
@@ -385,8 +394,7 @@ maybe_flush_journal(State = #qistate { dirty_count = DCount })
maybe_flush_journal(State) ->
State.
-all_segment_nums(State = #qistate { dir = Dir }) ->
- #qistate { segments = Segments } = get_all_segments(State),
+all_segment_nums(#qistate { dir = Dir, segments = Segments }) ->
sets:to_list(
lists:foldl(
fun (SegName, Set) ->
@@ -394,7 +402,7 @@ all_segment_nums(State = #qistate { dir = Dir }) ->
list_to_integer(
lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end,
SegName)), Set)
- end, sets:from_list(dict:fetch_keys(Segments)),
+ end, sets:from_list(segment_fetch_keys(Segments)),
filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir))).
blank_state(QueueName) ->
@@ -402,11 +410,9 @@ blank_state(QueueName) ->
Dir = filename:join(queues_dir(), StrName),
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
#qistate { dir = Dir,
- segments = dict:new(),
+ segments = segment_new(Dir),
journal_handle = undefined,
- dirty_count = 0,
- last_seg_a = undefined,
- last_seg_b = undefined
+ dirty_count = 0
}.
rev_sort(List) ->
@@ -422,11 +428,11 @@ seg_num_to_path(Dir, Seg) ->
SegName = integer_to_list(Seg),
filename:join(Dir, SegName ++ ?SEGMENT_EXTENSION).
-delete_segment(#segment { handle = undefined }) ->
- ok;
-delete_segment(#segment { handle = Hdl }) ->
+delete_segment(Segment = #segment { handle = undefined }) ->
+ Segment;
+delete_segment(Segment = #segment { handle = Hdl }) ->
ok = file_handle_cache:delete(Hdl),
- ok.
+ Segment #segment { handle = undefined }.
detect_clean_shutdown(Dir) ->
case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
@@ -465,11 +471,11 @@ get_segment_handle(Segment = #segment { handle = undefined, path = Path }) ->
get_segment_handle(Segment = #segment { handle = Hdl }) ->
{Hdl, Segment}.
-find_segment(Seg, #qistate { last_seg_a = #segment { num = Seg } = Segment }) ->
- Segment;
-find_segment(Seg, #qistate { last_seg_b = #segment { num = Seg } = Segment }) ->
- Segment;
-find_segment(Seg, #qistate { segments = Segments, dir = Dir }) ->
+segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_], _Dir}) ->
+ Segment; %% 1 or (2, matches head)
+segment_find(Seg, {_Segments, [_, Segment = #segment { num = Seg }], _Dir}) ->
+ Segment; %% 2, matches tail
+segment_find(Seg, {Segments, _, Dir}) -> %% no match
case dict:find(Seg, Segments) of
{ok, Segment = #segment{}} -> Segment;
error -> #segment { pubs = 0,
@@ -481,46 +487,52 @@ find_segment(Seg, #qistate { segments = Segments, dir = Dir }) ->
}
end.
-store_segment(Segment = #segment { num = Seg }, State =
- #qistate { last_seg_a = #segment { num = Seg }}) ->
- State #qistate { last_seg_a = Segment };
-store_segment(Segment = #segment { num = Seg }, State =
- #qistate { last_seg_b = #segment { num = Seg }}) ->
- State #qistate { last_seg_b = Segment };
-store_segment(Segment, State =
- #qistate { last_seg_a = LastSegA, last_seg_b = LastSegB }) ->
- case LastSegA of
- undefined ->
- State #qistate { last_seg_a = Segment };
- _ ->
- case LastSegB of
- undefined ->
- State #qistate { last_seg_b = Segment };
- _ ->
- State1 = #qistate { segments = Segments } =
- State #qistate { last_seg_a = LastSegB,
- last_seg_b = Segment },
- State1 #qistate {
- segments = return_segment_to_dict(LastSegA, Segments) }
- end
- end.
-
-get_all_segments(State = #qistate { last_seg_a = undefined,
- last_seg_b = undefined }) ->
- State;
-get_all_segments(State = #qistate { segments = Segments,
- last_seg_a = LastSegA,
- last_seg_b = LastSegB }) ->
- State #qistate { last_seg_a = undefined,
- last_seg_b = undefined,
- segments = return_segment_to_dict(
- LastSegB,
- return_segment_to_dict(LastSegA, Segments)) }.
-
-return_segment_to_dict(undefined, Segments) ->
- Segments;
-return_segment_to_dict(Segment = #segment { num = Seg }, Segments) ->
- dict:store(Seg, Segment, Segments).
+segment_store(Segment = #segment { num = Seg }, %% 1 or (2, matches head)
+ {Segments, [#segment { num = Seg } | Tail], Dir}) ->
+ {Segments, [Segment | Tail], Dir};
+segment_store(Segment = #segment { num = Seg }, %% 2, matches tail
+ {Segments, [SegmentA, #segment { num = Seg }], Dir}) ->
+ {Segments, [SegmentA, Segment], Dir};
+segment_store(Segment = #segment { num = Seg },
+ {Segments, [], Dir}) ->
+ {dict:erase(Seg, Segments), [Segment], Dir};
+segment_store(Segment = #segment { num = Seg },
+ {Segments, [SegmentA], Dir}) ->
+ {dict:erase(Seg, Segments), [Segment, SegmentA], Dir};
+segment_store(Segment = #segment { num = Seg },
+ {Segments, [SegmentA, SegmentB], Dir}) ->
+ {dict:store(SegmentB#segment.num, SegmentB, dict:erase(Seg, Segments)),
+ [Segment, SegmentA], Dir}.
+
+segment_fold(Fun, Acc, {Segments, [], _Dir}) ->
+ dict:fold(Fun, Acc, Segments);
+segment_fold(Fun, Acc, {Segments, CachedSegments, _Dir}) ->
+ Acc1 = lists:foldl(fun (Segment = #segment { num = Num }, AccN) ->
+ Fun(Num, Segment, AccN)
+ end, Acc, CachedSegments),
+ dict:fold(Fun, Acc1, Segments).
+
+segment_map(Fun, {Segments, CachedSegments, Dir}) ->
+ {dict:map(Fun, Segments),
+ lists:map(fun (Segment = #segment { num = Num }) -> Fun(Num, Segment) end,
+ CachedSegments), Dir}.
+
+segment_fetch_keys({Segments, CachedSegments, _Dir}) ->
+ lists:map(fun (Segment) -> Segment#segment.num end, CachedSegments) ++
+ dict:fetch_keys(Segments).
+
+segment_erase(#segment { handle = undefined, num = Num },
+ {Segments, [#segment { num = Num } | Rest], Dir}) ->
+ {Segments, Rest, Dir}; %% 1 or (2, matches head)
+segment_erase(#segment { handle = undefined, num = Num },
+ {Segments, [Head, #segment { num = Num }], Dir}) ->
+ {Segments, [Head], Dir}; %% 2, matches tail
+segment_erase(#segment { handle = undefined, num = Num },
+ {Segments, CachedSegments, Dir}) ->
+ {dict:erase(Num, Segments), CachedSegments, Dir}.
+
+segment_new(Dir) ->
+ {dict:new(), [], Dir}.
get_journal_handle(State =
#qistate { journal_handle = undefined, dir = Dir }) ->
@@ -562,13 +574,12 @@ write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) ->
terminate(StoreShutdown, State =
#qistate { journal_handle = JournalHdl,
- dir = Dir }) ->
- State1 = #qistate { segments = Segments } = get_all_segments(State),
+ dir = Dir, segments = Segments }) ->
ok = case JournalHdl of
undefined -> ok;
_ -> file_handle_cache:close(JournalHdl)
end,
- ok = dict:fold(
+ ok = segment_fold(
fun (_Seg, #segment { handle = undefined }, ok) ->
ok;
(_Seg, #segment { handle = Hdl }, ok) ->
@@ -578,7 +589,7 @@ terminate(StoreShutdown, State =
true -> store_clean_shutdown(Dir);
false -> ok
end,
- State1 #qistate { journal_handle = undefined, segments = dict:new() }.
+ State #qistate { journal_handle = undefined, segments = segment_new(Dir) }.
%%----------------------------------------------------------------------------
%% Majors
@@ -591,22 +602,21 @@ terminate(StoreShutdown, State =
%% number of unacked msgs is PubCount - AckCount. If KeepAcks is
%% false, then dict:size(SegDict) == PubCount - AckCount. If KeepAcks
%% is true, then dict:size(SegDict) == PubCount.
-load_segment(Seg, KeepAcks, State) ->
- Segment = #segment { path = Path, handle = SegHdl } =
- find_segment(Seg, State),
+load_segment(KeepAcks,
+ Segment = #segment { path = Path, handle = SegHdl }) ->
SegmentExists = case SegHdl of
undefined -> filelib:is_file(Path);
_ -> true
end,
case SegmentExists of
false ->
- {dict:new(), 0, 0, State};
+ {dict:new(), 0, 0, Segment};
true ->
{Hdl, Segment1} = get_segment_handle(Segment),
{ok, 0} = file_handle_cache:position(Hdl, bof),
{SegDict, PubCount, AckCount} =
load_segment_entries(KeepAcks, Hdl, dict:new(), 0, 0),
- {SegDict, PubCount, AckCount, store_segment(Segment1, State)}
+ {SegDict, PubCount, AckCount, Segment1}
end.
load_segment_entries(KeepAcks, Hdl, SegDict, PubCount, AckCount) ->
@@ -653,29 +663,29 @@ deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict) ->
load_journal(State) ->
{JournalHdl, State1} = get_journal_handle(State),
{ok, 0} = file_handle_cache:position(JournalHdl, 0),
- State2 = #qistate { segments = Segments } =
- get_all_segments(load_journal_entries(State1)),
- dict:fold(
- fun (Seg, #segment { journal_entries = JEntries,
- pubs = PubCountInJournal,
- acks = AckCountInJournal }, StateN) ->
- %% We want to keep acks in so that we can remove them if
- %% duplicates are in the journal. The counts here are
- %% purely from the segment itself.
- {SegDict, PubCountInSeg, AckCountInSeg, StateN1} =
- load_segment(Seg, true, StateN),
- %% Removed counts here are the number of pubs and acks
- %% that are duplicates - i.e. found in both the segment
- %% and journal.
- {JEntries1, PubsRemoved, AcksRemoved} =
- journal_minus_segment(JEntries, SegDict),
- Segment1 = find_segment(Seg, StateN1),
- PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved,
- AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved,
- store_segment(Segment1 #segment { journal_entries = JEntries1,
- pubs = PubCount1,
- acks = AckCount1 }, StateN1)
- end, State2, Segments).
+ State2 = #qistate { segments = Segments } = load_journal_entries(State1),
+ Segments1 =
+ segment_map(
+ fun (_Seg, Segment = #segment { journal_entries = JEntries,
+ pubs = PubCountInJournal,
+ acks = AckCountInJournal }) ->
+ %% We want to keep acks in so that we can remove
+ %% them if duplicates are in the journal. The counts
+ %% here are purely from the segment itself.
+ {SegDict, PubCountInSeg, AckCountInSeg, Segment1} =
+ load_segment(true, Segment),
+ %% Removed counts here are the number of pubs and
+ %% acks that are duplicates - i.e. found in both the
+ %% segment and journal.
+ {JEntries1, PubsRemoved, AcksRemoved} =
+ journal_minus_segment(JEntries, SegDict),
+ PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved,
+ AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved,
+ Segment1 #segment { journal_entries = JEntries1,
+ pubs = PubCount1,
+ acks = AckCount1 }
+ end, Segments),
+ State2 #qistate { segments = Segments1 }.
load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
case file_handle_cache:read(Hdl, ?SEQ_BYTES) of
@@ -707,11 +717,12 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
_ErrOrEoF -> State
end.
-add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount }) ->
+add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount,
+ segments = Segments }) ->
{Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
Segment = #segment { journal_entries = SegJDict,
pubs = PubCount, acks = AckCount } =
- find_segment(Seg, State),
+ segment_find(Seg, Segments),
SegJDict1 = add_to_journal(RelSeq, Action, SegJDict),
Segment1 = Segment #segment { journal_entries = SegJDict1 },
Segment2 =
@@ -720,7 +731,8 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount }) ->
ack -> Segment1 #segment { acks = AckCount + 1 };
{_MsgId, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 }
end,
- store_segment(Segment2, State #qistate { dirty_count = DCount + 1 });
+ State #qistate { dirty_count = DCount + 1,
+ segments = segment_store(Segment2, Segments) };
%% This is a more relaxed version of deliver_or_ack_msg because we can
%% have dels or acks in the journal without the corresponding