diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-30 18:00:26 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-30 18:00:26 +0000 |
| commit | 4a1b5e5279127cbcf75880b4ce145dd4aefae050 (patch) | |
| tree | 4adabf8121eae20403a82f371743de206aee027e /src | |
| parent | 6c2709a85d3a1a16ebc1da263ceef42b2e5a54cb (diff) | |
| download | rabbitmq-server-git-4a1b5e5279127cbcf75880b4ce145dd4aefae050.tar.gz | |
Well, the transformation is done. Hilariously it works, first time. However, whilst all tests pass, I suspect there may still be faults, eg in the counting of entries in the journal etc. It still needs further checking...
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 364 |
1 files changed, 206 insertions, 158 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 62c6af5384..acda163654 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -77,9 +77,12 @@ -define(CLEAN_FILENAME, "clean.dot"). -define(MAX_ACK_JOURNAL_ENTRY_COUNT, 32768). --define(ACK_JOURNAL_FILENAME, "ack_journal.jif"). +-define(ACK_JOURNAL_FILENAME, "journal.jif"). + +-define(DEL_BIT, 0). +-define(ACK_BIT, 1). -define(SEQ_BYTES, 8). --define(SEQ_BITS, (?SEQ_BYTES * 8)). +-define(SEQ_BITS, ((?SEQ_BYTES * 8) - 1)). -define(SEGMENT_EXTENSION, ".idx"). -define(REL_SEQ_BITS, 14). @@ -112,11 +115,11 @@ -record(qistate, { dir, seg_num_handles, - journal_ack_count, + journal_count, journal_ack_dict, + journal_del_dict, seg_ack_counts, - publish_handle, - deliver_handle + publish_handle }). -include("rabbit.hrl"). @@ -132,11 +135,11 @@ {non_neg_integer(), hdl(), non_neg_integer()})). -type(qistate() :: #qistate { dir :: file_path(), seg_num_handles :: dict(), - journal_ack_count :: integer(), + journal_count :: integer(), journal_ack_dict :: dict(), + journal_del_dict :: dict(), seg_ack_counts :: dict(), - publish_handle :: hdl_and_count(), - deliver_handle :: hdl_and_count() + publish_handle :: hdl_and_count() }). -spec(init/1 :: (queue_name()) -> {non_neg_integer(), qistate()}). @@ -193,29 +196,17 @@ write_published(MsgId, SeqId, IsPersistent, State) RelSeq:?REL_SEQ_BITS, MsgId/binary>>), State1. -write_delivered(SeqId, State) -> - {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - {Hdl, State1} = get_del_handle(SegNum, State), - ok = file_handle_cache:append( - Hdl, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>), - State1. +write_delivered(SeqId, State = #qistate { journal_del_dict = JDelDict }) -> + {JDelDict1, State1} = + write_to_journal([<<?DEL_BIT:1, SeqId:?SEQ_BITS>>], + [SeqId], JDelDict, State), + maybe_full_flush(State1 #qistate { journal_del_dict = JDelDict1 }). -write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict, - journal_ack_count = JAckCount }) -> - {Hdl, State1} = get_journal_handle(State), - {JAckDict1, JAckCount1} = - lists:foldl( - fun (SeqId, {JAckDict2, JAckCount2}) -> - ok = file_handle_cache:append(Hdl, <<SeqId:?SEQ_BITS>>), - {add_ack_to_ack_dict(SeqId, JAckDict2), JAckCount2 + 1} - end, {JAckDict, JAckCount}, SeqIds), - State2 = State1 #qistate { journal_ack_dict = JAckDict1, - journal_ack_count = JAckCount1 }, - case JAckCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT of - true -> full_flush_journal(State2); - false -> State2 - end. +write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict }) -> + {JAckDict1, State1} = write_to_journal([<<?ACK_BIT:1, SeqId:?SEQ_BITS>> || + SeqId <- SeqIds], + SeqIds, JAckDict, State), + maybe_full_flush(State1 #qistate { journal_ack_dict = JAckDict1 }). sync_seq_ids(SeqIds, SyncAckJournal, State) -> State1 = case SyncAckJournal of @@ -237,37 +228,44 @@ sync_seq_ids(SeqIds, SyncAckJournal, State) -> StateM end, State1, SegNumsSet). -can_flush_journal(#qistate { journal_ack_count = 0 }) -> +can_flush_journal(#qistate { journal_count = 0 }) -> false; can_flush_journal(_) -> true. -flush_journal(State = #qistate { journal_ack_count = 0 }) -> +flush_journal(State = #qistate { journal_count = 0 }) -> State; flush_journal(State = #qistate { journal_ack_dict = JAckDict, - journal_ack_count = JAckCount }) -> - [SegNum|_] = dict:fetch_keys(JAckDict), - Acks = dict:fetch(SegNum, JAckDict), - State1 = append_acks_to_segment(SegNum, Acks, State), - JAckCount1 = JAckCount - length(Acks), - State2 = State1 #qistate { journal_ack_dict = dict:erase(SegNum, JAckDict), - journal_ack_count = JAckCount1 }, + journal_del_dict = JDelDict, + journal_count = JCount }) -> + SegNum = case dict:fetch_keys(JAckDict) of + [] -> hd(dict:fetch_keys(JDelDict)); + [N|_] -> N + end, + Dels = seg_entries_from_dict(SegNum, JDelDict), + Acks = seg_entries_from_dict(SegNum, JAckDict), + State1 = append_dels_to_segment(SegNum, Dels, State), + State2 = append_acks_to_segment(SegNum, Acks, State1), + JCount1 = JCount - length(Dels) - length(Acks), + State3 = State2 #qistate { journal_del_dict = dict:erase(SegNum, JDelDict), + journal_ack_dict = dict:erase(SegNum, JAckDict), + journal_count = JCount1 }, if - JAckCount1 == 0 -> - {Hdl, State3} = get_journal_handle(State2), + JCount1 == 0 -> + {Hdl, State4} = get_journal_handle(State3), ok = file_handle_cache:position(Hdl, bof), ok = file_handle_cache:truncate(Hdl), ok = file_handle_cache:sync(Hdl), - State3; - JAckCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT -> - flush_journal(State2); + State4; + JCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT -> + flush_journal(State3); true -> - State2 + State3 end. read_segment_entries(InitSeqId, State) -> {SegNum, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), - {SDict, _PubCount, _DelCount, _AckCount, _HighRelSeq, State1} = + {SDict, _PubCount, _AckCount, _HighRelSeq, State1} = load_segment(SegNum, State), %% deliberately sort the list desc, because foldl will reverse it RelSeqs = rev_sort(dict:fetch_keys(SDict)), @@ -303,7 +301,7 @@ find_lowest_seq_id_seg_and_next_seq_id(State = #qistate { dir = Dir }) -> case SegNums of [] -> {0, State}; _ -> SegNum2 = lists:last(SegNums), - {_SDict, PubCount, _DelCount, _AckCount, HighRelSeq, State2} = + {_SDict, PubCount, _AckCount, HighRelSeq, State2} = load_segment(SegNum2, State), NextSeqId1 = reconstruct_seq_id(SegNum2, HighRelSeq), NextSeqId2 = @@ -355,6 +353,23 @@ start_msg_store(DurableQueues) -> %% Minor Helpers %%---------------------------------------------------------------------------- +write_to_journal(BinList, SeqIds, Dict, + State = #qistate { journal_count = JCount }) -> + {Hdl, State1} = get_journal_handle(State), + ok = file_handle_cache:append(Hdl, BinList), + {Dict1, JCount1} = + lists:foldl( + fun (SeqId, {Dict2, JCount2}) -> + {add_seqid_to_dict(SeqId, Dict2), JCount2 + 1} + end, {Dict, JCount}, SeqIds), + {Dict1, State1 #qistate { journal_count = JCount1 }}. + +maybe_full_flush(State = #qistate { journal_count = JCount }) -> + case JCount > ?MAX_ACK_JOURNAL_ENTRY_COUNT of + true -> full_flush_journal(State); + false -> State + end. + full_flush_journal(State) -> case can_flush_journal(State) of true -> State1 = flush_journal(State), @@ -388,11 +403,6 @@ get_pub_handle(SegNum, State = #qistate { publish_handle = PubHandle }) -> get_counted_handle(SegNum, State, PubHandle), {Hdl, State1 #qistate { publish_handle = PubHandle1 }}. -get_del_handle(SegNum, State = #qistate { deliver_handle = DelHandle }) -> - {State1, DelHandle1 = {_SegNum, Hdl, _Count}} = - get_counted_handle(SegNum, State, DelHandle), - {Hdl, State1 #qistate { deliver_handle = DelHandle1 }}. - get_counted_handle(SegNum, State, undefined) -> {Hdl, State1} = get_seg_handle(SegNum, State), {State1, {SegNum, Hdl, 1}}; @@ -455,9 +465,12 @@ delete_queue_directory(Dir) -> [ filename:join(Dir, Entry) || Entry <- Entries ]), ok = file:del_dir(Dir). -add_ack_to_ack_dict(SeqId, ADict) -> +add_seqid_to_dict(SeqId, Dict) -> {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], ADict). + add_seqid_to_dict(SegNum, RelSeq, Dict). + +add_seqid_to_dict(SegNum, RelSeq, Dict) -> + dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], Dict). all_segment_nums(Dir) -> lists:sort( @@ -471,11 +484,11 @@ blank_state(QueueName) -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")), #qistate { dir = Dir, seg_num_handles = dict:new(), - journal_ack_count = 0, + journal_count = 0, journal_ack_dict = dict:new(), + journal_del_dict = dict:new(), seg_ack_counts = dict:new(), - publish_handle = undefined, - deliver_handle = undefined + publish_handle = undefined }. detect_clean_shutdown(Dir) -> @@ -489,6 +502,12 @@ store_clean_shutdown(Dir) -> [write, raw, binary], [{write_buffer, unbuffered}]), ok = file_handle_cache:close(Hdl). + +seg_entries_from_dict(SegNum, Dict) -> + case dict:find(SegNum, Dict) of + {ok, Entries} -> Entries; + error -> [] + end. %%---------------------------------------------------------------------------- @@ -500,7 +519,7 @@ queue_index_walker([]) -> queue_index_walker([QueueName|QueueNames]) -> State = blank_state(QueueName), {Hdl, State1} = get_journal_handle(State), - JAckDict = load_journal(Hdl, dict:new()), + {_JDelDict, JAckDict} = load_journal(Hdl, dict:new(), dict:new()), State2 = #qistate { dir = Dir } = close_handle(journal, State1 #qistate { journal_ack_dict = JAckDict }), SegNums = all_segment_nums(Dir), @@ -510,7 +529,7 @@ queue_index_walker({[], State, QueueNames}) -> _State = terminate(State), queue_index_walker(QueueNames); queue_index_walker({[SegNum | SegNums], State, QueueNames}) -> - {SDict, _PubCount, _DelCount, _AckCount, _HighRelSeq, State1} = + {SDict, _PubCount, _AckCount, _HighRelSeq, State1} = load_segment(SegNum, State), queue_index_walker({dict:to_list(SDict), State1, SegNums, QueueNames}); @@ -534,31 +553,18 @@ read_and_prune_segments(State = #qistate { dir = Dir }) -> {TotalMsgCount, State1} = lists:foldl( fun (SegNum, {TotalMsgCount1, StateN = - #qistate { publish_handle = PublishHandle, - deliver_handle = DeliverHandle }}) -> - {SDict, PubCount, DelCount, AckCount, _HighRelSeq, StateM} = + #qistate { publish_handle = PublishHandle }}) -> + {SDict, PubCount, AckCount, _HighRelSeq, StateM} = load_segment(SegNum, StateN), - {TransientMsgsAcks, StateL = - #qistate { seg_ack_counts = AckCounts, - journal_ack_dict = JAckDict }} = + StateL = #qistate { seg_ack_counts = AckCounts } = drop_and_deliver(SegNum, SDict, CleanShutdown, StateM), - %% ignore TransientMsgsAcks in AckCounts and - %% JAckDict1 because the TransientMsgsAcks fall - %% through into scatter_journal at which point the - %% AckCounts and TotalMsgCount will be correctly - %% adjusted. TotalMsgCount2 = TotalMsgCount1 + dict:size(SDict), AckCounts1 = case AckCount of 0 -> AckCounts; N -> dict:store(SegNum, N, AckCounts) end, - JAckDict1 = - case TransientMsgsAcks of - [] -> JAckDict; - _ -> dict:store(SegNum, TransientMsgsAcks, JAckDict) - end, - %% In each of the following, there should only be - %% one segment that matches the 3rd case. All other + %% In the following, there should only be max one + %% segment that matches the 3rd case. All other %% segments should either be full or empty. There %% could be no partial segments. PublishHandle1 = case PubCount of @@ -567,90 +573,117 @@ read_and_prune_segments(State = #qistate { dir = Dir }) -> _ when PublishHandle == undefined -> {SegNum, undefined, PubCount} end, - DeliverHandle1 = case DelCount of - ?SEGMENT_ENTRIES_COUNT -> DeliverHandle; - 0 -> DeliverHandle; - _ when DeliverHandle == undefined -> - {SegNum, undefined, DelCount} - end, {TotalMsgCount2, StateL #qistate { seg_ack_counts = AckCounts1, - journal_ack_dict = JAckDict1, - publish_handle = PublishHandle1, - deliver_handle = DeliverHandle1 }} + publish_handle = PublishHandle1 }} end, {0, State}, SegNums), {TotalMsgCount, State1}. scatter_journal(TotalMsgCount, State = #qistate { dir = Dir }) -> - {Hdl, State1 = #qistate { journal_ack_dict = JAckDict }} = + {Hdl, State1 = #qistate { journal_del_dict = JDelDict, + journal_ack_dict = JAckDict }} = get_journal_handle(State), - %% ADict may well contain duplicates. However, this is ok, due to - %% the use of sets in replay_journal_acks_to_segment - ADict = load_journal(Hdl, JAckDict), + %% ADict and DDict may well contain duplicates. However, this is + %% ok, because we use sets to eliminate dups before writing to + %% segments + {ADict, DDict} = load_journal(Hdl, JAckDict, JDelDict), State2 = close_handle(journal, State1), - {TotalMsgCount1, State3} = - dict:fold(fun replay_journal_acks_to_segment/3, - {TotalMsgCount, + {TotalMsgCount1, ADict1, State3} = + dict:fold(fun replay_journal_to_segment/3, + {TotalMsgCount, ADict, %% supply empty dict so that when %% replay_journal_acks_to_segment loads segments, %% it gets all msgs, and ignores anything we've %% found in the journal. - State2 #qistate { journal_ack_dict = dict:new() }}, ADict), + State2 #qistate { journal_del_dict = dict:new(), + journal_ack_dict = dict:new() }}, DDict), + {TotalMsgCount2, State4} = + dict:fold(fun replay_journal_acks_to_segment/3, + {TotalMsgCount1, State3}, ADict1), JournalPath = filename:join(Dir, ?ACK_JOURNAL_FILENAME), ok = file:delete(JournalPath), - {TotalMsgCount1, State3}. + {TotalMsgCount2, State4}. -load_journal(Hdl, ADict) -> +load_journal(Hdl, ADict, DDict) -> case file_handle_cache:read(Hdl, ?SEQ_BYTES) of - {ok, <<SeqId:?SEQ_BITS>>} -> - load_journal(Hdl, add_ack_to_ack_dict(SeqId, ADict)); - _ErrOrEoF -> ADict + {ok, <<?DEL_BIT:1, SeqId:?SEQ_BITS>>} -> + load_journal(Hdl, ADict, add_seqid_to_dict(SeqId, DDict)); + {ok, <<?ACK_BIT:1, SeqId:?SEQ_BITS>>} -> + load_journal(Hdl, add_seqid_to_dict(SeqId, ADict), DDict); + _ErrOrEoF -> {ADict, DDict} end. -replay_journal_acks_to_segment(_, [], Acc) -> - Acc; +replay_journal_to_segment(_SegNum, [], {TotalMsgCount, ADict, State}) -> + {TotalMsgCount, ADict, State}; +replay_journal_to_segment(SegNum, Dels, {TotalMsgCount, ADict, State}) -> + {SDict, _PubCount, _AckCount, _HighRelSeq, State1} = + load_segment(SegNum, State), + ValidDels = sets:to_list( + sets:filter( + fun (RelSeq) -> + case dict:find(RelSeq, SDict) of + {ok, {_MsgId, false, _IsPersistent}} -> true; + _ -> false + end + end, sets:from_list(Dels))), + State2 = append_dels_to_segment(SegNum, ValidDels, State1), + Acks = seg_entries_from_dict(SegNum, ADict), + case Acks of + [] -> {TotalMsgCount, ADict, State2}; + _ -> + ADict1 = dict:erase(SegNum, ADict), + {Count, State3} = filter_acks_and_append_to_segment(SegNum, SDict, + Acks, State2), + {TotalMsgCount - Count, ADict1, State3} + end. + +replay_journal_acks_to_segment(_SegNum, [], {TotalMsgCount, State}) -> + {TotalMsgCount, State}; replay_journal_acks_to_segment(SegNum, Acks, {TotalMsgCount, State}) -> - {SDict, _PubCount, _DelCount, _AckCount, _HighRelSeq, State1} = + {SDict, _PubCount, _AckCount, _HighRelSeq, State1} = load_segment(SegNum, State), + {Count, State2} = + filter_acks_and_append_to_segment(SegNum, SDict, Acks, State1), + {TotalMsgCount - Count, State2}. + +filter_acks_and_append_to_segment(SegNum, SDict, Acks, State) -> ValidRelSeqIds = dict:fetch_keys(SDict), ValidAcks = sets:to_list(sets:intersection(sets:from_list(ValidRelSeqIds), sets:from_list(Acks))), - %% ValidAcks will not contain any duplicates at this point. - {TotalMsgCount - length(ValidAcks), - append_acks_to_segment(SegNum, ValidAcks, State1)}. + {length(ValidAcks), append_acks_to_segment(SegNum, ValidAcks, State)}. -drop_and_deliver(SegNum, SDict, CleanShutdown, State) -> - {AckMe, DeliverMe} = +drop_and_deliver(SegNum, SDict, CleanShutdown, + State = #qistate { journal_del_dict = JDelDict, + journal_ack_dict = JAckDict }) -> + {JDelDict1, JAckDict1} = dict:fold( - fun (RelSeq, {MsgId, IsDelivered, true}, {AckMeAcc, DeliverMeAcc}) -> + fun (RelSeq, {MsgId, IsDelivered, true}, {JDelDict2, JAckDict2}) -> %% msg is persistent, keep only if the msg_store has it case {IsDelivered, rabbit_msg_store:contains(MsgId)} of {false, true} when not CleanShutdown -> %% not delivered, but dirty shutdown => mark delivered - {AckMeAcc, [RelSeq | DeliverMeAcc]}; + {add_seqid_to_dict(SegNum, RelSeq, JDelDict2), + JAckDict2}; {_, true} -> - {AckMeAcc, DeliverMeAcc}; + {JDelDict2, JAckDict2}; {true, false} -> - {[RelSeq | AckMeAcc], DeliverMeAcc}; + {JDelDict2, + add_seqid_to_dict(SegNum, RelSeq, JAckDict2)}; {false, false} -> - {[RelSeq | AckMeAcc], [RelSeq | DeliverMeAcc]} + {add_seqid_to_dict(SegNum, RelSeq, JDelDict2), + add_seqid_to_dict(SegNum, RelSeq, JAckDict2)} end; - (RelSeq, {_MsgId, false, false}, {AckMeAcc, DeliverMeAcc}) -> + (RelSeq, {_MsgId, false, false}, {JDelDict2, JAckDict2}) -> %% not persistent and not delivered => deliver and ack it - {[RelSeq | AckMeAcc], [RelSeq | DeliverMeAcc]}; - (RelSeq, {_MsgId, true, false}, {AckMeAcc, DeliverMeAcc}) -> + {add_seqid_to_dict(SegNum, RelSeq, JDelDict2), + add_seqid_to_dict(SegNum, RelSeq, JAckDict2)}; + (RelSeq, {_MsgId, true, false}, {JDelDict2, JAckDict2}) -> %% not persistent but delivered => ack it - {[RelSeq | AckMeAcc], DeliverMeAcc} - end, {[], []}, SDict), - {Hdl, State1} = get_seg_handle(SegNum, State), - ok = case DeliverMe of - [] -> ok; - _ -> file_handle_cache:append( - Hdl, - [ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>> || RelSeq <- DeliverMe ]) - end, - {AckMe, State1}. + {JDelDict2, + add_seqid_to_dict(SegNum, RelSeq, JAckDict2)} + end, {JDelDict, JAckDict}, SDict), + State #qistate { journal_del_dict = JDelDict1, + journal_ack_dict = JAckDict1 }. %%---------------------------------------------------------------------------- @@ -664,35 +697,44 @@ load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls, error -> filelib:is_file(seg_num_to_path(Dir, SegNum)) end, case SegmentExists of - false -> {dict:new(), 0, 0, 0, 0, State}; + false -> {dict:new(), 0, 0, 0, State}; true -> - {Hdl, State1 = #qistate { journal_ack_dict = JAckDict }} = + {Hdl, State1 = #qistate { journal_del_dict = JDelDict, + journal_ack_dict = JAckDict }} = get_seg_handle(SegNum, State), ok = file_handle_cache:position(Hdl, bof), - {SDict, PubCount, DelCount, AckCount, HighRelSeq} = - load_segment_entries(Hdl, dict:new(), 0, 0, 0, 0), - RelSeqs = case dict:find(SegNum, JAckDict) of - {ok, RelSeqs1} -> RelSeqs1; - error -> [] - end, + {SDict, PubCount, AckCount, HighRelSeq} = + load_segment_entries(Hdl, dict:new(), 0, 0, 0), + %% delete ack'd msgs first {SDict1, AckCount1} = lists:foldl(fun (RelSeq, {SDict2, AckCount2}) -> {dict:erase(RelSeq, SDict2), AckCount2 + 1} - end, {SDict, AckCount}, RelSeqs), - {SDict1, PubCount, DelCount, AckCount1, HighRelSeq, State1} + end, {SDict, AckCount}, + seg_entries_from_dict(SegNum, JAckDict)), + %% ensure remaining msgs are delivered as necessary + SDict3 = + lists:foldl( + fun (RelSeq, SDict4) -> + case dict:find(RelSeq, SDict4) of + {ok, {MsgId, false, IsPersistent}} -> + dict:store(RelSeq, {MsgId, true, IsPersistent}, + SDict4); + _ -> SDict4 + end + end, SDict1, seg_entries_from_dict(SegNum, JDelDict)), + + {SDict3, PubCount, AckCount1, HighRelSeq, State1} end. -load_segment_entries(Hdl, SDict, PubCount, DelCount, AckCount, HighRelSeq) -> +load_segment_entries(Hdl, SDict, PubCount, AckCount, HighRelSeq) -> case file_handle_cache:read(Hdl, 1) of {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>} -> {ok, LSB} = file_handle_cache:read( Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1), <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, - {SDict1, DelCount1, AckCount1} = - deliver_or_ack_msg(SDict, DelCount, AckCount, RelSeq), - load_segment_entries( - Hdl, SDict1, PubCount, DelCount1, AckCount1, HighRelSeq); + {SDict1, AckCount1} = deliver_or_ack_msg(SDict, AckCount, RelSeq), + load_segment_entries(Hdl, SDict1, PubCount, AckCount1, HighRelSeq); {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>} -> %% because we specify /binary, and binaries are complete @@ -704,23 +746,21 @@ load_segment_entries(Hdl, SDict, PubCount, DelCount, AckCount, HighRelSeq) -> HighRelSeq1 = lists:max([RelSeq, HighRelSeq]), load_segment_entries( Hdl, dict:store(RelSeq, {MsgId, false, 1 == IsPersistentNum}, - SDict), - PubCount + 1, DelCount, AckCount, HighRelSeq1); - _ErrOrEoF -> {SDict, PubCount, DelCount, AckCount, HighRelSeq} + SDict), PubCount + 1, AckCount, HighRelSeq1); + _ErrOrEoF -> {SDict, PubCount, AckCount, HighRelSeq} end. -deliver_or_ack_msg(SDict, DelCount, AckCount, RelSeq) -> +deliver_or_ack_msg(SDict, AckCount, RelSeq) -> case dict:find(RelSeq, SDict) of {ok, {MsgId, false, IsPersistent}} -> - {dict:store(RelSeq, {MsgId, true, IsPersistent}, SDict), - DelCount + 1, AckCount}; + {dict:store(RelSeq, {MsgId, true, IsPersistent}, SDict), AckCount}; {ok, {_MsgId, true, _IsPersistent}} -> - {dict:erase(RelSeq, SDict), DelCount, AckCount + 1} + {dict:erase(RelSeq, SDict), AckCount + 1} end. %%---------------------------------------------------------------------------- -%% Appending Acks to Segments +%% Appending Acks or Dels to Segments %%---------------------------------------------------------------------------- append_acks_to_segment(SegNum, Acks, @@ -749,13 +789,21 @@ append_acks_to_segment(SegNum, AckCount, Acks, State = #qistate { dir = Dir }) {?SEGMENT_ENTRIES_COUNT, State1}; append_acks_to_segment(SegNum, AckCount, Acks, State) when length(Acks) + AckCount < ?SEGMENT_ENTRIES_COUNT -> - {Hdl, State1} = get_seg_handle(SegNum, State), - {ok, AckCount1} = - lists:foldl( - fun (RelSeq, {ok, AckCount2}) -> - {file_handle_cache:append( - Hdl, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>), AckCount2 + 1} - end, {ok, AckCount}, Acks), + {Count, Hdl, State1} = append_to_segment(SegNum, Acks, State), ok = file_handle_cache:sync(Hdl), - {AckCount1, State1}. + {AckCount + Count, State1}. + +append_dels_to_segment(SegNum, Dels, State) -> + {_Count, _Hdl, State1} = append_to_segment(SegNum, Dels, State), + State1. + +append_to_segment(SegNum, AcksOrDels, State) -> + {Hdl, State1} = get_seg_handle(SegNum, State), + {Count, List} = + lists:foldl(fun (RelSeq, {Count1, Acc}) -> + {Count1 + 1, + [<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>> | Acc]} + end, {0, []}, AcksOrDels), + ok = file_handle_cache:append(Hdl, List), + {Count, Hdl, State1}. |
