diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-05-19 12:40:00 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-05-19 12:40:00 +0100 |
| commit | b366bc7f483cc54e53a7bf19a580d0b9c0868db3 (patch) | |
| tree | 2c0c6193479be4f3bfcea62894e7e59c41a4b10b /src | |
| parent | 84c899894fd06f0929220593cfcbf8458c5bd365 (diff) | |
| download | rabbitmq-server-git-b366bc7f483cc54e53a7bf19a580d0b9c0868db3.tar.gz | |
Drop PubCount and AckCount in favour of UnackedCount
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 169 |
1 files changed, 78 insertions, 91 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 956b379704..67bf9f5236 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -159,7 +159,7 @@ -record(qistate, { dir, segments, journal_handle, dirty_count }). --record(segment, { pubs, acks, handle, journal_entries, path, num }). +-record(segment, { unacked, handle, journal_entries, path, num }). -include("rabbit.hrl"). @@ -169,8 +169,7 @@ -type(hdl() :: ('undefined' | any())). -type(segment() :: ('undefined' | - #segment { pubs :: non_neg_integer(), - acks :: non_neg_integer(), + #segment { unacked :: non_neg_integer(), handle :: hdl(), journal_entries :: array(), path :: file_path(), @@ -233,13 +232,12 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) -> %% number of unacked messages. lists:foldl( fun (Seg, {Segments2, CountAcc}) -> - Segment = #segment { pubs = PubCount, - acks = AckCount } = + Segment = #segment { unacked = UnackedCount } = recover_segment( ContainsCheckFun, CleanShutdown, segment_find_or_new(Seg, Dir, Segments2)), {segment_store(Segment, Segments2), - CountAcc + PubCount - AckCount} + CountAcc + UnackedCount} end, {Segments, 0}, all_segment_nums(State1)); true -> %% At this stage, we will only know about files that @@ -253,11 +251,11 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) -> fun (Seg, SegmentsN) -> case {segment_find(Seg, SegmentsN), dict:find(Seg, SegmentDictTerms)} of - {error, {ok, {PubCount, AckCount}}} -> + {error, {ok, UnackedCount}} -> Segment = segment_new(Seg, Dir), segment_store( - Segment #segment { pubs = PubCount, - acks = AckCount }, + Segment #segment { + unacked = UnackedCount }, SegmentsN); _ -> SegmentsN @@ -350,9 +348,9 @@ read(Start, End, State = #qistate { segments = Segments, false -> ?SEGMENT_ENTRY_COUNT end, Segment = segment_find_or_new(StartSeg, Dir, Segments), - {SegEntries, _PubCount, _AckCount, Segment1} = load_segment(false, Segment), + {SegEntries, _UnackedCount, Segment1} = load_segment(false, Segment), #segment { journal_entries = JEntries } = Segment1, - {SegEntries1, _PubCountDelta, _AckCountDelta} = + {SegEntries1, _UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), {array:sparse_foldr( fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) @@ -458,13 +456,12 @@ terminate(StoreShutdown, Terms, State = end, SegTerms = segment_fold( fun (Seg, #segment { handle = Hdl, - pubs = PubCount, - acks = AckCount }, SegTermsAcc) -> + unacked = UnackedCount }, SegTermsAcc) -> ok = case Hdl of undefined -> ok; _ -> file_handle_cache:close(Hdl) end, - [{Seg, {PubCount, AckCount}} | SegTermsAcc] + [{Seg, UnackedCount} | SegTermsAcc] end, [], Segments), case StoreShutdown of true -> store_clean_shutdown([{segments, SegTerms} | Terms], Dir); @@ -473,17 +470,16 @@ terminate(StoreShutdown, Terms, State = State #qistate { journal_handle = undefined, segments = undefined }. recover_segment(ContainsCheckFun, CleanShutdown, Segment) -> - {SegEntries, PubCount, AckCount, Segment1} = load_segment(false, Segment), + {SegEntries, UnackedCount, Segment1} = load_segment(false, Segment), #segment { journal_entries = JEntries } = Segment1, - {SegEntries1, PubCountDelta, AckCountDelta} = + {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment2) -> recover_message(ContainsCheckFun(Guid), CleanShutdown, Del, RelSeq, Segment2) end, - Segment1 #segment { pubs = PubCount + PubCountDelta, - acks = AckCount + AckCountDelta}, + Segment1 #segment { unacked = UnackedCount + UnackedCountDelta }, SegEntries1). recover_message( true, true, _Del, _RelSeq, Segment) -> @@ -559,14 +555,13 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount, add_to_journal(RelSeq, Action, Segment = #segment { journal_entries = JEntries, - pubs = PubCount, - acks = AckCount }) -> + unacked = UnackedCount }) -> Segment1 = Segment #segment { journal_entries = add_to_journal(RelSeq, Action, JEntries) }, case Action of del -> Segment1; - ack -> Segment1 #segment { acks = AckCount + 1 }; - ?PUB -> Segment1 #segment { pubs = PubCount + 1 } + ack -> Segment1 #segment { unacked = UnackedCount - 1 }; + ?PUB -> Segment1 #segment { unacked = UnackedCount + 1 } end; add_to_journal(RelSeq, Action, JEntries) -> @@ -596,14 +591,14 @@ flush_journal(State = #qistate { segments = Segments }) -> Segments1 = segment_fold( fun (_Seg, #segment { journal_entries = JEntries, - pubs = PubCount, - acks = AckCount } = Segment, SegmentsN) -> - case PubCount =:= AckCount of - true -> ok = delete_segment(Segment), - SegmentsN; - false -> segment_store( - append_journal_to_segment(Segment, JEntries), - SegmentsN) + unacked = UnackedCount } = Segment, + SegmentsN) -> + case UnackedCount of + 0 -> ok = delete_segment(Segment), + SegmentsN; + _ -> segment_store( + append_journal_to_segment(Segment, JEntries), + SegmentsN) end end, segments_new(), Segments), {JournalHdl, State1} = @@ -641,23 +636,21 @@ load_journal(State) -> Segments1 = segment_map( fun (_Seg, Segment = #segment { journal_entries = JEntries, - pubs = PubCountInJournal, - acks = AckCountInJournal }) -> + unacked = UnackedCountInJournal }) -> %% We want to keep ack'd entries in so that we can %% remove them if duplicates are in the journal. The %% counts here are purely from the segment itself. - {SegEntries, PubCountInSeg, AckCountInSeg, Segment1} = + {SegEntries, UnackedCountInSeg, Segment1} = load_segment(true, Segment), %% Removed counts here are the number of pubs and %% acks that are duplicates - i.e. found in both the %% segment and journal. - {JEntries1, PubsRemoved, AcksRemoved} = + {JEntries1, UnackedCountDuplicates} = journal_minus_segment(JEntries, SegEntries), - PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved, - AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved, Segment1 #segment { journal_entries = JEntries1, - pubs = PubCount1, - acks = AckCount1 } + unacked = UnackedCountInJournal + + UnackedCountInSeg - + UnackedCountDuplicates } end, Segments), State2 #qistate { segments = Segments1 }. @@ -731,8 +724,7 @@ get_segment_handle(Segment = #segment { handle = Hdl }) -> {Hdl, Segment}. segment_new(Seg, Dir) -> - #segment { pubs = 0, - acks = 0, + #segment { unacked = 0, handle = undefined, journal_entries = array_new(), path = seg_num_to_path(Dir, Seg), @@ -824,15 +816,15 @@ load_segment(KeepAcked, Segment = #segment { path = Path, handle = SegHdl }) -> _ -> true end, case SegmentExists of - false -> {array_new(), 0, 0, Segment}; + false -> {array_new(), 0, Segment}; true -> {Hdl, Segment1} = get_segment_handle(Segment), {ok, 0} = file_handle_cache:position(Hdl, bof), - {SegEntries, PubCount, AckCount} = - load_segment_entries(KeepAcked, Hdl, array_new(), 0, 0), - {SegEntries, PubCount, AckCount, Segment1} + {SegEntries, UnackedCount} = + load_segment_entries(KeepAcked, Hdl, array_new(), 0), + {SegEntries, UnackedCount, Segment1} end. -load_segment_entries(KeepAcked, Hdl, SegEntries, PubCount, AckCount) -> +load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> @@ -842,22 +834,22 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, PubCount, AckCount) -> Obj = {{Guid, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), load_segment_entries(KeepAcked, Hdl, SegEntries1, - PubCount + 1, AckCount); + UnackedCount + 1); {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>} -> - {AckCountDelta, SegEntries1} = + {UnackedCountDelta, SegEntries1} = case array:get(RelSeq, SegEntries) of {Pub, no_del, no_ack} -> - {0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)}; + { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)}; {Pub, del, no_ack} when KeepAcked -> - {1, array:set(RelSeq, {Pub, del, ack}, SegEntries)}; + {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)}; {_Pub, del, no_ack} -> - {1, array:reset(RelSeq, SegEntries)} + {-1, array:reset(RelSeq, SegEntries)} end, load_segment_entries(KeepAcked, Hdl, SegEntries1, - PubCount, AckCount + AckCountDelta); + UnackedCount + UnackedCountDelta); _ErrOrEoF -> - {SegEntries, PubCount, AckCount} + {SegEntries, UnackedCount} end. array_new() -> @@ -874,104 +866,99 @@ bool_to_int(false) -> 0. %% holding for that segment in memory. There must be no duplicates. segment_plus_journal(SegEntries, JEntries) -> array:sparse_foldl( - fun (RelSeq, JObj, {SegEntriesOut, PubsAdded, AcksAdded}) -> + fun (RelSeq, JObj, {SegEntriesOut, AdditionalUnacked}) -> SegEntry = array:get(RelSeq, SegEntriesOut), - {Obj, PubsAddedDelta, AcksAddedDelta} = + {Obj, AdditionalUnackedDelta} = segment_plus_journal1(SegEntry, JObj), {case Obj of undefined -> array:reset(RelSeq, SegEntriesOut); _ -> array:set(RelSeq, Obj, SegEntriesOut) end, - PubsAdded + PubsAddedDelta, - AcksAdded + AcksAddedDelta} - end, {SegEntries, 0, 0}, JEntries). + AdditionalUnacked + AdditionalUnackedDelta} + end, {SegEntries, 0}, JEntries). -%% Here, the result is a triple with the first element containing the +%% Here, the result is a tuple with the first element containing the %% item which we may be adding to (for items only in the journal), %% modifying in (bits in both), or, when returning 'undefined', %% erasing from (ack in journal, not segment) the segment array. The -%% other two elements of the triple are the deltas for PubsAdded and -%% AcksAdded - these get increased when a publish or ack is found in -%% the journal. +%% other element of the tuple is the delta for AdditionalUnacked. segment_plus_journal1(undefined, {?PUB, no_del, no_ack} = Obj) -> - {Obj, 1, 0}; + {Obj, 1}; segment_plus_journal1(undefined, {?PUB, del, no_ack} = Obj) -> - {Obj, 1, 0}; + {Obj, 1}; segment_plus_journal1(undefined, {?PUB, del, ack}) -> - {undefined, 1, 1}; + {undefined, 0}; segment_plus_journal1({?PUB = Pub, no_del, no_ack}, {no_pub, del, no_ack}) -> - {{Pub, del, no_ack}, 0, 0}; + {{Pub, del, no_ack}, 0}; segment_plus_journal1({?PUB, no_del, no_ack}, {no_pub, del, ack}) -> - {undefined, 0, 1}; + {undefined, -1}; segment_plus_journal1({?PUB, del, no_ack}, {no_pub, no_del, ack}) -> - {undefined, 0, 1}. + {undefined, -1}. %% Remove from the journal entries for a segment, items that are %% duplicates of entries found in the segment itself. Used on start up %% to clean up the journal. journal_minus_segment(JEntries, SegEntries) -> array:sparse_foldl( - fun (RelSeq, JObj, {JEntriesOut, PubsRemoved, AcksRemoved}) -> + fun (RelSeq, JObj, {JEntriesOut, UnackedRemoved}) -> SegEntry = array:get(RelSeq, SegEntries), - {Obj, PubsRemovedDelta, AcksRemovedDelta} = + {Obj, UnackedRemovedDelta} = journal_minus_segment1(JObj, SegEntry), {case Obj of keep -> JEntriesOut; undefined -> array:reset(RelSeq, JEntriesOut); _ -> array:set(RelSeq, Obj, JEntriesOut) end, - PubsRemoved + PubsRemovedDelta, - AcksRemoved + AcksRemovedDelta} - end, {JEntries, 0, 0}, JEntries). + UnackedRemoved + UnackedRemovedDelta} + end, {JEntries, 0}, JEntries). -%% Here, the result is a triple with the first element containing the +%% Here, the result is a tuple with the first element containing the %% item we are adding to or modifying in the (initially fresh) journal %% array. If the item is 'undefined' we leave the journal array -%% alone. The other two elements of the triple are the deltas for -%% PubsRemoved and AcksRemoved - these only get increased when a -%% publish or ack is in both the journal and the segment. +%% alone. The other element of the tuple is the deltas for +%% UnackedRemoved. %% Both the same. Must be at least the publish journal_minus_segment1({?PUB, _Del, no_ack} = Obj, Obj) -> - {undefined, 1, 0}; + {undefined, 1}; journal_minus_segment1({?PUB, _Del, ack} = Obj, Obj) -> - {undefined, 1, 1}; + {undefined, 0}; %% Just publish in journal journal_minus_segment1({?PUB, no_del, no_ack}, undefined) -> - {keep, 0, 0}; + {keep, 0}; %% Publish and deliver in journal journal_minus_segment1({?PUB, del, no_ack}, undefined) -> - {keep, 0, 0}; + {keep, 0}; journal_minus_segment1({?PUB = Pub, del, no_ack}, {Pub, no_del, no_ack}) -> - {{no_pub, del, no_ack}, 1, 0}; + {{no_pub, del, no_ack}, 1}; %% Publish, deliver and ack in journal journal_minus_segment1({?PUB, del, ack}, undefined) -> - {keep, 0, 0}; + {keep, 0}; journal_minus_segment1({?PUB = Pub, del, ack}, {Pub, no_del, no_ack}) -> - {{no_pub, del, ack}, 1, 0}; + {{no_pub, del, ack}, 1}; journal_minus_segment1({?PUB = Pub, del, ack}, {Pub, del, no_ack}) -> - {{no_pub, no_del, ack}, 1, 0}; + {{no_pub, no_del, ack}, 1}; %% Just deliver in journal journal_minus_segment1({no_pub, del, no_ack}, {?PUB, no_del, no_ack}) -> - {keep, 0, 0}; + {keep, 0}; journal_minus_segment1({no_pub, del, no_ack}, {?PUB, del, no_ack}) -> - {undefined, 0, 0}; + {undefined, 0}; %% Just ack in journal journal_minus_segment1({no_pub, no_del, ack}, {?PUB, del, no_ack}) -> - {keep, 0, 0}; + {keep, 0}; journal_minus_segment1({no_pub, no_del, ack}, {?PUB, del, ack}) -> - {undefined, 0, 1}; + {undefined, -1}; %% Deliver and ack in journal journal_minus_segment1({no_pub, del, ack}, {?PUB, no_del, no_ack}) -> - {keep, 0, 0}; + {keep, 0}; journal_minus_segment1({no_pub, del, ack}, {?PUB, del, no_ack}) -> - {{no_pub, no_del, ack}, 0, 0}; + {{no_pub, no_del, ack}, 0}; journal_minus_segment1({no_pub, del, ack}, {?PUB, del, ack}) -> - {undefined, 0, 1}. + {undefined, -1}. |
