diff options
| -rw-r--r-- | src/rabbit_queue_index.erl | 89 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 30 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 128 |
3 files changed, 117 insertions, 130 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 44df5976da..241766b823 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,8 +31,8 @@ -module(rabbit_queue_index). --export([init/3, terminate/2, terminate_and_erase/1, write_published/4, - write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1, +-export([init/3, terminate/2, terminate_and_erase/1, publish/4, + deliver/2, ack/2, sync/2, flush/1, read_segment_entries/2, next_segment_boundary/1, segment_size/0, find_lowest_seq_id_seg_and_next_seq_id/1, recover/1]). @@ -189,12 +189,11 @@ {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(terminate_and_erase/1 :: (qistate()) -> qistate()). --spec(write_published/4 :: (guid(), seq_id(), boolean(), qistate()) - -> qistate()). --spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()). --spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()). --spec(sync_seq_ids/2 :: ([seq_id()], qistate()) -> qistate()). --spec(flush_journal/1 :: (qistate()) -> qistate()). +-spec(publish/4 :: (guid(), seq_id(), boolean(), qistate()) -> qistate()). +-spec(deliver/2 :: (seq_id(), qistate()) -> qistate()). +-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). +-spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). +-spec(flush/1 :: (qistate()) -> qistate()). -spec(read_segment_entries/2 :: (seq_id(), qistate()) -> {[{guid(), seq_id(), boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). @@ -291,7 +290,7 @@ terminate_and_erase(State) -> ok = rabbit_misc:recursive_delete([State1 #qistate.dir]), State1. -write_published(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> +publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> ?GUID_BYTES = size(Guid), {JournalHdl, State1} = get_journal_handle(State), ok = file_handle_cache:append( @@ -301,15 +300,15 @@ write_published(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]), maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)). -write_delivered(SeqId, State) -> +deliver(SeqId, State) -> {JournalHdl, State1} = get_journal_handle(State), ok = file_handle_cache:append( JournalHdl, <<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>), maybe_flush_journal(add_to_journal(SeqId, del, State1)). -write_acks([], State) -> +ack([], State) -> State; -write_acks(SeqIds, State) -> +ack(SeqIds, State) -> {JournalHdl, State1} = get_journal_handle(State), ok = file_handle_cache:append( JournalHdl, [<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>> || @@ -318,43 +317,15 @@ write_acks(SeqIds, State) -> add_to_journal(SeqId, ack, StateN) end, State1, SeqIds)). -sync_seq_ids([], State) -> +sync([], State) -> State; -sync_seq_ids(_SeqIds, State = #qistate { journal_handle = undefined }) -> +sync(_SeqIds, State = #qistate { journal_handle = undefined }) -> State; -sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> +sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> ok = file_handle_cache:sync(JournalHdl), State. -flush_journal(State = #qistate { dirty_count = 0 }) -> - State; -flush_journal(State = #qistate { segments = Segments }) -> - Segments1 = - segment_fold( - fun (_Seg, #segment { journal_entries = JEntries, - pubs = PubCount, - acks = AckCount } = Segment, SegmentsN) -> - case PubCount > 0 andalso PubCount == AckCount of - true -> ok = delete_segment(Segment), - SegmentsN; - false -> segment_store( - append_journal_to_segment(Segment, JEntries), - SegmentsN) - end - end, segments_new(), Segments), - {JournalHdl, State1} = - get_journal_handle(State #qistate { segments = Segments1 }), - ok = file_handle_cache:clear(JournalHdl), - State1 #qistate { dirty_count = 0 }. - -append_journal_to_segment(Segment, JEntries) -> - case array:sparse_size(JEntries) of - 0 -> Segment; - _ -> {Hdl, Segment1} = get_segment_handle(Segment), - array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries), - ok = file_handle_cache:sync(Hdl), - Segment1 #segment { journal_entries = array_new() } - end. +flush(State) -> flush_journal(State). read_segment_entries(InitSeqId, State = #qistate { segments = Segments, dir = Dir }) -> @@ -599,6 +570,36 @@ maybe_flush_journal(State = #qistate { dirty_count = DCount }) maybe_flush_journal(State) -> State. +flush_journal(State = #qistate { dirty_count = 0 }) -> + State; +flush_journal(State = #qistate { segments = Segments }) -> + Segments1 = + segment_fold( + fun (_Seg, #segment { journal_entries = JEntries, + pubs = PubCount, + acks = AckCount } = Segment, SegmentsN) -> + case PubCount > 0 andalso PubCount == AckCount of + true -> ok = delete_segment(Segment), + SegmentsN; + false -> segment_store( + append_journal_to_segment(Segment, JEntries), + SegmentsN) + end + end, segments_new(), Segments), + {JournalHdl, State1} = + get_journal_handle(State #qistate { segments = Segments1 }), + ok = file_handle_cache:clear(JournalHdl), + State1 #qistate { dirty_count = 0 }. + +append_journal_to_segment(Segment, JEntries) -> + case array:sparse_size(JEntries) of + 0 -> Segment; + _ -> {Hdl, Segment1} = get_segment_handle(Segment), + array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries), + ok = file_handle_cache:sync(Hdl), + Segment1 #segment { journal_entries = array_new() } + end. + get_journal_handle(State = #qistate { journal_handle = undefined, dir = Dir }) -> Path = filename:join(Dir, ?JOURNAL_FILENAME), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b9f6dfd6a9..97d74fc93f 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1414,8 +1414,8 @@ queue_index_publish(SeqIds, Persistent, Qi) -> lists:foldl( fun (SeqId, {QiN, SeqIdsGuidsAcc, MSCStateN}) -> Guid = rabbit_guid:guid(), - QiM = rabbit_queue_index:write_published(Guid, SeqId, Persistent, - QiN), + QiM = rabbit_queue_index:publish( + Guid, SeqId, Persistent, QiN), {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid, Guid, MSCStateN), {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM} @@ -1425,13 +1425,11 @@ queue_index_publish(SeqIds, Persistent, Qi) -> {A, B}. queue_index_deliver(SeqIds, Qi) -> - lists:foldl( - fun (SeqId, QiN) -> - rabbit_queue_index:write_delivered(SeqId, QiN) - end, Qi, SeqIds). + lists:foldl(fun (SeqId, QiN) -> rabbit_queue_index:deliver(SeqId, QiN) end, + Qi, SeqIds). -queue_index_flush_journal(Qi) -> - rabbit_queue_index:flush_journal(Qi). +queue_index_flush(Qi) -> + rabbit_queue_index:flush(Qi). verify_read_with_published(_Delivered, _Persistent, [], _) -> ok; @@ -1491,8 +1489,8 @@ test_queue_index() -> {ReadC, Qi15} = rabbit_queue_index:read_segment_entries(0, Qi14), ok = verify_read_with_published(true, true, ReadC, lists:reverse(SeqIdsGuidsB)), - Qi16 = rabbit_queue_index:write_acks(SeqIdsB, Qi15), - Qi17 = queue_index_flush_journal(Qi16), + Qi16 = rabbit_queue_index:ack(SeqIdsB, Qi15), + Qi17 = queue_index_flush(Qi16), %% Everything will have gone now because #pubs == #acks {0, 0, Qi18} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17), @@ -1512,8 +1510,8 @@ test_queue_index() -> {0, _Terms4, Qi22} = test_queue_init(), {Qi23, _SeqIdsGuidsC} = queue_index_publish(SeqIdsC, false, Qi22), Qi24 = queue_index_deliver(SeqIdsC, Qi23), - Qi25 = rabbit_queue_index:write_acks(SeqIdsC, Qi24), - Qi26 = queue_index_flush_journal(Qi25), + Qi25 = rabbit_queue_index:ack(SeqIdsC, Qi24), + Qi26 = queue_index_flush(Qi25), {Qi27, _SeqIdsGuidsC1} = queue_index_publish([SegmentSize], false, Qi26), _Qi28 = rabbit_queue_index:terminate_and_erase(Qi27), ok = stop_msg_store(), @@ -1524,8 +1522,8 @@ test_queue_index() -> {Qi30, _SeqIdsGuidsC2} = queue_index_publish(SeqIdsC, false, Qi29), Qi31 = queue_index_deliver(SeqIdsC, Qi30), {Qi32, _SeqIdsGuidsC3} = queue_index_publish([SegmentSize], false, Qi31), - Qi33 = rabbit_queue_index:write_acks(SeqIdsC, Qi32), - Qi34 = queue_index_flush_journal(Qi33), + Qi33 = rabbit_queue_index:ack(SeqIdsC, Qi32), + Qi34 = queue_index_flush(Qi33), _Qi35 = rabbit_queue_index:terminate_and_erase(Qi34), ok = stop_msg_store(), ok = empty_test_queue(), @@ -1535,8 +1533,8 @@ test_queue_index() -> {0, _Terms6, Qi36} = test_queue_init(), {Qi37, _SeqIdsGuidsD} = queue_index_publish(SeqIdsD, false, Qi36), Qi38 = queue_index_deliver(SeqIdsD, Qi37), - Qi39 = rabbit_queue_index:write_acks(SeqIdsD, Qi38), - Qi40 = queue_index_flush_journal(Qi39), + Qi39 = rabbit_queue_index:ack(SeqIdsD, Qi38), + Qi40 = queue_index_flush(Qi39), _Qi41 = rabbit_queue_index:terminate_and_erase(Qi40), ok = stop_msg_store(), ok = rabbit_variable_queue:start([]), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 39ef3ec421..079c14eb9a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -360,7 +360,7 @@ delete_and_terminate(State) -> remove_pending_ack(false, State1), %% flushing here is good because it deletes all full segments, %% leaving only partial segments around. - IndexState1 = rabbit_queue_index:flush_journal(IndexState), + IndexState1 = rabbit_queue_index:flush(IndexState), IndexState2 = case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id( IndexState1) of @@ -456,7 +456,7 @@ fetch(AckRequired, State = %% 1. Mark it delivered if necessary IndexState1 = case IndexOnDisk andalso not IsDelivered of - true -> rabbit_queue_index:write_delivered( + true -> rabbit_queue_index:deliver( SeqId, IndexState); false -> IndexState end, @@ -465,29 +465,25 @@ fetch(AckRequired, State = MsgStore = find_msg_store(IsPersistent, PersistentStore), IndexState2 = case MsgOnDisk andalso not AckRequired of - true -> %% Remove from disk now - ok = case MsgOnDisk of - true -> - rabbit_msg_store:remove(MsgStore, [Guid]); - false -> - ok - end, - case IndexOnDisk of - true -> - rabbit_queue_index:write_acks([SeqId], - IndexState1); - false -> - IndexState1 - end; - false -> - IndexState1 + %% Remove from disk now + true -> ok = case MsgOnDisk of + true -> rabbit_msg_store:remove( + MsgStore, [Guid]); + false -> ok + end, + case IndexOnDisk of + true -> rabbit_queue_index:ack( + [SeqId], IndexState1); + false -> IndexState1 + end; + false -> IndexState1 end, %% 3. If it's on disk, not persistent and an ack's %% required then remove it from the queue index only. IndexState3 = case IndexOnDisk andalso AckRequired andalso not IsPersistent of - true -> rabbit_queue_index:write_acks([SeqId], IndexState2); + true -> rabbit_queue_index:ack([SeqId], IndexState2); false -> IndexState2 end, @@ -539,7 +535,7 @@ ack(AckTags, State = #vqstate { index_state = IndexState, [SeqId | SeqIds], PAN1} end end, {dict:new(), [], PA}, AckTags), - IndexState1 = rabbit_queue_index:write_acks(SeqIds, IndexState), + IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = dict:fold(fun (MsgStore, Guids, ok) -> rabbit_msg_store:remove(MsgStore, Guids) end, ok, GuidsByStore), @@ -635,7 +631,7 @@ requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) -> StateN3} end end, {[], dict:new(), State}, AckTags), - IndexState1 = rabbit_queue_index:write_acks(SeqIds, IndexState), + IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = dict:fold(fun (MsgStore, Guids, ok) -> rabbit_msg_store:release(MsgStore, Guids) end, ok, GuidsByStore), @@ -707,8 +703,7 @@ needs_sync(_) -> true. sync(State) -> tx_commit_index(State). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> - State #vqstate { index_state = - rabbit_queue_index:flush_journal(IndexState) }. + State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, on_sync = {_, _, From}, @@ -759,21 +754,19 @@ remove_pending_ack(KeepPersistent, {SeqIdsAcc, Dict, dict:erase(SeqId, PAN)} end, {[], dict:new(), PA}, PA), case KeepPersistent of - true -> - State1 = State #vqstate { pending_ack = PA1 }, - case dict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of - error -> State1; - {ok, Guids} -> ok = rabbit_msg_store:remove( - ?TRANSIENT_MSG_STORE, Guids), - State1 - end; - false -> - IndexState1 = rabbit_queue_index:write_acks(SeqIds, IndexState), - ok = dict:fold(fun (MsgStore, Guids, ok) -> - rabbit_msg_store:remove(MsgStore, Guids) - end, ok, GuidsByStore), - State #vqstate { pending_ack = dict:new(), - index_state = IndexState1 } + true -> State1 = State #vqstate { pending_ack = PA1 }, + case dict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of + error -> State1; + {ok, Guids} -> ok = rabbit_msg_store:remove( + ?TRANSIENT_MSG_STORE, Guids), + State1 + end; + false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), + ok = dict:fold(fun (MsgStore, Guids, ok) -> + rabbit_msg_store:remove(MsgStore, Guids) + end, ok, GuidsByStore), + State #vqstate { pending_ack = dict:new(), + index_state = IndexState1 } end. lookup_tx(Txn) -> @@ -812,30 +805,27 @@ betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) -> fun ({Guid, SeqId, IsPersistent, IsDelivered}, {FilteredAcc, IndexStateAcc}) -> case SeqId < TransientThreshold andalso not IsPersistent of - true -> - IndexStateAcc1 = - case IsDelivered of - false -> rabbit_queue_index:write_delivered( - SeqId, IndexStateAcc); - true -> IndexStateAcc - end, - {FilteredAcc, rabbit_queue_index:write_acks( - [SeqId], IndexStateAcc1)}; - false -> - case SeqId < SeqIdLimit of - true -> - {[#msg_status { msg = undefined, - guid = Guid, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = true, - index_on_disk = true - } | FilteredAcc], - IndexStateAcc}; - false -> - {FilteredAcc, IndexStateAcc} - end + true -> IndexStateAcc1 = + case IsDelivered of + false -> rabbit_queue_index:deliver( + SeqId, IndexStateAcc); + true -> IndexStateAcc + end, + {FilteredAcc, rabbit_queue_index:ack( + [SeqId], IndexStateAcc1)}; + false -> case SeqId < SeqIdLimit of + true -> {[#msg_status { + msg = undefined, + guid = Guid, + seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = true, + index_on_disk = true + } | FilteredAcc], + IndexStateAcc}; + false -> {FilteredAcc, IndexStateAcc} + end end end, {[], IndexState}, List), {bpqueue:from_list([{true, Filtered}]), IndexState1}. @@ -966,8 +956,7 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns}, false -> SeqIdsAcc end, StateN1} end, {Acks, State1}, Pubs), - IndexState1 = - rabbit_queue_index:sync_seq_ids(SeqIds, IndexState), + IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }. @@ -1024,7 +1013,7 @@ remove_queue_entries(PersistentStore, Fold, Q, IndexState) -> IndexState2 = case SeqIds of [] -> IndexState1; - _ -> rabbit_queue_index:write_acks(SeqIds, IndexState1) + _ -> rabbit_queue_index:ack(SeqIds, IndexState1) end, {Count, IndexState2}. @@ -1047,8 +1036,7 @@ remove_queue_entries1( false -> SeqIdsAcc end, IndexStateN1 = case IndexOnDisk andalso not IsDelivered of - true -> rabbit_queue_index:write_delivered( - SeqId, IndexStateN); + true -> rabbit_queue_index:deliver(SeqId, IndexStateN); false -> IndexStateN end, {PersistentStore, CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}. @@ -1298,11 +1286,11 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status { is_delivered = IsDelivered }, IndexState) when Force orelse IsPersistent -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION - IndexState1 = rabbit_queue_index:write_published( - Guid, SeqId, IsPersistent, IndexState), + IndexState1 = rabbit_queue_index:publish(Guid, SeqId, IsPersistent, + IndexState), {MsgStatus #msg_status { index_on_disk = true }, case IsDelivered of - true -> rabbit_queue_index:write_delivered(SeqId, IndexState1); + true -> rabbit_queue_index:deliver(SeqId, IndexState1); false -> IndexState1 end}; maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> |
