summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl11
-rw-r--r--src/rabbit_queue_index.erl90
-rw-r--r--src/rabbit_tests.erl16
-rw-r--r--src/rabbit_variable_queue.erl135
4 files changed, 155 insertions, 97 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 418b5d5864..1455b4567b 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -481,6 +481,8 @@ close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } =
%%----------------------------------------------------------------------------
init([Server, BaseDir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
+ process_flag(trap_exit, true),
+
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
@@ -562,8 +564,6 @@ init([Server, BaseDir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
{ok, Offset} = file_handle_cache:position(FileHdl, Offset),
ok = file_handle_cache:truncate(FileHdl),
- process_flag(trap_exit, true),
-
{ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule,
FileSummaryEts),
@@ -716,7 +716,8 @@ handle_cast({set_maximum_since_use, Age}, State) ->
handle_info(timeout, State) ->
noreply(internal_sync(State));
-handle_info({'EXIT', _Pid, Reason}, State) ->
+handle_info({'EXIT', Pid, Reason}, State) ->
+ io:format("~p EXIT! ~p ~p ~p~n", [self(), Reason, Pid, State]),
{stop, Reason, State}.
terminate(_Reason, State = #msstate { index_state = IndexState,
@@ -1292,6 +1293,10 @@ build_index(Gatherer, Left, [],
sum_file_size = SumFileSize }) ->
case gatherer:fetch(Gatherer) of
finished ->
+ unlink(Gatherer),
+ receive {'EXIT', Gatherer, _} -> ok
+ after 0 -> ok
+ end,
ok = index_delete_by_file(undefined, State),
Offset = case ets:lookup(FileSummaryEts, Left) of
[] -> 0;
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index f37d701931..7227481dce 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,7 +31,7 @@
-module(rabbit_queue_index).
--export([init/1, terminate/2, terminate_and_erase/1, write_published/4,
+-export([init/2, terminate/2, terminate_and_erase/1, write_published/4,
write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1,
read_segment_entries/2, next_segment_boundary/1, segment_size/0,
find_lowest_seq_id_seg_and_next_seq_id/1,
@@ -190,13 +190,13 @@
-type(seq_id() :: integer()).
-type(seg_dict() :: {dict(), [segment()]}).
-type(qistate() :: #qistate { dir :: file_path(),
- segments :: seg_dict(),
+ segments :: 'undefined' | seg_dict(),
journal_handle :: hdl(),
dirty_count :: integer()
}).
--spec(init/1 :: (queue_name()) ->
- {non_neg_integer(), binary(), binary(), qistate()}).
+-spec(init/2 :: (queue_name(), boolean()) ->
+ {'undefined' | non_neg_integer(), binary(), binary(), [any()], qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(terminate_and_erase/1 :: (qistate()) -> qistate()).
-spec(write_published/4 :: (msg_id(), seq_id(), boolean(), qistate())
@@ -220,21 +220,22 @@
%% Public API
%%----------------------------------------------------------------------------
-init(Name) ->
+init(Name, MsgStoreRecovered) ->
State = blank_state(Name),
- {PRef, TRef} = case read_shutdown_terms(State #qistate.dir) of
- {error, _} ->
- {rabbit_guid:guid(), rabbit_guid:guid()};
- {ok, Terms} ->
- case [persistent_ref, transient_ref] --
- proplists:get_keys(Terms) of
- [] ->
- {proplists:get_value(persistent_ref, Terms),
- proplists:get_value(transient_ref, Terms)};
- _ ->
- {rabbit_guid:guid(), rabbit_guid:guid()}
- end
- end,
+ {PRef, TRef, Terms} =
+ case read_shutdown_terms(State #qistate.dir) of
+ {error, _} ->
+ {rabbit_guid:guid(), rabbit_guid:guid(), []};
+ {ok, Terms1} ->
+ case [persistent_ref, transient_ref] --
+ proplists:get_keys(Terms1) of
+ [] ->
+ {proplists:get_value(persistent_ref, Terms1),
+ proplists:get_value(transient_ref, Terms1), Terms1};
+ _ ->
+ {rabbit_guid:guid(), rabbit_guid:guid(), []}
+ end
+ end,
%% 1. Load the journal completely. This will also load segments
%% which have entries in the journal and remove duplicates.
%% The counts will correctly reflect the combination of the
@@ -249,35 +250,40 @@ init(Name) ->
%% acks only go to the RAM journal as it doesn't matter if we
%% lose them. Also mark delivered if not clean shutdown. Also
%% find the number of unacked messages.
- AllSegs = all_segment_nums(State2),
+ AllSegs =
CleanShutdown = detect_clean_shutdown(Dir),
%% We know the journal is empty here, so we don't need to combine
%% with the journal, and we don't need to worry about messages
%% that have been acked.
{Segments1, Count, DCount1} =
- lists:foldl(
- fun (Seg, {Segments2, CountAcc, DCountAcc}) ->
- Segment = segment_find_or_new(Seg, Dir, Segments2),
- {SegEntries, PubCount, AckCount, Segment1} =
- load_segment(false, Segment),
- {Segment2 = #segment { pubs = PubCount1, acks = AckCount1 },
- DCountAcc1} =
- array:sparse_foldl(
- fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack},
- {Segment3, DCountAcc2}) ->
- {Segment4, DCountDelta} =
- maybe_add_to_journal(
- rabbit_msg_store:contains(
- ?PERSISTENT_MSG_STORE, MsgId),
- CleanShutdown, Del, RelSeq, Segment3),
- {Segment4, DCountAcc2 + DCountDelta}
- end, {Segment1 #segment { pubs = PubCount,
- acks = AckCount }, DCountAcc},
- SegEntries),
- {segment_store(Segment2, Segments2),
- CountAcc + PubCount1 - AckCount1, DCountAcc1}
- end, {Segments, 0, DCount}, AllSegs),
- {Count, PRef, TRef,
+ case CleanShutdown andalso MsgStoreRecovered of
+ false ->
+ lists:foldl(
+ fun (Seg, {Segments2, CountAcc, DCountAcc}) ->
+ Segment = segment_find_or_new(Seg, Dir, Segments2),
+ {SegEntries, PubCount, AckCount, Segment1} =
+ load_segment(false, Segment),
+ {Segment2 = #segment { pubs = PubCount1, acks = AckCount1 },
+ DCountAcc1} =
+ array:sparse_foldl(
+ fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack},
+ {Segment3, DCountAcc2}) ->
+ {Segment4, DCountDelta} =
+ maybe_add_to_journal(
+ rabbit_msg_store:contains(
+ ?PERSISTENT_MSG_STORE, MsgId),
+ CleanShutdown, Del, RelSeq, Segment3),
+ {Segment4, DCountAcc2 + DCountDelta}
+ end, {Segment1 #segment { pubs = PubCount,
+ acks = AckCount }, DCountAcc},
+ SegEntries),
+ {segment_store(Segment2, Segments2),
+ CountAcc + PubCount1 - AckCount1, DCountAcc1}
+ end, {Segments, 0, DCount}, all_segment_nums(State2));
+ true ->
+ {Segments, undefined, DCount}
+ end,
+ {Count, PRef, TRef, Terms,
State2 #qistate { segments = Segments1, dirty_count = DCount1 }}.
maybe_add_to_journal( true, true, _Del, _RelSeq, Segment) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 22473594a0..788aeeddd3 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1206,7 +1206,7 @@ test_amqqueue(Durable) ->
empty_test_queue() ->
ok = start_transient_msg_store(),
ok = rabbit_queue_index:start_persistent_msg_store([]),
- {0, _PRef, _TRef, Qi1} = rabbit_queue_index:init(test_queue()),
+ {0, _PRef, _TRef, _Terms, Qi1} = rabbit_queue_index:init(test_queue(), false),
_Qi2 = rabbit_queue_index:terminate_and_erase(Qi1),
ok.
@@ -1255,7 +1255,7 @@ test_queue_index() ->
ok = empty_test_queue(),
SeqIdsA = lists:seq(0,9999),
SeqIdsB = lists:seq(10000,19999),
- {0, _PRef, _TRef, Qi0} = rabbit_queue_index:init(test_queue()),
+ {0, _PRef, _TRef, _Terms, Qi0} = rabbit_queue_index:init(test_queue(), false),
{0, 0, Qi1} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi0),
{Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1),
@@ -1270,7 +1270,7 @@ test_queue_index() ->
ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]),
ok = start_transient_msg_store(),
%% should get length back as 0, as all the msgs were transient
- {0, _PRef1, _TRef1, Qi6} = rabbit_queue_index:init(test_queue()),
+ {0, _PRef1, _TRef1, _Terms1, Qi6} = rabbit_queue_index:init(test_queue(), false),
{0, SegSize, Qi7} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6),
{Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7),
@@ -1285,7 +1285,7 @@ test_queue_index() ->
ok = start_transient_msg_store(),
%% should get length back as 10000
LenB = length(SeqIdsB),
- {LenB, _PRef2, _TRef2, Qi12} = rabbit_queue_index:init(test_queue()),
+ {LenB, _PRef2, _TRef2, _Terms2, Qi12} = rabbit_queue_index:init(test_queue(), false),
{0, TwoSegs, Qi13} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi12),
Qi14 = queue_index_deliver(SeqIdsB, Qi13),
@@ -1302,7 +1302,7 @@ test_queue_index() ->
ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]),
ok = start_transient_msg_store(),
%% should get length back as 0 because all persistent msgs have been acked
- {0, _PRef3, _TRef3, Qi20} = rabbit_queue_index:init(test_queue()),
+ {0, _PRef3, _TRef3, _Terms3, Qi20} = rabbit_queue_index:init(test_queue(), false),
_Qi21 = rabbit_queue_index:terminate_and_erase(Qi20),
ok = stop_msg_store(),
ok = empty_test_queue(),
@@ -1311,7 +1311,7 @@ test_queue_index() ->
%% First, partials:
%% a) partial pub+del+ack, then move to new segment
SeqIdsC = lists:seq(0,trunc(SegmentSize/2)),
- {0, _PRef4, _TRef4, Qi22} = rabbit_queue_index:init(test_queue()),
+ {0, _PRef4, _TRef4, _Terms4, Qi22} = rabbit_queue_index:init(test_queue(), false),
{Qi23, _SeqIdsMsgIdsC} = queue_index_publish(SeqIdsC, false, Qi22),
Qi24 = queue_index_deliver(SeqIdsC, Qi23),
Qi25 = rabbit_queue_index:write_acks(SeqIdsC, Qi24),
@@ -1322,7 +1322,7 @@ test_queue_index() ->
ok = empty_test_queue(),
%% b) partial pub+del, then move to new segment, then ack all in old segment
- {0, _PRef5, _TRef5, Qi29} = rabbit_queue_index:init(test_queue()),
+ {0, _PRef5, _TRef5, _Terms5, Qi29} = rabbit_queue_index:init(test_queue(), false),
{Qi30, _SeqIdsMsgIdsC2} = queue_index_publish(SeqIdsC, false, Qi29),
Qi31 = queue_index_deliver(SeqIdsC, Qi30),
{Qi32, _SeqIdsMsgIdsC3} = queue_index_publish([SegmentSize], false, Qi31),
@@ -1334,7 +1334,7 @@ test_queue_index() ->
%% c) just fill up several segments of all pubs, then +dels, then +acks
SeqIdsD = lists:seq(0,SegmentSize*4),
- {0, _PRef6, _TRef6, Qi36} = rabbit_queue_index:init(test_queue()),
+ {0, _PRef6, _TRef6, _Terms6, Qi36} = rabbit_queue_index:init(test_queue(), false),
{Qi37, _SeqIdsMsgIdsD} = queue_index_publish(SeqIdsD, false, Qi36),
Qi38 = queue_index_deliver(SeqIdsD, Qi37),
Qi39 = rabbit_queue_index:write_acks(SeqIdsD, Qi38),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 03db8510db..56a79f4724 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -155,7 +155,9 @@
len,
on_sync,
msg_store_clients,
- persistent_store
+ persistent_store,
+ persistent_count,
+ transient_threshold
}).
-include("rabbit.hrl").
@@ -212,7 +214,9 @@
len :: non_neg_integer(),
on_sync :: {[ack()], [msg_id()], [{pid(), any()}]},
msg_store_clients :: {{any(), binary()}, {any(), binary()}},
- persistent_store :: pid() | atom()
+ persistent_store :: pid() | atom(),
+ persistent_count :: non_neg_integer(),
+ transient_threshold :: non_neg_integer()
}).
-spec(init/2 :: (queue_name(), pid() | atom()) -> vqstate()).
@@ -256,14 +260,18 @@
%%----------------------------------------------------------------------------
init(QueueName, PersistentStore) ->
- {DeltaCount, PRef, TRef, IndexState} =
- rabbit_queue_index:init(QueueName),
+ MsgStoreRecovered =
+ rabbit_msg_store:successfully_recovered_state(PersistentStore),
+ {DeltaCount, PRef, TRef, Terms, IndexState} =
+ rabbit_queue_index:init(QueueName, MsgStoreRecovered),
{DeltaSeqId, NextSeqId, IndexState1} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState),
- Delta = case DeltaCount of
+
+ DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
+ Delta = case DeltaCount1 of
0 -> ?BLANK_DELTA;
_ -> #delta { start_seq_id = DeltaSeqId,
- count = DeltaCount,
+ count = DeltaCount1,
end_seq_id = NextSeqId }
end,
Now = now(),
@@ -282,24 +290,28 @@ init(QueueName, PersistentStore) ->
in_counter = 0,
egress_rate = {Now, 0},
avg_egress_rate = 0,
- ingress_rate = {Now, DeltaCount},
+ ingress_rate = {Now, DeltaCount1},
avg_ingress_rate = 0,
rate_timestamp = Now,
- len = DeltaCount,
+ len = DeltaCount1,
on_sync = {[], [], []},
msg_store_clients = {
{rabbit_msg_store:client_init(PersistentStore, PRef), PRef},
{rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), TRef}},
- persistent_store = PersistentStore
+ persistent_store = PersistentStore,
+ persistent_count = DeltaCount1,
+ transient_threshold = NextSeqId
},
maybe_deltas_to_betas(State).
terminate(State = #vqstate {
+ persistent_count = PCount,
index_state = IndexState,
msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}} }) ->
rabbit_msg_store:client_terminate(MSCStateP),
rabbit_msg_store:client_terminate(MSCStateT),
- Terms = [{persistent_ref, PRef}, {transient_ref, TRef}],
+ Terms = [{persistent_ref, PRef}, {transient_ref, TRef},
+ {persistent_count, PCount}],
State #vqstate { index_state = rabbit_queue_index:terminate(Terms, IndexState) }.
publish(Msg, State) ->
@@ -313,7 +325,8 @@ publish_delivered(Msg = #basic_message { guid = MsgId,
out_counter = OutCount,
in_counter = InCount,
msg_store_clients = MSCState,
- persistent_store = PersistentStore }) ->
+ persistent_store = PersistentStore,
+ persistent_count = PCount }) ->
State1 = State #vqstate { out_counter = OutCount + 1,
in_counter = InCount + 1 },
MsgStatus = #msg_status {
@@ -321,7 +334,11 @@ publish_delivered(Msg = #basic_message { guid = MsgId,
is_delivered = true, msg_on_disk = false, index_on_disk = false },
{MsgStatus1, MSCState1} = maybe_write_msg_to_disk(PersistentStore, false,
MsgStatus, MSCState),
- State2 = State1 #vqstate { msg_store_clients = MSCState1 },
+ State2 = State1 #vqstate { msg_store_clients = MSCState1,
+ persistent_count = PCount + case IsPersistent of
+ true -> 1;
+ false -> 0
+ end },
case MsgStatus1 #msg_status.msg_on_disk of
true ->
{#msg_status { index_on_disk = true }, IndexState1} =
@@ -422,7 +439,7 @@ fetch(State =
false -> ok = case MsgOnDisk of
true ->
rabbit_msg_store:remove(
- MsgStore, [MsgId]);
+ MsgStore, [MsgId]);
false -> ok
end,
ack_not_on_disk
@@ -434,7 +451,9 @@ fetch(State =
index_state = IndexState1, len = Len1 }}
end.
-ack(AckTags, State = #vqstate { index_state = IndexState }) ->
+ack(AckTags, State = #vqstate { index_state = IndexState,
+ persistent_count = PCount,
+ persistent_store = PersistentStore }) ->
{MsgIdsByStore, SeqIds} =
lists:foldl(
fun (ack_not_on_disk, Acc) -> Acc;
@@ -448,7 +467,11 @@ ack(AckTags, State = #vqstate { index_state = IndexState }) ->
ok = dict:fold(fun (MsgStore, MsgIds, ok) ->
rabbit_msg_store:remove(MsgStore, MsgIds)
end, ok, MsgIdsByStore),
- State #vqstate { index_state = IndexState1 }.
+ PCount1 = PCount - case dict:find(PersistentStore, MsgIdsByStore) of
+ error -> 0;
+ {ok, MsgIds} -> length(MsgIds)
+ end,
+ State #vqstate { index_state = IndexState1, persistent_count = PCount1 }.
len(#vqstate { len = Len }) ->
Len.
@@ -464,7 +487,8 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len,
{Len, State1} =
purge1(Q4Count, State #vqstate { index_state = IndexState1,
q4 = queue:new() }),
- {Len, State1 #vqstate { len = 0, ram_msg_count = 0, ram_index_count = 0 }}.
+ {Len, State1 #vqstate { len = 0, ram_msg_count = 0, ram_index_count = 0,
+ persistent_count = 0 }}.
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
@@ -472,7 +496,8 @@ delete_and_terminate(State) ->
{_PurgeCount, State1 = #vqstate {
index_state = IndexState,
msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}},
- persistent_store = PersistentStore }} =
+ persistent_store = PersistentStore,
+ transient_threshold = TransientThreshold }} =
purge(State),
IndexState1 =
case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(
@@ -481,8 +506,8 @@ delete_and_terminate(State) ->
IndexState2;
{DeltaSeqId, NextSeqId, IndexState2} ->
{_DeleteCount, IndexState3} =
- delete1(PersistentStore, NextSeqId, 0, DeltaSeqId,
- IndexState2),
+ delete1(PersistentStore, TransientThreshold, NextSeqId, 0,
+ DeltaSeqId, IndexState2),
IndexState3
end,
IndexState4 = rabbit_queue_index:terminate_and_erase(IndexState1),
@@ -503,7 +528,9 @@ delete_and_terminate(State) ->
%% are now at the tail of the queue.
requeue(MsgsWithAckTags, State) ->
{SeqIds, MsgIdsByStore,
- State1 = #vqstate { index_state = IndexState }} =
+ State1 = #vqstate { index_state = IndexState,
+ persistent_count = PCount,
+ persistent_store = PersistentStore }} =
lists:foldl(
fun ({Msg = #basic_message { guid = MsgId }, AckTag},
{SeqIdsAcc, Dict, StateN}) ->
@@ -519,14 +546,20 @@ requeue(MsgsWithAckTags, State) ->
{_SeqId, StateN1} = publish(Msg, true, MsgOnDisk, StateN),
{SeqIdsAcc1, Dict1, StateN1}
end, {[], dict:new(), State}, MsgsWithAckTags),
- IndexState1 = case SeqIds of
- [] -> IndexState;
- _ -> rabbit_queue_index:write_acks(SeqIds, IndexState)
- end,
+ IndexState1 =
+ case SeqIds of
+ [] -> IndexState;
+ _ -> rabbit_queue_index:write_acks(SeqIds, IndexState)
+ end,
ok = dict:fold(fun (MsgStore, MsgIds, ok) ->
rabbit_msg_store:release(MsgStore, MsgIds)
end, ok, MsgIdsByStore),
- State1 #vqstate { index_state = IndexState1 }.
+ PCount1 = PCount - case dict:find(PersistentStore, MsgIdsByStore) of
+ error -> 0;
+ {ok, MsgIds} -> length(MsgIds)
+ end,
+ State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1 }.
tx_publish(Msg = #basic_message { is_persistent = true, guid = MsgId },
State = #vqstate { msg_store_clients = MSCState,
@@ -633,7 +666,7 @@ persistent_msg_ids(Pubs) ->
[MsgId || Obj = #basic_message { guid = MsgId } <- Pubs,
Obj #basic_message.is_persistent].
-betas_from_segment_entries(List, SeqIdLimit) ->
+betas_from_segment_entries(List, SeqIdLimit, TransientThreshold) ->
bpqueue:from_list([{true,
[#msg_status { msg = undefined,
msg_id = MsgId,
@@ -644,7 +677,8 @@ betas_from_segment_entries(List, SeqIdLimit) ->
index_on_disk = true
}
|| {MsgId, SeqId, IsPersistent, IsDelivered} <- List,
- SeqId < SeqIdLimit ]}]).
+ SeqId < SeqIdLimit,
+ (IsPersistent orelse SeqId >= TransientThreshold)]}]).
read_index_segment(SeqId, IndexState) ->
SeqId1 = SeqId + rabbit_queue_index:segment_size(),
@@ -703,23 +737,25 @@ should_force_index_to_disk(State =
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-delete1(_PersistentStore, NextSeqId, Count, DeltaSeqId, IndexState)
- when DeltaSeqId >= NextSeqId ->
+delete1(_PersistentStore, _TransientThreshold, NextSeqId, Count, DeltaSeqId,
+ IndexState) when DeltaSeqId >= NextSeqId ->
{Count, IndexState};
-delete1(PersistentStore, NextSeqId, Count, DeltaSeqId, IndexState) ->
+delete1(PersistentStore, TransientThreshold, NextSeqId, Count, DeltaSeqId,
+ IndexState) ->
Delta1SeqId = DeltaSeqId + rabbit_queue_index:segment_size(),
case rabbit_queue_index:read_segment_entries(DeltaSeqId, IndexState) of
{[], IndexState1} ->
- delete1(PersistentStore, NextSeqId, Count, Delta1SeqId,
- IndexState1);
+ delete1(PersistentStore, TransientThreshold, NextSeqId, Count,
+ Delta1SeqId, IndexState1);
{List, IndexState1} ->
- Q = betas_from_segment_entries(List, Delta1SeqId),
+ Q = betas_from_segment_entries(List, Delta1SeqId,
+ TransientThreshold),
{QCount, IndexState2} =
remove_queue_entries(
PersistentStore, fun beta_fold_no_index_on_disk/3,
Q, IndexState1),
- delete1(PersistentStore, NextSeqId, Count + QCount, Delta1SeqId,
- IndexState2)
+ delete1(PersistentStore, TransientThreshold, NextSeqId,
+ Count + QCount, Delta1SeqId, IndexState2)
end.
purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState,
@@ -886,14 +922,20 @@ test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount,
publish(Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId },
IsDelivered, MsgOnDisk, State =
- #vqstate { next_seq_id = SeqId, len = Len, in_counter = InCount }) ->
+ #vqstate { next_seq_id = SeqId, len = Len, in_counter = InCount,
+ persistent_count = PCount }) ->
MsgStatus = #msg_status {
msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
index_on_disk = false },
+ PCount1 = PCount + case IsPersistent of
+ true -> 1;
+ false -> 0
+ end,
{SeqId, publish(test_keep_msg_in_ram(SeqId, State), MsgStatus,
State #vqstate { next_seq_id = SeqId + 1, len = Len + 1,
- in_counter = InCount + 1 })}.
+ in_counter = InCount + 1,
+ persistent_count = PCount1 })}.
publish(msg, MsgStatus, #vqstate {
index_state = IndexState, ram_msg_count = RamMsgCount,
@@ -1097,7 +1139,8 @@ maybe_deltas_to_betas(
target_ram_msg_count = TargetRamMsgCount,
delta = #delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
- end_seq_id = DeltaSeqIdEnd }}) ->
+ end_seq_id = DeltaSeqIdEnd },
+ transient_threshold = TransientThreshold}) ->
case (not bpqueue:is_empty(Q3)) andalso (0 == TargetRamMsgCount) of
true ->
State;
@@ -1110,7 +1153,7 @@ maybe_deltas_to_betas(
State1 = State #vqstate { index_state = IndexState1 },
%% length(List) may be < segment_size because of acks. But
%% it can't be []
- Q3a = betas_from_segment_entries(List, DeltaSeqIdEnd),
+ Q3a = betas_from_segment_entries(List, DeltaSeqIdEnd, TransientThreshold),
Q3b = bpqueue:join(Q3, Q3a),
case DeltaCount - bpqueue:len(Q3a) of
0 ->
@@ -1120,11 +1163,15 @@ maybe_deltas_to_betas(
q2 = bpqueue:new(),
q3 = bpqueue:join(Q3b, Q2) };
N when N > 0 ->
- State1 #vqstate {
- q3 = Q3b,
- delta = #delta { start_seq_id = Delta1SeqId,
- count = N,
- end_seq_id = DeltaSeqIdEnd } }
+ State2 = State1 #vqstate {
+ q3 = Q3b,
+ delta = #delta { start_seq_id = Delta1SeqId,
+ count = N,
+ end_seq_id = DeltaSeqIdEnd } },
+ case N == DeltaCount of
+ true -> maybe_deltas_to_betas(State2);
+ false -> State2
+ end
end
end.