diff options
| -rw-r--r-- | src/rabbit_queue_index.erl | 1186 | ||||
| -rw-r--r-- | src/rabbit_queue_index3.erl | 850 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 2 |
4 files changed, 594 insertions, 1460 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 91ecd66925..2b4ec1a473 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -32,54 +32,28 @@ -module(rabbit_queue_index). -export([init/1, terminate/1, terminate_and_erase/1, write_published/4, - write_delivered/2, write_acks/2, sync_seq_ids/3, flush_journal/1, + write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1, read_segment_entries/2, next_segment_boundary/1, segment_size/0, find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]). -%%---------------------------------------------------------------------------- -%% The queue disk index -%% -%% The queue disk index operates over a journal, and a number of -%% segment files. Each segment is the same size, both in max number of -%% entries, and max file size, owing to fixed sized records. -%% -%% Publishes are written directly to the segment files. The segment is -%% found by dividing the sequence id by the the max number of entries -%% per segment. Only the relative sequence within the segment is -%% recorded as the sequence id within a segment file (i.e. sequence id -%% modulo max number of entries per segment). This is keeps entries -%% as small as possible. Publishes are only ever going to be received -%% in contiguous ascending order. -%% -%% Acks and deliveries are written to a bounded journal and are also -%% held in memory, each in a dict with the segment as the key. Again, -%% the records are fixed size: the entire sequence id is written and -%% is limited to a 63-bit unsigned integer. The remaining bit -%% indicates whether the journal entry is for a delivery or an -%% ack. When the journal gets too big, or flush_journal is called, the -%% journal is (possibly incrementally) flushed out to the segment -%% files. As acks and delivery notes can be received in any order -%% (this is not obvious for deliveries, but consider what happens when -%% eg msgs are *re*queued - you'll publish and then mark the msgs -%% delivered immediately, which may be out of order), this journal -%% reduces seeking, and batches writes to the segment files, keeping -%% performance high. -%% -%% On startup, the journal is read along with all the segment files, -%% and the journal is fully flushed out to the segment files. Care is -%% taken to ensure that no message can be delivered or ack'd twice. -%% -%%---------------------------------------------------------------------------- - -define(CLEAN_FILENAME, "clean.dot"). +%%---------------------------------------------------------------------------- +%% ---- Journal details ---- + -define(MAX_JOURNAL_ENTRY_COUNT, 32768). -define(JOURNAL_FILENAME, "journal.jif"). --define(DEL_BIT, 0). --define(ACK_BIT, 1). +-define(PUB_PERSIST_JPREFIX, 2#00). +-define(PUB_TRANS_JPREFIX, 2#01). +-define(DEL_JPREFIX, 2#10). +-define(ACK_JPREFIX, 2#11). +-define(JPREFIX_BITS, 2). -define(SEQ_BYTES, 8). --define(SEQ_BITS, ((?SEQ_BYTES * 8) - 1)). +-define(SEQ_BITS, ((?SEQ_BYTES * 8) - ?JPREFIX_BITS)). + +%% ---- Segment details ---- + -define(SEGMENT_EXTENSION, ".idx"). -define(REL_SEQ_BITS, 14). @@ -111,13 +85,18 @@ -record(qistate, { dir, - seg_num_handles, - journal_count, - journal_ack_dict, - journal_del_dict, - seg_ack_counts, - publish_handle, - partial_segments + segments, + journal_handle, + dirty_count + }). + +-record(segment, + { pubs, + acks, + handle, + journal_entries, + path, + num }). -include("rabbit.hrl"). @@ -129,16 +108,10 @@ -type(hdl() :: ('undefined' | any())). -type(msg_id() :: binary()). -type(seq_id() :: integer()). --type(hdl_and_count() :: ('undefined' | - {non_neg_integer(), hdl(), non_neg_integer()})). --type(qistate() :: #qistate { dir :: file_path(), - seg_num_handles :: dict(), - journal_count :: integer(), - journal_ack_dict :: dict(), - journal_del_dict :: dict(), - seg_ack_counts :: dict(), - publish_handle :: hdl_and_count(), - partial_segments :: dict() +-type(qistate() :: #qistate { dir :: file_path(), + segments :: dict(), + journal_handle :: hdl(), + dirty_count :: integer() }). -spec(init/1 :: (queue_name()) -> {non_neg_integer(), qistate()}). @@ -148,7 +121,7 @@ -> qistate()). -spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()). -spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()). --spec(sync_seq_ids/3 :: ([seq_id()], boolean(), qistate()) -> qistate()). +-spec(sync_seq_ids/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush_journal/1 :: (qistate()) -> qistate()). -spec(read_segment_entries/2 :: (seq_id(), qistate()) -> {[{msg_id(), seq_id(), boolean(), boolean()}], qistate()}). @@ -167,16 +140,61 @@ init(Name) -> State = blank_state(Name), - {TotalMsgCount, State1} = read_and_prune_segments(State), - scatter_journal(TotalMsgCount, State1). - -terminate(State = #qistate { seg_num_handles = SegHdls }) -> - case 0 == dict:size(SegHdls) of - true -> State; - false -> State1 = #qistate { dir = Dir } = close_all_handles(State), - store_clean_shutdown(Dir), - State1 #qistate { publish_handle = undefined } - end. + %% 1. Load the journal completely. This will also load segments + %% which have entries in the journal and remove duplicates. + %% The counts will correctly reflect the combination of the + %% segment and the journal. + State1 = load_journal(State), + %% 2. Flush the journal. This makes life easier for everyone, as + %% it means there won't be any publishes in the journal alone. + State2 = #qistate { dir = Dir } = flush_journal(State1), + %% 3. Load each segment in turn and filter out messages that are + %% not in the msg_store, by adding acks to the journal. These + %% acks only go to the RAM journal as it doesn't matter if we + %% lose them. Also mark delivered if not clean shutdown. + AllSegs = all_segment_nums(State2), + CleanShutdown = detect_clean_shutdown(Dir), + %% We know the journal is empty here, so we don't need to combine + %% with the journal, and we don't need to worry about messages + %% that have been acked. + State3 = + lists:foldl( + fun (Seg, StateN) -> + {SegDict, _PubCount, _AckCount, StateN1} = + load_segment(Seg, false, StateN), + dict:fold( + fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack}, + StateM) -> + SeqId = reconstruct_seq_id(Seg, RelSeq), + InMsgStore = rabbit_msg_store:contains(MsgId), + case {InMsgStore, CleanShutdown} of + {true, true} -> + StateM; + {true, false} when Del == del -> + StateM; + {true, false} -> + add_to_journal(SeqId, del, StateM); + {false, _} when Del == del -> + add_to_journal(SeqId, ack, StateM); + {false, _} -> + add_to_journal( + SeqId, ack, + add_to_journal(SeqId, del, StateM)) + end + end, StateN1, SegDict) + end, State2, AllSegs), + %% 4. Go through all segments and calculate the number of unacked + %% messages we have. + Count = lists:foldl( + fun (Seg, CountAcc) -> + #segment { pubs = PubCount, acks = AckCount } = + find_segment(Seg, State3), + CountAcc + PubCount - AckCount + end, 0, AllSegs), + {Count, State3}. + +terminate(State) -> + terminate(true, State). terminate_and_erase(State) -> State1 = terminate(State), @@ -186,123 +204,114 @@ terminate_and_erase(State) -> write_published(MsgId, SeqId, IsPersistent, State) when is_binary(MsgId) -> ?MSG_ID_BYTES = size(MsgId), - {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - {Hdl, State1} = get_pub_handle(SegNum, State), - ok = file_handle_cache:append(Hdl, - <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS, MsgId/binary>>), - State1. + {JournalHdl, State1} = get_journal_handle(State), + ok = file_handle_cache:append(JournalHdl, + [<<(case IsPersistent of + true -> ?PUB_PERSIST_JPREFIX; + false -> ?PUB_TRANS_JPREFIX + end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, + MsgId]), + maybe_flush_journal(add_to_journal(SeqId, {MsgId, IsPersistent}, State1)). + +write_delivered(SeqId, State) -> + {JournalHdl, State1} = get_journal_handle(State), + ok = file_handle_cache:append(JournalHdl, + <<?DEL_JPREFIX:?JPREFIX_BITS, + SeqId:?SEQ_BITS>>), + maybe_flush_journal(add_to_journal(SeqId, del, State1)). + +write_acks(SeqIds, State) -> + {JournalHdl, State1} = get_journal_handle(State), + ok = file_handle_cache:append(JournalHdl, + [<<?ACK_JPREFIX:?JPREFIX_BITS, + SeqId:?SEQ_BITS>> || SeqId <- SeqIds]), + State2 = lists:foldl(fun (SeqId, StateN) -> + add_to_journal(SeqId, ack, StateN) + end, State1, SeqIds), + maybe_flush_journal(State2). + +sync_seq_ids(_SeqIds, State = #qistate { journal_handle = undefined }) -> + State; +sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> + ok = file_handle_cache:sync(JournalHdl), + State. -write_delivered(SeqId, State = #qistate { journal_del_dict = JDelDict }) -> - {JDelDict1, State1} = - write_to_journal([<<?DEL_BIT:1, SeqId:?SEQ_BITS>>], - [SeqId], JDelDict, State), - maybe_flush(State1 #qistate { journal_del_dict = JDelDict1 }). - -write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict }) -> - {JAckDict1, State1} = - write_to_journal([<<?ACK_BIT:1, SeqId:?SEQ_BITS>> || SeqId <- SeqIds], - SeqIds, JAckDict, State), - maybe_flush(State1 #qistate { journal_ack_dict = JAckDict1 }). - -sync_seq_ids(SeqIds, SyncAckJournal, State) -> - State1 = case SyncAckJournal of - true -> {Hdl, State2} = get_journal_handle(State), - ok = file_handle_cache:sync(Hdl), - State2; - false -> State - end, - SegNumsSet = - lists:foldl( - fun (SeqId, Set) -> - {SegNum, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - sets:add_element(SegNum, Set) - end, sets:new(), SeqIds), - sets:fold( - fun (SegNum, StateN) -> - {Hdl1, StateM} = get_seg_handle(SegNum, StateN), - ok = file_handle_cache:sync(Hdl1), - StateM - end, State1, SegNumsSet). - -flush_journal(State = #qistate { journal_count = 0 }) -> +flush_journal(State = #qistate { dirty_count = 0 }) -> State; -flush_journal(State = #qistate { journal_ack_dict = JAckDict, - journal_del_dict = JDelDict, - journal_count = JCount }) -> - SegNum = case dict:fetch_keys(JAckDict) of - [] -> hd(dict:fetch_keys(JDelDict)); - [N|_] -> N - end, - Dels = seg_entries_from_dict(SegNum, JDelDict), - Acks = seg_entries_from_dict(SegNum, JAckDict), - State1 = append_dels_to_segment(SegNum, Dels, State), - State2 = append_acks_to_segment(SegNum, Acks, State1), - JCount1 = JCount - length(Dels) - length(Acks), - State3 = State2 #qistate { journal_del_dict = dict:erase(SegNum, JDelDict), - journal_ack_dict = dict:erase(SegNum, JAckDict), - journal_count = JCount1 }, - case JCount1 of - 0 -> {Hdl, State4} = get_journal_handle(State3), - {ok, 0} = file_handle_cache:position(Hdl, bof), - ok = file_handle_cache:truncate(Hdl), - ok = file_handle_cache:sync(Hdl), - State4; - _ -> flush_journal(State3) - end. +flush_journal(State = #qistate { segments = Segments }) -> + State1 = + dict:fold( + fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount, + acks = AckCount } = Segment, StateN) -> + case PubCount > 0 andalso PubCount == AckCount of + true -> + ok = delete_segment(Segment), + StateN; + false -> + case 0 == dict:size(JEntries) of + true -> + store_segment(Segment, StateN); + false -> + {Hdl, Segment1} = get_segment_handle(Segment), + dict:fold(fun write_entry_to_segment/3, + Hdl, JEntries), + ok = file_handle_cache:sync(Hdl), + store_segment( + Segment1 #segment { journal_entries = + dict:new() }, StateN) + end + end + end, State #qistate { segments = dict:new() }, Segments), + {JournalHdl, State2} = get_journal_handle(State1), + {ok, 0} = file_handle_cache:position(JournalHdl, bof), + ok = file_handle_cache:truncate(JournalHdl), + ok = file_handle_cache:sync(JournalHdl), + State2 #qistate { dirty_count = 0 }. read_segment_entries(InitSeqId, State) -> - {SegNum, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), - {SDict, _PubCount, _AckCount, _HighRelSeq, State1} = - load_segment(SegNum, State), + {Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), + {SegDict, _PubCount, _AckCount, State1} = + load_segment(Seg, false, State), + #segment { journal_entries = JEntries } = find_segment(Seg, State1), + SegDict1 = journal_plus_segment(JEntries, SegDict), %% deliberately sort the list desc, because foldl will reverse it - RelSeqs = rev_sort(dict:fetch_keys(SDict)), + RelSeqs = rev_sort(dict:fetch_keys(SegDict1)), {lists:foldl(fun (RelSeq, Acc) -> - {MsgId, IsDelivered, IsPersistent} = - dict:fetch(RelSeq, SDict), - [ {MsgId, reconstruct_seq_id(SegNum, RelSeq), - IsPersistent, IsDelivered} | Acc] + {{MsgId, IsPersistent}, IsDelivered, no_ack} = + dict:fetch(RelSeq, SegDict1), + [ {MsgId, reconstruct_seq_id(Seg, RelSeq), + IsPersistent, IsDelivered == del} | Acc ] end, [], RelSeqs), State1}. next_segment_boundary(SeqId) -> - {SegNum, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - reconstruct_seq_id(SegNum + 1, 0). + {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), + reconstruct_seq_id(Seg + 1, 0). segment_size() -> ?SEGMENT_ENTRY_COUNT. -find_lowest_seq_id_seg_and_next_seq_id(State = #qistate { dir = Dir }) -> - SegNums = all_segment_nums(Dir), +find_lowest_seq_id_seg_and_next_seq_id(State) -> + SegNums = all_segment_nums(State), %% We don't want the lowest seq_id, merely the seq_id of the start %% of the lowest segment. That seq_id may not actually exist, but %% that's fine. The important thing is that the segment exists and %% the seq_id reported is on a segment boundary. + %% We also don't really care about the max seq_id. Just start the + %% next segment: it makes life much easier. + %% SegNums is sorted, ascending. - LowSeqIdSeg = + {LowSeqIdSeg, NextSeqId} = case SegNums of - [] -> 0; - [MinSegNum|_] -> reconstruct_seq_id(MinSegNum, 0) + [] -> {0, 0}; + [MinSeg|_] -> {reconstruct_seq_id(MinSeg, 0), + reconstruct_seq_id(1 + lists:last(SegNums), 0)} end, - {NextSeqId, State1} = - case SegNums of - [] -> {0, State}; - _ -> MaxSegNum = lists:last(SegNums), - {_SDict, PubCount, _AckCount, HighRelSeq, State2} = - load_segment(MaxSegNum, State), - NextSeqId1 = reconstruct_seq_id(MaxSegNum, HighRelSeq), - NextSeqId2 = case PubCount of - 0 -> NextSeqId1; - _ -> NextSeqId1 + 1 - end, - {NextSeqId2, State2} - end, - {LowSeqIdSeg, NextSeqId, State1}. + {LowSeqIdSeg, NextSeqId, State}. start_msg_store(DurableQueues) -> - DurableDict = + DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue #amqqueue.name), Queue #amqqueue.name} || Queue <- DurableQueues ]), QueuesDir = queues_dir(), @@ -337,175 +346,84 @@ start_msg_store(DurableQueues) -> ok. %%---------------------------------------------------------------------------- -%% Minor Helpers +%% Msg Store Startup Delta Function %%---------------------------------------------------------------------------- -maybe_flush(State = #qistate { journal_count = JCount }) - when JCount > ?MAX_JOURNAL_ENTRY_COUNT -> - flush_journal(State); -maybe_flush(State) -> - State. - -write_to_journal(BinList, SeqIds, Dict, - State = #qistate { journal_count = JCount }) -> - {Hdl, State1} = get_journal_handle(State), - ok = file_handle_cache:append(Hdl, BinList), - {Dict1, JCount1} = - lists:foldl( - fun (SeqId, {Dict2, JCount2}) -> - {add_seqid_to_dict(SeqId, Dict2), JCount2 + 1} - end, {Dict, JCount}, SeqIds), - {Dict1, State1 #qistate { journal_count = JCount1 }}. - -queue_name_to_dir_name(Name = #resource { kind = queue }) -> - Bin = term_to_binary(Name), - Size = 8*size(Bin), - <<Num:Size>> = Bin, - lists:flatten(io_lib:format("~.36B", [Num])). - -queues_dir() -> - filename:join(rabbit_mnesia:dir(), "queues"). +queue_index_walker([]) -> + finished; +queue_index_walker([QueueName|QueueNames]) -> + State = blank_state(QueueName), + State1 = load_journal(State), + SegNums = all_segment_nums(State1), + queue_index_walker({SegNums, State1, QueueNames}); -rev_sort(List) -> - lists:sort(fun (A, B) -> B < A end, List). +queue_index_walker({[], State, QueueNames}) -> + _State = terminate(false, State), + queue_index_walker(QueueNames); +queue_index_walker({[Seg | SegNums], State, QueueNames}) -> + SeqId = reconstruct_seq_id(Seg, 0), + {Messages, State1} = read_segment_entries(SeqId, State), + queue_index_walker({Messages, State1, SegNums, QueueNames}); -get_journal_handle(State = #qistate { dir = Dir, seg_num_handles = SegHdls }) -> - case dict:find(journal, SegHdls) of - {ok, Hdl} -> - {Hdl, State}; - error -> - Path = filename:join(Dir, ?JOURNAL_FILENAME), - Mode = [raw, binary, write, read, read_ahead], - new_handle(journal, Path, Mode, State) +queue_index_walker({[], State, SegNums, QueueNames}) -> + queue_index_walker({SegNums, State, QueueNames}); +queue_index_walker({[{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs], + State, SegNums, QueueNames}) -> + case IsPersistent of + true -> {MsgId, 1, {Msgs, State, SegNums, QueueNames}}; + false -> queue_index_walker({Msgs, State, SegNums, QueueNames}) end. -get_pub_handle(SegNum, State = #qistate { publish_handle = PubHandle }) -> - {State1, PubHandle1 = {_SegNum, Hdl, _Count}} = - get_counted_handle(SegNum, State, PubHandle), - {Hdl, State1 #qistate { publish_handle = PubHandle1 }}. - -get_counted_handle(SegNum, State, undefined) -> - get_counted_handle(SegNum, State, {SegNum, undefined, 0}); -get_counted_handle(SegNum, State = #qistate { partial_segments = Partials }, - {SegNum, undefined, Count}) -> - {Hdl, State1} = get_seg_handle(SegNum, State), - {CountExtra, Partials1} = - case dict:find(SegNum, Partials) of - {ok, CountExtra1} -> {CountExtra1, dict:erase(SegNum, Partials)}; - error -> {0, Partials} - end, - Count1 = Count + 1 + CountExtra, - {State1 #qistate { partial_segments = Partials1 }, {SegNum, Hdl, Count1}}; -get_counted_handle(SegNum, State, {SegNum, Hdl, Count}) - when Count < ?SEGMENT_ENTRY_COUNT -> - {State, {SegNum, Hdl, Count + 1}}; -get_counted_handle(SegNumA, State, {SegNumB, Hdl, ?SEGMENT_ENTRY_COUNT}) - when SegNumA == SegNumB + 1 -> - ok = file_handle_cache:append_write_buffer(Hdl), - get_counted_handle(SegNumA, State, undefined); -get_counted_handle(SegNumA, State = #qistate { partial_segments = Partials, - seg_ack_counts = AckCounts }, - {SegNumB, _Hdl, Count}) -> - %% don't flush here because it's possible SegNumB has been deleted - State1 = - case dict:find(SegNumB, AckCounts) of - {ok, Count} -> - %% #acks == #pubs, and we're moving to different - %% segment, so delete. - delete_segment(SegNumB, State); - _ -> - State #qistate { - partial_segments = dict:store(SegNumB, Count, Partials) } - end, - get_counted_handle(SegNumA, State1, undefined). +%%---------------------------------------------------------------------------- +%% Minors +%%---------------------------------------------------------------------------- -get_seg_handle(SegNum, State = #qistate { dir = Dir, seg_num_handles = SegHdls }) -> - case dict:find(SegNum, SegHdls) of - {ok, Hdl} -> - {Hdl, State}; - error -> - new_handle(SegNum, seg_num_to_path(Dir, SegNum), - [binary, raw, read, write, - {read_ahead, ?SEGMENT_TOTAL_SIZE}], - State) - end. +maybe_flush_journal(State = #qistate { dirty_count = DCount }) + when DCount > ?MAX_JOURNAL_ENTRY_COUNT -> + flush_journal(State); +maybe_flush_journal(State) -> + State. -delete_segment(SegNum, State = #qistate { dir = Dir, - seg_ack_counts = AckCounts, - partial_segments = Partials }) -> - State1 = close_handle(SegNum, State), - ok = case file:delete(seg_num_to_path(Dir, SegNum)) of - ok -> ok; - {error, enoent} -> ok - end, - State1 #qistate {seg_ack_counts = dict:erase(SegNum, AckCounts), - partial_segments = dict:erase(SegNum, Partials) }. - -new_handle(Key, Path, Mode, State = #qistate { seg_num_handles = SegHdls }) -> - {ok, Hdl} = file_handle_cache:open(Path, Mode, [{write_buffer, infinity}]), - {Hdl, State #qistate { seg_num_handles = dict:store(Key, Hdl, SegHdls) }}. - -close_handle(Key, State = #qistate { seg_num_handles = SegHdls }) -> - case dict:find(Key, SegHdls) of - {ok, Hdl} -> - ok = file_handle_cache:close(Hdl), - State #qistate { seg_num_handles = dict:erase(Key, SegHdls) }; - error -> - State - end. +all_segment_nums(#qistate { segments = Segments, dir = Dir }) -> + sets:to_list( + lists:foldl( + fun (SegName, Set) -> + sets:add_element( + list_to_integer( + lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, + SegName)), Set) + end, sets:from_list(dict:fetch_keys(Segments)), + filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir))). -close_all_handles(State = #qistate { seg_num_handles = SegHdls }) -> - ok = dict:fold(fun (_Key, Hdl, ok) -> - file_handle_cache:close(Hdl) - end, ok, SegHdls), - State #qistate { seg_num_handles = dict:new() }. +blank_state(QueueName) -> + StrName = queue_name_to_dir_name(QueueName), + Dir = filename:join(queues_dir(), StrName), + ok = filelib:ensure_dir(filename:join(Dir, "nothing")), + #qistate { dir = Dir, + segments = dict:new(), + journal_handle = undefined, + dirty_count = 0 + }. -bool_to_int(true ) -> 1; -bool_to_int(false) -> 0. +rev_sort(List) -> + lists:sort(fun (A, B) -> B < A end, List). seq_id_to_seg_and_rel_seq_id(SeqId) -> { SeqId div ?SEGMENT_ENTRY_COUNT, SeqId rem ?SEGMENT_ENTRY_COUNT }. -reconstruct_seq_id(SegNum, RelSeq) -> - (SegNum * ?SEGMENT_ENTRY_COUNT) + RelSeq. +reconstruct_seq_id(Seg, RelSeq) -> + (Seg * ?SEGMENT_ENTRY_COUNT) + RelSeq. -seg_num_to_path(Dir, SegNum) -> - SegName = integer_to_list(SegNum), +seg_num_to_path(Dir, Seg) -> + SegName = integer_to_list(Seg), filename:join(Dir, SegName ++ ?SEGMENT_EXTENSION). -delete_queue_directory(Dir) -> - {ok, Entries} = file:list_dir(Dir), - ok = lists:foldl(fun (Entry, ok) -> - file:delete(filename:join(Dir, Entry)) - end, ok, Entries), - ok = file:del_dir(Dir). - -add_seqid_to_dict(SeqId, Dict) -> - {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - add_seqid_to_dict(SegNum, RelSeq, Dict). - -add_seqid_to_dict(SegNum, RelSeq, Dict) -> - dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], Dict). - -all_segment_nums(Dir) -> - lists:sort( - [list_to_integer( - lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName)) - || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)]). - -blank_state(QueueName) -> - StrName = queue_name_to_dir_name(QueueName), - Dir = filename:join(queues_dir(), StrName), - ok = filelib:ensure_dir(filename:join(Dir, "nothing")), - #qistate { dir = Dir, - seg_num_handles = dict:new(), - journal_count = 0, - journal_ack_dict = dict:new(), - journal_del_dict = dict:new(), - seg_ack_counts = dict:new(), - publish_handle = undefined, - partial_segments = dict:new() - }. +delete_segment(#segment { handle = undefined }) -> + ok; +delete_segment(#segment { handle = Hdl, path = Path }) -> + ok = file_handle_cache:close(Hdl), + ok = file:delete(Path), + ok. detect_clean_shutdown(Dir) -> case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of @@ -519,258 +437,143 @@ store_clean_shutdown(Dir) -> [{write_buffer, unbuffered}]), ok = file_handle_cache:close(Hdl). -seg_entries_from_dict(SegNum, Dict) -> - case dict:find(SegNum, Dict) of - {ok, Entries} -> Entries; - error -> [] - end. - - -%%---------------------------------------------------------------------------- -%% Msg Store Startup Delta Function -%%---------------------------------------------------------------------------- - -queue_index_walker([]) -> - finished; -queue_index_walker([QueueName|QueueNames]) -> - State = blank_state(QueueName), - {Hdl, State1} = get_journal_handle(State), - {JAckDict, _JDelDict} = load_journal(Hdl, dict:new(), dict:new()), - State2 = #qistate { dir = Dir } = - close_handle(journal, State1 #qistate { journal_ack_dict = JAckDict }), - SegNums = all_segment_nums(Dir), - queue_index_walker({SegNums, State2, QueueNames}); - -queue_index_walker({[], State, QueueNames}) -> - _State = terminate(State), - queue_index_walker(QueueNames); -queue_index_walker({[SegNum | SegNums], State, QueueNames}) -> - {SDict, _PubCount, _AckCount, _HighRelSeq, State1} = - load_segment(SegNum, State), - queue_index_walker({dict:to_list(SDict), State1, SegNums, QueueNames}); - -queue_index_walker({[], State, SegNums, QueueNames}) -> - queue_index_walker({SegNums, State, QueueNames}); -queue_index_walker({[{_RelSeq, {MsgId, _IsDelivered, IsPersistent}} | Msgs], - State, SegNums, QueueNames}) -> - case IsPersistent of - true -> {MsgId, 1, {Msgs, State, SegNums, QueueNames}}; - false -> queue_index_walker({Msgs, State, SegNums, QueueNames}) - end. +queue_name_to_dir_name(Name = #resource { kind = queue }) -> + Bin = term_to_binary(Name), + Size = 8*size(Bin), + <<Num:Size>> = Bin, + lists:flatten(io_lib:format("~.36B", [Num])). +queues_dir() -> + filename:join(rabbit_mnesia:dir(), "queues"). -%%---------------------------------------------------------------------------- -%% Startup Functions -%%---------------------------------------------------------------------------- +delete_queue_directory(Dir) -> + {ok, Entries} = file:list_dir(Dir), + ok = lists:foldl(fun (Entry, ok) -> + file:delete(filename:join(Dir, Entry)) + end, ok, Entries), + ok = file:del_dir(Dir). -read_and_prune_segments(State = #qistate { dir = Dir }) -> - SegNums = all_segment_nums(Dir), - CleanShutdown = detect_clean_shutdown(Dir), - {TotalMsgCount, State1} = - lists:foldl( - fun (SegNum, {TotalMsgCount1, StateN = - #qistate { publish_handle = PublishHandle, - partial_segments = Partials }}) -> - {SDict, PubCount, AckCount, _HighRelSeq, StateM} = - load_segment(SegNum, StateN), - StateL = #qistate { seg_ack_counts = AckCounts } = - drop_and_deliver(SegNum, SDict, CleanShutdown, StateM), - %% ignore the effect of drop_and_deliver on - %% TotalMsgCount and AckCounts, as drop_and_deliver - %% will add to the journal dicts, which will then - %% effect TotalMsgCount when we scatter the journal - TotalMsgCount2 = TotalMsgCount1 + dict:size(SDict), - AckCounts1 = case AckCount of - 0 -> AckCounts; - N -> dict:store(SegNum, N, AckCounts) - end, - %% In the following, whilst there may be several - %% partial segments, we only remember the last - %% one. All other partial segments get added into - %% the partial_segments dict - {PublishHandle1, Partials1} = - case PubCount of - ?SEGMENT_ENTRY_COUNT -> - {PublishHandle, Partials}; - 0 -> - {PublishHandle, Partials}; - _ -> - {{SegNum, undefined, PubCount}, - case PublishHandle of - undefined -> - Partials; - {SegNumOld, undefined, PubCountOld} -> - dict:store(SegNumOld, PubCountOld, - Partials) - end} - end, - {TotalMsgCount2, - StateL #qistate { seg_ack_counts = AckCounts1, - publish_handle = PublishHandle1, - partial_segments = Partials1 }} - end, {0, State}, SegNums), - {TotalMsgCount, State1}. - -scatter_journal(TotalMsgCount, State = #qistate { dir = Dir }) -> - {Hdl, State1 = #qistate { journal_del_dict = JDelDict, - journal_ack_dict = JAckDict }} = - get_journal_handle(State), - %% ADict and DDict may well contain duplicates. However, this is - %% ok, because we use sets to eliminate dups before writing to - %% segments - {ADict, DDict} = load_journal(Hdl, JAckDict, JDelDict), - State2 = close_handle(journal, State1), - {TotalMsgCount1, ADict1, State3} = - dict:fold(fun replay_journal_to_segment/3, - {TotalMsgCount, ADict, - %% supply empty dicts so that when - %% replay_journal_to_segment loads segments, it - %% gets all msgs, and ignores anything we've found - %% in the journal. - State2 #qistate { journal_del_dict = dict:new(), - journal_ack_dict = dict:new() }}, DDict), - %% replay for segments which only had acks, and no deliveries - {TotalMsgCount2, State4} = - dict:fold(fun replay_journal_acks_to_segment/3, - {TotalMsgCount1, State3}, ADict1), - JournalPath = filename:join(Dir, ?JOURNAL_FILENAME), - ok = file:delete(JournalPath), - {TotalMsgCount2, State4}. - -load_journal(Hdl, ADict, DDict) -> - case file_handle_cache:read(Hdl, ?SEQ_BYTES) of - {ok, <<?DEL_BIT:1, SeqId:?SEQ_BITS>>} -> - load_journal(Hdl, ADict, add_seqid_to_dict(SeqId, DDict)); - {ok, <<?ACK_BIT:1, SeqId:?SEQ_BITS>>} -> - load_journal(Hdl, add_seqid_to_dict(SeqId, ADict), DDict); - _ErrOrEoF -> - {ADict, DDict} +get_segment_handle(Segment = #segment { handle = undefined, path = Path }) -> + {ok, Hdl} = file_handle_cache:open(Path, + [binary, raw, read, write, + {read_ahead, ?SEGMENT_TOTAL_SIZE}], + [{write_buffer, infinity}]), + {Hdl, Segment #segment { handle = Hdl }}; +get_segment_handle(Segment = #segment { handle = Hdl }) -> + {Hdl, Segment}. + +find_segment(Seg, #qistate { segments = Segments, dir = Dir }) -> + case dict:find(Seg, Segments) of + {ok, Segment = #segment{}} -> Segment; + error -> #segment { pubs = 0, + acks = 0, + handle = undefined, + journal_entries = dict:new(), + path = seg_num_to_path(Dir, Seg), + num = Seg + } end. -replay_journal_to_segment(_SegNum, [], {TotalMsgCount, ADict, State}) -> - {TotalMsgCount, ADict, State}; -replay_journal_to_segment(SegNum, Dels, {TotalMsgCount, ADict, State}) -> - {SDict, _PubCount, _AckCount, _HighRelSeq, State1} = - load_segment(SegNum, State), - ValidDels = sets:to_list( - sets:filter( - fun (RelSeq) -> - case dict:find(RelSeq, SDict) of - {ok, {_MsgId, false, _IsPersistent}} -> true; - _ -> false - end - end, sets:from_list(Dels))), - State2 = append_dels_to_segment(SegNum, ValidDels, State1), - Acks = seg_entries_from_dict(SegNum, ADict), - case Acks of - [] -> {TotalMsgCount, ADict, State2}; - _ -> ADict1 = dict:erase(SegNum, ADict), - {Count, State3} = - filter_acks_and_append_to_segment(SegNum, SDict, - Acks, State2), - {TotalMsgCount - Count, ADict1, State3} - end. +store_segment(Segment = #segment { num = Seg }, + State = #qistate { segments = Segments }) -> + State #qistate { segments = dict:store(Seg, Segment, Segments) }. + +get_journal_handle(State = + #qistate { journal_handle = undefined, dir = Dir }) -> + Path = filename:join(Dir, ?JOURNAL_FILENAME), + {ok, Hdl} = file_handle_cache:open(Path, + [binary, raw, read, write, + {read_ahead, ?SEGMENT_TOTAL_SIZE}], + [{write_buffer, infinity}]), + {Hdl, State #qistate { journal_handle = Hdl }}; +get_journal_handle(State = #qistate { journal_handle = Hdl }) -> + {Hdl, State}. -replay_journal_acks_to_segment(_SegNum, [], {TotalMsgCount, State}) -> - {TotalMsgCount, State}; -replay_journal_acks_to_segment(SegNum, Acks, {TotalMsgCount, State}) -> - {SDict, _PubCount, _AckCount, _HighRelSeq, State1} = - load_segment(SegNum, State), - {Count, State2} = - filter_acks_and_append_to_segment(SegNum, SDict, Acks, State1), - {TotalMsgCount - Count, State2}. - -filter_acks_and_append_to_segment(SegNum, SDict, Acks, State) -> - ValidRelSeqIds = dict:fetch_keys(SDict), - ValidAcks = sets:to_list(sets:intersection(sets:from_list(ValidRelSeqIds), - sets:from_list(Acks))), - {length(ValidAcks), append_acks_to_segment(SegNum, ValidAcks, State)}. - -drop_and_deliver(SegNum, SDict, CleanShutdown, - State = #qistate { journal_del_dict = JDelDict, - journal_ack_dict = JAckDict }) -> - {JDelDict1, JAckDict1} = - dict:fold( - fun (RelSeq, {MsgId, IsDelivered, true}, {JDelDict2, JAckDict2}) -> - %% msg is persistent, keep only if the msg_store has it - case {IsDelivered, rabbit_msg_store:contains(MsgId)} of - {false, true} when not CleanShutdown -> - %% not delivered, but dirty shutdown => mark delivered - {add_seqid_to_dict(SegNum, RelSeq, JDelDict2), - JAckDict2}; - {_, true} -> - {JDelDict2, JAckDict2}; - {true, false} -> - {JDelDict2, - add_seqid_to_dict(SegNum, RelSeq, JAckDict2)}; - {false, false} -> - {add_seqid_to_dict(SegNum, RelSeq, JDelDict2), - add_seqid_to_dict(SegNum, RelSeq, JAckDict2)} - end; - (RelSeq, {_MsgId, false, false}, {JDelDict2, JAckDict2}) -> - %% not persistent and not delivered => deliver and ack it - {add_seqid_to_dict(SegNum, RelSeq, JDelDict2), - add_seqid_to_dict(SegNum, RelSeq, JAckDict2)}; - (RelSeq, {_MsgId, true, false}, {JDelDict2, JAckDict2}) -> - %% not persistent but delivered => ack it - {JDelDict2, - add_seqid_to_dict(SegNum, RelSeq, JAckDict2)} - end, {JDelDict, JAckDict}, SDict), - State #qistate { journal_del_dict = JDelDict1, - journal_ack_dict = JAckDict1 }. +bool_to_int(true ) -> 1; +bool_to_int(false) -> 0. +write_entry_to_segment(_RelSeq, {{_MsgId, _IsPersistent}, del, ack}, Hdl) -> + Hdl; +write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) -> + ok = case Publish of + no_pub -> + ok; + {MsgId, IsPersistent} -> + file_handle_cache:append( + Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, + (bool_to_int(IsPersistent)):1, + RelSeq:?REL_SEQ_BITS>>, MsgId]) + end, + ok = case {Del, Ack} of + {no_del, no_ack} -> ok; + _ -> Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>>, + Data = case {Del, Ack} of + {del, ack} -> [Binary, Binary]; + _ -> Binary + end, + file_handle_cache:append(Hdl, Data) + end, + Hdl. + +terminate(StoreShutdown, State = + #qistate { segments = Segments, journal_handle = JournalHdl, + dir = Dir }) -> + ok = case JournalHdl of + undefined -> ok; + _ -> file_handle_cache:close(JournalHdl) + end, + ok = dict:fold( + fun (_Seg, #segment { handle = undefined }, ok) -> + ok; + (_Seg, #segment { handle = Hdl }, ok) -> + file_handle_cache:close(Hdl) + end, ok, Segments), + case StoreShutdown of + true -> store_clean_shutdown(Dir); + false -> ok + end, + State #qistate { journal_handle = undefined, segments = dict:new() }. %%---------------------------------------------------------------------------- -%% Loading Segments +%% Majors %%---------------------------------------------------------------------------- -load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls, - dir = Dir }) -> - SegmentExists = case dict:find(SegNum, SegHdls) of - {ok, _} -> true; - error -> filelib:is_file(seg_num_to_path(Dir, SegNum)) +%% Loading segments + +%% Does not do any combining with the journal at all. The PubCount +%% that comes back is the number of publishes in the segment. The +%% number of unacked msgs is PubCount - AckCount. If KeepAcks is +%% false, then dict:size(SegDict) == PubCount - AckCount. If KeepAcks +%% is true, then dict:size(SegDict) == PubCount. +load_segment(Seg, KeepAcks, State) -> + Segment = #segment { path = Path, handle = SegHdl } = + find_segment(Seg, State), + SegmentExists = case SegHdl of + undefined -> filelib:is_file(Path); + _ -> true end, case SegmentExists of false -> - {dict:new(), 0, 0, 0, State}; + {dict:new(), 0, 0, State}; true -> - {Hdl, State1 = #qistate { journal_del_dict = JDelDict, - journal_ack_dict = JAckDict }} = - get_seg_handle(SegNum, State), + {Hdl, Segment1} = get_segment_handle(Segment), {ok, 0} = file_handle_cache:position(Hdl, bof), - {SDict, PubCount, AckCount, HighRelSeq} = - load_segment_entries(Hdl, dict:new(), 0, 0, 0), - %% delete ack'd msgs first - {SDict1, AckCount1} = - lists:foldl(fun (RelSeq, {SDict2, AckCount2}) -> - {dict:erase(RelSeq, SDict2), AckCount2 + 1} - end, {SDict, AckCount}, - seg_entries_from_dict(SegNum, JAckDict)), - %% ensure remaining msgs are delivered as necessary - SDict3 = - lists:foldl( - fun (RelSeq, SDict4) -> - case dict:find(RelSeq, SDict4) of - {ok, {MsgId, false, IsPersistent}} -> - dict:store(RelSeq, - {MsgId, true, IsPersistent}, - SDict4); - _ -> - SDict4 - end - end, SDict1, seg_entries_from_dict(SegNum, JDelDict)), - {SDict3, PubCount, AckCount1, HighRelSeq, State1} + {SegDict, PubCount, AckCount} = + load_segment_entries(KeepAcks, Hdl, dict:new(), 0, 0), + {SegDict, PubCount, AckCount, store_segment(Segment1, State)} end. -load_segment_entries(Hdl, SDict, PubCount, AckCount, HighRelSeq) -> +load_segment_entries(KeepAcks, Hdl, SegDict, PubCount, AckCount) -> case file_handle_cache:read(Hdl, 1) of {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>} -> {ok, LSB} = file_handle_cache:read( Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1), <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, - {SDict1, AckCount1} = deliver_or_ack_msg(SDict, AckCount, RelSeq), - load_segment_entries(Hdl, SDict1, PubCount, AckCount1, HighRelSeq); + {AckCount1, SegDict1} = + deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict), + load_segment_entries(KeepAcks, Hdl, SegDict1, PubCount, AckCount1); {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>} -> %% because we specify /binary, and binaries are complete @@ -779,71 +582,252 @@ load_segment_entries(Hdl, SDict, PubCount, AckCount, HighRelSeq) -> file_handle_cache:read( Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1), <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, - HighRelSeq1 = lists:max([RelSeq, HighRelSeq]), - load_segment_entries( - Hdl, dict:store(RelSeq, {MsgId, false, 1 == IsPersistentNum}, - SDict), PubCount + 1, AckCount, HighRelSeq1); + SegDict1 = + dict:store(RelSeq, + {{MsgId, 1 == IsPersistentNum}, no_del, no_ack}, + SegDict), + load_segment_entries(KeepAcks, Hdl, SegDict1, PubCount+1, AckCount); _ErrOrEoF -> - {SDict, PubCount, AckCount, HighRelSeq} + {SegDict, PubCount, AckCount} end. -deliver_or_ack_msg(SDict, AckCount, RelSeq) -> - case dict:find(RelSeq, SDict) of - {ok, {MsgId, false, IsPersistent}} -> - {dict:store(RelSeq, {MsgId, true, IsPersistent}, SDict), AckCount}; - {ok, {_MsgId, true, _IsPersistent}} -> - {dict:erase(RelSeq, SDict), AckCount + 1} +deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict) -> + case dict:find(RelSeq, SegDict) of + {ok, {PubRecord, no_del, no_ack}} -> + {AckCount, dict:store(RelSeq, {PubRecord, del, no_ack}, SegDict)}; + {ok, {PubRecord, del, no_ack}} when KeepAcks -> + {AckCount + 1, dict:store(RelSeq, {PubRecord, del, ack}, SegDict)}; + {ok, {_PubRecord, del, no_ack}} -> + {AckCount + 1, dict:erase(RelSeq, SegDict)} end. +%% Loading Journal. This isn't idempotent and will mess up the counts +%% if you call it more than once on the same state. Assumes the counts +%% are 0 to start with. + +load_journal(State) -> + {JournalHdl, State1} = get_journal_handle(State), + {ok, 0} = file_handle_cache:position(JournalHdl, 0), + State2 = #qistate { segments = Segments } = load_journal_entries(State1), + dict:fold( + fun (Seg, #segment { journal_entries = JEntries, + pubs = PubCountInJournal, + acks = AckCountInJournal }, StateN) -> + %% We want to keep acks in so that we can remove them if + %% duplicates are in the journal. The counts here are + %% purely from the segment itself. + {SegDict, PubCountInSeg, AckCountInSeg, StateN1} = + load_segment(Seg, true, StateN), + %% Removed counts here are the number of pubs and acks + %% that are duplicates - i.e. found in both the segment + %% and journal. + {JEntries1, PubsRemoved, AcksRemoved} = + journal_minus_segment(JEntries, SegDict), + Segment1 = find_segment(Seg, StateN1), + PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved, + AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved, + store_segment(Segment1 #segment { journal_entries = JEntries1, + pubs = PubCount1, + acks = AckCount1 }, StateN1) + end, State2, Segments). + +load_journal_entries(State = #qistate { journal_handle = Hdl }) -> + case file_handle_cache:read(Hdl, ?SEQ_BYTES) of + {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} -> + case Prefix of + ?DEL_JPREFIX -> + load_journal_entries(add_to_journal(SeqId, del, State)); + ?ACK_JPREFIX -> + load_journal_entries(add_to_journal(SeqId, ack, State)); + _ -> + case file_handle_cache:read(Hdl, ?MSG_ID_BYTES) of + {ok, <<MsgIdNum:?MSG_ID_BITS>>} -> + %% work around for binary data + %% fragmentation. See + %% rabbit_msg_file:read_next/2 + <<MsgId:?MSG_ID_BYTES/binary>> = + <<MsgIdNum:?MSG_ID_BITS>>, + Publish = {MsgId, + case Prefix of + ?PUB_PERSIST_JPREFIX -> true; + ?PUB_TRANS_JPREFIX -> false + end}, + load_journal_entries( + add_to_journal(SeqId, Publish, State)); + _ErrOrEoF -> %% err, we've lost at least a publish + State + end + end; + _ErrOrEoF -> State + end. -%%---------------------------------------------------------------------------- -%% Appending Acks or Dels to Segments -%%---------------------------------------------------------------------------- - -append_acks_to_segment(SegNum, Acks, - State = #qistate { seg_ack_counts = AckCounts, - partial_segments = Partials }) -> - AckCount = case dict:find(SegNum, AckCounts) of - {ok, AckCount1} -> AckCount1; - error -> 0 - end, - AckTarget = case dict:find(SegNum, Partials) of - {ok, PubCount} -> PubCount; - error -> ?SEGMENT_ENTRY_COUNT - end, - AckCount2 = AckCount + length(Acks), - append_acks_to_segment(SegNum, AckCount2, Acks, AckTarget, State). - -append_acks_to_segment(SegNum, AckCount, _Acks, AckCount, State = - #qistate { publish_handle = PubHdl }) -> - PubHdl1 = case PubHdl of - %% If we're adjusting the pubhdl here then there - %% will be no entry in partials, thus the target ack - %% count must be SEGMENT_ENTRY_COUNT - {SegNum, Hdl, AckCount = ?SEGMENT_ENTRY_COUNT} - when Hdl /= undefined -> - {SegNum + 1, undefined, 0}; - _ -> - PubHdl - end, - delete_segment(SegNum, State #qistate { publish_handle = PubHdl1 }); -append_acks_to_segment(_SegNum, _AckCount, [], _AckTarget, State) -> - State; -append_acks_to_segment(SegNum, AckCount, Acks, AckTarget, State = - #qistate { seg_ack_counts = AckCounts }) - when AckCount < AckTarget -> - {Hdl, State1} = append_to_segment(SegNum, Acks, State), - ok = file_handle_cache:sync(Hdl), - State1 #qistate { seg_ack_counts = - dict:store(SegNum, AckCount, AckCounts) }. - -append_dels_to_segment(SegNum, Dels, State) -> - {_Hdl, State1} = append_to_segment(SegNum, Dels, State), - State1. +add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount }) -> + {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), + Segment = #segment { journal_entries = SegJDict, + pubs = PubCount, acks = AckCount } = + find_segment(Seg, State), + SegJDict1 = add_to_journal(RelSeq, Action, SegJDict), + Segment1 = Segment #segment { journal_entries = SegJDict1 }, + Segment2 = + case Action of + del -> Segment1; + ack -> Segment1 #segment { acks = AckCount + 1 }; + {_MsgId, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 } + end, + store_segment(Segment2, State #qistate { dirty_count = DCount + 1 }); + +%% This is a more relaxed version of deliver_or_ack_msg because we can +%% have dels or acks in the journal without the corresponding +%% pub. Also, always want to keep acks. Things must occur in the right +%% order though. +add_to_journal(RelSeq, Action, SegJDict) -> + case dict:find(RelSeq, SegJDict) of + {ok, {PubRecord, no_del, no_ack}} when Action == del -> + dict:store(RelSeq, {PubRecord, del, no_ack}, SegJDict); + {ok, {PubRecord, DelRecord, no_ack}} when Action == ack -> + dict:store(RelSeq, {PubRecord, DelRecord, ack}, SegJDict); + error when Action == del -> + dict:store(RelSeq, {no_pub, del, no_ack}, SegJDict); + error when Action == ack -> + dict:store(RelSeq, {no_pub, no_del, ack}, SegJDict); + error -> + {_MsgId, _IsPersistent} = Action, %% ASSERTION + dict:store(RelSeq, {Action, no_del, no_ack}, SegJDict) + end. -append_to_segment(SegNum, AcksOrDels, State) -> - {Hdl, State1} = get_seg_handle(SegNum, State), - ok = file_handle_cache:append( - Hdl, [<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>> || RelSeq <- AcksOrDels ]), - {Hdl, State1}. +%% Combine what we have just read from a segment file with what we're +%% holding for that segment in memory. There must be no +%% duplicates. Used when providing segment entries to the variable +%% queue. +journal_plus_segment(JEntries, SegDict) -> + dict:fold(fun (RelSeq, JObj, SegDictOut) -> + SegEntry = case dict:find(RelSeq, SegDictOut) of + error -> not_found; + {ok, SObj = {_, _, _}} -> SObj + end, + journal_plus_segment(JObj, SegEntry, RelSeq, SegDictOut) + end, SegDict, JEntries). + +%% Here, the OutDict is the SegDict which we may be adding to (for +%% items only in the journal), modifying (bits in both), or erasing +%% from (ack in journal, not segment). +journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack}, + not_found, + RelSeq, OutDict) -> + dict:store(RelSeq, Obj, OutDict); +journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack}, + not_found, + RelSeq, OutDict) -> + dict:store(RelSeq, Obj, OutDict); +journal_plus_segment({{_MsgId, _IsPersistent}, del, ack}, + not_found, + RelSeq, OutDict) -> + dict:erase(RelSeq, OutDict); + +journal_plus_segment({no_pub, del, no_ack}, + {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, + RelSeq, OutDict) -> + dict:store(RelSeq, {PubRecord, del, no_ack}, OutDict); + +journal_plus_segment({no_pub, del, ack}, + {{_MsgId, _IsPersistent}, no_del, no_ack}, + RelSeq, OutDict) -> + dict:erase(RelSeq, OutDict); +journal_plus_segment({no_pub, no_del, ack}, + {{_MsgId, _IsPersistent}, del, no_ack}, + RelSeq, OutDict) -> + dict:erase(RelSeq, OutDict). + + +%% Remove from the journal entries for a segment, items that are +%% duplicates of entries found in the segment itself. Used on start up +%% to clean up the journal. +journal_minus_segment(JEntries, SegDict) -> + dict:fold(fun (RelSeq, JObj, {JEntriesOut, PubsRemoved, AcksRemoved}) -> + SegEntry = case dict:find(RelSeq, SegDict) of + error -> not_found; + {ok, SObj = {_, _, _}} -> SObj + end, + journal_minus_segment(JObj, SegEntry, RelSeq, JEntriesOut, + PubsRemoved, AcksRemoved) + end, {dict:new(), 0, 0}, JEntries). + +%% Here, the OutDict is a fresh journal that we're filling with valid +%% entries. PubsRemoved and AcksRemoved only get increased when the a +%% publish or ack is in both the journal and the segment. + +%% Both the same. Must be at least the publish +journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, no_ack}, + _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {OutDict, PubsRemoved + 1, AcksRemoved}; +journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, ack}, + _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {OutDict, PubsRemoved + 1, AcksRemoved + 1}; + +%% Just publish in journal +journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack}, + not_found, + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; + +%% Just deliver in journal +journal_minus_segment(Obj = {no_pub, del, no_ack}, + {{_MsgId, _IsPersistent}, no_del, no_ack}, + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; +journal_minus_segment({no_pub, del, no_ack}, + {{_MsgId, _IsPersistent}, del, no_ack}, + _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {OutDict, PubsRemoved, AcksRemoved}; + +%% Just ack in journal +journal_minus_segment(Obj = {no_pub, no_del, ack}, + {{_MsgId, _IsPersistent}, del, no_ack}, + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; +journal_minus_segment({no_pub, no_del, ack}, + {{_MsgId, _IsPersistent}, del, ack}, + _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {OutDict, PubsRemoved, AcksRemoved}; + +%% Publish and deliver in journal +journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack}, + not_found, + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; +journal_minus_segment({PubRecord, del, no_ack}, + {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, {no_pub, del, no_ack}, OutDict), + PubsRemoved + 1, AcksRemoved}; + +%% Deliver and ack in journal +journal_minus_segment(Obj = {no_pub, del, ack}, + {{_MsgId, _IsPersistent}, no_del, no_ack}, + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; +journal_minus_segment({no_pub, del, ack}, + {{_MsgId, _IsPersistent}, del, no_ack}, + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, {no_pub, no_del, ack}, OutDict), + PubsRemoved, AcksRemoved}; +journal_minus_segment({no_pub, del, ack}, + {{_MsgId, _IsPersistent}, del, ack}, + _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {OutDict, PubsRemoved, AcksRemoved + 1}; + +%% Publish, deliver and ack in journal +journal_minus_segment({{_MsgId, _IsPersistent}, del, ack}, + not_found, + _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {OutDict, PubsRemoved, AcksRemoved}; +journal_minus_segment({PubRecord, del, ack}, + {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, {no_pub, del, ack}, OutDict), + PubsRemoved + 1, AcksRemoved}; +journal_minus_segment({PubRecord, del, ack}, + {PubRecord = {_MsgId, _IsPersistent}, del, no_ack}, + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, {no_pub, no_del, ack}, OutDict), + PubsRemoved + 1, AcksRemoved}. diff --git a/src/rabbit_queue_index3.erl b/src/rabbit_queue_index3.erl deleted file mode 100644 index 43a210d950..0000000000 --- a/src/rabbit_queue_index3.erl +++ /dev/null @@ -1,850 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_queue_index3). - --export([init/1, terminate/1, terminate_and_erase/1, write_published/4, - write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1, - read_segment_entries/2, next_segment_boundary/1, segment_size/0, - find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]). - --define(CLEAN_FILENAME, "clean.dot"). - -%%---------------------------------------------------------------------------- -%% ---- Journal details ---- - --define(MAX_JOURNAL_ENTRY_COUNT, 32768). --define(JOURNAL_FILENAME, "journal.jif"). - --define(PUB_PERSIST_JPREFIX, 00). --define(PUB_TRANS_JPREFIX, 01). --define(DEL_JPREFIX, 10). --define(ACK_JPREFIX, 11). --define(JPREFIX_BITS, 2). --define(SEQ_BYTES, 8). --define(SEQ_BITS, ((?SEQ_BYTES * 8) - ?JPREFIX_BITS)). - -%% ---- Segment details ---- - --define(SEGMENT_EXTENSION, ".idx"). - --define(REL_SEQ_BITS, 14). --define(REL_SEQ_BITS_BYTE_ALIGNED, (?REL_SEQ_BITS + 8 - (?REL_SEQ_BITS rem 8))). --define(SEGMENT_ENTRY_COUNT, 16384). %% trunc(math:pow(2,?REL_SEQ_BITS))). - -%% seq only is binary 00 followed by 14 bits of rel seq id -%% (range: 0 - 16383) --define(REL_SEQ_ONLY_PREFIX, 00). --define(REL_SEQ_ONLY_PREFIX_BITS, 2). --define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2). - -%% publish record is binary 1 followed by a bit for is_persistent, -%% then 14 bits of rel seq id, and 128 bits of md5sum msg id --define(PUBLISH_PREFIX, 1). --define(PUBLISH_PREFIX_BITS, 1). - --define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes --define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)). -%% 16 bytes for md5sum + 2 for seq, bits and prefix --define(PUBLISH_RECORD_LENGTH_BYTES, ?MSG_ID_BYTES + 2). - -%% 1 publish, 1 deliver, 1 ack per msg --define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * - (?PUBLISH_RECORD_LENGTH_BYTES + - (2 * ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES))). - -%%---------------------------------------------------------------------------- - --record(qistate, - { dir, - segments, - journal_handle, - dirty_count - }). - --record(segment, - { pubs, - acks, - handle, - journal_entries, - path, - num - }). - --include("rabbit.hrl"). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(hdl() :: ('undefined' | any())). --type(msg_id() :: binary()). --type(seq_id() :: integer()). --type(qistate() :: #qistate { dir :: file_path(), - segments :: dict(), - journal_handle :: hdl(), - dirty_count :: integer() - }). - --spec(init/1 :: (queue_name()) -> {non_neg_integer(), qistate()}). --spec(terminate/1 :: (qistate()) -> qistate()). --spec(terminate_and_erase/1 :: (qistate()) -> qistate()). --spec(write_published/4 :: (msg_id(), seq_id(), boolean(), qistate()) - -> qistate()). --spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()). --spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()). --spec(sync_seq_ids/2 :: ([seq_id()], qistate()) -> qistate()). --spec(flush_journal/1 :: (qistate()) -> qistate()). --spec(read_segment_entries/2 :: (seq_id(), qistate()) -> - {[{msg_id(), seq_id(), boolean(), boolean()}], qistate()}). --spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). --spec(segment_size/0 :: () -> non_neg_integer()). --spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) -> - {non_neg_integer(), non_neg_integer(), qistate()}). --spec(start_msg_store/1 :: ([amqqueue()]) -> 'ok'). - --endif. - - -%%---------------------------------------------------------------------------- -%% Public API -%%---------------------------------------------------------------------------- - -init(Name) -> - State = blank_state(Name), - %% 1. Load the journal completely. This will also load segments - %% which have entries in the journal and remove duplicates. - %% The counts will correctly reflect the combination of the - %% segment and the journal. - State1 = load_journal(State), - %% 2. Flush the journal. This makes life easier for everyone, as - %% it means there won't be any publishes in the journal alone. - State2 = #qistate { dir = Dir } = flush_journal(State1), - %% 3. Load each segment in turn and filter out messages that are - %% not in the msg_store, by adding acks to the journal. These - %% acks only go to the RAM journal as it doesn't matter if we - %% lose them. Also mark delivered if not clean shutdown. - AllSegs = all_segment_nums(Dir), - CleanShutdown = detect_clean_shutdown(Dir), - %% We know the journal is empty here, so we don't need to combine - %% with the journal, and we don't need to worry about messages - %% that have been acked. - State3 = - lists:foldl( - fun (Seg, StateN) -> - {SegDict, _PubCount, _AckCount, StateN1} = - load_segment(Seg, false, StateN), - dict:fold( - fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack}, - StateM) -> - SeqId = reconstruct_seq_id(Seg, RelSeq), - InMsgStore = rabbit_msg_store:contains(MsgId), - case {InMsgStore, CleanShutdown} of - {true, true} -> - StateM; - {true, false} when Del == del -> - StateM; - {true, false} -> - add_to_journal(SeqId, del, StateM); - {false, _} when Del == del -> - add_to_journal(SeqId, ack, StateM); - {false, _} -> - add_to_journal( - SeqId, ack, - add_to_journal(SeqId, del, StateM)) - end - end, StateN1, SegDict) - end, State2, AllSegs), - %% 4. Go through all segments and calculate the number of unacked - %% messages we have. - Count = lists:foldl( - fun (Seg, CountAcc) -> - #segment { pubs = PubCount, acks = AckCount } = - find_segment(Seg, State3), - CountAcc + PubCount - AckCount - end, 0, AllSegs), - {Count, State3}. - -terminate(State) -> - terminate(true, State). - -terminate_and_erase(State) -> - State1 = terminate(State), - ok = delete_queue_directory(State1 #qistate.dir), - State1. - -write_published(MsgId, SeqId, IsPersistent, State) - when is_binary(MsgId) -> - ?MSG_ID_BYTES = size(MsgId), - {JournalHdl, State1} = get_journal_handle(State), - ok = file_handle_cache:append(JournalHdl, - [<<(case IsPersistent of - true -> ?PUB_PERSIST_JPREFIX; - false -> ?PUB_TRANS_JPREFIX - end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, - MsgId]), - maybe_flush_journal(add_to_journal(SeqId, {MsgId, IsPersistent}, State1)). - -write_delivered(SeqId, State) -> - {JournalHdl, State1} = get_journal_handle(State), - ok = file_handle_cache:append(JournalHdl, - <<?DEL_JPREFIX:?JPREFIX_BITS, - SeqId:?SEQ_BITS>>), - maybe_flush_journal(add_to_journal(SeqId, del, State1)). - -write_acks(SeqIds, State) -> - {SeqIds1, State1} = remove_pubs_dels_from_journal(SeqIds, State), - case SeqIds1 of - [] -> - State; - _ -> - {JournalHdl, State2} = get_journal_handle(State1), - ok = file_handle_cache:append(JournalHdl, - [<<?ACK_JPREFIX:?JPREFIX_BITS, - SeqId:?SEQ_BITS>> - || SeqId <- SeqIds1]), - State3 = lists:foldl(fun (SeqId, StateN) -> - add_to_journal(SeqId, ack, StateN) - end, State2, SeqIds1), - maybe_flush_journal(State3) - end. - -sync_seq_ids(_SeqIds, State = #qistate { journal_handle = undefined }) -> - State; -sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> - ok = file_handle_cache:sync(JournalHdl), - State. - -flush_journal(State = #qistate { dirty_count = 0 }) -> - State; -flush_journal(State = #qistate { segments = Segments }) -> - State1 = - dict:fold( - fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount, - acks = AckCount } = Segment, StateN) -> - case dict:is_empty(JEntries) of - true -> store_segment(Segment, StateN); - false when AckCount == PubCount -> - ok = delete_segment(Segment); - false -> - {Hdl, Segment1} = get_segment_handle(Segment), - dict:fold(fun write_entry_to_segment/3, - Hdl, JEntries), - ok = file_handle_cache:sync(Hdl), - store_segment( - Segment1 #segment { journal_entries = dict:new() }, - StateN) - end - end, State, Segments), - {JournalHdl, State2} = get_journal_handle(State1), - {ok, 0} = file_handle_cache:position(JournalHdl, bof), - ok = file_handle_cache:truncate(JournalHdl), - ok = file_handle_cache:sync(JournalHdl), - State2 #qistate { dirty_count = 0 }. - -read_segment_entries(InitSeqId, State) -> - {Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), - {SegDict, _PubCount, _AckCount, State1} = - load_segment(Seg, false, State), - #segment { journal_entries = JEntries } = find_segment(Seg, State1), - SegDict1 = journal_plus_segment(JEntries, SegDict), - %% deliberately sort the list desc, because foldl will reverse it - RelSeqs = rev_sort(dict:fetch_keys(SegDict1)), - {lists:foldl(fun (RelSeq, Acc) -> - {{MsgId, IsPersistent}, IsDelivered, no_ack} = - dict:fetch(RelSeq, SegDict1), - [ {MsgId, reconstruct_seq_id(Seg, RelSeq), - IsPersistent, IsDelivered} | Acc ] - end, [], RelSeqs), - State1}. - -next_segment_boundary(SeqId) -> - {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - reconstruct_seq_id(Seg + 1, 0). - -segment_size() -> - ?SEGMENT_ENTRY_COUNT. - -find_lowest_seq_id_seg_and_next_seq_id(State = #qistate { dir = Dir }) -> - SegNums = all_segment_nums(Dir), - %% We don't want the lowest seq_id, merely the seq_id of the start - %% of the lowest segment. That seq_id may not actually exist, but - %% that's fine. The important thing is that the segment exists and - %% the seq_id reported is on a segment boundary. - - %% We also don't really care about the max seq_id. Just start the - %% next segment: it makes life much easier. - - %% SegNums is sorted, ascending. - {LowSeqIdSeg, NextSeqId} = - case SegNums of - [] -> {0, 0}; - [MinSeg|_] -> {reconstruct_seq_id(MinSeg, 0), - reconstruct_seq_id(lists:last(SegNums), 0)} - end, - {LowSeqIdSeg, NextSeqId, State}. - -start_msg_store(DurableQueues) -> - DurableDict = - dict:from_list([ {queue_name_to_dir_name(Queue #amqqueue.name), - Queue #amqqueue.name} || Queue <- DurableQueues ]), - QueuesDir = queues_dir(), - Directories = case file:list_dir(QueuesDir) of - {ok, Entries} -> - [ Entry || Entry <- Entries, - filelib:is_dir( - filename:join(QueuesDir, Entry)) ]; - {error, enoent} -> - [] - end, - DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)), - {DurableQueueNames, TransientDirs} = - lists:foldl( - fun (QueueDir, {DurableAcc, TransientAcc}) -> - case sets:is_element(QueueDir, DurableDirectories) of - true -> - {[dict:fetch(QueueDir, DurableDict) | DurableAcc], - TransientAcc}; - false -> - {DurableAcc, [QueueDir | TransientAcc]} - end - end, {[], []}, Directories), - MsgStoreDir = filename:join(rabbit_mnesia:dir(), "msg_store"), - ok = rabbit:start_child(rabbit_msg_store, [MsgStoreDir, - fun queue_index_walker/1, - DurableQueueNames]), - lists:foreach(fun (DirName) -> - Dir = filename:join(queues_dir(), DirName), - ok = delete_queue_directory(Dir) - end, TransientDirs), - ok. - -%%---------------------------------------------------------------------------- -%% Msg Store Startup Delta Function -%%---------------------------------------------------------------------------- - -queue_index_walker([]) -> - finished; -queue_index_walker([QueueName|QueueNames]) -> - State = #qistate { dir = Dir } = blank_state(QueueName), - State1 = #qistate { journal_handle = JHdl } = load_journal(State), - ok = file_handle_cache:close(JHdl), - SegNums = all_segment_nums(Dir), - queue_index_walker({SegNums, State1, QueueNames}); - -queue_index_walker({[], State, QueueNames}) -> - _State = terminate(false, State), - queue_index_walker(QueueNames); -queue_index_walker({[Seg | SegNums], State, QueueNames}) -> - SeqId = reconstruct_seq_id(Seg, 0), - {Messages, State1} = read_segment_entries(SeqId, State), - queue_index_walker({Messages, State1, SegNums, QueueNames}); - -queue_index_walker({[], State, SegNums, QueueNames}) -> - queue_index_walker({SegNums, State, QueueNames}); -queue_index_walker({[{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs], - State, SegNums, QueueNames}) -> - case IsPersistent of - true -> {MsgId, 1, {Msgs, State, SegNums, QueueNames}}; - false -> queue_index_walker({Msgs, State, SegNums, QueueNames}) - end. - -%%---------------------------------------------------------------------------- -%% Minors -%%---------------------------------------------------------------------------- - -maybe_flush_journal(State = #qistate { dirty_count = DCount }) - when DCount > ?MAX_JOURNAL_ENTRY_COUNT -> - flush_journal(State); -maybe_flush_journal(State) -> - State. - -all_segment_nums(Dir) -> - lists:sort( - [list_to_integer( - lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName)) - || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)]). - -blank_state(QueueName) -> - StrName = queue_name_to_dir_name(QueueName), - Dir = filename:join(queues_dir(), StrName), - ok = filelib:ensure_dir(filename:join(Dir, "nothing")), - #qistate { dir = Dir, - segments = dict:new(), - journal_handle = undefined, - dirty_count = 0 - }. - -rev_sort(List) -> - lists:sort(fun (A, B) -> B < A end, List). - -seq_id_to_seg_and_rel_seq_id(SeqId) -> - { SeqId div ?SEGMENT_ENTRY_COUNT, SeqId rem ?SEGMENT_ENTRY_COUNT }. - -reconstruct_seq_id(Seg, RelSeq) -> - (Seg * ?SEGMENT_ENTRY_COUNT) + RelSeq. - -seg_num_to_path(Dir, Seg) -> - SegName = integer_to_list(Seg), - filename:join(Dir, SegName ++ ?SEGMENT_EXTENSION). - -delete_segment(#segment { handle = undefined }) -> - ok; -delete_segment(#segment { handle = Hdl, path = Path }) -> - ok = file_handle_cache:close(Hdl), - ok = file:delete(Path), - ok. - -detect_clean_shutdown(Dir) -> - case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of - ok -> true; - {error, enoent} -> false - end. - -store_clean_shutdown(Dir) -> - {ok, Hdl} = file_handle_cache:open(filename:join(Dir, ?CLEAN_FILENAME), - [write, raw, binary], - [{write_buffer, unbuffered}]), - ok = file_handle_cache:close(Hdl). - -queue_name_to_dir_name(Name = #resource { kind = queue }) -> - Bin = term_to_binary(Name), - Size = 8*size(Bin), - <<Num:Size>> = Bin, - lists:flatten(io_lib:format("~.36B", [Num])). - -queues_dir() -> - filename:join(rabbit_mnesia:dir(), "queues"). - -delete_queue_directory(Dir) -> - {ok, Entries} = file:list_dir(Dir), - ok = lists:foldl(fun (Entry, ok) -> - file:delete(filename:join(Dir, Entry)) - end, ok, Entries), - ok = file:del_dir(Dir). - -get_segment_handle(Segment = #segment { handle = undefined, path = Path }) -> - {ok, Hdl} = file_handle_cache:open(Path, - [binary, raw, read, write, - {read_ahead, ?SEGMENT_TOTAL_SIZE}], - [{write_buffer, infinity}]), - {Hdl, Segment #segment { handle = Hdl }}; -get_segment_handle(Segment = #segment { handle = Hdl }) -> - {Hdl, Segment}. - -find_segment(Seg, #qistate { segments = Segments, dir = Dir }) -> - case dict:find(Seg, Segments) of - {ok, Segment = #segment{}} -> Segment; - error -> #segment { pubs = 0, - acks = 0, - handle = undefined, - journal_entries = dict:new(), - path = seg_num_to_path(Dir, Seg), - num = Seg - } - end. - -store_segment(Segment = #segment { num = Seg }, - State = #qistate { segments = Segments }) -> - State #qistate { segments = dict:store(Seg, Segment, Segments) }. - -get_journal_handle(State = - #qistate { journal_handle = undefined, dir = Dir }) -> - Path = filename:join(Dir, ?JOURNAL_FILENAME), - {ok, Hdl} = file_handle_cache:open(Path, - [binary, raw, read, write, - {read_ahead, ?SEGMENT_TOTAL_SIZE}], - [{write_buffer, infinity}]), - {Hdl, State #qistate { journal_handle = Hdl }}; -get_journal_handle(State = #qistate { journal_handle = Hdl }) -> - {Hdl, State}. - -bool_to_int(true ) -> 1; -bool_to_int(false) -> 0. - -write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) -> - ok = case Publish of - no_pub -> - ok; - {MsgId, IsPersistent} -> - file_handle_cache:append( - Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS>>, MsgId]) - end, - ok = case {Del, Ack} of - {no_del, no_ack} -> ok; - _ -> Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>, - Data = case {Del, Ack} of - {del, ack} -> [Binary, Binary]; - _ -> Binary - end, - file_handle_cache:append(Hdl, Data) - end, - Hdl. - -terminate(StoreShutdown, State = - #qistate { segments = Segments, journal_handle = JournalHdl, - dir = Dir }) -> - ok = case JournalHdl of - undefined -> ok; - _ -> file_handle_cache:close(JournalHdl) - end, - ok = dict:fold( - fun (_Seg, #segment { handle = undefined }, ok) -> - ok; - (_Seg, #segment { handle = Hdl }, ok) -> - file_handle_cache:close(Hdl) - end, ok, Segments), - case StoreShutdown of - true -> store_clean_shutdown(Dir); - false -> ok - end, - State #qistate { journal_handle = undefined, segments = dict:new() }. - -remove_pubs_dels_from_journal(SeqIds, State) -> - lists:foldl( - fun (SeqId, {SeqIdsAcc, StateN}) -> - {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - Segment = #segment { journal_entries = JEntries, - acks = AckCount } = - find_segment(Seg, StateN), - case dict:find(RelSeq, JEntries) of - {ok, {{_MsgId, _IsPersistent}, del, no_ack}} -> - StateN1 = - store_segment( - Segment #segment { journal_entries = - dict:erase(RelSeq, JEntries), - acks = AckCount + 1 }, - StateN), - {SeqIdsAcc, StateN1}; - _ -> - {[SeqId | SeqIdsAcc], StateN} - end - end, {[], State}, SeqIds). - -%%---------------------------------------------------------------------------- -%% Majors -%%---------------------------------------------------------------------------- - -%% Loading segments - -%% Does not do any combining with the journal at all. The PubCount -%% that comes back is the number of publishes in the segment. The -%% number of unacked msgs is PubCount - AckCount. If KeepAcks is -%% false, then dict:size(SegDict) == PubCount - AckCount. If KeepAcks -%% is true, then dict:size(SegDict) == PubCount. -load_segment(Seg, KeepAcks, State) -> - Segment = #segment { path = Path, handle = SegHdl } = - find_segment(Seg, State), - SegmentExists = case SegHdl of - undefined -> filelib:is_file(Path); - _ -> true - end, - case SegmentExists of - false -> - {dict:new(), 0, 0, State}; - true -> - {Hdl, Segment1} = get_segment_handle(Segment), - {ok, 0} = file_handle_cache:position(Hdl, bof), - {SegDict, PubCount, AckCount} = - load_segment_entries(KeepAcks, Hdl, dict:new(), 0, 0), - {SegDict, PubCount, AckCount, store_segment(Segment1, State)} - end. - -load_segment_entries(KeepAcks, Hdl, SegDict, PubCount, AckCount) -> - case file_handle_cache:read(Hdl, 1) of - {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>} -> - {ok, LSB} = file_handle_cache:read( - Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1), - <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, - {AckCount1, SegDict1} = - deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict), - load_segment_entries(KeepAcks, Hdl, SegDict1, PubCount, AckCount1); - {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>} -> - %% because we specify /binary, and binaries are complete - %% bytes, the size spec is in bytes, not bits. - {ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>} = - file_handle_cache:read( - Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1), - <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, - SegDict1 = - dict:store(RelSeq, - {{MsgId, 1 == IsPersistentNum}, no_del, no_ack}, - SegDict), - load_segment_entries(KeepAcks, Hdl, SegDict1, PubCount+1, AckCount); - _ErrOrEoF -> - {SegDict, PubCount, AckCount} - end. - -deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict) -> - case dict:find(RelSeq, SegDict) of - {ok, {PubRecord, no_del, no_ack}} -> - {AckCount, dict:store(RelSeq, {PubRecord, del, no_ack}, SegDict)}; - {ok, {PubRecord, del, no_ack}} when KeepAcks -> - {AckCount + 1, dict:store(RelSeq, {PubRecord, del, ack}, SegDict)}; - {ok, {_PubRecord, del, no_ack}} when KeepAcks -> - {AckCount + 1, dict:erase(RelSeq, SegDict)} - end. - -%% Loading Journal. This isn't idempotent and will mess up the counts -%% if you call it more than once on the same state. Assumes the counts -%% are 0 to start with. - -load_journal(State) -> - {JournalHdl, State1} = get_journal_handle(State), - {ok, 0} = file_handle_cache:position(JournalHdl, 0), - State1 = #qistate { segments = Segments } = load_journal_entries(State), - dict:fold( - fun (Seg, #segment { journal_entries = JEntries, - pubs = PubCountInJournal, - acks = AckCountInJournal }, StateN) -> - %% We want to keep acks in so that we can remove them if - %% duplicates are in the journal. The counts here are - %% purely from the segment itself. - {SegDict, PubCountInSeg, AckCountInSeg, StateN1} = - load_segment(Seg, true, StateN), - %% Removed counts here are the number of pubs and acks - %% that are duplicates - i.e. found in both the segment - %% and journal. - {JEntries1, PubsRemoved, AcksRemoved} = - journal_minus_segment(JEntries, SegDict), - {Segment1, StateN2} = find_segment(Seg, StateN1), - PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved, - AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved, - store_segment(Segment1 #segment { journal_entries = JEntries1, - pubs = PubCount1, - acks = AckCount1 }, StateN2) - end, State1, Segments). - -load_journal_entries(State = #qistate { journal_handle = Hdl }) -> - case file_handle_cache:read(Hdl, ?SEQ_BYTES) of - {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} -> - case Prefix of - ?DEL_JPREFIX -> - load_journal_entries(add_to_journal(SeqId, del, State)); - ?ACK_JPREFIX -> - load_journal_entries(add_to_journal(SeqId, ack, State)); - _ -> - case file_handle_cache:read(Hdl, ?MSG_ID_BYTES) of - {ok, <<MsgIdNum:?MSG_ID_BITS>>} -> - %% work around for binary data - %% fragmentation. See - %% rabbit_msg_file:read_next/2 - <<MsgId:?MSG_ID_BYTES/binary>> = - <<MsgIdNum:?MSG_ID_BITS>>, - Publish = {MsgId, - case Prefix of - ?PUB_PERSIST_JPREFIX -> true; - ?PUB_TRANS_JPREFIX -> false - end}, - load_journal_entries( - add_to_journal(SeqId, Publish, State)); - _ErrOrEoF -> %% err, we've lost at least a publish - State - end - end; - _ErrOrEoF -> State - end. - -add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount }) -> - {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - Segment = #segment { journal_entries = SegJDict, - pubs = PubCount, acks = AckCount } = - find_segment(Seg, State), - SegJDict1 = add_to_journal(RelSeq, Action, SegJDict), - Segment1 = Segment #segment { journal_entries = SegJDict1 }, - Segment2 = - case Action of - del -> Segment1; - ack -> Segment1 #segment { acks = AckCount + 1 }; - {_MsgId, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 } - end, - store_segment(Segment2, State #qistate { dirty_count = DCount + 1 }); - -%% This is a more relaxed version of deliver_or_ack_msg because we can -%% have dels or acks in the journal without the corresponding -%% pub. Also, always want to keep acks. Things must occur in the right -%% order though. -add_to_journal(RelSeq, Action, SegJDict) -> - case dict:find(RelSeq, SegJDict) of - {ok, {PubRecord, no_del, no_ack}} when Action == del -> - dict:store(RelSeq, {PubRecord, del, no_ack}, SegJDict); - {ok, {PubRecord, DelRecord, no_ack}} when Action == ack -> - dict:store(RelSeq, {PubRecord, DelRecord, ack}, SegJDict); - error when Action == del -> - dict:store(RelSeq, {no_pub, del, no_ack}, SegJDict); - error when Action == ack -> - dict:store(RelSeq, {no_pub, no_del, ack}, SegJDict); - error -> - {_MsgId, _IsPersistent} = Action, %% ASSERTION - dict:store(RelSeq, {Action, no_del, no_ack}, SegJDict) - end. - -%% Combine what we have just read from a segment file with what we're -%% holding for that segment in memory. There must be no -%% duplicates. Used when providing segment entries to the variable -%% queue. -journal_plus_segment(JEntries, SegDict) -> - dict:fold(fun (RelSeq, JObj, SegDictOut) -> - SegEntry = case dict:find(RelSeq, SegDictOut) of - error -> not_found; - {ok, SObj = {_, _, _}} -> SObj - end, - journal_plus_segment(JObj, SegEntry, RelSeq, SegDictOut) - end, SegDict, JEntries). - -%% Here, the OutDict is the SegDict which we may be adding to (for -%% items only in the journal), modifying (bits in both), or erasing -%% from (ack in journal, not segment). -journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack}, - not_found, - RelSeq, OutDict) -> - dict:store(RelSeq, Obj, OutDict); -journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack}, - not_found, - RelSeq, OutDict) -> - dict:store(RelSeq, Obj, OutDict); -journal_plus_segment({{_MsgId, _IsPersistent}, del, ack}, - not_found, - RelSeq, OutDict) -> - dict:erase(RelSeq, OutDict); - -journal_plus_segment({no_pub, del, no_ack}, - {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutDict) -> - dict:store(RelSeq, {PubRecord, del, no_ack}, OutDict); - -journal_plus_segment({no_pub, del, ack}, - {{_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutDict) -> - dict:erase(RelSeq, OutDict); -journal_plus_segment({no_pub, no_del, ack}, - {{_MsgId, _IsPersistent}, del, no_ack}, - RelSeq, OutDict) -> - dict:erase(RelSeq, OutDict). - - -%% Remove from the journal entries for a segment, items that are -%% duplicates of entries found in the segment itself. Used on start up -%% to clean up the journal. -journal_minus_segment(JEntries, SegDict) -> - dict:fold(fun (RelSeq, JObj, {JEntriesOut, PubsRemoved, AcksRemoved}) -> - SegEntry = case dict:find(RelSeq, SegDict) of - error -> not_found; - {ok, SObj = {_, _, _}} -> SObj - end, - journal_minus_segment(JObj, SegEntry, RelSeq, JEntriesOut, - PubsRemoved, AcksRemoved) - end, {dict:new(), 0, 0}, JEntries). - -%% Here, the OutDict is a fresh journal that we're filling with valid -%% entries. PubsRemoved and AcksRemoved only get increased when the a -%% publish or ack is in both the journal and the segment. - -%% Both the same. Must be at least the publish -journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, no_ack}, - _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {OutDict, PubsRemoved + 1, AcksRemoved}; -journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, ack}, - _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {OutDict, PubsRemoved + 1, AcksRemoved + 1}; - -%% Just publish in journal -journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack}, - not_found, - RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; - -%% Just deliver in journal -journal_minus_segment(Obj = {no_pub, del, no_ack}, - {{_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; -journal_minus_segment({no_pub, del, no_ack}, - {{_MsgId, _IsPersistent}, del, no_ack}, - _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {OutDict, PubsRemoved, AcksRemoved}; - -%% Just ack in journal -journal_minus_segment(Obj = {no_pub, no_del, ack}, - {{_MsgId, _IsPersistent}, del, no_ack}, - RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; -journal_minus_segment({no_pub, no_del, ack}, - {{_MsgId, _IsPersistent}, del, ack}, - _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {OutDict, PubsRemoved, AcksRemoved}; - -%% Publish and deliver in journal -journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack}, - not_found, - RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; -journal_minus_segment({PubRecord, del, no_ack}, - {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {dict:store(RelSeq, {no_pub, del, no_ack}, OutDict), - PubsRemoved + 1, AcksRemoved}; - -%% Deliver and ack in journal -journal_minus_segment(Obj = {no_pub, del, ack}, - {{_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; -journal_minus_segment({no_pub, del, ack}, - {{_MsgId, _IsPersistent}, del, no_ack}, - RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {dict:store(RelSeq, {no_pub, no_del, ack}, OutDict), - PubsRemoved, AcksRemoved}; -journal_minus_segment({no_pub, del, ack}, - {{_MsgId, _IsPersistent}, del, ack}, - _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {OutDict, PubsRemoved, AcksRemoved + 1}; - -%% Publish, deliver and ack in journal -journal_minus_segment({{_MsgId, _IsPersistent}, del, ack}, - not_found, - _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {OutDict, PubsRemoved, AcksRemoved}; -journal_minus_segment({PubRecord, del, ack}, - {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {dict:store(RelSeq, {no_pub, del, ack}, OutDict), - PubsRemoved + 1, AcksRemoved}; -journal_minus_segment({PubRecord, del, ack}, - {PubRecord = {_MsgId, _IsPersistent}, del, no_ack}, - RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {dict:store(RelSeq, {no_pub, no_del, ack}, OutDict), - PubsRemoved + 1, AcksRemoved}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f84ba70adc..dc81ea18b9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1078,6 +1078,8 @@ verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> ko. test_queue_index() -> + SegmentSize = rabbit_queue_index:segment_size(), + TwoSegs = SegmentSize + SegmentSize, stop_msg_store(), ok = empty_test_queue(), SeqIdsA = lists:seq(0,9999), @@ -1086,7 +1088,7 @@ test_queue_index() -> {0, 0, Qi1} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi0), {Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1), - {0, 10000, Qi3} = + {0, SegSize, Qi3} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi2), {ReadA, Qi4} = rabbit_queue_index:read_segment_entries(0, Qi3), ok = verify_read_with_published(false, false, ReadA, @@ -1097,10 +1099,10 @@ test_queue_index() -> ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]), %% should get length back as 0, as all the msgs were transient {0, Qi6} = rabbit_queue_index:init(test_queue()), - {0, 10000, Qi7} = + {0, SegSize, Qi7} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6), {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7), - {0, 20000, Qi9} = + {0, TwoSegs, Qi9} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi8), {ReadB, Qi10} = rabbit_queue_index:read_segment_entries(0, Qi9), ok = verify_read_with_published(false, true, ReadB, @@ -1111,7 +1113,7 @@ test_queue_index() -> %% should get length back as 10000 LenB = length(SeqIdsB), {LenB, Qi12} = rabbit_queue_index:init(test_queue()), - {0, 20000, Qi13} = + {0, TwoSegs, Qi13} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi12), Qi14 = queue_index_deliver(SeqIdsB, Qi13), {ReadC, Qi15} = rabbit_queue_index:read_segment_entries(0, Qi14), @@ -1119,10 +1121,8 @@ test_queue_index() -> lists:reverse(SeqIdsMsgIdsB)), Qi16 = rabbit_queue_index:write_acks(SeqIdsB, Qi15), Qi17 = queue_index_flush_journal(Qi16), - %% the entire first segment will have gone as they were firstly - %% transient, and secondly ack'd - SegmentSize = rabbit_queue_index:segment_size(), - {SegmentSize, 20000, Qi18} = + %% Everything will have gone now because #pubs == #acks + {0, 0, Qi18} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17), _Qi19 = rabbit_queue_index:terminate(Qi18), ok = stop_msg_store(), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0a5909a055..f2d45700f0 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -460,7 +460,7 @@ tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms} }) -> {SeqIdsAcc1, StateN1} end, {[], State1}, lists:flatten(lists:reverse(SPubs))), IndexState1 = - rabbit_queue_index:sync_seq_ids(PubSeqIds, [] /= SAcks, IndexState), + rabbit_queue_index:sync_seq_ids(PubSeqIds, IndexState), [ gen_server2:reply(From, ok) || From <- lists:reverse(SFroms) ], State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }. |
