diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-12-04 13:02:00 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-12-04 13:02:00 +0000 |
| commit | bf4f2458f688d259179bae41a1e0fcac95553e39 (patch) | |
| tree | 7065240e3916c6b49dc83e77ef581cd893f7e139 /src | |
| parent | 8a435be43cc41ddc626eb04faa399f3e96227fbf (diff) | |
| download | rabbitmq-server-git-bf4f2458f688d259179bae41a1e0fcac95553e39.tar.gz | |
segments now stored in array, not dict
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 188 |
1 files changed, 93 insertions, 95 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index c95c803365..56cbee1e7c 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -172,11 +172,11 @@ init(Name) -> lists:foldl( fun (Seg, {Segments2, CountAcc, DCountAcc}) -> Segment = segment_find_or_new(Seg, Dir, Segments2), - {SegDict, _PubCount, _AckCount, Segment1} = + {SegEntries, _PubCount, _AckCount, Segment1} = load_segment(false, Segment), {Segment2 = #segment { pubs = PubCount, acks = AckCount }, DCountAcc1} = - dict:fold( + array:sparse_foldl( fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack}, {Segment3, DCountAcc2}) -> InMsgStore = rabbit_msg_store:contains(MsgId), @@ -198,7 +198,7 @@ init(Name) -> RelSeq, del, Segment3)), DCountAcc2 + 2} end - end, {Segment1, DCountAcc}, SegDict), + end, {Segment1, DCountAcc}, SegEntries), {segment_store(Segment2, Segments2), CountAcc + PubCount - AckCount, DCountAcc1} end, {Segments, 0, DCount}, AllSegs), @@ -283,18 +283,16 @@ read_segment_entries(InitSeqId, State = #qistate { segments = Segments, dir = Dir }) -> {Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), Segment = segment_find_or_new(Seg, Dir, Segments), - {SegDict, _PubCount, _AckCount, + {SegEntries, _PubCount, _AckCount, Segment1 = #segment { journal_entries = JEntries }} = load_segment(false, Segment), - SegDict1 = journal_plus_segment(JEntries, SegDict), + SegEntries1 = journal_plus_segment(JEntries, SegEntries), %% deliberately sort the list desc, because foldl will reverse it - RelSeqs = rev_sort(dict:fetch_keys(SegDict1)), - {lists:foldl(fun (RelSeq, Acc) -> - {{MsgId, IsPersistent}, IsDelivered, no_ack} = - dict:fetch(RelSeq, SegDict1), - [ {MsgId, reconstruct_seq_id(Seg, RelSeq), - IsPersistent, IsDelivered == del} | Acc ] - end, [], RelSeqs), + {array:sparse_foldr( + fun (RelSeq, {{MsgId, IsPersistent}, IsDelivered, no_ack}, Acc) -> + [ {MsgId, reconstruct_seq_id(Seg, RelSeq), + IsPersistent, IsDelivered == del} | Acc ] + end, [], SegEntries1), State #qistate { segments = segment_store(Segment1, Segments) }}. next_segment_boundary(SeqId) -> @@ -421,9 +419,6 @@ blank_state(QueueName) -> journal_new() -> array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). -rev_sort(List) -> - lists:sort(fun (A, B) -> B < A end, List). - seq_id_to_seg_and_rel_seq_id(SeqId) -> { SeqId div ?SEGMENT_ENTRY_COUNT, SeqId rem ?SEGMENT_ENTRY_COUNT }. @@ -599,8 +594,9 @@ terminate(StoreShutdown, State = %% Does not do any combining with the journal at all. The PubCount %% that comes back is the number of publishes in the segment. The %% number of unacked msgs is PubCount - AckCount. If KeepAcks is -%% false, then dict:size(SegDict) == PubCount - AckCount. If KeepAcks -%% is true, then dict:size(SegDict) == PubCount. +%% false, then array:sparse_size(SegEntries) == PubCount - +%% AckCount. If KeepAcks is true, then array:sparse_size(SegEntries) +%% == PubCount. load_segment(KeepAcks, Segment = #segment { path = Path, handle = SegHdl }) -> SegmentExists = case SegHdl of @@ -613,21 +609,22 @@ load_segment(KeepAcks, true -> {Hdl, Segment1} = get_segment_handle(Segment), {ok, 0} = file_handle_cache:position(Hdl, bof), - {SegDict, PubCount, AckCount} = - load_segment_entries(KeepAcks, Hdl, dict:new(), 0, 0), - {SegDict, PubCount, AckCount, Segment1} + {SegEntries, PubCount, AckCount} = + load_segment_entries(KeepAcks, Hdl, journal_new(), 0, 0), + {SegEntries, PubCount, AckCount, Segment1} end. -load_segment_entries(KeepAcks, Hdl, SegDict, PubCount, AckCount) -> +load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) -> case file_handle_cache:read(Hdl, 1) of {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>} -> {ok, LSB} = file_handle_cache:read( Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1), <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, - {AckCount1, SegDict1} = - deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict), - load_segment_entries(KeepAcks, Hdl, SegDict1, PubCount, AckCount1); + {AckCount1, SegEntries1} = + deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegEntries), + load_segment_entries(KeepAcks, Hdl, SegEntries1, PubCount, + AckCount1); {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>} -> %% because we specify /binary, and binaries are complete @@ -636,23 +633,24 @@ load_segment_entries(KeepAcks, Hdl, SegDict, PubCount, AckCount) -> file_handle_cache:read( Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1), <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, - SegDict1 = - dict:store(RelSeq, - {{MsgId, 1 == IsPersistentNum}, no_del, no_ack}, - SegDict), - load_segment_entries(KeepAcks, Hdl, SegDict1, PubCount+1, AckCount); + SegEntries1 = + array:set(RelSeq, + {{MsgId, 1 == IsPersistentNum}, no_del, no_ack}, + SegEntries), + load_segment_entries(KeepAcks, Hdl, SegEntries1, PubCount + 1, + AckCount); _ErrOrEoF -> - {SegDict, PubCount, AckCount} + {SegEntries, PubCount, AckCount} end. -deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict) -> - case dict:find(RelSeq, SegDict) of - {ok, {PubRecord, no_del, no_ack}} -> - {AckCount, dict:store(RelSeq, {PubRecord, del, no_ack}, SegDict)}; - {ok, {PubRecord, del, no_ack}} when KeepAcks -> - {AckCount + 1, dict:store(RelSeq, {PubRecord, del, ack}, SegDict)}; - {ok, {_PubRecord, del, no_ack}} -> - {AckCount + 1, dict:erase(RelSeq, SegDict)} +deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegEntries) -> + case array:get(RelSeq, SegEntries) of + {PubRecord, no_del, no_ack} -> + {AckCount, array:set(RelSeq, {PubRecord, del, no_ack}, SegEntries)}; + {PubRecord, del, no_ack} when KeepAcks -> + {AckCount + 1, array:set(RelSeq, {PubRecord, del, ack}, SegEntries)}; + {_PubRecord, del, no_ack} -> + {AckCount + 1, array:reset(RelSeq, SegEntries)} end. %% Loading Journal. This isn't idempotent and will mess up the counts @@ -671,13 +669,13 @@ load_journal(State) -> %% We want to keep acks in so that we can remove %% them if duplicates are in the journal. The counts %% here are purely from the segment itself. - {SegDict, PubCountInSeg, AckCountInSeg, Segment1} = + {SegEntries, PubCountInSeg, AckCountInSeg, Segment1} = load_segment(true, Segment), %% Removed counts here are the number of pubs and %% acks that are duplicates - i.e. found in both the %% segment and journal. {JEntries1, PubsRemoved, AcksRemoved} = - journal_minus_segment(JEntries, SegDict), + journal_minus_segment(JEntries, SegEntries), PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved, AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved, Segment1 #segment { journal_entries = JEntries1, @@ -759,136 +757,136 @@ add_to_journal(RelSeq, Action, SegJArray) -> %% holding for that segment in memory. There must be no %% duplicates. Used when providing segment entries to the variable %% queue. -journal_plus_segment(JEntries, SegDict) -> +journal_plus_segment(JEntries, SegEntries) -> array:sparse_foldl( - fun (RelSeq, JObj, SegDictOut) -> - SegEntry = case dict:find(RelSeq, SegDictOut) of - error -> not_found; - {ok, SObj = {_, _, _}} -> SObj + fun (RelSeq, JObj, SegEntriesOut) -> + SegEntry = case array:get(RelSeq, SegEntriesOut) of + undefined -> not_found; + SObj = {_, _, _} -> SObj end, - journal_plus_segment(JObj, SegEntry, RelSeq, SegDictOut) - end, SegDict, JEntries). + journal_plus_segment(JObj, SegEntry, RelSeq, SegEntriesOut) + end, SegEntries, JEntries). -%% Here, the OutDict is the SegDict which we may be adding to (for +%% Here, the Out is the Seg Array which we may be adding to (for %% items only in the journal), modifying (bits in both), or erasing %% from (ack in journal, not segment). journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack}, not_found, - RelSeq, OutDict) -> - dict:store(RelSeq, Obj, OutDict); + RelSeq, Out) -> + array:set(RelSeq, Obj, Out); journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack}, not_found, - RelSeq, OutDict) -> - dict:store(RelSeq, Obj, OutDict); + RelSeq, Out) -> + array:set(RelSeq, Obj, Out); journal_plus_segment({{_MsgId, _IsPersistent}, del, ack}, not_found, - RelSeq, OutDict) -> - dict:erase(RelSeq, OutDict); + RelSeq, Out) -> + array:reset(RelSeq, Out); journal_plus_segment({no_pub, del, no_ack}, {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutDict) -> - dict:store(RelSeq, {PubRecord, del, no_ack}, OutDict); + RelSeq, Out) -> + array:set(RelSeq, {PubRecord, del, no_ack}, Out); journal_plus_segment({no_pub, del, ack}, {{_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutDict) -> - dict:erase(RelSeq, OutDict); + RelSeq, Out) -> + array:reset(RelSeq, Out); journal_plus_segment({no_pub, no_del, ack}, {{_MsgId, _IsPersistent}, del, no_ack}, - RelSeq, OutDict) -> - dict:erase(RelSeq, OutDict). + RelSeq, Out) -> + array:reset(RelSeq, Out). %% Remove from the journal entries for a segment, items that are %% duplicates of entries found in the segment itself. Used on start up %% to clean up the journal. -journal_minus_segment(JEntries, SegDict) -> +journal_minus_segment(JEntries, SegEntries) -> array:sparse_foldl( fun (RelSeq, JObj, {JEntriesOut, PubsRemoved, AcksRemoved}) -> - SegEntry = case dict:find(RelSeq, SegDict) of - error -> not_found; - {ok, SObj = {_, _, _}} -> SObj + SegEntry = case array:get(RelSeq, SegEntries) of + undefined -> not_found; + SObj = {_, _, _} -> SObj end, journal_minus_segment(JObj, SegEntry, RelSeq, JEntriesOut, PubsRemoved, AcksRemoved) end, {journal_new(), 0, 0}, JEntries). -%% Here, the OutArray is a fresh journal that we're filling with valid +%% Here, the Out is a fresh journal that we're filling with valid %% entries. PubsRemoved and AcksRemoved only get increased when the a %% publish or ack is in both the journal and the segment. %% Both the same. Must be at least the publish journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, no_ack}, - _RelSeq, OutArray, PubsRemoved, AcksRemoved) -> - {OutArray, PubsRemoved + 1, AcksRemoved}; + _RelSeq, Out, PubsRemoved, AcksRemoved) -> + {Out, PubsRemoved + 1, AcksRemoved}; journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, ack}, - _RelSeq, OutArray, PubsRemoved, AcksRemoved) -> - {OutArray, PubsRemoved + 1, AcksRemoved + 1}; + _RelSeq, Out, PubsRemoved, AcksRemoved) -> + {Out, PubsRemoved + 1, AcksRemoved + 1}; %% Just publish in journal journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack}, not_found, - RelSeq, OutArray, PubsRemoved, AcksRemoved) -> - {array:set(RelSeq, Obj, OutArray), PubsRemoved, AcksRemoved}; + RelSeq, Out, PubsRemoved, AcksRemoved) -> + {array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved}; %% Just deliver in journal journal_minus_segment(Obj = {no_pub, del, no_ack}, {{_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutArray, PubsRemoved, AcksRemoved) -> - {array:set(RelSeq, Obj, OutArray), PubsRemoved, AcksRemoved}; + RelSeq, Out, PubsRemoved, AcksRemoved) -> + {array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved}; journal_minus_segment({no_pub, del, no_ack}, {{_MsgId, _IsPersistent}, del, no_ack}, - _RelSeq, OutArray, PubsRemoved, AcksRemoved) -> - {OutArray, PubsRemoved, AcksRemoved}; + _RelSeq, Out, PubsRemoved, AcksRemoved) -> + {Out, PubsRemoved, AcksRemoved}; %% Just ack in journal journal_minus_segment(Obj = {no_pub, no_del, ack}, {{_MsgId, _IsPersistent}, del, no_ack}, - RelSeq, OutArray, PubsRemoved, AcksRemoved) -> - {array:set(RelSeq, Obj, OutArray), PubsRemoved, AcksRemoved}; + RelSeq, Out, PubsRemoved, AcksRemoved) -> + {array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved}; journal_minus_segment({no_pub, no_del, ack}, {{_MsgId, _IsPersistent}, del, ack}, - _RelSeq, OutArray, PubsRemoved, AcksRemoved) -> - {OutArray, PubsRemoved, AcksRemoved}; + _RelSeq, Out, PubsRemoved, AcksRemoved) -> + {Out, PubsRemoved, AcksRemoved}; %% Publish and deliver in journal journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack}, not_found, - RelSeq, OutArray, PubsRemoved, AcksRemoved) -> - {array:set(RelSeq, Obj, OutArray), PubsRemoved, AcksRemoved}; + RelSeq, Out, PubsRemoved, AcksRemoved) -> + {array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved}; journal_minus_segment({PubRecord, del, no_ack}, {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutArray, PubsRemoved, AcksRemoved) -> - {array:set(RelSeq, {no_pub, del, no_ack}, OutArray), + RelSeq, Out, PubsRemoved, AcksRemoved) -> + {array:set(RelSeq, {no_pub, del, no_ack}, Out), PubsRemoved + 1, AcksRemoved}; %% Deliver and ack in journal journal_minus_segment(Obj = {no_pub, del, ack}, {{_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutArray, PubsRemoved, AcksRemoved) -> - {array:set(RelSeq, Obj, OutArray), PubsRemoved, AcksRemoved}; + RelSeq, Out, PubsRemoved, AcksRemoved) -> + {array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved}; journal_minus_segment({no_pub, del, ack}, {{_MsgId, _IsPersistent}, del, no_ack}, - RelSeq, OutArray, PubsRemoved, AcksRemoved) -> - {array:set(RelSeq, {no_pub, no_del, ack}, OutArray), + RelSeq, Out, PubsRemoved, AcksRemoved) -> + {array:set(RelSeq, {no_pub, no_del, ack}, Out), PubsRemoved, AcksRemoved}; journal_minus_segment({no_pub, del, ack}, {{_MsgId, _IsPersistent}, del, ack}, - _RelSeq, OutArray, PubsRemoved, AcksRemoved) -> - {OutArray, PubsRemoved, AcksRemoved + 1}; + _RelSeq, Out, PubsRemoved, AcksRemoved) -> + {Out, PubsRemoved, AcksRemoved + 1}; %% Publish, deliver and ack in journal journal_minus_segment({{_MsgId, _IsPersistent}, del, ack}, not_found, - _RelSeq, OutArray, PubsRemoved, AcksRemoved) -> - {OutArray, PubsRemoved, AcksRemoved}; + _RelSeq, Out, PubsRemoved, AcksRemoved) -> + {Out, PubsRemoved, AcksRemoved}; journal_minus_segment({PubRecord, del, ack}, {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutArray, PubsRemoved, AcksRemoved) -> - {array:set(RelSeq, {no_pub, del, ack}, OutArray), + RelSeq, Out, PubsRemoved, AcksRemoved) -> + {array:set(RelSeq, {no_pub, del, ack}, Out), PubsRemoved + 1, AcksRemoved}; journal_minus_segment({PubRecord, del, ack}, {PubRecord = {_MsgId, _IsPersistent}, del, no_ack}, - RelSeq, OutArray, PubsRemoved, AcksRemoved) -> - {array:set(RelSeq, {no_pub, no_del, ack}, OutArray), + RelSeq, Out, PubsRemoved, AcksRemoved) -> + {array:set(RelSeq, {no_pub, no_del, ack}, Out), PubsRemoved + 1, AcksRemoved}. |
