diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-05-18 12:53:54 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-05-18 12:53:54 +0100 |
| commit | afdeb07ef3abf056976078ea3b68a3cb6a94f266 (patch) | |
| tree | a0d59ce73ac8002d5399bc830972b4c67840b346 /src | |
| parent | c75f81a640cff9ecb4a6e4e5c25b207c9399d94d (diff) | |
| download | rabbitmq-server-git-afdeb07ef3abf056976078ea3b68a3cb6a94f266.tar.gz | |
Reworked reading from the queue to still be limited to a maximum of one segment, but with a more natural start+end interface
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 122 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 83 |
3 files changed, 120 insertions, 91 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index a6753910b1..7cf36193f6 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -32,8 +32,8 @@ -module(rabbit_queue_index). -export([init/3, terminate/2, terminate_and_erase/1, publish/4, - deliver/2, ack/2, sync/2, flush/1, read_segment_entries/2, - next_segment_boundary/1, current_segment_boundary/1, bounds/1, + deliver/2, ack/2, sync/2, flush/1, read/3, + current_segment_boundary/1, next_segment_boundary/1, bounds/1, recover/1]). -define(CLEAN_FILENAME, "clean.dot"). @@ -83,15 +83,15 @@ %% journal is still held in this mapping). Actions are stored directly %% in this state. Thus at the point of flushing the journal, firstly %% no reading from disk is necessary, but secondly if the known number -%% of acks and publishes in a segment qare equal, given the known -%% state of the segment file combined with the journal, no writing -%% needs to be done to the segment file either (in fact it is deleted -%% if it exists at all). This is safe given that the set of acks is a -%% subset of the set of publishes. When it's necessary to sync -%% messages because of transactions, it's only necessary to fsync on -%% the journal: when entries are distributed from the journal to -%% segment files, those segments appended to are fsync'd prior to the -%% journal being truncated. +%% of acks and publishes in a segment are equal, given the known state +%% of the segment file combined with the journal, no writing needs to +%% be done to the segment file either (in fact it is deleted if it +%% exists at all). This is safe given that the set of acks is a subset +%% of the set of publishes. When it's necessary to sync messages +%% because of transactions, it's only necessary to fsync on the +%% journal: when entries are distributed from the journal to segment +%% files, those segments appended to are fsync'd prior to the journal +%% being truncated. %% %% This module is also responsible for scanning the queue index files %% and seeding the message store on start up. @@ -191,10 +191,11 @@ -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush/1 :: (qistate()) -> qistate()). --spec(read_segment_entries/2 :: (seq_id(), qistate()) -> - {[{guid(), seq_id(), boolean(), boolean()}], qistate()}). --spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). +-spec(read/3 :: (seq_id(), seq_id(), qistate()) -> + {[{guid(), seq_id(), boolean(), boolean()}], + seq_id() | 'undefined', qistate()}). -spec(current_segment_boundary/1 :: (seq_id()) -> seq_id()). +-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). -spec(recover/1 :: ([queue_name()]) -> {[[any()]], startup_fun_state()}). @@ -332,33 +333,50 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> flush(State) -> flush_journal(State). -read_segment_entries(InitSeqId, State = #qistate { segments = Segments, - dir = Dir }) -> - {Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), - Segment = segment_find_or_new(Seg, Dir, Segments), - {SegEntries, _PubCount, _AckCount, Segment1} = load_segment(false, Segment), +read(StartEnd, StartEnd, State) -> + {[], undefined, State}; +read(Start, End, State = #qistate { segments = Segments, + dir = Dir }) when Start =< End -> + %% Start is inclusive, End is exclusive. + {StartSeg, StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start), + {EndSeg, EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End), + Start1 = reconstruct_seq_id(StartSeg + 1, 0), + Again = case End =< Start1 of + true -> undefined; + false -> Start1 + end, + MaxRelSeq = case StartSeg =:= EndSeg of + true -> EndRelSeq; + false -> ?SEGMENT_ENTRY_COUNT + end, + Segment = segment_find_or_new(StartSeg, Dir, Segments), + {SegEntries, _PubCount, _AckCount, Segment1} = + load_segment(false, StartRelSeq, MaxRelSeq, Segment), #segment { journal_entries = JEntries } = Segment1, {array:sparse_foldr( fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) -> - [ {Guid, reconstruct_seq_id(Seg, RelSeq), + [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), IsPersistent, IsDelivered == del} | Acc ] - end, [], journal_plus_segment(JEntries, SegEntries)), + end, [], + journal_plus_segment(JEntries, SegEntries, StartRelSeq, MaxRelSeq)), + Again, State #qistate { segments = segment_store(Segment1, Segments) }}. -next_segment_boundary(SeqId) -> - {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - reconstruct_seq_id(Seg + 1, 0). - current_segment_boundary(SeqId) -> {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), reconstruct_seq_id(Seg, 0). +next_segment_boundary(SeqId) -> + {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), + reconstruct_seq_id(Seg + 1, 0). + bounds(State) -> SegNums = all_segment_nums(State), - %% We don't want the lowest seq_id, merely the seq_id of the start - %% of the lowest segment. That seq_id may not actually exist, but - %% that's fine. The important thing is that the segment exists and - %% the seq_id reported is on a segment boundary. + %% Don't bother trying to figure out the lowest seq_id, merely the + %% seq_id of the start of the lowest segment. That seq_id may not + %% actually exist, but that's fine. The important thing is that + %% the segment exists and the seq_id reported is on a segment + %% boundary. %% We also don't really care about the max seq_id. Just start the %% next segment: it makes life much easier. @@ -460,7 +478,7 @@ terminate(StoreShutdown, Terms, State = recover_segment(ContainsCheckFun, CleanShutdown, Segment) -> {SegEntries, PubCount, AckCount, Segment1} = - load_segment(false, Segment), + load_segment(false, 0, ?SEGMENT_ENTRY_COUNT, Segment), array:sparse_foldl( fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment3) -> recover_message(ContainsCheckFun(Guid), CleanShutdown, @@ -518,7 +536,8 @@ queue_index_walker_reader(QueueName, Gatherer) -> State1 = lists:foldl( fun (Seg, State2) -> SeqId = reconstruct_seq_id(Seg, 0), - {Messages, State3} = read_segment_entries(SeqId, State2), + {Messages, undefined, State3} = + read(SeqId, next_segment_boundary(SeqId), State2), [ok = gatherer:in(Gatherer, {Guid, 1}) || {Guid, _SeqId, true, _IsDelivered} <- Messages], State3 @@ -633,7 +652,7 @@ load_journal(State) -> %% them if duplicates are in the journal. The counts %% here are purely from the segment itself. {SegEntries, PubCountInSeg, AckCountInSeg, Segment1} = - load_segment(true, Segment), + load_segment(true, 0, ?SEGMENT_ENTRY_COUNT, Segment), %% Removed counts here are the number of pubs and %% acks that are duplicates - i.e. found in both the %% segment and journal. @@ -806,8 +825,9 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> %% number of unacked msgs is PubCount - AckCount. If KeepAcks is %% false, then array:sparse_size(SegEntries) == PubCount - %% AckCount. If KeepAcks is true, then array:sparse_size(SegEntries) -%% == PubCount. -load_segment(KeepAcks, Segment = #segment { path = Path, handle = SegHdl }) -> +%% == PubCount. StartRelSeq is inclusive, EndRelSeq is exclusive. +load_segment(KeepAcks, StartRelSeq, EndRelSeq, + Segment = #segment { path = Path, handle = SegHdl }) -> SegmentExists = case SegHdl of undefined -> filelib:is_file(Path); _ -> true @@ -817,20 +837,24 @@ load_segment(KeepAcks, Segment = #segment { path = Path, handle = SegHdl }) -> true -> {Hdl, Segment1} = get_segment_handle(Segment), {ok, 0} = file_handle_cache:position(Hdl, bof), {SegEntries, PubCount, AckCount} = - load_segment_entries(KeepAcks, Hdl, array_new(), 0, 0), + load_segment_entries(KeepAcks, StartRelSeq, EndRelSeq, Hdl, + array_new(), 0, 0), {SegEntries, PubCount, AckCount, Segment1} end. -load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) -> +load_segment_entries(KeepAcks, StartRel, EndRel, Hdl, SegEntries, PubCount, + AckCount) -> case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>} -> + RelSeq:?REL_SEQ_BITS>>} + when StartRel =< RelSeq andalso RelSeq < EndRel -> {AckCount1, SegEntries1} = deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegEntries), - load_segment_entries(KeepAcks, Hdl, SegEntries1, PubCount, - AckCount1); + load_segment_entries(KeepAcks, StartRel, EndRel, Hdl, SegEntries1, + PubCount, AckCount1); {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> + IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} + when StartRel =< RelSeq andalso RelSeq < EndRel -> %% because we specify /binary, and binaries are complete %% bytes, the size spec is in bytes, not bits. {ok, Guid} = file_handle_cache:read(Hdl, ?GUID_BYTES), @@ -838,8 +862,11 @@ load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) -> array:set(RelSeq, {{Guid, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries), - load_segment_entries(KeepAcks, Hdl, SegEntries1, PubCount + 1, - AckCount); + load_segment_entries(KeepAcks, StartRel, EndRel, Hdl, SegEntries1, + PubCount + 1, AckCount); + {ok, _SomeBinary} -> + load_segment_entries(KeepAcks, StartRel, EndRel, Hdl, SegEntries, + PubCount, AckCount); _ErrOrEoF -> {SegEntries, PubCount, AckCount} end. @@ -867,15 +894,18 @@ bool_to_int(false) -> 0. %% Combine what we have just read from a segment file with what we're %% holding for that segment in memory. There must be no %% duplicates. Used when providing segment entries to the variable -%% queue. -journal_plus_segment(JEntries, SegEntries) -> +%% queue. RelStart is inclusive, RelEnd is exclusive. +journal_plus_segment(JEntries, SegEntries, RelStart, RelEnd) -> array:sparse_foldl( - fun (RelSeq, JObj, SegEntriesOut) -> + fun (RelSeq, JObj, SegEntriesOut) + when RelStart =< RelSeq andalso RelSeq < RelEnd -> SegEntry = array:get(RelSeq, SegEntriesOut), case journal_plus_segment1(JObj, SegEntry) of undefined -> array:reset(RelSeq, SegEntriesOut); Obj -> array:set(RelSeq, Obj, SegEntriesOut) - end + end; + (_RelSeq, _JObj, SegEntriesOut) -> + SegEntriesOut end, SegEntries, JEntries). %% Here, the result is the item which we may be adding to (for items diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 358f857b12..9821367d59 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1458,7 +1458,7 @@ test_queue_index() -> {0, 0, Qi1} = rabbit_queue_index:bounds(Qi0), {Qi2, SeqIdsGuidsA} = queue_index_publish(SeqIdsA, false, Qi1), {0, SegmentSize, Qi3} = rabbit_queue_index:bounds(Qi2), - {ReadA, Qi4} = rabbit_queue_index:read_segment_entries(0, Qi3), + {ReadA, undefined, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3), ok = verify_read_with_published(false, false, ReadA, lists:reverse(SeqIdsGuidsA)), %% call terminate twice to prove it's idempotent @@ -1470,7 +1470,7 @@ test_queue_index() -> {0, 0, Qi7} = rabbit_queue_index:bounds(Qi6), {Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7), {0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8), - {ReadB, Qi10} = rabbit_queue_index:read_segment_entries(0, Qi9), + {ReadB, undefined, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9), ok = verify_read_with_published(false, true, ReadB, lists:reverse(SeqIdsGuidsB)), _Qi11 = rabbit_queue_index:terminate([], Qi10), @@ -1481,7 +1481,7 @@ test_queue_index() -> {LenB, _Terms2, Qi12} = test_queue_init(), {0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12), Qi14 = queue_index_deliver(SeqIdsB, Qi13), - {ReadC, Qi15} = rabbit_queue_index:read_segment_entries(0, Qi14), + {ReadC, undefined, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14), ok = verify_read_with_published(true, true, ReadC, lists:reverse(SeqIdsGuidsB)), Qi16 = rabbit_queue_index:ack(SeqIdsB, Qi15), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 856b1f0c36..992cf19a72 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -175,9 +175,9 @@ }). -record(delta, - { start_seq_id, + { start_seq_id, %% start_seq_id is inclusive count, - end_seq_id %% note the end_seq_id is always >, not >= + end_seq_id %% end_seq_id is exclusive }). -record(tx, { pending_messages, pending_acks }). @@ -797,7 +797,7 @@ persistent_guids(Pubs) -> [Guid || Obj = #basic_message { guid = Guid } <- Pubs, Obj #basic_message.is_persistent]. -betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) -> +betas_from_segment_entries(List, TransientThreshold, IndexState) -> {Filtered, IndexState1} = lists:foldr( fun ({Guid, SeqId, IsPersistent, IsDelivered}, @@ -811,28 +811,27 @@ betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) -> end, {FilteredAcc, rabbit_queue_index:ack( [SeqId], IndexStateAcc1)}; - false -> case SeqId < SeqIdLimit of - true -> {[#msg_status { - msg = undefined, - guid = Guid, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = true, - index_on_disk = true - } | FilteredAcc], - IndexStateAcc}; - false -> {FilteredAcc, IndexStateAcc} - end + false -> {[#msg_status { msg = undefined, + guid = Guid, + seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = true, + index_on_disk = true + } | FilteredAcc], + IndexStateAcc} end end, {[], IndexState}, List), {bpqueue:from_list([{true, Filtered}]), IndexState1}. -read_index_segment(SeqId, IndexState) -> - SeqId1 = rabbit_queue_index:next_segment_boundary(SeqId), - case rabbit_queue_index:read_segment_entries(SeqId, IndexState) of - {[], IndexState1} -> read_index_segment(SeqId1, IndexState1); - {List, IndexState1} -> {List, IndexState1, SeqId1} +read_one_index_segment(StartSeqId, EndSeqId, IndexState) + when StartSeqId =< EndSeqId -> + case rabbit_queue_index:read(StartSeqId, EndSeqId, IndexState) of + {List, Again, IndexState1} when List /= [] orelse Again =:= undefined -> + {List, IndexState1, + rabbit_queue_index:next_segment_boundary(StartSeqId)}; + {[], StartSeqId1, IndexState1} -> + read_one_index_segment(StartSeqId1, EndSeqId, IndexState1) end. ensure_binary_properties(Msg = #basic_message { content = Content }) -> @@ -959,26 +958,27 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns}, State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }. delete1(_PersistentStore, _TransientThreshold, NextSeqId, Count, DeltaSeqId, - IndexState) when DeltaSeqId >= NextSeqId -> + IndexState) when DeltaSeqId =:= undefined + orelse DeltaSeqId >= NextSeqId -> {Count, IndexState}; delete1(PersistentStore, TransientThreshold, NextSeqId, Count, DeltaSeqId, IndexState) -> - Delta1SeqId = rabbit_queue_index:next_segment_boundary(DeltaSeqId), - case rabbit_queue_index:read_segment_entries(DeltaSeqId, IndexState) of - {[], IndexState1} -> - delete1(PersistentStore, TransientThreshold, NextSeqId, Count, - Delta1SeqId, IndexState1); - {List, IndexState1} -> - {Q, IndexState2} = - betas_from_segment_entries( - List, Delta1SeqId, TransientThreshold, IndexState1), - {QCount, IndexState3} = - remove_queue_entries( - PersistentStore, fun beta_fold_no_index_on_disk/3, - Q, IndexState2), - delete1(PersistentStore, TransientThreshold, NextSeqId, - Count + QCount, Delta1SeqId, IndexState3) - end. + {List, Again, IndexState1} = + rabbit_queue_index:read(DeltaSeqId, NextSeqId, IndexState), + {IndexState2, Count1} = + case List of + [] -> {IndexState1, Count}; + _ -> {Q, IndexState3} = + betas_from_segment_entries( + List, TransientThreshold, IndexState1), + {Count2, IndexState4} = + remove_queue_entries( + PersistentStore, fun beta_fold_no_index_on_disk/3, + Q, IndexState3), + {IndexState4, Count2 + Count} + end, + delete1(PersistentStore, TransientThreshold, NextSeqId, Count1, Again, + IndexState2). purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState, persistent_store = PersistentStore }) -> @@ -1370,13 +1370,12 @@ maybe_deltas_to_betas( %% segment, or TargetRamMsgCount > 0, meaning we should %% really be holding all the betas in memory. {List, IndexState1, Delta1SeqId} = - read_index_segment(DeltaSeqId, IndexState), + read_one_index_segment(DeltaSeqId, DeltaSeqIdEnd, IndexState), %% length(List) may be < segment_size because of acks. It %% could be [] if we ignored every message in the segment %% due to it being transient and below the threshold - {Q3a, IndexState2} = - betas_from_segment_entries( - List, DeltaSeqIdEnd, TransientThreshold, IndexState1), + {Q3a, IndexState2} = betas_from_segment_entries( + List, TransientThreshold, IndexState1), State1 = State #vqstate { index_state = IndexState2 }, case bpqueue:len(Q3a) of 0 -> |
