summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_queue_index3.erl182
1 files changed, 175 insertions, 7 deletions
diff --git a/src/rabbit_queue_index3.erl b/src/rabbit_queue_index3.erl
index eeb38dd28a..01a7c748eb 100644
--- a/src/rabbit_queue_index3.erl
+++ b/src/rabbit_queue_index3.erl
@@ -31,9 +31,14 @@
-module(rabbit_queue_index3).
+-export([init/1, terminate/1, terminate_and_erase/1, write_published/4,
+ write_delivered/2, write_acks/2, sync_seq_ids/3, flush_journal/1,
+ read_segment_entries/2, next_segment_boundary/1, segment_size/0,
+ find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]).
-define(CLEAN_FILENAME, "clean.dot").
+%%----------------------------------------------------------------------------
%% ---- Journal details ----
-define(MAX_JOURNAL_ENTRY_COUNT, 32768).
@@ -98,6 +103,61 @@
%%----------------------------------------------------------------------------
+init(Name) ->
+ State = blank_state(Name),
+ %% 1. Load the journal completely. This will also load segments
+ %% which have entries in the journal and remove duplicates.
+ %% The counts will correctly reflect the combination of the
+ %% segment and the journal.
+ State1 = load_journal(State),
+ %% 2. Flush the journal. This makes life easier for everyone, as
+ %% it means there won't be any publishes in the journal alone.
+ State2 = #qistate { dir = Dir } = flush_journal(State1),
+ %% 3. Load each segment in turn and filter out messages that are
+ %% not in the msg_store, by adding acks to the journal. These
+ %% acks only go to the RAM journal as it doesn't matter if we
+ %% lose them. Also mark delivered if not clean shutdown.
+ AllSegs = all_segment_nums(Dir),
+ CleanShutdown = detect_clean_shutdown(Dir),
+ %% We know the journal is empty here, so we don't need to combine
+ %% with the journal, and we don't need to worry about messages
+ %% that have been acked.
+ State3 =
+ lists:foldl(
+ fun (Seg, StateN) ->
+ {SegDict, _PubCount, _AckCount, StateN1} =
+ load_segment(Seg, false, StateN),
+ dict:fold(
+ fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack},
+ StateM) ->
+ SeqId = reconstruct_seq_id(Seg, RelSeq),
+ InMsgStore = rabbit_msg_store:contains(MsgId),
+ case {InMsgStore, CleanShutdown} of
+ {true, true} ->
+ StateM;
+ {true, false} when Del == del ->
+ StateM;
+ {true, false} ->
+ add_to_journal(SeqId, del, StateM);
+ {false, _} when Del == del ->
+ add_to_journal(SeqId, ack, StateM);
+ {false, _} ->
+ add_to_journal(
+ SeqId, ack,
+ add_to_journal(SeqId, del, StateM))
+ end
+ end, StateN1, SegDict)
+ end, State2, AllSegs),
+ %% 4. Go through all segments and calculate the number of unacked
+ %% messages we have.
+ Count = lists:foldl(
+ fun (Seg, CountAcc) ->
+ #segment { pubs = PubCount, acks = AckCount } =
+ find_segment(Seg, State3),
+ CountAcc + PubCount - AckCount
+ end, 0, AllSegs),
+ {Count, State3}.
+
terminate(State = #qistate { segments = Segments, journal_handle = JournalHdl,
dir = Dir }) ->
ok = case JournalHdl of
@@ -118,21 +178,100 @@ terminate_and_erase(State) ->
ok = delete_queue_directory(State1 #qistate.dir),
State1.
+flush_journal(State = #qistate { dirty_count = 0 }) ->
+ State;
+flush_journal(State = #qistate { segments = Segments }) ->
+ dict:fold(
+ fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount,
+ acks = AckCount } = Segment, StateN) ->
+ case dict:is_empty(JEntries) of
+ true -> store_segment(Segment, StateN);
+ false when AckCount == PubCount ->
+ ok = delete_segment(Segment);
+ false ->
+ {Hdl, Segment1} = get_segment_handle(Segment),
+ dict:fold(fun write_entry_to_segment/3,
+ Hdl, JEntries),
+ ok = file_handle_cache:sync(Hdl),
+ store_segment(
+ Segment1 #segment { journal_entries = dict:new() },
+ StateN)
+ end
+ end, State, Segments).
+
+read_segment_entries(InitSeqId, State) ->
+ {Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId),
+ {SegDict, _PubCount, _AckCount, State1} =
+ load_segment(Seg, false, State),
+ #segment { journal_entries = JEntries } = find_segment(Seg, State1),
+ SegDict1 = journal_plus_segment(JEntries, SegDict),
+ %% deliberately sort the list desc, because foldl will reverse it
+ RelSeqs = rev_sort(dict:fetch_keys(SegDict1)),
+ {lists:foldl(fun (RelSeq, Acc) ->
+ {{MsgId, IsPersistent}, IsDelivered, no_ack} =
+ dict:fetch(RelSeq, SegDict1),
+ [ {MsgId, reconstruct_seq_id(Seg, RelSeq),
+ IsPersistent, IsDelivered} | Acc ]
+ end, [], RelSeqs),
+ State1}.
+
+next_segment_boundary(SeqId) ->
+ {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
+ reconstruct_seq_id(Seg + 1, 0).
+
+segment_size() ->
+ ?SEGMENT_ENTRY_COUNT.
+
+find_lowest_seq_id_seg_and_next_seq_id(State = #qistate { dir = Dir }) ->
+ SegNums = all_segment_nums(Dir),
+ %% We don't want the lowest seq_id, merely the seq_id of the start
+ %% of the lowest segment. That seq_id may not actually exist, but
+ %% that's fine. The important thing is that the segment exists and
+ %% the seq_id reported is on a segment boundary.
+
+ %% We also don't really care about the max seq_id. Just start the
+ %% next segment: it makes life much easier.
+
+ %% SegNums is sorted, ascending.
+ {LowSeqIdSeg, NextSeqId} =
+ case SegNums of
+ [] -> {0, 0};
+ [MinSeg|_] -> {reconstruct_seq_id(MinSeg, 0),
+ reconstruct_seq_id(lists:last(SegNums), 0)}
+ end,
+ {LowSeqIdSeg, NextSeqId, State}.
+
%%----------------------------------------------------------------------------
%% Minors
%%----------------------------------------------------------------------------
+all_segment_nums(Dir) ->
+ lists:sort(
+ [list_to_integer(
+ lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName))
+ || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)]).
+
+blank_state(QueueName) ->
+ StrName = queue_name_to_dir_name(QueueName),
+ Dir = filename:join(queues_dir(), StrName),
+ ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+ #qistate { dir = Dir,
+ segments = dict:new(),
+ journal_handle = undefined,
+ dirty_count = 0
+ }.
+
rev_sort(List) ->
lists:sort(fun (A, B) -> B < A end, List).
seq_id_to_seg_and_rel_seq_id(SeqId) ->
{ SeqId div ?SEGMENT_ENTRY_COUNT, SeqId rem ?SEGMENT_ENTRY_COUNT }.
-reconstruct_seq_id(SegNum, RelSeq) ->
- (SegNum * ?SEGMENT_ENTRY_COUNT) + RelSeq.
+reconstruct_seq_id(Seg, RelSeq) ->
+ (Seg * ?SEGMENT_ENTRY_COUNT) + RelSeq.
-seg_num_to_path(Dir, SegNum) ->
- SegName = integer_to_list(SegNum),
+seg_num_to_path(Dir, Seg) ->
+ SegName = integer_to_list(Seg),
filename:join(Dir, SegName ++ ?SEGMENT_EXTENSION).
delete_segment(#segment { handle = undefined }) ->
@@ -206,13 +345,42 @@ get_journal_handle(State =
get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
{Hdl, State}.
+bool_to_int(true ) -> 1;
+bool_to_int(false) -> 0.
+
+write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) ->
+ ok = case Publish of
+ no_pub ->
+ ok;
+ {MsgId, IsPersistent} ->
+ file_handle_cache:append(
+ Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
+ (bool_to_int(IsPersistent)):1,
+ RelSeq:?REL_SEQ_BITS>>, MsgId])
+ end,
+ ok = case {Del, Ack} of
+ {no_del, no_ack} -> ok;
+ _ -> Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>>,
+ Data = case {Del, Ack} of
+ {del, ack} -> [Binary, Binary];
+ _ -> Binary
+ end,
+ file_handle_cache:append(Hdl, Data)
+ end,
+ Hdl.
+
%%----------------------------------------------------------------------------
%% Majors
%%----------------------------------------------------------------------------
%% Loading segments
-%% Does not do any combining with the journal at all
+%% Does not do any combining with the journal at all. The PubCount
+%% that comes back is the number of publishes in the segment. The
+%% number of unacked msgs is PubCount - AckCount. If KeepAcks is
+%% false, then dict:size(SegDict) == PubCount - AckCount. If KeepAcks
+%% is true, then dict:size(SegDict) == PubCount.
load_segment(Seg, KeepAcks, State) ->
Segment = #segment { path = Path, handle = SegHdl } =
find_segment(Seg, State),
@@ -328,7 +496,7 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
_ErrOrEoF -> State
end.
-add_to_journal(SeqId, Action, State = #qistate {}) ->
+add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount }) ->
{Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
Segment = #segment { journal_entries = SegJDict,
pubs = PubCount, acks = AckCount } =
@@ -341,7 +509,7 @@ add_to_journal(SeqId, Action, State = #qistate {}) ->
ack -> Segment1 #segment { acks = AckCount + 1 };
{_MsgId, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 }
end,
- store_segment(Segment2, State);
+ store_segment(Segment2, State #qistate { dirty_count = DCount + 1 });
%% This is a more relaxed version of deliver_or_ack_msg because we can
%% have dels or acks in the journal without the corresponding