diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-12-02 18:04:53 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-12-02 18:04:53 +0000 |
| commit | 25225ff599df43c6c496bb1f38732412257cf0a5 (patch) | |
| tree | be0bd95755318485ee9f58487deb3679af389b62 | |
| parent | 1ce1bdb2011928da4d8d12881ca51e624445ea97 (diff) | |
| download | rabbitmq-server-git-25225ff599df43c6c496bb1f38732412257cf0a5.tar.gz | |
New qi in place, tested and debugged. It works. It's not quite as fast as before because of use of nested dicts. This can be solved by using sets for publishes, deliveries and acks within each segment - we don't actually need what the dict provides and sets will go much faster. That change should be fairly straight forward to do - the code will get a little longer, but not much more complex.
| -rw-r--r-- | src/rabbit_queue_index.erl | 1186 | ||||
| -rw-r--r-- | src/rabbit_queue_index3.erl | 850 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 2 |
4 files changed, 594 insertions, 1460 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 91ecd66925..2b4ec1a473 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -32,54 +32,28 @@ -module(rabbit_queue_index). -export([init/1, terminate/1, terminate_and_erase/1, write_published/4, - write_delivered/2, write_acks/2, sync_seq_ids/3, flush_journal/1, + write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1, read_segment_entries/2, next_segment_boundary/1, segment_size/0, find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]). -%%---------------------------------------------------------------------------- -%% The queue disk index -%% -%% The queue disk index operates over a journal, and a number of -%% segment files. Each segment is the same size, both in max number of -%% entries, and max file size, owing to fixed sized records. -%% -%% Publishes are written directly to the segment files. The segment is -%% found by dividing the sequence id by the the max number of entries -%% per segment. Only the relative sequence within the segment is -%% recorded as the sequence id within a segment file (i.e. sequence id -%% modulo max number of entries per segment). This is keeps entries -%% as small as possible. Publishes are only ever going to be received -%% in contiguous ascending order. -%% -%% Acks and deliveries are written to a bounded journal and are also -%% held in memory, each in a dict with the segment as the key. Again, -%% the records are fixed size: the entire sequence id is written and -%% is limited to a 63-bit unsigned integer. The remaining bit -%% indicates whether the journal entry is for a delivery or an -%% ack. When the journal gets too big, or flush_journal is called, the -%% journal is (possibly incrementally) flushed out to the segment -%% files. As acks and delivery notes can be received in any order -%% (this is not obvious for deliveries, but consider what happens when -%% eg msgs are *re*queued - you'll publish and then mark the msgs -%% delivered immediately, which may be out of order), this journal -%% reduces seeking, and batches writes to the segment files, keeping -%% performance high. -%% -%% On startup, the journal is read along with all the segment files, -%% and the journal is fully flushed out to the segment files. Care is -%% taken to ensure that no message can be delivered or ack'd twice. -%% -%%---------------------------------------------------------------------------- - -define(CLEAN_FILENAME, "clean.dot"). +%%---------------------------------------------------------------------------- +%% ---- Journal details ---- + -define(MAX_JOURNAL_ENTRY_COUNT, 32768). -define(JOURNAL_FILENAME, "journal.jif"). --define(DEL_BIT, 0). --define(ACK_BIT, 1). +-define(PUB_PERSIST_JPREFIX, 2#00). +-define(PUB_TRANS_JPREFIX, 2#01). +-define(DEL_JPREFIX, 2#10). +-define(ACK_JPREFIX, 2#11). +-define(JPREFIX_BITS, 2). -define(SEQ_BYTES, 8). --define(SEQ_BITS, ((?SEQ_BYTES * 8) - 1)). +-define(SEQ_BITS, ((?SEQ_BYTES * 8) - ?JPREFIX_BITS)). + +%% ---- Segment details ---- + -define(SEGMENT_EXTENSION, ".idx"). -define(REL_SEQ_BITS, 14). @@ -111,13 +85,18 @@ -record(qistate, { dir, - seg_num_handles, - journal_count, - journal_ack_dict, - journal_del_dict, - seg_ack_counts, - publish_handle, - partial_segments + segments, + journal_handle, + dirty_count + }). + +-record(segment, + { pubs, + acks, + handle, + journal_entries, + path, + num }). -include("rabbit.hrl"). @@ -129,16 +108,10 @@ -type(hdl() :: ('undefined' | any())). -type(msg_id() :: binary()). -type(seq_id() :: integer()). --type(hdl_and_count() :: ('undefined' | - {non_neg_integer(), hdl(), non_neg_integer()})). --type(qistate() :: #qistate { dir :: file_path(), - seg_num_handles :: dict(), - journal_count :: integer(), - journal_ack_dict :: dict(), - journal_del_dict :: dict(), - seg_ack_counts :: dict(), - publish_handle :: hdl_and_count(), - partial_segments :: dict() +-type(qistate() :: #qistate { dir :: file_path(), + segments :: dict(), + journal_handle :: hdl(), + dirty_count :: integer() }). -spec(init/1 :: (queue_name()) -> {non_neg_integer(), qistate()}). @@ -148,7 +121,7 @@ -> qistate()). -spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()). -spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()). --spec(sync_seq_ids/3 :: ([seq_id()], boolean(), qistate()) -> qistate()). +-spec(sync_seq_ids/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush_journal/1 :: (qistate()) -> qistate()). -spec(read_segment_entries/2 :: (seq_id(), qistate()) -> {[{msg_id(), seq_id(), boolean(), boolean()}], qistate()}). @@ -167,16 +140,61 @@ init(Name) -> State = blank_state(Name), - {TotalMsgCount, State1} = read_and_prune_segments(State), - scatter_journal(TotalMsgCount, State1). - -terminate(State = #qistate { seg_num_handles = SegHdls }) -> - case 0 == dict:size(SegHdls) of - true -> State; - false -> State1 = #qistate { dir = Dir } = close_all_handles(State), - store_clean_shutdown(Dir), - State1 #qistate { publish_handle = undefined } - end. + %% 1. Load the journal completely. This will also load segments + %% which have entries in the journal and remove duplicates. + %% The counts will correctly reflect the combination of the + %% segment and the journal. + State1 = load_journal(State), + %% 2. Flush the journal. This makes life easier for everyone, as + %% it means there won't be any publishes in the journal alone. + State2 = #qistate { dir = Dir } = flush_journal(State1), + %% 3. Load each segment in turn and filter out messages that are + %% not in the msg_store, by adding acks to the journal. These + %% acks only go to the RAM journal as it doesn't matter if we + %% lose them. Also mark delivered if not clean shutdown. + AllSegs = all_segment_nums(State2), + CleanShutdown = detect_clean_shutdown(Dir), + %% We know the journal is empty here, so we don't need to combine + %% with the journal, and we don't need to worry about messages + %% that have been acked. + State3 = + lists:foldl( + fun (Seg, StateN) -> + {SegDict, _PubCount, _AckCount, StateN1} = + load_segment(Seg, false, StateN), + dict:fold( + fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack}, + StateM) -> + SeqId = reconstruct_seq_id(Seg, RelSeq), + InMsgStore = rabbit_msg_store:contains(MsgId), + case {InMsgStore, CleanShutdown} of + {true, true} -> + StateM; + {true, false} when Del == del -> + StateM; + {true, false} -> + add_to_journal(SeqId, del, StateM); + {false, _} when Del == del -> + add_to_journal(SeqId, ack, StateM); + {false, _} -> + add_to_journal( + SeqId, ack, + add_to_journal(SeqId, del, StateM)) + end + end, StateN1, SegDict) + end, State2, AllSegs), + %% 4. Go through all segments and calculate the number of unacked + %% messages we have. + Count = lists:foldl( + fun (Seg, CountAcc) -> + #segment { pubs = PubCount, acks = AckCount } = + find_segment(Seg, State3), + CountAcc + PubCount - AckCount + end, 0, AllSegs), + {Count, State3}. + +terminate(State) -> + terminate(true, State). terminate_and_erase(State) -> State1 = terminate(State), @@ -186,123 +204,114 @@ terminate_and_erase(State) -> write_published(MsgId, SeqId, IsPersistent, State) when is_binary(MsgId) -> ?MSG_ID_BYTES = size(MsgId), - {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - {Hdl, State1} = get_pub_handle(SegNum, State), - ok = file_handle_cache:append(Hdl, - <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS, MsgId/binary>>), - State1. + {JournalHdl, State1} = get_journal_handle(State), + ok = file_handle_cache:append(JournalHdl, + [<<(case IsPersistent of + true -> ?PUB_PERSIST_JPREFIX; + false -> ?PUB_TRANS_JPREFIX + end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, + MsgId]), + maybe_flush_journal(add_to_journal(SeqId, {MsgId, IsPersistent}, State1)). + +write_delivered(SeqId, State) -> + {JournalHdl, State1} = get_journal_handle(State), + ok = file_handle_cache:append(JournalHdl, + <<?DEL_JPREFIX:?JPREFIX_BITS, + SeqId:?SEQ_BITS>>), + maybe_flush_journal(add_to_journal(SeqId, del, State1)). + +write_acks(SeqIds, State) -> + {JournalHdl, State1} = get_journal_handle(State), + ok = file_handle_cache:append(JournalHdl, + [<<?ACK_JPREFIX:?JPREFIX_BITS, + SeqId:?SEQ_BITS>> || SeqId <- SeqIds]), + State2 = lists:foldl(fun (SeqId, StateN) -> + add_to_journal(SeqId, ack, StateN) + end, State1, SeqIds), + maybe_flush_journal(State2). + +sync_seq_ids(_SeqIds, State = #qistate { journal_handle = undefined }) -> + State; +sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> + ok = file_handle_cache:sync(JournalHdl), + State. -write_delivered(SeqId, State = #qistate { journal_del_dict = JDelDict }) -> - {JDelDict1, State1} = - write_to_journal([<<?DEL_BIT:1, SeqId:?SEQ_BITS>>], - [SeqId], JDelDict, State), - maybe_flush(State1 #qistate { journal_del_dict = JDelDict1 }). - -write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict }) -> - {JAckDict1, State1} = - write_to_journal([<<?ACK_BIT:1, SeqId:?SEQ_BITS>> || SeqId <- SeqIds], - SeqIds, JAckDict, State), - maybe_flush(State1 #qistate { journal_ack_dict = JAckDict1 }). - -sync_seq_ids(SeqIds, SyncAckJournal, State) -> - State1 = case SyncAckJournal of - true -> {Hdl, State2} = get_journal_handle(State), - ok = file_handle_cache:sync(Hdl), - State2; - false -> State - end, - SegNumsSet = - lists:foldl( - fun (SeqId, Set) -> - {SegNum, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - sets:add_element(SegNum, Set) - end, sets:new(), SeqIds), - sets:fold( - fun (SegNum, StateN) -> - {Hdl1, StateM} = get_seg_handle(SegNum, StateN), - ok = file_handle_cache:sync(Hdl1), - StateM - end, State1, SegNumsSet). - -flush_journal(State = #qistate { journal_count = 0 }) -> +flush_journal(State = #qistate { dirty_count = 0 }) -> State; -flush_journal(State = #qistate { journal_ack_dict = JAckDict, - journal_del_dict = JDelDict, - journal_count = JCount }) -> - SegNum = case dict:fetch_keys(JAckDict) of - [] -> hd(dict:fetch_keys(JDelDict)); - [N|_] -> N - end, - Dels = seg_entries_from_dict(SegNum, JDelDict), - Acks = seg_entries_from_dict(SegNum, JAckDict), - State1 = append_dels_to_segment(SegNum, Dels, State), - State2 = append_acks_to_segment(SegNum, Acks, State1), - JCount1 = JCount - length(Dels) - length(Acks), - State3 = State2 #qistate { journal_del_dict = dict:erase(SegNum, JDelDict), - journal_ack_dict = dict:erase(SegNum, JAckDict), - journal_count = JCount1 }, - case JCount1 of - 0 -> {Hdl, State4} = get_journal_handle(State3), - {ok, 0} = file_handle_cache:position(Hdl, bof), - ok = file_handle_cache:truncate(Hdl), - ok = file_handle_cache:sync(Hdl), - State4; - _ -> flush_journal(State3) - end. +flush_journal(State = #qistate { segments = Segments }) -> + State1 = + dict:fold( + fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount, + acks = AckCount } = Segment, StateN) -> + case PubCount > 0 andalso PubCount == AckCount of + true -> + ok = delete_segment(Segment), + StateN; + false -> + case 0 == dict:size(JEntries) of + true -> + store_segment(Segment, StateN); + false -> + {Hdl, Segment1} = get_segment_handle(Segment), + dict:fold(fun write_entry_to_segment/3, + Hdl, JEntries), + ok = file_handle_cache:sync(Hdl), + store_segment( + Segment1 #segment { journal_entries = + dict:new() }, StateN) + end + end + end, State #qistate { segments = dict:new() }, Segments), + {JournalHdl, State2} = get_journal_handle(State1), + {ok, 0} = file_handle_cache:position(JournalHdl, bof), + ok = file_handle_cache:truncate(JournalHdl), + ok = file_handle_cache:sync(JournalHdl), + State2 #qistate { dirty_count = 0 }. read_segment_entries(InitSeqId, State) -> - {SegNum, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), - {SDict, _PubCount, _AckCount, _HighRelSeq, State1} = - load_segment(SegNum, State), + {Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), + {SegDict, _PubCount, _AckCount, State1} = + load_segment(Seg, false, State), + #segment { journal_entries = JEntries } = find_segment(Seg, State1), + SegDict1 = journal_plus_segment(JEntries, SegDict), %% deliberately sort the list desc, because foldl will reverse it - RelSeqs = rev_sort(dict:fetch_keys(SDict)), + RelSeqs = rev_sort(dict:fetch_keys(SegDict1)), {lists:foldl(fun (RelSeq, Acc) -> - {MsgId, IsDelivered, IsPersistent} = - dict:fetch(RelSeq, SDict), - [ {MsgId, reconstruct_seq_id(SegNum, RelSeq), - IsPersistent, IsDelivered} | Acc] + {{MsgId, IsPersistent}, IsDelivered, no_ack} = + dict:fetch(RelSeq, SegDict1), + [ {MsgId, reconstruct_seq_id(Seg, RelSeq), + IsPersistent, IsDelivered == del} | Acc ] end, [], RelSeqs), State1}. next_segment_boundary(SeqId) -> - {SegNum, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - reconstruct_seq_id(SegNum + 1, 0). + {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), + reconstruct_seq_id(Seg + 1, 0). segment_size() -> ?SEGMENT_ENTRY_COUNT. -find_lowest_seq_id_seg_and_next_seq_id(State = #qistate { dir = Dir }) -> - SegNums = all_segment_nums(Dir), +find_lowest_seq_id_seg_and_next_seq_id(State) -> + SegNums = all_segment_nums(State), %% We don't want the lowest seq_id, merely the seq_id of the start %% of the lowest segment. That seq_id may not actually exist, but %% that's fine. The important thing is that the segment exists and %% the seq_id reported is on a segment boundary. + %% We also don't really care about the max seq_id. Just start the + %% next segment: it makes life much easier. + %% SegNums is sorted, ascending. - LowSeqIdSeg = + {LowSeqIdSeg, NextSeqId} = case SegNums of - [] -> 0; - [MinSegNum|_] -> reconstruct_seq_id(MinSegNum, 0) + [] -> {0, 0}; + [MinSeg|_] -> {reconstruct_seq_id(MinSeg, 0), + reconstruct_seq_id(1 + lists:last(SegNums), 0)} end, - {NextSeqId, State1} = - case SegNums of - [] -> {0, State}; - _ -> MaxSegNum = lists:last(SegNums), - {_SDict, PubCount, _AckCount, HighRelSeq, State2} = - load_segment(MaxSegNum, State), - NextSeqId1 = reconstruct_seq_id(MaxSegNum, HighRelSeq), - NextSeqId2 = case PubCount of - 0 -> NextSeqId1; - _ -> NextSeqId1 + 1 - end, - {NextSeqId2, State2} - end, - {LowSeqIdSeg, NextSeqId, State1}. + {LowSeqIdSeg, NextSeqId, State}. start_msg_store(DurableQueues) -> - DurableDict = + DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue #amqqueue.name), Queue #amqqueue.name} || Queue <- DurableQueues ]), QueuesDir = queues_dir(), @@ -337,175 +346,84 @@ start_msg_store(DurableQueues) -> ok. %%---------------------------------------------------------------------------- -%% Minor Helpers +%% Msg Store Startup Delta Function %%---------------------------------------------------------------------------- -maybe_flush(State = #qistate { journal_count = JCount }) - when JCount > ?MAX_JOURNAL_ENTRY_COUNT -> - flush_journal(State); -maybe_flush(State) -> - State. - -write_to_journal(BinList, SeqIds, Dict, - State = #qistate { journal_count = JCount }) -> - {Hdl, State1} = get_journal_handle(State), - ok = file_handle_cache:append(Hdl, BinList), - {Dict1, JCount1} = - lists:foldl( - fun (SeqId, {Dict2, JCount2}) -> - {add_seqid_to_dict(SeqId, Dict2), JCount2 + 1} - end, {Dict, JCount}, SeqIds), - {Dict1, State1 #qistate { journal_count = JCount1 }}. - -queue_name_to_dir_name(Name = #resource { kind = queue }) -> - Bin = term_to_binary(Name), - Size = 8*size(Bin), - <<Num:Size>> = Bin, - lists:flatten(io_lib:format("~.36B", [Num])). - -queues_dir() -> - filename:join(rabbit_mnesia:dir(), "queues"). +queue_index_walker([]) -> + finished; +queue_index_walker([QueueName|QueueNames]) -> + State = blank_state(QueueName), + State1 = load_journal(State), + SegNums = all_segment_nums(State1), + queue_index_walker({SegNums, State1, QueueNames}); -rev_sort(List) -> - lists:sort(fun (A, B) -> B < A end, List). +queue_index_walker({[], State, QueueNames}) -> + _State = terminate(false, State), + queue_index_walker(QueueNames); +queue_index_walker({[Seg | SegNums], State, QueueNames}) -> + SeqId = reconstruct_seq_id(Seg, 0), + {Messages, State1} = read_segment_entries(SeqId, State), + queue_index_walker({Messages, State1, SegNums, QueueNames}); -get_journal_handle(State = #qistate { dir = Dir, seg_num_handles = SegHdls }) -> - case dict:find(journal, SegHdls) of - {ok, Hdl} -> - {Hdl, State}; - error -> - Path = filename:join(Dir, ?JOURNAL_FILENAME), - Mode = [raw, binary, write, read, read_ahead], - new_handle(journal, Path, Mode, State) +queue_index_walker({[], State, SegNums, QueueNames}) -> + queue_index_walker({SegNums, State, QueueNames}); +queue_index_walker({[{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs], + State, SegNums, QueueNames}) -> + case IsPersistent of + true -> {MsgId, 1, {Msgs, State, SegNums, QueueNames}}; + false -> queue_index_walker({Msgs, State, SegNums, QueueNames}) end. -get_pub_handle(SegNum, State = #qistate { publish_handle = PubHandle }) -> - {State1, PubHandle1 = {_SegNum, Hdl, _Count}} = - get_counted_handle(SegNum, State, PubHandle), - {Hdl, State1 #qistate { publish_handle = PubHandle1 }}. - -get_counted_handle(SegNum, State, undefined) -> - get_counted_handle(SegNum, State, {SegNum, undefined, 0}); -get_counted_handle(SegNum, State = #qistate { partial_segments = Partials }, - {SegNum, undefined, Count}) -> - {Hdl, State1} = get_seg_handle(SegNum, State), - {CountExtra, Partials1} = - case dict:find(SegNum, Partials) of - {ok, CountExtra1} -> {CountExtra1, dict:erase(SegNum, Partials)}; - error -> {0, Partials} - end, - Count1 = Count + 1 + CountExtra, - {State1 #qistate { partial_segments = Partials1 }, {SegNum, Hdl, Count1}}; -get_counted_handle(SegNum, State, {SegNum, Hdl, Count}) - when Count < ?SEGMENT_ENTRY_COUNT -> - {State, {SegNum, Hdl, Count + 1}}; -get_counted_handle(SegNumA, State, {SegNumB, Hdl, ?SEGMENT_ENTRY_COUNT}) - when SegNumA == SegNumB + 1 -> - ok = file_handle_cache:append_write_buffer(Hdl), - get_counted_handle(SegNumA, State, undefined); -get_counted_handle(SegNumA, State = #qistate { partial_segments = Partials, - seg_ack_counts = AckCounts }, - {SegNumB, _Hdl, Count}) -> - %% don't flush here because it's possible SegNumB has been deleted - State1 = - case dict:find(SegNumB, AckCounts) of - {ok, Count} -> - %% #acks == #pubs, and we're moving to different - %% segment, so delete. - delete_segment(SegNumB, State); - _ -> - State #qistate { - partial_segments = dict:store(SegNumB, Count, Partials) } - end, - get_counted_handle(SegNumA, State1, undefined). +%%---------------------------------------------------------------------------- +%% Minors +%%---------------------------------------------------------------------------- -get_seg_handle(SegNum, State = #qistate { dir = Dir, seg_num_handles = SegHdls }) -> - case dict:find(SegNum, SegHdls) of - {ok, Hdl} -> - {Hdl, State}; - error -> - new_handle(SegNum, seg_num_to_path(Dir, SegNum), - [binary, raw, read, write, - {read_ahead, ?SEGMENT_TOTAL_SIZE}], - State) - end. +maybe_flush_journal(State = #qistate { dirty_count = DCount }) + when DCount > ?MAX_JOURNAL_ENTRY_COUNT -> + flush_journal(State); +maybe_flush_journal(State) -> + State. -delete_segment(SegNum, State = #qistate { dir = Dir, - seg_ack_counts = AckCounts, - partial_segments = Partials }) -> - State1 = close_handle(SegNum, State), - ok = case file:delete(seg_num_to_path(Dir, SegNum)) of - ok -> ok; - {error, enoent} -> ok - end, - State1 #qistate {seg_ack_counts = dict:erase(SegNum, AckCounts), - partial_segments = dict:erase(SegNum, Partials) }. - -new_handle(Key, Path, Mode, State = #qistate { seg_num_handles = SegHdls }) -> - {ok, Hdl} = file_handle_cache:open(Path, Mode, [{write_buffer, infinity}]), - {Hdl, State #qistate { seg_num_handles = dict:store(Key, Hdl, SegHdls) }}. - -close_handle(Key, State = #qistate { seg_num_handles = SegHdls }) -> - case dict:find(Key, SegHdls) of - {ok, Hdl} -> - ok = file_handle_cache:close(Hdl), - State #qistate { seg_num_handles = dict:erase(Key, SegHdls) }; - error -> - State - end. +all_segment_nums(#qistate { segments = Segments, dir = Dir }) -> + sets:to_list( + lists:foldl( + fun (SegName, Set) -> + sets:add_element( + list_to_integer( + lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, + SegName)), Set) + end, sets:from_list(dict:fetch_keys(Segments)), + filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir))). -close_all_handles(State = #qistate { seg_num_handles = SegHdls }) -> - ok = dict:fold(fun (_Key, Hdl, ok) -> - file_handle_cache:close(Hdl) - end, ok, SegHdls), - State #qistate { seg_num_handles = dict:new() }. +blank_state(QueueName) -> + StrName = queue_name_to_dir_name(QueueName), + Dir = filename:join(queues_dir(), StrName), + ok = filelib:ensure_dir(filename:join(Dir, "nothing")), + #qistate { dir = Dir, + segments = dict:new(), + journal_handle = undefined, + dirty_count = 0 + }. -bool_to_int(true ) -> 1; -bool_to_int(false) -> 0. +rev_sort(List) -> + lists:sort(fun (A, B) -> B < A end, List). seq_id_to_seg_and_rel_seq_id(SeqId) -> { SeqId div ?SEGMENT_ENTRY_COUNT, SeqId rem ?SEGMENT_ENTRY_COUNT }. -reconstruct_seq_id(SegNum, RelSeq) -> - (SegNum * ?SEGMENT_ENTRY_COUNT) + RelSeq. +reconstruct_seq_id(Seg, RelSeq) -> + (Seg * ?SEGMENT_ENTRY_COUNT) + RelSeq. -seg_num_to_path(Dir, SegNum) -> - SegName = integer_to_list(SegNum), +seg_num_to_path(Dir, Seg) -> + SegName = integer_to_list(Seg), filename:join(Dir, SegName ++ ?SEGMENT_EXTENSION). -delete_queue_directory(Dir) -> - {ok, Entries} = file:list_dir(Dir), - ok = lists:foldl(fun (Entry, ok) -> - file:delete(filename:join(Dir, Entry)) - end, ok, Entries), - ok = file:del_dir(Dir). - -add_seqid_to_dict(SeqId, Dict) -> - {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - add_seqid_to_dict(SegNum, RelSeq, Dict). - -add_seqid_to_dict(SegNum, RelSeq, Dict) -> - dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], Dict). - -all_segment_nums(Dir) -> - lists:sort( - [list_to_integer( - lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName)) - || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)]). - -blank_state(QueueName) -> - StrName = queue_name_to_dir_name(QueueName), - Dir = filename:join(queues_dir(), StrName), - ok = filelib:ensure_dir(filename:join(Dir, "nothing")), - #qistate { dir = Dir, - seg_num_handles = dict:new(), - journal_count = 0, - journal_ack_dict = dict:new(), - journal_del_dict = dict:new(), - seg_ack_counts = dict:new(), - publish_handle = undefined, - partial_segments = dict:new() - }. +delete_segment(#segment { handle = undefined }) -> + ok; +delete_segment(#segment { handle = Hdl, path = Path }) -> + ok = file_handle_cache:close(Hdl), + ok = file:delete(Path), + ok. detect_clean_shutdown(Dir) -> case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of @@ -519,258 +437,143 @@ store_clean_shutdown(Dir) -> [{write_buffer, unbuffered}]), ok = file_handle_cache:close(Hdl). -seg_entries_from_dict(SegNum, Dict) -> - case dict:find(SegNum, Dict) of - {ok, Entries} -> Entries; - error -> [] - end. - - -%%---------------------------------------------------------------------------- -%% Msg Store Startup Delta Function -%%---------------------------------------------------------------------------- - -queue_index_walker([]) -> - finished; -queue_index_walker([QueueName|QueueNames]) -> - State = blank_state(QueueName), - {Hdl, State1} = get_journal_handle(State), - {JAckDict, _JDelDict} = load_journal(Hdl, dict:new(), dict:new()), - State2 = #qistate { dir = Dir } = - close_handle(journal, State1 #qistate { journal_ack_dict = JAckDict }), - SegNums = all_segment_nums(Dir), - queue_index_walker({SegNums, State2, QueueNames}); - -queue_index_walker({[], State, QueueNames}) -> - _State = terminate(State), - queue_index_walker(QueueNames); -queue_index_walker({[SegNum | SegNums], State, QueueNames}) -> - {SDict, _PubCount, _AckCount, _HighRelSeq, State1} = - load_segment(SegNum, State), - queue_index_walker({dict:to_list(SDict), State1, SegNums, QueueNames}); - -queue_index_walker({[], State, SegNums, QueueNames}) -> - queue_index_walker({SegNums, State, QueueNames}); -queue_index_walker({[{_RelSeq, {MsgId, _IsDelivered, IsPersistent}} | Msgs], - State, SegNums, QueueNames}) -> - case IsPersistent of - true -> {MsgId, 1, {Msgs, State, SegNums, QueueNames}}; - false -> queue_index_walker({Msgs, State, SegNums, QueueNames}) - end. +queue_name_to_dir_name(Name = #resource { kind = queue }) -> + Bin = term_to_binary(Name), + Size = 8*size(Bin), + <<Num:Size>> = Bin, + lists:flatten(io_lib:format("~.36B", [Num])). +queues_dir() -> + filename:join(rabbit_mnesia:dir(), "queues"). -%%---------------------------------------------------------------------------- -%% Startup Functions -%%---------------------------------------------------------------------------- +delete_queue_directory(Dir) -> + {ok, Entries} = file:list_dir(Dir), + ok = lists:foldl(fun (Entry, ok) -> + file:delete(filename:join(Dir, Entry)) + end, ok, Entries), + ok = file:del_dir(Dir). -read_and_prune_segments(State = #qistate { dir = Dir }) -> - SegNums = all_segment_nums(Dir), - CleanShutdown = detect_clean_shutdown(Dir), - {TotalMsgCount, State1} = - lists:foldl( - fun (SegNum, {TotalMsgCount1, StateN = - #qistate { publish_handle = PublishHandle, - partial_segments = Partials }}) -> - {SDict, PubCount, AckCount, _HighRelSeq, StateM} = - load_segment(SegNum, StateN), - StateL = #qistate { seg_ack_counts = AckCounts } = - drop_and_deliver(SegNum, SDict, CleanShutdown, StateM), - %% ignore the effect of drop_and_deliver on - %% TotalMsgCount and AckCounts, as drop_and_deliver - %% will add to the journal dicts, which will then - %% effect TotalMsgCount when we scatter the journal - TotalMsgCount2 = TotalMsgCount1 + dict:size(SDict), - AckCounts1 = case AckCount of - 0 -> AckCounts; - N -> dict:store(SegNum, N, AckCounts) - end, - %% In the following, whilst there may be several - %% partial segments, we only remember the last - %% one. All other partial segments get added into - %% the partial_segments dict - {PublishHandle1, Partials1} = - case PubCount of - ?SEGMENT_ENTRY_COUNT -> - {PublishHandle, Partials}; - 0 -> - {PublishHandle, Partials}; - _ -> - {{SegNum, undefined, PubCount}, - case PublishHandle of - undefined -> - Partials; - {SegNumOld, undefined, PubCountOld} -> - dict:store(SegNumOld, PubCountOld, - Partials) - end} - end, - {TotalMsgCount2, - StateL #qistate { seg_ack_counts = AckCounts1, - publish_handle = PublishHandle1, - partial_segments = Partials1 }} - end, {0, State}, SegNums), - {TotalMsgCount, State1}. - -scatter_journal(TotalMsgCount, State = #qistate { dir = Dir }) -> - {Hdl, State1 = #qistate { journal_del_dict = JDelDict, - journal_ack_dict = JAckDict }} = - get_journal_handle(State), - %% ADict and DDict may well contain duplicates. However, this is - %% ok, because we use sets to eliminate dups before writing to - %% segments - {ADict, DDict} = load_journal(Hdl, JAckDict, JDelDict), - State2 = close_handle(journal, State1), - {TotalMsgCount1, ADict1, State3} = - dict:fold(fun replay_journal_to_segment/3, - {TotalMsgCount, ADict, - %% supply empty dicts so that when - %% replay_journal_to_segment loads segments, it - %% gets all msgs, and ignores anything we've found - %% in the journal. - State2 #qistate { journal_del_dict = dict:new(), - journal_ack_dict = dict:new() }}, DDict), - %% replay for segments which only had acks, and no deliveries - {TotalMsgCount2, State4} = - dict:fold(fun replay_journal_acks_to_segment/3, - {TotalMsgCount1, State3}, ADict1), - JournalPath = filename:join(Dir, ?JOURNAL_FILENAME), - ok = file:delete(JournalPath), - {TotalMsgCount2, State4}. - -load_journal(Hdl, ADict, DDict) -> - case file_handle_cache:read(Hdl, ?SEQ_BYTES) of - {ok, <<?DEL_BIT:1, SeqId:?SEQ_BITS>>} -> - load_journal(Hdl, ADict, add_seqid_to_dict(SeqId, DDict)); - {ok, <<?ACK_BIT:1, SeqId:?SEQ_BITS>>} -> - load_journal(Hdl, add_seqid_to_dict(SeqId, ADict), DDict); - _ErrOrEoF -> - {ADict, DDict} +get_segment_handle(Segment = #segment { handle = undefined, path = Path }) -> + {ok, Hdl} = file_handle_cache:open(Path, + [binary, raw, read, write, + {read_ahead, ?SEGMENT_TOTAL_SIZE}], + [{write_buffer, infinity}]), + {Hdl, Segment #segment { handle = Hdl }}; +get_segment_handle(Segment = #segment { handle = Hdl }) -> + {Hdl, Segment}. + +find_segment(Seg, #qistate { segments = Segments, dir = Dir }) -> + case dict:find(Seg, Segments) of + {ok, Segment = #segment{}} -> Segment; + error -> #segment { pubs = 0, + acks = 0, + handle = undefined, + journal_entries = dict:new(), + path = seg_num_to_path(Dir, Seg), + num = Seg + } end. -replay_journal_to_segment(_SegNum, [], {TotalMsgCount, ADict, State}) -> - {TotalMsgCount, ADict, State}; -replay_journal_to_segment(SegNum, Dels, {TotalMsgCount, ADict, State}) -> - {SDict, _PubCount, _AckCount, _HighRelSeq, State1} = - load_segment(SegNum, State), - ValidDels = sets:to_list( - sets:filter( - fun (RelSeq) -> - case dict:find(RelSeq, SDict) of - {ok, {_MsgId, false, _IsPersistent}} -> true; - _ -> false - end - end, sets:from_list(Dels))), - State2 = append_dels_to_segment(SegNum, ValidDels, State1), - Acks = seg_entries_from_dict(SegNum, ADict), - case Acks of - [] -> {TotalMsgCount, ADict, State2}; - _ -> ADict1 = dict:erase(SegNum, ADict), - {Count, State3} = - filter_acks_and_append_to_segment(SegNum, SDict, - Acks, State2), - {TotalMsgCount - Count, ADict1, State3} - end. +store_segment(Segment = #segment { num = Seg }, + State = #qistate { segments = Segments }) -> + State #qistate { segments = dict:store(Seg, Segment, Segments) }. + +get_journal_handle(State = + #qistate { journal_handle = undefined, dir = Dir }) -> + Path = filename:join(Dir, ?JOURNAL_FILENAME), + {ok, Hdl} = file_handle_cache:open(Path, + [binary, raw, read, write, + {read_ahead, ?SEGMENT_TOTAL_SIZE}], + [{write_buffer, infinity}]), + {Hdl, State #qistate { journal_handle = Hdl }}; +get_journal_handle(State = #qistate { journal_handle = Hdl }) -> + {Hdl, State}. -replay_journal_acks_to_segment(_SegNum, [], {TotalMsgCount, State}) -> - {TotalMsgCount, State}; -replay_journal_acks_to_segment(SegNum, Acks, {TotalMsgCount, State}) -> - {SDict, _PubCount, _AckCount, _HighRelSeq, State1} = - load_segment(SegNum, State), - {Count, State2} = - filter_acks_and_append_to_segment(SegNum, SDict, Acks, State1), - {TotalMsgCount - Count, State2}. - -filter_acks_and_append_to_segment(SegNum, SDict, Acks, State) -> - ValidRelSeqIds = dict:fetch_keys(SDict), - ValidAcks = sets:to_list(sets:intersection(sets:from_list(ValidRelSeqIds), - sets:from_list(Acks))), - {length(ValidAcks), append_acks_to_segment(SegNum, ValidAcks, State)}. - -drop_and_deliver(SegNum, SDict, CleanShutdown, - State = #qistate { journal_del_dict = JDelDict, - journal_ack_dict = JAckDict }) -> - {JDelDict1, JAckDict1} = - dict:fold( - fun (RelSeq, {MsgId, IsDelivered, true}, {JDelDict2, JAckDict2}) -> - %% msg is persistent, keep only if the msg_store has it - case {IsDelivered, rabbit_msg_store:contains(MsgId)} of - {false, true} when not CleanShutdown -> - %% not delivered, but dirty shutdown => mark delivered - {add_seqid_to_dict(SegNum, RelSeq, JDelDict2), - JAckDict2}; - {_, true} -> - {JDelDict2, JAckDict2}; - {true, false} -> - {JDelDict2, - add_seqid_to_dict(SegNum, RelSeq, JAckDict2)}; - {false, false} -> - {add_seqid_to_dict(SegNum, RelSeq, JDelDict2), - add_seqid_to_dict(SegNum, RelSeq, JAckDict2)} - end; - (RelSeq, {_MsgId, false, false}, {JDelDict2, JAckDict2}) -> - %% not persistent and not delivered => deliver and ack it - {add_seqid_to_dict(SegNum, RelSeq, JDelDict2), - add_seqid_to_dict(SegNum, RelSeq, JAckDict2)}; - (RelSeq, {_MsgId, true, false}, {JDelDict2, JAckDict2}) -> - %% not persistent but delivered => ack it - {JDelDict2, - add_seqid_to_dict(SegNum, RelSeq, JAckDict2)} - end, {JDelDict, JAckDict}, SDict), - State #qistate { journal_del_dict = JDelDict1, - journal_ack_dict = JAckDict1 }. +bool_to_int(true ) -> 1; +bool_to_int(false) -> 0. +write_entry_to_segment(_RelSeq, {{_MsgId, _IsPersistent}, del, ack}, Hdl) -> + Hdl; +write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) -> + ok = case Publish of + no_pub -> + ok; + {MsgId, IsPersistent} -> + file_handle_cache:append( + Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, + (bool_to_int(IsPersistent)):1, + RelSeq:?REL_SEQ_BITS>>, MsgId]) + end, + ok = case {Del, Ack} of + {no_del, no_ack} -> ok; + _ -> Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>>, + Data = case {Del, Ack} of + {del, ack} -> [Binary, Binary]; + _ -> Binary + end, + file_handle_cache:append(Hdl, Data) + end, + Hdl. + +terminate(StoreShutdown, State = + #qistate { segments = Segments, journal_handle = JournalHdl, + dir = Dir }) -> + ok = case JournalHdl of + undefined -> ok; + _ -> file_handle_cache:close(JournalHdl) + end, + ok = dict:fold( + fun (_Seg, #segment { handle = undefined }, ok) -> + ok; + (_Seg, #segment { handle = Hdl }, ok) -> + file_handle_cache:close(Hdl) + end, ok, Segments), + case StoreShutdown of + true -> store_clean_shutdown(Dir); + false -> ok + end, + State #qistate { journal_handle = undefined, segments = dict:new() }. %%---------------------------------------------------------------------------- -%% Loading Segments +%% Majors %%---------------------------------------------------------------------------- -load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls, - dir = Dir }) -> - SegmentExists = case dict:find(SegNum, SegHdls) of - {ok, _} -> true; - error -> filelib:is_file(seg_num_to_path(Dir, SegNum)) +%% Loading segments + +%% Does not do any combining with the journal at all. The PubCount +%% that comes back is the number of publishes in the segment. The +%% number of unacked msgs is PubCount - AckCount. If KeepAcks is +%% false, then dict:size(SegDict) == PubCount - AckCount. If KeepAcks +%% is true, then dict:size(SegDict) == PubCount. +load_segment(Seg, KeepAcks, State) -> + Segment = #segment { path = Path, handle = SegHdl } = + find_segment(Seg, State), + SegmentExists = case SegHdl of + undefined -> filelib:is_file(Path); + _ -> true end, case SegmentExists of false -> - {dict:new(), 0, 0, 0, State}; + {dict:new(), 0, 0, State}; true -> - {Hdl, State1 = #qistate { journal_del_dict = JDelDict, - journal_ack_dict = JAckDict }} = - get_seg_handle(SegNum, State), + {Hdl, Segment1} = get_segment_handle(Segment), {ok, 0} = file_handle_cache:position(Hdl, bof), - {SDict, PubCount, AckCount, HighRelSeq} = - load_segment_entries(Hdl, dict:new(), 0, 0, 0), - %% delete ack'd msgs first - {SDict1, AckCount1} = - lists:foldl(fun (RelSeq, {SDict2, AckCount2}) -> - {dict:erase(RelSeq, SDict2), AckCount2 + 1} - end, {SDict, AckCount}, - seg_entries_from_dict(SegNum, JAckDict)), - %% ensure remaining msgs are delivered as necessary - SDict3 = - lists:foldl( - fun (RelSeq, SDict4) -> - case dict:find(RelSeq, SDict4) of - {ok, {MsgId, false, IsPersistent}} -> - dict:store(RelSeq, - {MsgId, true, IsPersistent}, - SDict4); - _ -> - SDict4 - end - end, SDict1, seg_entries_from_dict(SegNum, JDelDict)), - {SDict3, PubCount, AckCount1, HighRelSeq, State1} + {SegDict, PubCount, AckCount} = + load_segment_entries(KeepAcks, Hdl, dict:new(), 0, 0), + {SegDict, PubCount, AckCount, store_segment(Segment1, State)} end. -load_segment_entries(Hdl, SDict, PubCount, AckCount, HighRelSeq) -> +load_segment_entries(KeepAcks, Hdl, SegDict, PubCount, AckCount) -> case file_handle_cache:read(Hdl, 1) of {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>} -> {ok, LSB} = file_handle_cache:read( Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1), <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, - {SDict1, AckCount1} = deliver_or_ack_msg(SDict, AckCount, RelSeq), - load_segment_entries(Hdl, SDict1, PubCount, AckCount1, HighRelSeq); + {AckCount1, SegDict1} = + deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict), + load_segment_entries(KeepAcks, Hdl, SegDict1, PubCount, AckCount1); {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>} -> %% because we specify /binary, and binaries are complete @@ -779,71 +582,252 @@ load_segment_entries(Hdl, SDict, PubCount, AckCount, HighRelSeq) -> file_handle_cache:read( Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1), <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, - HighRelSeq1 = lists:max([RelSeq, HighRelSeq]), - load_segment_entries( - Hdl, dict:store(RelSeq, {MsgId, false, 1 == IsPersistentNum}, - SDict), PubCount + 1, AckCount, HighRelSeq1); + SegDict1 = + dict:store(RelSeq, + {{MsgId, 1 == IsPersistentNum}, no_del, no_ack}, + SegDict), + load_segment_entries(KeepAcks, Hdl, SegDict1, PubCount+1, AckCount); _ErrOrEoF -> - {SDict, PubCount, AckCount, HighRelSeq} + {SegDict, PubCount, AckCount} end. -deliver_or_ack_msg(SDict, AckCount, RelSeq) -> - case dict:find(RelSeq, SDict) of - {ok, {MsgId, false, IsPersistent}} -> - {dict:store(RelSeq, {MsgId, true, IsPersistent}, SDict), AckCount}; - {ok, {_MsgId, true, _IsPersistent}} -> - {dict:erase(RelSeq, SDict), AckCount + 1} +deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict) -> + case dict:find(RelSeq, SegDict) of + {ok, {PubRecord, no_del, no_ack}} -> + {AckCount, dict:store(RelSeq, {PubRecord, del, no_ack}, SegDict)}; + {ok, {PubRecord, del, no_ack}} when KeepAcks -> + {AckCount + 1, dict:store(RelSeq, {PubRecord, del, ack}, SegDict)}; + {ok, {_PubRecord, del, no_ack}} -> + {AckCount + 1, dict:erase(RelSeq, SegDict)} end. +%% Loading Journal. This isn't idempotent and will mess up the counts +%% if you call it more than once on the same state. Assumes the counts +%% are 0 to start with. + +load_journal(State) -> + {JournalHdl, State1} = get_journal_handle(State), + {ok, 0} = file_handle_cache:position(JournalHdl, 0), + State2 = #qistate { segments = Segments } = load_journal_entries(State1), + dict:fold( + fun (Seg, #segment { journal_entries = JEntries, + pubs = PubCountInJournal, + acks = AckCountInJournal }, StateN) -> + %% We want to keep acks in so that we can remove them if + %% duplicates are in the journal. The counts here are + %% purely from the segment itself. + {SegDict, PubCountInSeg, AckCountInSeg, StateN1} = + load_segment(Seg, true, StateN), + %% Removed counts here are the number of pubs and acks + %% that are duplicates - i.e. found in both the segment + %% and journal. + {JEntries1, PubsRemoved, AcksRemoved} = + journal_minus_segment(JEntries, SegDict), + Segment1 = find_segment(Seg, StateN1), + PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved, + AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved, + store_segment(Segment1 #segment { journal_entries = JEntries1, + pubs = PubCount1, + acks = AckCount1 }, StateN1) + end, State2, Segments). + +load_journal_entries(State = #qistate { journal_handle = Hdl }) -> + case file_handle_cache:read(Hdl, ?SEQ_BYTES) of + {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} -> + case Prefix of + ?DEL_JPREFIX -> + load_journal_entries(add_to_journal(SeqId, del, State)); + ?ACK_JPREFIX -> + load_journal_entries(add_to_journal(SeqId, ack, State)); + _ -> + case file_handle_cache:read(Hdl, ?MSG_ID_BYTES) of + {ok, <<MsgIdNum:?MSG_ID_BITS>>} -> + %% work around for binary data + %% fragmentation. See + %% rabbit_msg_file:read_next/2 + <<MsgId:?MSG_ID_BYTES/binary>> = + <<MsgIdNum:?MSG_ID_BITS>>, + Publish = {MsgId, + case Prefix of + ?PUB_PERSIST_JPREFIX -> true; + ?PUB_TRANS_JPREFIX -> false + end}, + load_journal_entries( + add_to_journal(SeqId, Publish, State)); + _ErrOrEoF -> %% err, we've lost at least a publish + State + end + end; + _ErrOrEoF -> State + end. -%%---------------------------------------------------------------------------- -%% Appending Acks or Dels to Segments -%%---------------------------------------------------------------------------- - -append_acks_to_segment(SegNum, Acks, - State = #qistate { seg_ack_counts = AckCounts, - partial_segments = Partials }) -> - AckCount = case dict:find(SegNum, AckCounts) of - {ok, AckCount1} -> AckCount1; - error -> 0 - end, - AckTarget = case dict:find(SegNum, Partials) of - {ok, PubCount} -> PubCount; - error -> ?SEGMENT_ENTRY_COUNT - end, - AckCount2 = AckCount + length(Acks), - append_acks_to_segment(SegNum, AckCount2, Acks, AckTarget, State). - -append_acks_to_segment(SegNum, AckCount, _Acks, AckCount, State = - #qistate { publish_handle = PubHdl }) -> - PubHdl1 = case PubHdl of - %% If we're adjusting the pubhdl here then there - %% will be no entry in partials, thus the target ack - %% count must be SEGMENT_ENTRY_COUNT - {SegNum, Hdl, AckCount = ?SEGMENT_ENTRY_COUNT} - when Hdl /= undefined -> - {SegNum + 1, undefined, 0}; - _ -> - PubHdl - end, - delete_segment(SegNum, State #qistate { publish_handle = PubHdl1 }); -append_acks_to_segment(_SegNum, _AckCount, [], _AckTarget, State) -> - State; -append_acks_to_segment(SegNum, AckCount, Acks, AckTarget, State = - #qistate { seg_ack_counts = AckCounts }) - when AckCount < AckTarget -> - {Hdl, State1} = append_to_segment(SegNum, Acks, State), - ok = file_handle_cache:sync(Hdl), - State1 #qistate { seg_ack_counts = - dict:store(SegNum, AckCount, AckCounts) }. - -append_dels_to_segment(SegNum, Dels, State) -> - {_Hdl, State1} = append_to_segment(SegNum, Dels, State), - State1. +add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount }) -> + {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), + Segment = #segment { journal_entries = SegJDict, + pubs = PubCount, acks = AckCount } = + find_segment(Seg, State), + SegJDict1 = add_to_journal(RelSeq, Action, SegJDict), + Segment1 = Segment #segment { journal_entries = SegJDict1 }, + Segment2 = + case Action of + del -> Segment1; + ack -> Segment1 #segment { acks = AckCount + 1 }; + {_MsgId, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 } + end, + store_segment(Segment2, State #qistate { dirty_count = DCount + 1 }); + +%% This is a more relaxed version of deliver_or_ack_msg because we can +%% have dels or acks in the journal without the corresponding +%% pub. Also, always want to keep acks. Things must occur in the right +%% order though. +add_to_journal(RelSeq, Action, SegJDict) -> + case dict:find(RelSeq, SegJDict) of + {ok, {PubRecord, no_del, no_ack}} when Action == del -> + dict:store(RelSeq, {PubRecord, del, no_ack}, SegJDict); + {ok, {PubRecord, DelRecord, no_ack}} when Action == ack -> + dict:store(RelSeq, {PubRecord, DelRecord, ack}, SegJDict); + error when Action == del -> + dict:store(RelSeq, {no_pub, del, no_ack}, SegJDict); + error when Action == ack -> + dict:store(RelSeq, {no_pub, no_del, ack}, SegJDict); + error -> + {_MsgId, _IsPersistent} = Action, %% ASSERTION + dict:store(RelSeq, {Action, no_del, no_ack}, SegJDict) + end. -append_to_segment(SegNum, AcksOrDels, State) -> - {Hdl, State1} = get_seg_handle(SegNum, State), - ok = file_handle_cache:append( - Hdl, [<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>> || RelSeq <- AcksOrDels ]), - {Hdl, State1}. +%% Combine what we have just read from a segment file with what we're +%% holding for that segment in memory. There must be no +%% duplicates. Used when providing segment entries to the variable +%% queue. +journal_plus_segment(JEntries, SegDict) -> + dict:fold(fun (RelSeq, JObj, SegDictOut) -> + SegEntry = case dict:find(RelSeq, SegDictOut) of + error -> not_found; + {ok, SObj = {_, _, _}} -> SObj + end, + journal_plus_segment(JObj, SegEntry, RelSeq, SegDictOut) + end, SegDict, JEntries). + +%% Here, the OutDict is the SegDict which we may be adding to (for +%% items only in the journal), modifying (bits in both), or erasing +%% from (ack in journal, not segment). +journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack}, + not_found, + RelSeq, OutDict) -> + dict:store(RelSeq, Obj, OutDict); +journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack}, + not_found, + RelSeq, OutDict) -> + dict:store(RelSeq, Obj, OutDict); +journal_plus_segment({{_MsgId, _IsPersistent}, del, ack}, + not_found, + RelSeq, OutDict) -> + dict:erase(RelSeq, OutDict); + +journal_plus_segment({no_pub, del, no_ack}, + {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, + RelSeq, OutDict) -> + dict:store(RelSeq, {PubRecord, del, no_ack}, OutDict); + +journal_plus_segment({no_pub, del, ack}, + {{_MsgId, _IsPersistent}, no_del, no_ack}, + RelSeq, OutDict) -> + dict:erase(RelSeq, OutDict); +journal_plus_segment({no_pub, no_del, ack}, + {{_MsgId, _IsPersistent}, del, no_ack}, + RelSeq, OutDict) -> + dict:erase(RelSeq, OutDict). + + +%% Remove from the journal entries for a segment, items that are +%% duplicates of entries found in the segment itself. Used on start up +%% to clean up the journal. +journal_minus_segment(JEntries, SegDict) -> + dict:fold(fun (RelSeq, JObj, {JEntriesOut, PubsRemoved, AcksRemoved}) -> + SegEntry = case dict:find(RelSeq, SegDict) of + error -> not_found; + {ok, SObj = {_, _, _}} -> SObj + end, + journal_minus_segment(JObj, SegEntry, RelSeq, JEntriesOut, + PubsRemoved, AcksRemoved) + end, {dict:new(), 0, 0}, JEntries). + +%% Here, the OutDict is a fresh journal that we're filling with valid +%% entries. PubsRemoved and AcksRemoved only get increased when the a +%% publish or ack is in both the journal and the segment. + +%% Both the same. Must be at least the publish +journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, no_ack}, + _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {OutDict, PubsRemoved + 1, AcksRemoved}; +journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, ack}, + _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {OutDict, PubsRemoved + 1, AcksRemoved + 1}; + +%% Just publish in journal +journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack}, + not_found, + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; + +%% Just deliver in journal +journal_minus_segment(Obj = {no_pub, del, no_ack}, + {{_MsgId, _IsPersistent}, no_del, no_ack}, + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; +journal_minus_segment({no_pub, del, no_ack}, + {{_MsgId, _IsPersistent}, del, no_ack}, + _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {OutDict, PubsRemoved, AcksRemoved}; + +%% Just ack in journal +journal_minus_segment(Obj = {no_pub, no_del, ack}, + {{_MsgId, _IsPersistent}, del, no_ack}, + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; +journal_minus_segment({no_pub, no_del, ack}, + {{_MsgId, _IsPersistent}, del, ack}, + _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {OutDict, PubsRemoved, AcksRemoved}; + +%% Publish and deliver in journal +journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack}, + not_found, + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; +journal_minus_segment({PubRecord, del, no_ack}, + {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, {no_pub, del, no_ack}, OutDict), + PubsRemoved + 1, AcksRemoved}; + +%% Deliver and ack in journal +journal_minus_segment(Obj = {no_pub, del, ack}, + {{_MsgId, _IsPersistent}, no_del, no_ack}, + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; +journal_minus_segment({no_pub, del, ack}, + {{_MsgId, _IsPersistent}, del, no_ack}, + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, {no_pub, no_del, ack}, OutDict), + PubsRemoved, AcksRemoved}; +journal_minus_segment({no_pub, del, ack}, + {{_MsgId, _IsPersistent}, del, ack}, + _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {OutDict, PubsRemoved, AcksRemoved + 1}; + +%% Publish, deliver and ack in journal +journal_minus_segment({{_MsgId, _IsPersistent}, del, ack}, + not_found, + _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {OutDict, PubsRemoved, AcksRemoved}; +journal_minus_segment({PubRecord, del, ack}, + {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, {no_pub, del, ack}, OutDict), + PubsRemoved + 1, AcksRemoved}; +journal_minus_segment({PubRecord, del, ack}, + {PubRecord = {_MsgId, _IsPersistent}, del, no_ack}, + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, {no_pub, no_del, ack}, OutDict), + PubsRemoved + 1, AcksRemoved}. diff --git a/src/rabbit_queue_index3.erl b/src/rabbit_queue_index3.erl deleted file mode 100644 index 43a210d950..0000000000 --- a/src/rabbit_queue_index3.erl +++ /dev/null @@ -1,850 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_queue_index3). - --export([init/1, terminate/1, terminate_and_erase/1, write_published/4, - write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1, - read_segment_entries/2, next_segment_boundary/1, segment_size/0, - find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]). - --define(CLEAN_FILENAME, "clean.dot"). - -%%---------------------------------------------------------------------------- -%% ---- Journal details ---- - --define(MAX_JOURNAL_ENTRY_COUNT, 32768). --define(JOURNAL_FILENAME, "journal.jif"). - --define(PUB_PERSIST_JPREFIX, 00). --define(PUB_TRANS_JPREFIX, 01). --define(DEL_JPREFIX, 10). --define(ACK_JPREFIX, 11). --define(JPREFIX_BITS, 2). --define(SEQ_BYTES, 8). --define(SEQ_BITS, ((?SEQ_BYTES * 8) - ?JPREFIX_BITS)). - -%% ---- Segment details ---- - --define(SEGMENT_EXTENSION, ".idx"). - --define(REL_SEQ_BITS, 14). --define(REL_SEQ_BITS_BYTE_ALIGNED, (?REL_SEQ_BITS + 8 - (?REL_SEQ_BITS rem 8))). --define(SEGMENT_ENTRY_COUNT, 16384). %% trunc(math:pow(2,?REL_SEQ_BITS))). - -%% seq only is binary 00 followed by 14 bits of rel seq id -%% (range: 0 - 16383) --define(REL_SEQ_ONLY_PREFIX, 00). --define(REL_SEQ_ONLY_PREFIX_BITS, 2). --define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2). - -%% publish record is binary 1 followed by a bit for is_persistent, -%% then 14 bits of rel seq id, and 128 bits of md5sum msg id --define(PUBLISH_PREFIX, 1). --define(PUBLISH_PREFIX_BITS, 1). - --define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes --define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)). -%% 16 bytes for md5sum + 2 for seq, bits and prefix --define(PUBLISH_RECORD_LENGTH_BYTES, ?MSG_ID_BYTES + 2). - -%% 1 publish, 1 deliver, 1 ack per msg --define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * - (?PUBLISH_RECORD_LENGTH_BYTES + - (2 * ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES))). - -%%---------------------------------------------------------------------------- - --record(qistate, - { dir, - segments, - journal_handle, - dirty_count - }). - --record(segment, - { pubs, - acks, - handle, - journal_entries, - path, - num - }). - --include("rabbit.hrl"). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(hdl() :: ('undefined' | any())). --type(msg_id() :: binary()). --type(seq_id() :: integer()). --type(qistate() :: #qistate { dir :: file_path(), - segments :: dict(), - journal_handle :: hdl(), - dirty_count :: integer() - }). - --spec(init/1 :: (queue_name()) -> {non_neg_integer(), qistate()}). --spec(terminate/1 :: (qistate()) -> qistate()). --spec(terminate_and_erase/1 :: (qistate()) -> qistate()). --spec(write_published/4 :: (msg_id(), seq_id(), boolean(), qistate()) - -> qistate()). --spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()). --spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()). --spec(sync_seq_ids/2 :: ([seq_id()], qistate()) -> qistate()). --spec(flush_journal/1 :: (qistate()) -> qistate()). --spec(read_segment_entries/2 :: (seq_id(), qistate()) -> - {[{msg_id(), seq_id(), boolean(), boolean()}], qistate()}). --spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). --spec(segment_size/0 :: () -> non_neg_integer()). --spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) -> - {non_neg_integer(), non_neg_integer(), qistate()}). --spec(start_msg_store/1 :: ([amqqueue()]) -> 'ok'). - --endif. - - -%%---------------------------------------------------------------------------- -%% Public API -%%---------------------------------------------------------------------------- - -init(Name) -> - State = blank_state(Name), - %% 1. Load the journal completely. This will also load segments - %% which have entries in the journal and remove duplicates. - %% The counts will correctly reflect the combination of the - %% segment and the journal. - State1 = load_journal(State), - %% 2. Flush the journal. This makes life easier for everyone, as - %% it means there won't be any publishes in the journal alone. - State2 = #qistate { dir = Dir } = flush_journal(State1), - %% 3. Load each segment in turn and filter out messages that are - %% not in the msg_store, by adding acks to the journal. These - %% acks only go to the RAM journal as it doesn't matter if we - %% lose them. Also mark delivered if not clean shutdown. - AllSegs = all_segment_nums(Dir), - CleanShutdown = detect_clean_shutdown(Dir), - %% We know the journal is empty here, so we don't need to combine - %% with the journal, and we don't need to worry about messages - %% that have been acked. - State3 = - lists:foldl( - fun (Seg, StateN) -> - {SegDict, _PubCount, _AckCount, StateN1} = - load_segment(Seg, false, StateN), - dict:fold( - fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack}, - StateM) -> - SeqId = reconstruct_seq_id(Seg, RelSeq), - InMsgStore = rabbit_msg_store:contains(MsgId), - case {InMsgStore, CleanShutdown} of - {true, true} -> - StateM; - {true, false} when Del == del -> - StateM; - {true, false} -> - add_to_journal(SeqId, del, StateM); - {false, _} when Del == del -> - add_to_journal(SeqId, ack, StateM); - {false, _} -> - add_to_journal( - SeqId, ack, - add_to_journal(SeqId, del, StateM)) - end - end, StateN1, SegDict) - end, State2, AllSegs), - %% 4. Go through all segments and calculate the number of unacked - %% messages we have. - Count = lists:foldl( - fun (Seg, CountAcc) -> - #segment { pubs = PubCount, acks = AckCount } = - find_segment(Seg, State3), - CountAcc + PubCount - AckCount - end, 0, AllSegs), - {Count, State3}. - -terminate(State) -> - terminate(true, State). - -terminate_and_erase(State) -> - State1 = terminate(State), - ok = delete_queue_directory(State1 #qistate.dir), - State1. - -write_published(MsgId, SeqId, IsPersistent, State) - when is_binary(MsgId) -> - ?MSG_ID_BYTES = size(MsgId), - {JournalHdl, State1} = get_journal_handle(State), - ok = file_handle_cache:append(JournalHdl, - [<<(case IsPersistent of - true -> ?PUB_PERSIST_JPREFIX; - false -> ?PUB_TRANS_JPREFIX - end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, - MsgId]), - maybe_flush_journal(add_to_journal(SeqId, {MsgId, IsPersistent}, State1)). - -write_delivered(SeqId, State) -> - {JournalHdl, State1} = get_journal_handle(State), - ok = file_handle_cache:append(JournalHdl, - <<?DEL_JPREFIX:?JPREFIX_BITS, - SeqId:?SEQ_BITS>>), - maybe_flush_journal(add_to_journal(SeqId, del, State1)). - -write_acks(SeqIds, State) -> - {SeqIds1, State1} = remove_pubs_dels_from_journal(SeqIds, State), - case SeqIds1 of - [] -> - State; - _ -> - {JournalHdl, State2} = get_journal_handle(State1), - ok = file_handle_cache:append(JournalHdl, - [<<?ACK_JPREFIX:?JPREFIX_BITS, - SeqId:?SEQ_BITS>> - || SeqId <- SeqIds1]), - State3 = lists:foldl(fun (SeqId, StateN) -> - add_to_journal(SeqId, ack, StateN) - end, State2, SeqIds1), - maybe_flush_journal(State3) - end. - -sync_seq_ids(_SeqIds, State = #qistate { journal_handle = undefined }) -> - State; -sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> - ok = file_handle_cache:sync(JournalHdl), - State. - -flush_journal(State = #qistate { dirty_count = 0 }) -> - State; -flush_journal(State = #qistate { segments = Segments }) -> - State1 = - dict:fold( - fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount, - acks = AckCount } = Segment, StateN) -> - case dict:is_empty(JEntries) of - true -> store_segment(Segment, StateN); - false when AckCount == PubCount -> - ok = delete_segment(Segment); - false -> - {Hdl, Segment1} = get_segment_handle(Segment), - dict:fold(fun write_entry_to_segment/3, - Hdl, JEntries), - ok = file_handle_cache:sync(Hdl), - store_segment( - Segment1 #segment { journal_entries = dict:new() }, - StateN) - end - end, State, Segments), - {JournalHdl, State2} = get_journal_handle(State1), - {ok, 0} = file_handle_cache:position(JournalHdl, bof), - ok = file_handle_cache:truncate(JournalHdl), - ok = file_handle_cache:sync(JournalHdl), - State2 #qistate { dirty_count = 0 }. - -read_segment_entries(InitSeqId, State) -> - {Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), - {SegDict, _PubCount, _AckCount, State1} = - load_segment(Seg, false, State), - #segment { journal_entries = JEntries } = find_segment(Seg, State1), - SegDict1 = journal_plus_segment(JEntries, SegDict), - %% deliberately sort the list desc, because foldl will reverse it - RelSeqs = rev_sort(dict:fetch_keys(SegDict1)), - {lists:foldl(fun (RelSeq, Acc) -> - {{MsgId, IsPersistent}, IsDelivered, no_ack} = - dict:fetch(RelSeq, SegDict1), - [ {MsgId, reconstruct_seq_id(Seg, RelSeq), - IsPersistent, IsDelivered} | Acc ] - end, [], RelSeqs), - State1}. - -next_segment_boundary(SeqId) -> - {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - reconstruct_seq_id(Seg + 1, 0). - -segment_size() -> - ?SEGMENT_ENTRY_COUNT. - -find_lowest_seq_id_seg_and_next_seq_id(State = #qistate { dir = Dir }) -> - SegNums = all_segment_nums(Dir), - %% We don't want the lowest seq_id, merely the seq_id of the start - %% of the lowest segment. That seq_id may not actually exist, but - %% that's fine. The important thing is that the segment exists and - %% the seq_id reported is on a segment boundary. - - %% We also don't really care about the max seq_id. Just start the - %% next segment: it makes life much easier. - - %% SegNums is sorted, ascending. - {LowSeqIdSeg, NextSeqId} = - case SegNums of - [] -> {0, 0}; - [MinSeg|_] -> {reconstruct_seq_id(MinSeg, 0), - reconstruct_seq_id(lists:last(SegNums), 0)} - end, - {LowSeqIdSeg, NextSeqId, State}. - -start_msg_store(DurableQueues) -> - DurableDict = - dict:from_list([ {queue_name_to_dir_name(Queue #amqqueue.name), - Queue #amqqueue.name} || Queue <- DurableQueues ]), - QueuesDir = queues_dir(), - Directories = case file:list_dir(QueuesDir) of - {ok, Entries} -> - [ Entry || Entry <- Entries, - filelib:is_dir( - filename:join(QueuesDir, Entry)) ]; - {error, enoent} -> - [] - end, - DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)), - {DurableQueueNames, TransientDirs} = - lists:foldl( - fun (QueueDir, {DurableAcc, TransientAcc}) -> - case sets:is_element(QueueDir, DurableDirectories) of - true -> - {[dict:fetch(QueueDir, DurableDict) | DurableAcc], - TransientAcc}; - false -> - {DurableAcc, [QueueDir | TransientAcc]} - end - end, {[], []}, Directories), - MsgStoreDir = filename:join(rabbit_mnesia:dir(), "msg_store"), - ok = rabbit:start_child(rabbit_msg_store, [MsgStoreDir, - fun queue_index_walker/1, - DurableQueueNames]), - lists:foreach(fun (DirName) -> - Dir = filename:join(queues_dir(), DirName), - ok = delete_queue_directory(Dir) - end, TransientDirs), - ok. - -%%---------------------------------------------------------------------------- -%% Msg Store Startup Delta Function -%%---------------------------------------------------------------------------- - -queue_index_walker([]) -> - finished; -queue_index_walker([QueueName|QueueNames]) -> - State = #qistate { dir = Dir } = blank_state(QueueName), - State1 = #qistate { journal_handle = JHdl } = load_journal(State), - ok = file_handle_cache:close(JHdl), - SegNums = all_segment_nums(Dir), - queue_index_walker({SegNums, State1, QueueNames}); - -queue_index_walker({[], State, QueueNames}) -> - _State = terminate(false, State), - queue_index_walker(QueueNames); -queue_index_walker({[Seg | SegNums], State, QueueNames}) -> - SeqId = reconstruct_seq_id(Seg, 0), - {Messages, State1} = read_segment_entries(SeqId, State), - queue_index_walker({Messages, State1, SegNums, QueueNames}); - -queue_index_walker({[], State, SegNums, QueueNames}) -> - queue_index_walker({SegNums, State, QueueNames}); -queue_index_walker({[{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs], - State, SegNums, QueueNames}) -> - case IsPersistent of - true -> {MsgId, 1, {Msgs, State, SegNums, QueueNames}}; - false -> queue_index_walker({Msgs, State, SegNums, QueueNames}) - end. - -%%---------------------------------------------------------------------------- -%% Minors -%%---------------------------------------------------------------------------- - -maybe_flush_journal(State = #qistate { dirty_count = DCount }) - when DCount > ?MAX_JOURNAL_ENTRY_COUNT -> - flush_journal(State); -maybe_flush_journal(State) -> - State. - -all_segment_nums(Dir) -> - lists:sort( - [list_to_integer( - lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName)) - || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)]). - -blank_state(QueueName) -> - StrName = queue_name_to_dir_name(QueueName), - Dir = filename:join(queues_dir(), StrName), - ok = filelib:ensure_dir(filename:join(Dir, "nothing")), - #qistate { dir = Dir, - segments = dict:new(), - journal_handle = undefined, - dirty_count = 0 - }. - -rev_sort(List) -> - lists:sort(fun (A, B) -> B < A end, List). - -seq_id_to_seg_and_rel_seq_id(SeqId) -> - { SeqId div ?SEGMENT_ENTRY_COUNT, SeqId rem ?SEGMENT_ENTRY_COUNT }. - -reconstruct_seq_id(Seg, RelSeq) -> - (Seg * ?SEGMENT_ENTRY_COUNT) + RelSeq. - -seg_num_to_path(Dir, Seg) -> - SegName = integer_to_list(Seg), - filename:join(Dir, SegName ++ ?SEGMENT_EXTENSION). - -delete_segment(#segment { handle = undefined }) -> - ok; -delete_segment(#segment { handle = Hdl, path = Path }) -> - ok = file_handle_cache:close(Hdl), - ok = file:delete(Path), - ok. - -detect_clean_shutdown(Dir) -> - case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of - ok -> true; - {error, enoent} -> false - end. - -store_clean_shutdown(Dir) -> - {ok, Hdl} = file_handle_cache:open(filename:join(Dir, ?CLEAN_FILENAME), - [write, raw, binary], - [{write_buffer, unbuffered}]), - ok = file_handle_cache:close(Hdl). - -queue_name_to_dir_name(Name = #resource { kind = queue }) -> - Bin = term_to_binary(Name), - Size = 8*size(Bin), - <<Num:Size>> = Bin, - lists:flatten(io_lib:format("~.36B", [Num])). - -queues_dir() -> - filename:join(rabbit_mnesia:dir(), "queues"). - -delete_queue_directory(Dir) -> - {ok, Entries} = file:list_dir(Dir), - ok = lists:foldl(fun (Entry, ok) -> - file:delete(filename:join(Dir, Entry)) - end, ok, Entries), - ok = file:del_dir(Dir). - -get_segment_handle(Segment = #segment { handle = undefined, path = Path }) -> - {ok, Hdl} = file_handle_cache:open(Path, - [binary, raw, read, write, - {read_ahead, ?SEGMENT_TOTAL_SIZE}], - [{write_buffer, infinity}]), - {Hdl, Segment #segment { handle = Hdl }}; -get_segment_handle(Segment = #segment { handle = Hdl }) -> - {Hdl, Segment}. - -find_segment(Seg, #qistate { segments = Segments, dir = Dir }) -> - case dict:find(Seg, Segments) of - {ok, Segment = #segment{}} -> Segment; - error -> #segment { pubs = 0, - acks = 0, - handle = undefined, - journal_entries = dict:new(), - path = seg_num_to_path(Dir, Seg), - num = Seg - } - end. - -store_segment(Segment = #segment { num = Seg }, - State = #qistate { segments = Segments }) -> - State #qistate { segments = dict:store(Seg, Segment, Segments) }. - -get_journal_handle(State = - #qistate { journal_handle = undefined, dir = Dir }) -> - Path = filename:join(Dir, ?JOURNAL_FILENAME), - {ok, Hdl} = file_handle_cache:open(Path, - [binary, raw, read, write, - {read_ahead, ?SEGMENT_TOTAL_SIZE}], - [{write_buffer, infinity}]), - {Hdl, State #qistate { journal_handle = Hdl }}; -get_journal_handle(State = #qistate { journal_handle = Hdl }) -> - {Hdl, State}. - -bool_to_int(true ) -> 1; -bool_to_int(false) -> 0. - -write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) -> - ok = case Publish of - no_pub -> - ok; - {MsgId, IsPersistent} -> - file_handle_cache:append( - Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS>>, MsgId]) - end, - ok = case {Del, Ack} of - {no_del, no_ack} -> ok; - _ -> Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>, - Data = case {Del, Ack} of - {del, ack} -> [Binary, Binary]; - _ -> Binary - end, - file_handle_cache:append(Hdl, Data) - end, - Hdl. - -terminate(StoreShutdown, State = - #qistate { segments = Segments, journal_handle = JournalHdl, - dir = Dir }) -> - ok = case JournalHdl of - undefined -> ok; - _ -> file_handle_cache:close(JournalHdl) - end, - ok = dict:fold( - fun (_Seg, #segment { handle = undefined }, ok) -> - ok; - (_Seg, #segment { handle = Hdl }, ok) -> - file_handle_cache:close(Hdl) - end, ok, Segments), - case StoreShutdown of - true -> store_clean_shutdown(Dir); - false -> ok - end, - State #qistate { journal_handle = undefined, segments = dict:new() }. - -remove_pubs_dels_from_journal(SeqIds, State) -> - lists:foldl( - fun (SeqId, {SeqIdsAcc, StateN}) -> - {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - Segment = #segment { journal_entries = JEntries, - acks = AckCount } = - find_segment(Seg, StateN), - case dict:find(RelSeq, JEntries) of - {ok, {{_MsgId, _IsPersistent}, del, no_ack}} -> - StateN1 = - store_segment( - Segment #segment { journal_entries = - dict:erase(RelSeq, JEntries), - acks = AckCount + 1 }, - StateN), - {SeqIdsAcc, StateN1}; - _ -> - {[SeqId | SeqIdsAcc], StateN} - end - end, {[], State}, SeqIds). - -%%---------------------------------------------------------------------------- -%% Majors -%%---------------------------------------------------------------------------- - -%% Loading segments - -%% Does not do any combining with the journal at all. The PubCount -%% that comes back is the number of publishes in the segment. The -%% number of unacked msgs is PubCount - AckCount. If KeepAcks is -%% false, then dict:size(SegDict) == PubCount - AckCount. If KeepAcks -%% is true, then dict:size(SegDict) == PubCount. -load_segment(Seg, KeepAcks, State) -> - Segment = #segment { path = Path, handle = SegHdl } = - find_segment(Seg, State), - SegmentExists = case SegHdl of - undefined -> filelib:is_file(Path); - _ -> true - end, - case SegmentExists of - false -> - {dict:new(), 0, 0, State}; - true -> - {Hdl, Segment1} = get_segment_handle(Segment), - {ok, 0} = file_handle_cache:position(Hdl, bof), - {SegDict, PubCount, AckCount} = - load_segment_entries(KeepAcks, Hdl, dict:new(), 0, 0), - {SegDict, PubCount, AckCount, store_segment(Segment1, State)} - end. - -load_segment_entries(KeepAcks, Hdl, SegDict, PubCount, AckCount) -> - case file_handle_cache:read(Hdl, 1) of - {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>} -> - {ok, LSB} = file_handle_cache:read( - Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1), - <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, - {AckCount1, SegDict1} = - deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict), - load_segment_entries(KeepAcks, Hdl, SegDict1, PubCount, AckCount1); - {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>} -> - %% because we specify /binary, and binaries are complete - %% bytes, the size spec is in bytes, not bits. - {ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>} = - file_handle_cache:read( - Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1), - <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, - SegDict1 = - dict:store(RelSeq, - {{MsgId, 1 == IsPersistentNum}, no_del, no_ack}, - SegDict), - load_segment_entries(KeepAcks, Hdl, SegDict1, PubCount+1, AckCount); - _ErrOrEoF -> - {SegDict, PubCount, AckCount} - end. - -deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict) -> - case dict:find(RelSeq, SegDict) of - {ok, {PubRecord, no_del, no_ack}} -> - {AckCount, dict:store(RelSeq, {PubRecord, del, no_ack}, SegDict)}; - {ok, {PubRecord, del, no_ack}} when KeepAcks -> - {AckCount + 1, dict:store(RelSeq, {PubRecord, del, ack}, SegDict)}; - {ok, {_PubRecord, del, no_ack}} when KeepAcks -> - {AckCount + 1, dict:erase(RelSeq, SegDict)} - end. - -%% Loading Journal. This isn't idempotent and will mess up the counts -%% if you call it more than once on the same state. Assumes the counts -%% are 0 to start with. - -load_journal(State) -> - {JournalHdl, State1} = get_journal_handle(State), - {ok, 0} = file_handle_cache:position(JournalHdl, 0), - State1 = #qistate { segments = Segments } = load_journal_entries(State), - dict:fold( - fun (Seg, #segment { journal_entries = JEntries, - pubs = PubCountInJournal, - acks = AckCountInJournal }, StateN) -> - %% We want to keep acks in so that we can remove them if - %% duplicates are in the journal. The counts here are - %% purely from the segment itself. - {SegDict, PubCountInSeg, AckCountInSeg, StateN1} = - load_segment(Seg, true, StateN), - %% Removed counts here are the number of pubs and acks - %% that are duplicates - i.e. found in both the segment - %% and journal. - {JEntries1, PubsRemoved, AcksRemoved} = - journal_minus_segment(JEntries, SegDict), - {Segment1, StateN2} = find_segment(Seg, StateN1), - PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved, - AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved, - store_segment(Segment1 #segment { journal_entries = JEntries1, - pubs = PubCount1, - acks = AckCount1 }, StateN2) - end, State1, Segments). - -load_journal_entries(State = #qistate { journal_handle = Hdl }) -> - case file_handle_cache:read(Hdl, ?SEQ_BYTES) of - {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} -> - case Prefix of - ?DEL_JPREFIX -> - load_journal_entries(add_to_journal(SeqId, del, State)); - ?ACK_JPREFIX -> - load_journal_entries(add_to_journal(SeqId, ack, State)); - _ -> - case file_handle_cache:read(Hdl, ?MSG_ID_BYTES) of - {ok, <<MsgIdNum:?MSG_ID_BITS>>} -> - %% work around for binary data - %% fragmentation. See - %% rabbit_msg_file:read_next/2 - <<MsgId:?MSG_ID_BYTES/binary>> = - <<MsgIdNum:?MSG_ID_BITS>>, - Publish = {MsgId, - case Prefix of - ?PUB_PERSIST_JPREFIX -> true; - ?PUB_TRANS_JPREFIX -> false - end}, - load_journal_entries( - add_to_journal(SeqId, Publish, State)); - _ErrOrEoF -> %% err, we've lost at least a publish - State - end - end; - _ErrOrEoF -> State - end. - -add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount }) -> - {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - Segment = #segment { journal_entries = SegJDict, - pubs = PubCount, acks = AckCount } = - find_segment(Seg, State), - SegJDict1 = add_to_journal(RelSeq, Action, SegJDict), - Segment1 = Segment #segment { journal_entries = SegJDict1 }, - Segment2 = - case Action of - del -> Segment1; - ack -> Segment1 #segment { acks = AckCount + 1 }; - {_MsgId, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 } - end, - store_segment(Segment2, State #qistate { dirty_count = DCount + 1 }); - -%% This is a more relaxed version of deliver_or_ack_msg because we can -%% have dels or acks in the journal without the corresponding -%% pub. Also, always want to keep acks. Things must occur in the right -%% order though. -add_to_journal(RelSeq, Action, SegJDict) -> - case dict:find(RelSeq, SegJDict) of - {ok, {PubRecord, no_del, no_ack}} when Action == del -> - dict:store(RelSeq, {PubRecord, del, no_ack}, SegJDict); - {ok, {PubRecord, DelRecord, no_ack}} when Action == ack -> - dict:store(RelSeq, {PubRecord, DelRecord, ack}, SegJDict); - error when Action == del -> - dict:store(RelSeq, {no_pub, del, no_ack}, SegJDict); - error when Action == ack -> - dict:store(RelSeq, {no_pub, no_del, ack}, SegJDict); - error -> - {_MsgId, _IsPersistent} = Action, %% ASSERTION - dict:store(RelSeq, {Action, no_del, no_ack}, SegJDict) - end. - -%% Combine what we have just read from a segment file with what we're -%% holding for that segment in memory. There must be no -%% duplicates. Used when providing segment entries to the variable -%% queue. -journal_plus_segment(JEntries, SegDict) -> - dict:fold(fun (RelSeq, JObj, SegDictOut) -> - SegEntry = case dict:find(RelSeq, SegDictOut) of - error -> not_found; - {ok, SObj = {_, _, _}} -> SObj - end, - journal_plus_segment(JObj, SegEntry, RelSeq, SegDictOut) - end, SegDict, JEntries). - -%% Here, the OutDict is the SegDict which we may be adding to (for -%% items only in the journal), modifying (bits in both), or erasing -%% from (ack in journal, not segment). -journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack}, - not_found, - RelSeq, OutDict) -> - dict:store(RelSeq, Obj, OutDict); -journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack}, - not_found, - RelSeq, OutDict) -> - dict:store(RelSeq, Obj, OutDict); -journal_plus_segment({{_MsgId, _IsPersistent}, del, ack}, - not_found, - RelSeq, OutDict) -> - dict:erase(RelSeq, OutDict); - -journal_plus_segment({no_pub, del, no_ack}, - {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutDict) -> - dict:store(RelSeq, {PubRecord, del, no_ack}, OutDict); - -journal_plus_segment({no_pub, del, ack}, - {{_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutDict) -> - dict:erase(RelSeq, OutDict); -journal_plus_segment({no_pub, no_del, ack}, - {{_MsgId, _IsPersistent}, del, no_ack}, - RelSeq, OutDict) -> - dict:erase(RelSeq, OutDict). - - -%% Remove from the journal entries for a segment, items that are -%% duplicates of entries found in the segment itself. Used on start up -%% to clean up the journal. -journal_minus_segment(JEntries, SegDict) -> - dict:fold(fun (RelSeq, JObj, {JEntriesOut, PubsRemoved, AcksRemoved}) -> - SegEntry = case dict:find(RelSeq, SegDict) of - error -> not_found; - {ok, SObj = {_, _, _}} -> SObj - end, - journal_minus_segment(JObj, SegEntry, RelSeq, JEntriesOut, - PubsRemoved, AcksRemoved) - end, {dict:new(), 0, 0}, JEntries). - -%% Here, the OutDict is a fresh journal that we're filling with valid -%% entries. PubsRemoved and AcksRemoved only get increased when the a -%% publish or ack is in both the journal and the segment. - -%% Both the same. Must be at least the publish -journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, no_ack}, - _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {OutDict, PubsRemoved + 1, AcksRemoved}; -journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, ack}, - _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {OutDict, PubsRemoved + 1, AcksRemoved + 1}; - -%% Just publish in journal -journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack}, - not_found, - RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; - -%% Just deliver in journal -journal_minus_segment(Obj = {no_pub, del, no_ack}, - {{_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; -journal_minus_segment({no_pub, del, no_ack}, - {{_MsgId, _IsPersistent}, del, no_ack}, - _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {OutDict, PubsRemoved, AcksRemoved}; - -%% Just ack in journal -journal_minus_segment(Obj = {no_pub, no_del, ack}, - {{_MsgId, _IsPersistent}, del, no_ack}, - RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; -journal_minus_segment({no_pub, no_del, ack}, - {{_MsgId, _IsPersistent}, del, ack}, - _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {OutDict, PubsRemoved, AcksRemoved}; - -%% Publish and deliver in journal -journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack}, - not_found, - RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; -journal_minus_segment({PubRecord, del, no_ack}, - {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {dict:store(RelSeq, {no_pub, del, no_ack}, OutDict), - PubsRemoved + 1, AcksRemoved}; - -%% Deliver and ack in journal -journal_minus_segment(Obj = {no_pub, del, ack}, - {{_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; -journal_minus_segment({no_pub, del, ack}, - {{_MsgId, _IsPersistent}, del, no_ack}, - RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {dict:store(RelSeq, {no_pub, no_del, ack}, OutDict), - PubsRemoved, AcksRemoved}; -journal_minus_segment({no_pub, del, ack}, - {{_MsgId, _IsPersistent}, del, ack}, - _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {OutDict, PubsRemoved, AcksRemoved + 1}; - -%% Publish, deliver and ack in journal -journal_minus_segment({{_MsgId, _IsPersistent}, del, ack}, - not_found, - _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {OutDict, PubsRemoved, AcksRemoved}; -journal_minus_segment({PubRecord, del, ack}, - {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {dict:store(RelSeq, {no_pub, del, ack}, OutDict), - PubsRemoved + 1, AcksRemoved}; -journal_minus_segment({PubRecord, del, ack}, - {PubRecord = {_MsgId, _IsPersistent}, del, no_ack}, - RelSeq, OutDict, PubsRemoved, AcksRemoved) -> - {dict:store(RelSeq, {no_pub, no_del, ack}, OutDict), - PubsRemoved + 1, AcksRemoved}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f84ba70adc..dc81ea18b9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1078,6 +1078,8 @@ verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> ko. test_queue_index() -> + SegmentSize = rabbit_queue_index:segment_size(), + TwoSegs = SegmentSize + SegmentSize, stop_msg_store(), ok = empty_test_queue(), SeqIdsA = lists:seq(0,9999), @@ -1086,7 +1088,7 @@ test_queue_index() -> {0, 0, Qi1} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi0), {Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1), - {0, 10000, Qi3} = + {0, SegSize, Qi3} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi2), {ReadA, Qi4} = rabbit_queue_index:read_segment_entries(0, Qi3), ok = verify_read_with_published(false, false, ReadA, @@ -1097,10 +1099,10 @@ test_queue_index() -> ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]), %% should get length back as 0, as all the msgs were transient {0, Qi6} = rabbit_queue_index:init(test_queue()), - {0, 10000, Qi7} = + {0, SegSize, Qi7} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6), {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7), - {0, 20000, Qi9} = + {0, TwoSegs, Qi9} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi8), {ReadB, Qi10} = rabbit_queue_index:read_segment_entries(0, Qi9), ok = verify_read_with_published(false, true, ReadB, @@ -1111,7 +1113,7 @@ test_queue_index() -> %% should get length back as 10000 LenB = length(SeqIdsB), {LenB, Qi12} = rabbit_queue_index:init(test_queue()), - {0, 20000, Qi13} = + {0, TwoSegs, Qi13} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi12), Qi14 = queue_index_deliver(SeqIdsB, Qi13), {ReadC, Qi15} = rabbit_queue_index:read_segment_entries(0, Qi14), @@ -1119,10 +1121,8 @@ test_queue_index() -> lists:reverse(SeqIdsMsgIdsB)), Qi16 = rabbit_queue_index:write_acks(SeqIdsB, Qi15), Qi17 = queue_index_flush_journal(Qi16), - %% the entire first segment will have gone as they were firstly - %% transient, and secondly ack'd - SegmentSize = rabbit_queue_index:segment_size(), - {SegmentSize, 20000, Qi18} = + %% Everything will have gone now because #pubs == #acks + {0, 0, Qi18} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17), _Qi19 = rabbit_queue_index:terminate(Qi18), ok = stop_msg_store(), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0a5909a055..f2d45700f0 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -460,7 +460,7 @@ tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms} }) -> {SeqIdsAcc1, StateN1} end, {[], State1}, lists:flatten(lists:reverse(SPubs))), IndexState1 = - rabbit_queue_index:sync_seq_ids(PubSeqIds, [] /= SAcks, IndexState), + rabbit_queue_index:sync_seq_ids(PubSeqIds, IndexState), [ gen_server2:reply(From, ok) || From <- lists:reverse(SFroms) ], State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }. |
