diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-05 20:58:48 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-05 20:58:48 +0100 |
| commit | 0227797f19d04a7cce315bfeb6c5f94cb2ed9592 (patch) | |
| tree | 9da9416318aa1070236b2e4e2d6c434adcab9c26 | |
| parent | 85812ceafa7ecf102c5a05b976a04140624caa19 (diff) | |
| download | rabbitmq-server-git-0227797f19d04a7cce315bfeb6c5f94cb2ed9592.tar.gz | |
Given a clean shutdown, near instantaneous startup, regardless of queue length. Note most expensive element in startup is loading in the msg_store index. Also note for some unexplained reason, this currently doesn't work with toke - the toke plugin will need reworking to become available to both msg_stores simultaneously.
| -rw-r--r-- | src/rabbit_msg_store.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 90 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 135 |
4 files changed, 155 insertions, 97 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 418b5d5864..1455b4567b 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -481,6 +481,8 @@ close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } = %%---------------------------------------------------------------------------- init([Server, BaseDir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> + process_flag(trap_exit, true), + ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), @@ -562,8 +564,6 @@ init([Server, BaseDir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> {ok, Offset} = file_handle_cache:position(FileHdl, Offset), ok = file_handle_cache:truncate(FileHdl), - process_flag(trap_exit, true), - {ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule, FileSummaryEts), @@ -716,7 +716,8 @@ handle_cast({set_maximum_since_use, Age}, State) -> handle_info(timeout, State) -> noreply(internal_sync(State)); -handle_info({'EXIT', _Pid, Reason}, State) -> +handle_info({'EXIT', Pid, Reason}, State) -> + io:format("~p EXIT! ~p ~p ~p~n", [self(), Reason, Pid, State]), {stop, Reason, State}. terminate(_Reason, State = #msstate { index_state = IndexState, @@ -1292,6 +1293,10 @@ build_index(Gatherer, Left, [], sum_file_size = SumFileSize }) -> case gatherer:fetch(Gatherer) of finished -> + unlink(Gatherer), + receive {'EXIT', Gatherer, _} -> ok + after 0 -> ok + end, ok = index_delete_by_file(undefined, State), Offset = case ets:lookup(FileSummaryEts, Left) of [] -> 0; diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index f37d701931..7227481dce 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/1, terminate/2, terminate_and_erase/1, write_published/4, +-export([init/2, terminate/2, terminate_and_erase/1, write_published/4, write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1, read_segment_entries/2, next_segment_boundary/1, segment_size/0, find_lowest_seq_id_seg_and_next_seq_id/1, @@ -190,13 +190,13 @@ -type(seq_id() :: integer()). -type(seg_dict() :: {dict(), [segment()]}). -type(qistate() :: #qistate { dir :: file_path(), - segments :: seg_dict(), + segments :: 'undefined' | seg_dict(), journal_handle :: hdl(), dirty_count :: integer() }). --spec(init/1 :: (queue_name()) -> - {non_neg_integer(), binary(), binary(), qistate()}). +-spec(init/2 :: (queue_name(), boolean()) -> + {'undefined' | non_neg_integer(), binary(), binary(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(terminate_and_erase/1 :: (qistate()) -> qistate()). -spec(write_published/4 :: (msg_id(), seq_id(), boolean(), qistate()) @@ -220,21 +220,22 @@ %% Public API %%---------------------------------------------------------------------------- -init(Name) -> +init(Name, MsgStoreRecovered) -> State = blank_state(Name), - {PRef, TRef} = case read_shutdown_terms(State #qistate.dir) of - {error, _} -> - {rabbit_guid:guid(), rabbit_guid:guid()}; - {ok, Terms} -> - case [persistent_ref, transient_ref] -- - proplists:get_keys(Terms) of - [] -> - {proplists:get_value(persistent_ref, Terms), - proplists:get_value(transient_ref, Terms)}; - _ -> - {rabbit_guid:guid(), rabbit_guid:guid()} - end - end, + {PRef, TRef, Terms} = + case read_shutdown_terms(State #qistate.dir) of + {error, _} -> + {rabbit_guid:guid(), rabbit_guid:guid(), []}; + {ok, Terms1} -> + case [persistent_ref, transient_ref] -- + proplists:get_keys(Terms1) of + [] -> + {proplists:get_value(persistent_ref, Terms1), + proplists:get_value(transient_ref, Terms1), Terms1}; + _ -> + {rabbit_guid:guid(), rabbit_guid:guid(), []} + end + end, %% 1. Load the journal completely. This will also load segments %% which have entries in the journal and remove duplicates. %% The counts will correctly reflect the combination of the @@ -249,35 +250,40 @@ init(Name) -> %% acks only go to the RAM journal as it doesn't matter if we %% lose them. Also mark delivered if not clean shutdown. Also %% find the number of unacked messages. - AllSegs = all_segment_nums(State2), + AllSegs = CleanShutdown = detect_clean_shutdown(Dir), %% We know the journal is empty here, so we don't need to combine %% with the journal, and we don't need to worry about messages %% that have been acked. {Segments1, Count, DCount1} = - lists:foldl( - fun (Seg, {Segments2, CountAcc, DCountAcc}) -> - Segment = segment_find_or_new(Seg, Dir, Segments2), - {SegEntries, PubCount, AckCount, Segment1} = - load_segment(false, Segment), - {Segment2 = #segment { pubs = PubCount1, acks = AckCount1 }, - DCountAcc1} = - array:sparse_foldl( - fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack}, - {Segment3, DCountAcc2}) -> - {Segment4, DCountDelta} = - maybe_add_to_journal( - rabbit_msg_store:contains( - ?PERSISTENT_MSG_STORE, MsgId), - CleanShutdown, Del, RelSeq, Segment3), - {Segment4, DCountAcc2 + DCountDelta} - end, {Segment1 #segment { pubs = PubCount, - acks = AckCount }, DCountAcc}, - SegEntries), - {segment_store(Segment2, Segments2), - CountAcc + PubCount1 - AckCount1, DCountAcc1} - end, {Segments, 0, DCount}, AllSegs), - {Count, PRef, TRef, + case CleanShutdown andalso MsgStoreRecovered of + false -> + lists:foldl( + fun (Seg, {Segments2, CountAcc, DCountAcc}) -> + Segment = segment_find_or_new(Seg, Dir, Segments2), + {SegEntries, PubCount, AckCount, Segment1} = + load_segment(false, Segment), + {Segment2 = #segment { pubs = PubCount1, acks = AckCount1 }, + DCountAcc1} = + array:sparse_foldl( + fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack}, + {Segment3, DCountAcc2}) -> + {Segment4, DCountDelta} = + maybe_add_to_journal( + rabbit_msg_store:contains( + ?PERSISTENT_MSG_STORE, MsgId), + CleanShutdown, Del, RelSeq, Segment3), + {Segment4, DCountAcc2 + DCountDelta} + end, {Segment1 #segment { pubs = PubCount, + acks = AckCount }, DCountAcc}, + SegEntries), + {segment_store(Segment2, Segments2), + CountAcc + PubCount1 - AckCount1, DCountAcc1} + end, {Segments, 0, DCount}, all_segment_nums(State2)); + true -> + {Segments, undefined, DCount} + end, + {Count, PRef, TRef, Terms, State2 #qistate { segments = Segments1, dirty_count = DCount1 }}. maybe_add_to_journal( true, true, _Del, _RelSeq, Segment) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 22473594a0..788aeeddd3 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1206,7 +1206,7 @@ test_amqqueue(Durable) -> empty_test_queue() -> ok = start_transient_msg_store(), ok = rabbit_queue_index:start_persistent_msg_store([]), - {0, _PRef, _TRef, Qi1} = rabbit_queue_index:init(test_queue()), + {0, _PRef, _TRef, _Terms, Qi1} = rabbit_queue_index:init(test_queue(), false), _Qi2 = rabbit_queue_index:terminate_and_erase(Qi1), ok. @@ -1255,7 +1255,7 @@ test_queue_index() -> ok = empty_test_queue(), SeqIdsA = lists:seq(0,9999), SeqIdsB = lists:seq(10000,19999), - {0, _PRef, _TRef, Qi0} = rabbit_queue_index:init(test_queue()), + {0, _PRef, _TRef, _Terms, Qi0} = rabbit_queue_index:init(test_queue(), false), {0, 0, Qi1} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi0), {Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1), @@ -1270,7 +1270,7 @@ test_queue_index() -> ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]), ok = start_transient_msg_store(), %% should get length back as 0, as all the msgs were transient - {0, _PRef1, _TRef1, Qi6} = rabbit_queue_index:init(test_queue()), + {0, _PRef1, _TRef1, _Terms1, Qi6} = rabbit_queue_index:init(test_queue(), false), {0, SegSize, Qi7} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6), {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7), @@ -1285,7 +1285,7 @@ test_queue_index() -> ok = start_transient_msg_store(), %% should get length back as 10000 LenB = length(SeqIdsB), - {LenB, _PRef2, _TRef2, Qi12} = rabbit_queue_index:init(test_queue()), + {LenB, _PRef2, _TRef2, _Terms2, Qi12} = rabbit_queue_index:init(test_queue(), false), {0, TwoSegs, Qi13} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi12), Qi14 = queue_index_deliver(SeqIdsB, Qi13), @@ -1302,7 +1302,7 @@ test_queue_index() -> ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]), ok = start_transient_msg_store(), %% should get length back as 0 because all persistent msgs have been acked - {0, _PRef3, _TRef3, Qi20} = rabbit_queue_index:init(test_queue()), + {0, _PRef3, _TRef3, _Terms3, Qi20} = rabbit_queue_index:init(test_queue(), false), _Qi21 = rabbit_queue_index:terminate_and_erase(Qi20), ok = stop_msg_store(), ok = empty_test_queue(), @@ -1311,7 +1311,7 @@ test_queue_index() -> %% First, partials: %% a) partial pub+del+ack, then move to new segment SeqIdsC = lists:seq(0,trunc(SegmentSize/2)), - {0, _PRef4, _TRef4, Qi22} = rabbit_queue_index:init(test_queue()), + {0, _PRef4, _TRef4, _Terms4, Qi22} = rabbit_queue_index:init(test_queue(), false), {Qi23, _SeqIdsMsgIdsC} = queue_index_publish(SeqIdsC, false, Qi22), Qi24 = queue_index_deliver(SeqIdsC, Qi23), Qi25 = rabbit_queue_index:write_acks(SeqIdsC, Qi24), @@ -1322,7 +1322,7 @@ test_queue_index() -> ok = empty_test_queue(), %% b) partial pub+del, then move to new segment, then ack all in old segment - {0, _PRef5, _TRef5, Qi29} = rabbit_queue_index:init(test_queue()), + {0, _PRef5, _TRef5, _Terms5, Qi29} = rabbit_queue_index:init(test_queue(), false), {Qi30, _SeqIdsMsgIdsC2} = queue_index_publish(SeqIdsC, false, Qi29), Qi31 = queue_index_deliver(SeqIdsC, Qi30), {Qi32, _SeqIdsMsgIdsC3} = queue_index_publish([SegmentSize], false, Qi31), @@ -1334,7 +1334,7 @@ test_queue_index() -> %% c) just fill up several segments of all pubs, then +dels, then +acks SeqIdsD = lists:seq(0,SegmentSize*4), - {0, _PRef6, _TRef6, Qi36} = rabbit_queue_index:init(test_queue()), + {0, _PRef6, _TRef6, _Terms6, Qi36} = rabbit_queue_index:init(test_queue(), false), {Qi37, _SeqIdsMsgIdsD} = queue_index_publish(SeqIdsD, false, Qi36), Qi38 = queue_index_deliver(SeqIdsD, Qi37), Qi39 = rabbit_queue_index:write_acks(SeqIdsD, Qi38), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 03db8510db..56a79f4724 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -155,7 +155,9 @@ len, on_sync, msg_store_clients, - persistent_store + persistent_store, + persistent_count, + transient_threshold }). -include("rabbit.hrl"). @@ -212,7 +214,9 @@ len :: non_neg_integer(), on_sync :: {[ack()], [msg_id()], [{pid(), any()}]}, msg_store_clients :: {{any(), binary()}, {any(), binary()}}, - persistent_store :: pid() | atom() + persistent_store :: pid() | atom(), + persistent_count :: non_neg_integer(), + transient_threshold :: non_neg_integer() }). -spec(init/2 :: (queue_name(), pid() | atom()) -> vqstate()). @@ -256,14 +260,18 @@ %%---------------------------------------------------------------------------- init(QueueName, PersistentStore) -> - {DeltaCount, PRef, TRef, IndexState} = - rabbit_queue_index:init(QueueName), + MsgStoreRecovered = + rabbit_msg_store:successfully_recovered_state(PersistentStore), + {DeltaCount, PRef, TRef, Terms, IndexState} = + rabbit_queue_index:init(QueueName, MsgStoreRecovered), {DeltaSeqId, NextSeqId, IndexState1} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState), - Delta = case DeltaCount of + + DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), + Delta = case DeltaCount1 of 0 -> ?BLANK_DELTA; _ -> #delta { start_seq_id = DeltaSeqId, - count = DeltaCount, + count = DeltaCount1, end_seq_id = NextSeqId } end, Now = now(), @@ -282,24 +290,28 @@ init(QueueName, PersistentStore) -> in_counter = 0, egress_rate = {Now, 0}, avg_egress_rate = 0, - ingress_rate = {Now, DeltaCount}, + ingress_rate = {Now, DeltaCount1}, avg_ingress_rate = 0, rate_timestamp = Now, - len = DeltaCount, + len = DeltaCount1, on_sync = {[], [], []}, msg_store_clients = { {rabbit_msg_store:client_init(PersistentStore, PRef), PRef}, {rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), TRef}}, - persistent_store = PersistentStore + persistent_store = PersistentStore, + persistent_count = DeltaCount1, + transient_threshold = NextSeqId }, maybe_deltas_to_betas(State). terminate(State = #vqstate { + persistent_count = PCount, index_state = IndexState, msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}} }) -> rabbit_msg_store:client_terminate(MSCStateP), rabbit_msg_store:client_terminate(MSCStateT), - Terms = [{persistent_ref, PRef}, {transient_ref, TRef}], + Terms = [{persistent_ref, PRef}, {transient_ref, TRef}, + {persistent_count, PCount}], State #vqstate { index_state = rabbit_queue_index:terminate(Terms, IndexState) }. publish(Msg, State) -> @@ -313,7 +325,8 @@ publish_delivered(Msg = #basic_message { guid = MsgId, out_counter = OutCount, in_counter = InCount, msg_store_clients = MSCState, - persistent_store = PersistentStore }) -> + persistent_store = PersistentStore, + persistent_count = PCount }) -> State1 = State #vqstate { out_counter = OutCount + 1, in_counter = InCount + 1 }, MsgStatus = #msg_status { @@ -321,7 +334,11 @@ publish_delivered(Msg = #basic_message { guid = MsgId, is_delivered = true, msg_on_disk = false, index_on_disk = false }, {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState), - State2 = State1 #vqstate { msg_store_clients = MSCState1 }, + State2 = State1 #vqstate { msg_store_clients = MSCState1, + persistent_count = PCount + case IsPersistent of + true -> 1; + false -> 0 + end }, case MsgStatus1 #msg_status.msg_on_disk of true -> {#msg_status { index_on_disk = true }, IndexState1} = @@ -422,7 +439,7 @@ fetch(State = false -> ok = case MsgOnDisk of true -> rabbit_msg_store:remove( - MsgStore, [MsgId]); + MsgStore, [MsgId]); false -> ok end, ack_not_on_disk @@ -434,7 +451,9 @@ fetch(State = index_state = IndexState1, len = Len1 }} end. -ack(AckTags, State = #vqstate { index_state = IndexState }) -> +ack(AckTags, State = #vqstate { index_state = IndexState, + persistent_count = PCount, + persistent_store = PersistentStore }) -> {MsgIdsByStore, SeqIds} = lists:foldl( fun (ack_not_on_disk, Acc) -> Acc; @@ -448,7 +467,11 @@ ack(AckTags, State = #vqstate { index_state = IndexState }) -> ok = dict:fold(fun (MsgStore, MsgIds, ok) -> rabbit_msg_store:remove(MsgStore, MsgIds) end, ok, MsgIdsByStore), - State #vqstate { index_state = IndexState1 }. + PCount1 = PCount - case dict:find(PersistentStore, MsgIdsByStore) of + error -> 0; + {ok, MsgIds} -> length(MsgIds) + end, + State #vqstate { index_state = IndexState1, persistent_count = PCount1 }. len(#vqstate { len = Len }) -> Len. @@ -464,7 +487,8 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len, {Len, State1} = purge1(Q4Count, State #vqstate { index_state = IndexState1, q4 = queue:new() }), - {Len, State1 #vqstate { len = 0, ram_msg_count = 0, ram_index_count = 0 }}. + {Len, State1 #vqstate { len = 0, ram_msg_count = 0, ram_index_count = 0, + persistent_count = 0 }}. %% the only difference between purge and delete is that delete also %% needs to delete everything that's been delivered and not ack'd. @@ -472,7 +496,8 @@ delete_and_terminate(State) -> {_PurgeCount, State1 = #vqstate { index_state = IndexState, msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}}, - persistent_store = PersistentStore }} = + persistent_store = PersistentStore, + transient_threshold = TransientThreshold }} = purge(State), IndexState1 = case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id( @@ -481,8 +506,8 @@ delete_and_terminate(State) -> IndexState2; {DeltaSeqId, NextSeqId, IndexState2} -> {_DeleteCount, IndexState3} = - delete1(PersistentStore, NextSeqId, 0, DeltaSeqId, - IndexState2), + delete1(PersistentStore, TransientThreshold, NextSeqId, 0, + DeltaSeqId, IndexState2), IndexState3 end, IndexState4 = rabbit_queue_index:terminate_and_erase(IndexState1), @@ -503,7 +528,9 @@ delete_and_terminate(State) -> %% are now at the tail of the queue. requeue(MsgsWithAckTags, State) -> {SeqIds, MsgIdsByStore, - State1 = #vqstate { index_state = IndexState }} = + State1 = #vqstate { index_state = IndexState, + persistent_count = PCount, + persistent_store = PersistentStore }} = lists:foldl( fun ({Msg = #basic_message { guid = MsgId }, AckTag}, {SeqIdsAcc, Dict, StateN}) -> @@ -519,14 +546,20 @@ requeue(MsgsWithAckTags, State) -> {_SeqId, StateN1} = publish(Msg, true, MsgOnDisk, StateN), {SeqIdsAcc1, Dict1, StateN1} end, {[], dict:new(), State}, MsgsWithAckTags), - IndexState1 = case SeqIds of - [] -> IndexState; - _ -> rabbit_queue_index:write_acks(SeqIds, IndexState) - end, + IndexState1 = + case SeqIds of + [] -> IndexState; + _ -> rabbit_queue_index:write_acks(SeqIds, IndexState) + end, ok = dict:fold(fun (MsgStore, MsgIds, ok) -> rabbit_msg_store:release(MsgStore, MsgIds) end, ok, MsgIdsByStore), - State1 #vqstate { index_state = IndexState1 }. + PCount1 = PCount - case dict:find(PersistentStore, MsgIdsByStore) of + error -> 0; + {ok, MsgIds} -> length(MsgIds) + end, + State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1 }. tx_publish(Msg = #basic_message { is_persistent = true, guid = MsgId }, State = #vqstate { msg_store_clients = MSCState, @@ -633,7 +666,7 @@ persistent_msg_ids(Pubs) -> [MsgId || Obj = #basic_message { guid = MsgId } <- Pubs, Obj #basic_message.is_persistent]. -betas_from_segment_entries(List, SeqIdLimit) -> +betas_from_segment_entries(List, SeqIdLimit, TransientThreshold) -> bpqueue:from_list([{true, [#msg_status { msg = undefined, msg_id = MsgId, @@ -644,7 +677,8 @@ betas_from_segment_entries(List, SeqIdLimit) -> index_on_disk = true } || {MsgId, SeqId, IsPersistent, IsDelivered} <- List, - SeqId < SeqIdLimit ]}]). + SeqId < SeqIdLimit, + (IsPersistent orelse SeqId >= TransientThreshold)]}]). read_index_segment(SeqId, IndexState) -> SeqId1 = SeqId + rabbit_queue_index:segment_size(), @@ -703,23 +737,25 @@ should_force_index_to_disk(State = %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -delete1(_PersistentStore, NextSeqId, Count, DeltaSeqId, IndexState) - when DeltaSeqId >= NextSeqId -> +delete1(_PersistentStore, _TransientThreshold, NextSeqId, Count, DeltaSeqId, + IndexState) when DeltaSeqId >= NextSeqId -> {Count, IndexState}; -delete1(PersistentStore, NextSeqId, Count, DeltaSeqId, IndexState) -> +delete1(PersistentStore, TransientThreshold, NextSeqId, Count, DeltaSeqId, + IndexState) -> Delta1SeqId = DeltaSeqId + rabbit_queue_index:segment_size(), case rabbit_queue_index:read_segment_entries(DeltaSeqId, IndexState) of {[], IndexState1} -> - delete1(PersistentStore, NextSeqId, Count, Delta1SeqId, - IndexState1); + delete1(PersistentStore, TransientThreshold, NextSeqId, Count, + Delta1SeqId, IndexState1); {List, IndexState1} -> - Q = betas_from_segment_entries(List, Delta1SeqId), + Q = betas_from_segment_entries(List, Delta1SeqId, + TransientThreshold), {QCount, IndexState2} = remove_queue_entries( PersistentStore, fun beta_fold_no_index_on_disk/3, Q, IndexState1), - delete1(PersistentStore, NextSeqId, Count + QCount, Delta1SeqId, - IndexState2) + delete1(PersistentStore, TransientThreshold, NextSeqId, + Count + QCount, Delta1SeqId, IndexState2) end. purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState, @@ -886,14 +922,20 @@ test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount, publish(Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }, IsDelivered, MsgOnDisk, State = - #vqstate { next_seq_id = SeqId, len = Len, in_counter = InCount }) -> + #vqstate { next_seq_id = SeqId, len = Len, in_counter = InCount, + persistent_count = PCount }) -> MsgStatus = #msg_status { msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = false }, + PCount1 = PCount + case IsPersistent of + true -> 1; + false -> 0 + end, {SeqId, publish(test_keep_msg_in_ram(SeqId, State), MsgStatus, State #vqstate { next_seq_id = SeqId + 1, len = Len + 1, - in_counter = InCount + 1 })}. + in_counter = InCount + 1, + persistent_count = PCount1 })}. publish(msg, MsgStatus, #vqstate { index_state = IndexState, ram_msg_count = RamMsgCount, @@ -1097,7 +1139,8 @@ maybe_deltas_to_betas( target_ram_msg_count = TargetRamMsgCount, delta = #delta { start_seq_id = DeltaSeqId, count = DeltaCount, - end_seq_id = DeltaSeqIdEnd }}) -> + end_seq_id = DeltaSeqIdEnd }, + transient_threshold = TransientThreshold}) -> case (not bpqueue:is_empty(Q3)) andalso (0 == TargetRamMsgCount) of true -> State; @@ -1110,7 +1153,7 @@ maybe_deltas_to_betas( State1 = State #vqstate { index_state = IndexState1 }, %% length(List) may be < segment_size because of acks. But %% it can't be [] - Q3a = betas_from_segment_entries(List, DeltaSeqIdEnd), + Q3a = betas_from_segment_entries(List, DeltaSeqIdEnd, TransientThreshold), Q3b = bpqueue:join(Q3, Q3a), case DeltaCount - bpqueue:len(Q3a) of 0 -> @@ -1120,11 +1163,15 @@ maybe_deltas_to_betas( q2 = bpqueue:new(), q3 = bpqueue:join(Q3b, Q2) }; N when N > 0 -> - State1 #vqstate { - q3 = Q3b, - delta = #delta { start_seq_id = Delta1SeqId, - count = N, - end_seq_id = DeltaSeqIdEnd } } + State2 = State1 #vqstate { + q3 = Q3b, + delta = #delta { start_seq_id = Delta1SeqId, + count = N, + end_seq_id = DeltaSeqIdEnd } }, + case N == DeltaCount of + true -> maybe_deltas_to_betas(State2); + false -> State2 + end end end. |
