summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-12-04 13:02:00 +0000
committerMatthew Sackman <matthew@lshift.net>2009-12-04 13:02:00 +0000
commitbf4f2458f688d259179bae41a1e0fcac95553e39 (patch)
tree7065240e3916c6b49dc83e77ef581cd893f7e139 /src
parent8a435be43cc41ddc626eb04faa399f3e96227fbf (diff)
downloadrabbitmq-server-git-bf4f2458f688d259179bae41a1e0fcac95553e39.tar.gz
segments now stored in array, not dict
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl188
1 files changed, 93 insertions, 95 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index c95c803365..56cbee1e7c 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -172,11 +172,11 @@ init(Name) ->
lists:foldl(
fun (Seg, {Segments2, CountAcc, DCountAcc}) ->
Segment = segment_find_or_new(Seg, Dir, Segments2),
- {SegDict, _PubCount, _AckCount, Segment1} =
+ {SegEntries, _PubCount, _AckCount, Segment1} =
load_segment(false, Segment),
{Segment2 = #segment { pubs = PubCount, acks = AckCount },
DCountAcc1} =
- dict:fold(
+ array:sparse_foldl(
fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack},
{Segment3, DCountAcc2}) ->
InMsgStore = rabbit_msg_store:contains(MsgId),
@@ -198,7 +198,7 @@ init(Name) ->
RelSeq, del, Segment3)),
DCountAcc2 + 2}
end
- end, {Segment1, DCountAcc}, SegDict),
+ end, {Segment1, DCountAcc}, SegEntries),
{segment_store(Segment2, Segments2),
CountAcc + PubCount - AckCount, DCountAcc1}
end, {Segments, 0, DCount}, AllSegs),
@@ -283,18 +283,16 @@ read_segment_entries(InitSeqId, State = #qistate { segments = Segments,
dir = Dir }) ->
{Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId),
Segment = segment_find_or_new(Seg, Dir, Segments),
- {SegDict, _PubCount, _AckCount,
+ {SegEntries, _PubCount, _AckCount,
Segment1 = #segment { journal_entries = JEntries }} =
load_segment(false, Segment),
- SegDict1 = journal_plus_segment(JEntries, SegDict),
+ SegEntries1 = journal_plus_segment(JEntries, SegEntries),
%% deliberately sort the list desc, because foldl will reverse it
- RelSeqs = rev_sort(dict:fetch_keys(SegDict1)),
- {lists:foldl(fun (RelSeq, Acc) ->
- {{MsgId, IsPersistent}, IsDelivered, no_ack} =
- dict:fetch(RelSeq, SegDict1),
- [ {MsgId, reconstruct_seq_id(Seg, RelSeq),
- IsPersistent, IsDelivered == del} | Acc ]
- end, [], RelSeqs),
+ {array:sparse_foldr(
+ fun (RelSeq, {{MsgId, IsPersistent}, IsDelivered, no_ack}, Acc) ->
+ [ {MsgId, reconstruct_seq_id(Seg, RelSeq),
+ IsPersistent, IsDelivered == del} | Acc ]
+ end, [], SegEntries1),
State #qistate { segments = segment_store(Segment1, Segments) }}.
next_segment_boundary(SeqId) ->
@@ -421,9 +419,6 @@ blank_state(QueueName) ->
journal_new() ->
array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).
-rev_sort(List) ->
- lists:sort(fun (A, B) -> B < A end, List).
-
seq_id_to_seg_and_rel_seq_id(SeqId) ->
{ SeqId div ?SEGMENT_ENTRY_COUNT, SeqId rem ?SEGMENT_ENTRY_COUNT }.
@@ -599,8 +594,9 @@ terminate(StoreShutdown, State =
%% Does not do any combining with the journal at all. The PubCount
%% that comes back is the number of publishes in the segment. The
%% number of unacked msgs is PubCount - AckCount. If KeepAcks is
-%% false, then dict:size(SegDict) == PubCount - AckCount. If KeepAcks
-%% is true, then dict:size(SegDict) == PubCount.
+%% false, then array:sparse_size(SegEntries) == PubCount -
+%% AckCount. If KeepAcks is true, then array:sparse_size(SegEntries)
+%% == PubCount.
load_segment(KeepAcks,
Segment = #segment { path = Path, handle = SegHdl }) ->
SegmentExists = case SegHdl of
@@ -613,21 +609,22 @@ load_segment(KeepAcks,
true ->
{Hdl, Segment1} = get_segment_handle(Segment),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- {SegDict, PubCount, AckCount} =
- load_segment_entries(KeepAcks, Hdl, dict:new(), 0, 0),
- {SegDict, PubCount, AckCount, Segment1}
+ {SegEntries, PubCount, AckCount} =
+ load_segment_entries(KeepAcks, Hdl, journal_new(), 0, 0),
+ {SegEntries, PubCount, AckCount, Segment1}
end.
-load_segment_entries(KeepAcks, Hdl, SegDict, PubCount, AckCount) ->
+load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) ->
case file_handle_cache:read(Hdl, 1) of
{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>} ->
{ok, LSB} = file_handle_cache:read(
Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1),
<<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>,
- {AckCount1, SegDict1} =
- deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict),
- load_segment_entries(KeepAcks, Hdl, SegDict1, PubCount, AckCount1);
+ {AckCount1, SegEntries1} =
+ deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegEntries),
+ load_segment_entries(KeepAcks, Hdl, SegEntries1, PubCount,
+ AckCount1);
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>} ->
%% because we specify /binary, and binaries are complete
@@ -636,23 +633,24 @@ load_segment_entries(KeepAcks, Hdl, SegDict, PubCount, AckCount) ->
file_handle_cache:read(
Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1),
<<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>,
- SegDict1 =
- dict:store(RelSeq,
- {{MsgId, 1 == IsPersistentNum}, no_del, no_ack},
- SegDict),
- load_segment_entries(KeepAcks, Hdl, SegDict1, PubCount+1, AckCount);
+ SegEntries1 =
+ array:set(RelSeq,
+ {{MsgId, 1 == IsPersistentNum}, no_del, no_ack},
+ SegEntries),
+ load_segment_entries(KeepAcks, Hdl, SegEntries1, PubCount + 1,
+ AckCount);
_ErrOrEoF ->
- {SegDict, PubCount, AckCount}
+ {SegEntries, PubCount, AckCount}
end.
-deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict) ->
- case dict:find(RelSeq, SegDict) of
- {ok, {PubRecord, no_del, no_ack}} ->
- {AckCount, dict:store(RelSeq, {PubRecord, del, no_ack}, SegDict)};
- {ok, {PubRecord, del, no_ack}} when KeepAcks ->
- {AckCount + 1, dict:store(RelSeq, {PubRecord, del, ack}, SegDict)};
- {ok, {_PubRecord, del, no_ack}} ->
- {AckCount + 1, dict:erase(RelSeq, SegDict)}
+deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegEntries) ->
+ case array:get(RelSeq, SegEntries) of
+ {PubRecord, no_del, no_ack} ->
+ {AckCount, array:set(RelSeq, {PubRecord, del, no_ack}, SegEntries)};
+ {PubRecord, del, no_ack} when KeepAcks ->
+ {AckCount + 1, array:set(RelSeq, {PubRecord, del, ack}, SegEntries)};
+ {_PubRecord, del, no_ack} ->
+ {AckCount + 1, array:reset(RelSeq, SegEntries)}
end.
%% Loading Journal. This isn't idempotent and will mess up the counts
@@ -671,13 +669,13 @@ load_journal(State) ->
%% We want to keep acks in so that we can remove
%% them if duplicates are in the journal. The counts
%% here are purely from the segment itself.
- {SegDict, PubCountInSeg, AckCountInSeg, Segment1} =
+ {SegEntries, PubCountInSeg, AckCountInSeg, Segment1} =
load_segment(true, Segment),
%% Removed counts here are the number of pubs and
%% acks that are duplicates - i.e. found in both the
%% segment and journal.
{JEntries1, PubsRemoved, AcksRemoved} =
- journal_minus_segment(JEntries, SegDict),
+ journal_minus_segment(JEntries, SegEntries),
PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved,
AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved,
Segment1 #segment { journal_entries = JEntries1,
@@ -759,136 +757,136 @@ add_to_journal(RelSeq, Action, SegJArray) ->
%% holding for that segment in memory. There must be no
%% duplicates. Used when providing segment entries to the variable
%% queue.
-journal_plus_segment(JEntries, SegDict) ->
+journal_plus_segment(JEntries, SegEntries) ->
array:sparse_foldl(
- fun (RelSeq, JObj, SegDictOut) ->
- SegEntry = case dict:find(RelSeq, SegDictOut) of
- error -> not_found;
- {ok, SObj = {_, _, _}} -> SObj
+ fun (RelSeq, JObj, SegEntriesOut) ->
+ SegEntry = case array:get(RelSeq, SegEntriesOut) of
+ undefined -> not_found;
+ SObj = {_, _, _} -> SObj
end,
- journal_plus_segment(JObj, SegEntry, RelSeq, SegDictOut)
- end, SegDict, JEntries).
+ journal_plus_segment(JObj, SegEntry, RelSeq, SegEntriesOut)
+ end, SegEntries, JEntries).
-%% Here, the OutDict is the SegDict which we may be adding to (for
+%% Here, the Out is the Seg Array which we may be adding to (for
%% items only in the journal), modifying (bits in both), or erasing
%% from (ack in journal, not segment).
journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack},
not_found,
- RelSeq, OutDict) ->
- dict:store(RelSeq, Obj, OutDict);
+ RelSeq, Out) ->
+ array:set(RelSeq, Obj, Out);
journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack},
not_found,
- RelSeq, OutDict) ->
- dict:store(RelSeq, Obj, OutDict);
+ RelSeq, Out) ->
+ array:set(RelSeq, Obj, Out);
journal_plus_segment({{_MsgId, _IsPersistent}, del, ack},
not_found,
- RelSeq, OutDict) ->
- dict:erase(RelSeq, OutDict);
+ RelSeq, Out) ->
+ array:reset(RelSeq, Out);
journal_plus_segment({no_pub, del, no_ack},
{PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack},
- RelSeq, OutDict) ->
- dict:store(RelSeq, {PubRecord, del, no_ack}, OutDict);
+ RelSeq, Out) ->
+ array:set(RelSeq, {PubRecord, del, no_ack}, Out);
journal_plus_segment({no_pub, del, ack},
{{_MsgId, _IsPersistent}, no_del, no_ack},
- RelSeq, OutDict) ->
- dict:erase(RelSeq, OutDict);
+ RelSeq, Out) ->
+ array:reset(RelSeq, Out);
journal_plus_segment({no_pub, no_del, ack},
{{_MsgId, _IsPersistent}, del, no_ack},
- RelSeq, OutDict) ->
- dict:erase(RelSeq, OutDict).
+ RelSeq, Out) ->
+ array:reset(RelSeq, Out).
%% Remove from the journal entries for a segment, items that are
%% duplicates of entries found in the segment itself. Used on start up
%% to clean up the journal.
-journal_minus_segment(JEntries, SegDict) ->
+journal_minus_segment(JEntries, SegEntries) ->
array:sparse_foldl(
fun (RelSeq, JObj, {JEntriesOut, PubsRemoved, AcksRemoved}) ->
- SegEntry = case dict:find(RelSeq, SegDict) of
- error -> not_found;
- {ok, SObj = {_, _, _}} -> SObj
+ SegEntry = case array:get(RelSeq, SegEntries) of
+ undefined -> not_found;
+ SObj = {_, _, _} -> SObj
end,
journal_minus_segment(JObj, SegEntry, RelSeq, JEntriesOut,
PubsRemoved, AcksRemoved)
end, {journal_new(), 0, 0}, JEntries).
-%% Here, the OutArray is a fresh journal that we're filling with valid
+%% Here, the Out is a fresh journal that we're filling with valid
%% entries. PubsRemoved and AcksRemoved only get increased when the a
%% publish or ack is in both the journal and the segment.
%% Both the same. Must be at least the publish
journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, no_ack},
- _RelSeq, OutArray, PubsRemoved, AcksRemoved) ->
- {OutArray, PubsRemoved + 1, AcksRemoved};
+ _RelSeq, Out, PubsRemoved, AcksRemoved) ->
+ {Out, PubsRemoved + 1, AcksRemoved};
journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, ack},
- _RelSeq, OutArray, PubsRemoved, AcksRemoved) ->
- {OutArray, PubsRemoved + 1, AcksRemoved + 1};
+ _RelSeq, Out, PubsRemoved, AcksRemoved) ->
+ {Out, PubsRemoved + 1, AcksRemoved + 1};
%% Just publish in journal
journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack},
not_found,
- RelSeq, OutArray, PubsRemoved, AcksRemoved) ->
- {array:set(RelSeq, Obj, OutArray), PubsRemoved, AcksRemoved};
+ RelSeq, Out, PubsRemoved, AcksRemoved) ->
+ {array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved};
%% Just deliver in journal
journal_minus_segment(Obj = {no_pub, del, no_ack},
{{_MsgId, _IsPersistent}, no_del, no_ack},
- RelSeq, OutArray, PubsRemoved, AcksRemoved) ->
- {array:set(RelSeq, Obj, OutArray), PubsRemoved, AcksRemoved};
+ RelSeq, Out, PubsRemoved, AcksRemoved) ->
+ {array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved};
journal_minus_segment({no_pub, del, no_ack},
{{_MsgId, _IsPersistent}, del, no_ack},
- _RelSeq, OutArray, PubsRemoved, AcksRemoved) ->
- {OutArray, PubsRemoved, AcksRemoved};
+ _RelSeq, Out, PubsRemoved, AcksRemoved) ->
+ {Out, PubsRemoved, AcksRemoved};
%% Just ack in journal
journal_minus_segment(Obj = {no_pub, no_del, ack},
{{_MsgId, _IsPersistent}, del, no_ack},
- RelSeq, OutArray, PubsRemoved, AcksRemoved) ->
- {array:set(RelSeq, Obj, OutArray), PubsRemoved, AcksRemoved};
+ RelSeq, Out, PubsRemoved, AcksRemoved) ->
+ {array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved};
journal_minus_segment({no_pub, no_del, ack},
{{_MsgId, _IsPersistent}, del, ack},
- _RelSeq, OutArray, PubsRemoved, AcksRemoved) ->
- {OutArray, PubsRemoved, AcksRemoved};
+ _RelSeq, Out, PubsRemoved, AcksRemoved) ->
+ {Out, PubsRemoved, AcksRemoved};
%% Publish and deliver in journal
journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack},
not_found,
- RelSeq, OutArray, PubsRemoved, AcksRemoved) ->
- {array:set(RelSeq, Obj, OutArray), PubsRemoved, AcksRemoved};
+ RelSeq, Out, PubsRemoved, AcksRemoved) ->
+ {array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved};
journal_minus_segment({PubRecord, del, no_ack},
{PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack},
- RelSeq, OutArray, PubsRemoved, AcksRemoved) ->
- {array:set(RelSeq, {no_pub, del, no_ack}, OutArray),
+ RelSeq, Out, PubsRemoved, AcksRemoved) ->
+ {array:set(RelSeq, {no_pub, del, no_ack}, Out),
PubsRemoved + 1, AcksRemoved};
%% Deliver and ack in journal
journal_minus_segment(Obj = {no_pub, del, ack},
{{_MsgId, _IsPersistent}, no_del, no_ack},
- RelSeq, OutArray, PubsRemoved, AcksRemoved) ->
- {array:set(RelSeq, Obj, OutArray), PubsRemoved, AcksRemoved};
+ RelSeq, Out, PubsRemoved, AcksRemoved) ->
+ {array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved};
journal_minus_segment({no_pub, del, ack},
{{_MsgId, _IsPersistent}, del, no_ack},
- RelSeq, OutArray, PubsRemoved, AcksRemoved) ->
- {array:set(RelSeq, {no_pub, no_del, ack}, OutArray),
+ RelSeq, Out, PubsRemoved, AcksRemoved) ->
+ {array:set(RelSeq, {no_pub, no_del, ack}, Out),
PubsRemoved, AcksRemoved};
journal_minus_segment({no_pub, del, ack},
{{_MsgId, _IsPersistent}, del, ack},
- _RelSeq, OutArray, PubsRemoved, AcksRemoved) ->
- {OutArray, PubsRemoved, AcksRemoved + 1};
+ _RelSeq, Out, PubsRemoved, AcksRemoved) ->
+ {Out, PubsRemoved, AcksRemoved + 1};
%% Publish, deliver and ack in journal
journal_minus_segment({{_MsgId, _IsPersistent}, del, ack},
not_found,
- _RelSeq, OutArray, PubsRemoved, AcksRemoved) ->
- {OutArray, PubsRemoved, AcksRemoved};
+ _RelSeq, Out, PubsRemoved, AcksRemoved) ->
+ {Out, PubsRemoved, AcksRemoved};
journal_minus_segment({PubRecord, del, ack},
{PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack},
- RelSeq, OutArray, PubsRemoved, AcksRemoved) ->
- {array:set(RelSeq, {no_pub, del, ack}, OutArray),
+ RelSeq, Out, PubsRemoved, AcksRemoved) ->
+ {array:set(RelSeq, {no_pub, del, ack}, Out),
PubsRemoved + 1, AcksRemoved};
journal_minus_segment({PubRecord, del, ack},
{PubRecord = {_MsgId, _IsPersistent}, del, no_ack},
- RelSeq, OutArray, PubsRemoved, AcksRemoved) ->
- {array:set(RelSeq, {no_pub, no_del, ack}, OutArray),
+ RelSeq, Out, PubsRemoved, AcksRemoved) ->
+ {array:set(RelSeq, {no_pub, no_del, ack}, Out),
PubsRemoved + 1, AcksRemoved}.