summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_tests.erl18
-rw-r--r--src/rabbit_variable_queue.erl789
2 files changed, 465 insertions, 342 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 856a8c4647..16332f325c 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1225,7 +1225,7 @@ fresh_variable_queue() ->
assert_prop(S0, len, 0),
assert_prop(S0, q1, 0),
assert_prop(S0, q2, 0),
- assert_prop(S0, gamma, #gamma { start_seq_id = undefined,
+ assert_prop(S0, delta, #delta { start_seq_id = undefined,
count = 0,
end_seq_id = undefined }),
assert_prop(S0, q3, 0),
@@ -1234,7 +1234,7 @@ fresh_variable_queue() ->
test_variable_queue() ->
passed = test_variable_queue_dynamic_duration_change(),
- passed = test_variable_queue_partial_segments_gamma_thing(),
+ passed = test_variable_queue_partial_segments_delta_thing(),
passed.
test_variable_queue_dynamic_duration_change() ->
@@ -1253,7 +1253,7 @@ test_variable_queue_dynamic_duration_change() ->
%% just publish and fetch some persistent msgs, this hits the the
%% partial segment path in queue_index due to the period when
- %% duration was 0 and the entire queue was gamma.
+ %% duration was 0 and the entire queue was delta.
{_SeqIds1, VQ7} = variable_queue_publish(true, 20, VQ6),
{VQ8, AckTags1} = variable_queue_fetch(20, true, false, 20, VQ7),
VQ9 = rabbit_variable_queue:ack(AckTags1, VQ8),
@@ -1288,7 +1288,7 @@ test_variable_queue_dynamic_duration_change_f(Len, VQ0) ->
test_variable_queue_dynamic_duration_change_f(Len, VQ3)
end.
-test_variable_queue_partial_segments_gamma_thing() ->
+test_variable_queue_partial_segments_delta_thing() ->
SegmentSize = rabbit_queue_index:segment_size(),
HalfSegment = SegmentSize div 2,
VQ0 = fresh_variable_queue(),
@@ -1296,21 +1296,21 @@ test_variable_queue_partial_segments_gamma_thing() ->
variable_queue_publish(true, SegmentSize + HalfSegment, VQ0),
VQ2 = rabbit_variable_queue:remeasure_rates(VQ1),
VQ3 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ2),
- %% one segment in q3 as betas, and half a segment in gamma
+ %% one segment in q3 as betas, and half a segment in delta
S3 = rabbit_variable_queue:status(VQ3),
io:format("~p~n", [S3]),
- assert_prop(S3, gamma, #gamma { start_seq_id = SegmentSize,
+ assert_prop(S3, delta, #delta { start_seq_id = SegmentSize,
count = HalfSegment,
end_seq_id = SegmentSize + HalfSegment }),
assert_prop(S3, q3, SegmentSize),
assert_prop(S3, len, SegmentSize + HalfSegment),
VQ4 = rabbit_variable_queue:set_queue_ram_duration_target(infinity, VQ3),
{[_SeqId], VQ5} = variable_queue_publish(true, 1, VQ4),
- %% should have 1 alpha, but it's in the same segment as the gammas
+ %% should have 1 alpha, but it's in the same segment as the deltas
S5 = rabbit_variable_queue:status(VQ5),
io:format("~p~n", [S5]),
assert_prop(S5, q1, 1),
- assert_prop(S5, gamma, #gamma { start_seq_id = SegmentSize,
+ assert_prop(S5, delta, #delta { start_seq_id = SegmentSize,
count = HalfSegment,
end_seq_id = SegmentSize + HalfSegment }),
assert_prop(S5, q3, SegmentSize),
@@ -1320,7 +1320,7 @@ test_variable_queue_partial_segments_gamma_thing() ->
%% the half segment should now be in q3 as betas
S6 = rabbit_variable_queue:status(VQ6),
io:format("~p~n", [S6]),
- assert_prop(S6, gamma, #gamma { start_seq_id = undefined,
+ assert_prop(S6, delta, #delta { start_seq_id = undefined,
count = 0,
end_seq_id = undefined }),
assert_prop(S6, q1, 1),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7c1ef6875d..6c7fad1212 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -43,14 +43,14 @@
-record(vqstate,
{ q1,
q2,
- gamma,
+ delta,
q3,
q4,
duration_target,
target_ram_msg_count,
ram_msg_count,
ram_msg_count_prev,
- queue,
+ ram_index_count,
index_state,
next_seq_id,
out_counter,
@@ -68,32 +68,46 @@
-include("rabbit.hrl").
-include("rabbit_queue.hrl").
+-record(msg_status,
+ { msg,
+ msg_id,
+ seq_id,
+ is_persistent,
+ is_delivered,
+ msg_on_disk,
+ index_on_disk
+ }).
+
+-define(RAM_INDEX_TARGET_RATIO, 32768).
+
%%----------------------------------------------------------------------------
-%% Basic premise is that msgs move from q1 -> q2 -> gamma -> q3 -> q4
+%% WRONG - UPDATE ME!
+
+%% Basic premise is that msgs move from q1 -> q2 -> delta -> q3 -> q4
%% but they can only do so in the right form. q1 and q4 only hold
%% alphas (msgs in ram), q2 and q3 only hold betas (msg on disk, index
-%% in ram), and gamma is just a count of the number of index entries
+%% in ram), and delta is just a count of the number of index entries
%% on disk at that stage (msg on disk, index on disk).
%%
%% When a msg arrives, we decide in which form it should be. It is
%% then added to the right-most appropriate queue, maintaining
%% order. Thus if the msg is to be an alpha, it will be added to q1,
-%% unless all of q2, gamma and q3 are empty, in which case it will go
-%% to q4. If it is to be a beta, it will be added to q2 unless gamma
+%% unless all of q2, delta and q3 are empty, in which case it will go
+%% to q4. If it is to be a beta, it will be added to q2 unless delta
%% is empty, in which case it will go to q3.
%%
%% The major invariant is that if the msg is to be a beta, q1 will be
-%% empty, and if it is to be a gamma then both q1 and q2 will be empty.
+%% empty, and if it is to be a delta then both q1 and q2 will be empty.
%%
%% When taking msgs out of the queue, if q4 is empty then we read
-%% directly from q3, or gamma, if q3 is empty. If q3 and gamma are
+%% directly from q3, or delta, if q3 is empty. If q3 and delta are
%% empty then we have an invariant that q2 must be empty because q2
-%% can only grow if gamma is non empty.
+%% can only grow if delta is non empty.
%%
%% A further invariant is that if the queue is non empty, either q4 or
-%% q3 contains at least one entry. I.e. we never allow gamma to
-%% contain all msgs in the queue. Also, if q4 is non empty and gamma
+%% q3 contains at least one entry. I.e. we never allow delta to
+%% contain all msgs in the queue. Also, if q4 is non empty and delta
%% is non empty then q3 must be non empty.
%%----------------------------------------------------------------------------
@@ -106,15 +120,15 @@
| 'ack_not_on_disk').
-type(vqstate() :: #vqstate {
q1 :: queue(),
- q2 :: queue(),
- gamma :: gamma(),
- q3 :: queue(),
+ q2 :: {non_neg_integer(), queue()},
+ delta :: delta(),
+ q3 :: {non_neg_integer(), queue()},
q4 :: queue(),
duration_target :: non_neg_integer(),
target_ram_msg_count :: non_neg_integer(),
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
- queue :: queue_name(),
+ ram_index_count :: non_neg_integer(),
index_state :: any(),
next_seq_id :: seq_id(),
out_counter :: non_neg_integer(),
@@ -161,7 +175,7 @@
-endif.
--define(BLANK_GAMMA, #gamma { start_seq_id = undefined,
+-define(BLANK_DELTA, #delta { start_seq_id = undefined,
count = 0,
end_seq_id = undefined }).
@@ -170,40 +184,40 @@
%%----------------------------------------------------------------------------
init(QueueName) ->
- {GammaCount, IndexState} =
+ {DeltaCount, IndexState} =
rabbit_queue_index:init(QueueName),
- {GammaSeqId, NextSeqId, IndexState1} =
+ {DeltaSeqId, NextSeqId, IndexState1} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState),
- Gamma = case GammaCount of
- 0 -> ?BLANK_GAMMA;
- _ -> #gamma { start_seq_id = GammaSeqId,
- count = GammaCount,
+ Delta = case DeltaCount of
+ 0 -> ?BLANK_DELTA;
+ _ -> #delta { start_seq_id = DeltaSeqId,
+ count = DeltaCount,
end_seq_id = NextSeqId }
end,
Now = now(),
State =
- #vqstate { q1 = queue:new(), q2 = queue:new(),
- gamma = Gamma,
- q3 = queue:new(), q4 = queue:new(),
+ #vqstate { q1 = queue:new(), q2 = {0, queue:new()},
+ delta = Delta,
+ q3 = {0, queue:new()}, q4 = queue:new(),
duration_target = undefined,
target_ram_msg_count = undefined,
ram_msg_count = 0,
ram_msg_count_prev = 0,
- queue = QueueName,
+ ram_index_count = 0,
index_state = IndexState1,
next_seq_id = NextSeqId,
out_counter = 0,
in_counter = 0,
egress_rate = {Now, 0},
avg_egress_rate = 0,
- ingress_rate = {Now, GammaCount},
+ ingress_rate = {Now, DeltaCount},
avg_ingress_rate = 0,
rate_timestamp = Now,
- len = GammaCount,
+ len = DeltaCount,
on_sync = {[], [], []},
msg_store_read_state = rabbit_msg_store:client_init()
},
- maybe_gammas_to_betas(State).
+ maybe_deltas_to_betas(State).
terminate(State = #vqstate { index_state = IndexState,
msg_store_read_state = MSCState }) ->
@@ -221,11 +235,14 @@ publish_delivered(Msg = #basic_message { guid = MsgId,
in_counter = InCount}) ->
State1 = State #vqstate { out_counter = OutCount + 1,
in_counter = InCount + 1 },
- case maybe_write_msg_to_disk(false, false, Msg) of
+ MsgStatus = #msg_status {
+ msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent,
+ is_delivered = false, msg_on_disk = false, index_on_disk = false },
+ MsgStatus1 = maybe_write_msg_to_disk(false, MsgStatus),
+ case MsgStatus1 #msg_status.msg_on_disk of
true ->
- {true, IndexState1} =
- maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId,
- true, IndexState),
+ {#msg_status { index_on_disk = true }, IndexState1} =
+ maybe_write_index_to_disk(false, MsgStatus1, IndexState),
{{ack_index_and_store, MsgId, SeqId},
State1 #vqstate { index_state = IndexState1,
next_seq_id = SeqId + 1 }};
@@ -289,12 +306,11 @@ fetch(State =
index_state = IndexState, len = Len }) ->
case queue:out(Q4) of
{empty, _Q4} ->
- fetch_from_q3_or_gamma(State);
- {{value,
- #alpha { msg = Msg = #basic_message { guid = MsgId,
- is_persistent = IsPersistent },
- seq_id = SeqId, is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }},
+ fetch_from_q3_or_delta(State);
+ {{value, #msg_status {
+ msg = Msg, msg_id = MsgId, seq_id = SeqId,
+ is_persistent = IsPersistent, is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }},
Q4a} ->
{IndexState1, IndexOnDisk1} =
case IndexOnDisk of
@@ -313,20 +329,17 @@ fetch(State =
false ->
{IndexState, false}
end,
- _MsgOnDisk1 = IndexOnDisk1 =
+ AckTag =
case IndexOnDisk1 of
true -> true = IsPersistent, %% ASSERTION
- true = MsgOnDisk; %% ASSERTION
+ true = MsgOnDisk, %% ASSERTION
+ {ack_index_and_store, MsgId, SeqId};
false -> ok = case MsgOnDisk andalso not IsPersistent of
true -> rabbit_msg_store:remove([MsgId]);
false -> ok
end,
- false
+ ack_not_on_disk
end,
- AckTag = case IndexOnDisk1 of
- true -> {ack_index_and_store, MsgId, SeqId};
- false -> ack_not_on_disk
- end,
Len1 = Len - 1,
{{Msg, IsDelivered, AckTag, Len1},
State #vqstate { q4 = Q4a, out_counter = OutCount + 1,
@@ -362,7 +375,7 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
{Len, State1} =
purge1(Q4Count, State #vqstate { index_state = IndexState1,
q4 = queue:new() }),
- {Len, State1 #vqstate { len = 0 }}.
+ {Len, State1 #vqstate { len = 0, ram_msg_count = 0, ram_index_count = 0 }}.
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
@@ -373,9 +386,9 @@ delete(State) ->
IndexState) of
{N, N, IndexState2} ->
IndexState2;
- {GammaSeqId, NextSeqId, IndexState2} ->
+ {DeltaSeqId, NextSeqId, IndexState2} ->
{_DeleteCount, IndexState3} =
- delete1(NextSeqId, 0, GammaSeqId, IndexState2),
+ delete1(NextSeqId, 0, DeltaSeqId, IndexState2),
IndexState3
end,
IndexState4 = rabbit_queue_index:terminate_and_erase(IndexState1),
@@ -383,27 +396,26 @@ delete(State) ->
%% [{Msg, AckTag}]
%% We guarantee that after fetch, only persistent msgs are left on
-%% disk. This means that in a requeue, we set
-%% PersistentMsgsAlreadyOnDisk to true, thus avoiding calls to
-%% msg_store:write for persistent msgs. It also means that we don't
-%% need to worry about calling msg_store:remove (as ack would do)
-%% because transient msgs won't be on disk anyway, thus they won't
-%% need to be removed. However, we do call msg_store:release so that
-%% the cache isn't held full of msgs which are now at the tail of the
-%% queue.
+%% disk. This means that in a requeue, we set MsgOnDisk to true, thus
+%% avoiding calls to msg_store:write for persistent msgs. It also
+%% means that we don't need to worry about calling msg_store:remove
+%% (as ack would do) because transient msgs won't be on disk anyway,
+%% thus they won't need to be removed. However, we do call
+%% msg_store:release so that the cache isn't held full of msgs which
+%% are now at the tail of the queue.
requeue(MsgsWithAckTags, State) ->
{SeqIds, MsgIds, State1 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun ({Msg = #basic_message { guid = MsgId }, AckTag},
{SeqIdsAcc, MsgIdsAcc, StateN}) ->
- {_SeqId, StateN1} = publish(Msg, true, true, StateN),
- {SeqIdsAcc1, MsgIdsAcc1} =
+ {SeqIdsAcc1, MsgIdsAcc1, MsgOnDisk} =
case AckTag of
ack_not_on_disk ->
- {SeqIdsAcc, MsgIdsAcc};
+ {SeqIdsAcc, MsgIdsAcc, false};
{ack_index_and_store, MsgId, SeqId} ->
- {[SeqId | SeqIdsAcc], [MsgId | MsgIdsAcc]}
+ {[SeqId | SeqIdsAcc], [MsgId | MsgIdsAcc], true}
end,
+ {_SeqId, StateN1} = publish(Msg, true, MsgOnDisk, StateN),
{SeqIdsAcc1, MsgIdsAcc1, StateN1}
end, {[], [], State}, MsgsWithAckTags),
IndexState1 = case SeqIds of
@@ -416,8 +428,13 @@ requeue(MsgsWithAckTags, State) ->
end,
State1 #vqstate { index_state = IndexState1 }.
-tx_publish(Msg = #basic_message { is_persistent = true }, State) ->
- true = maybe_write_msg_to_disk(true, false, Msg),
+tx_publish(Msg = #basic_message { is_persistent = true, guid = MsgId },
+ State) ->
+ MsgStatus = #msg_status {
+ msg = Msg, msg_id = MsgId, seq_id = undefined, is_persistent = true,
+ is_delivered = false, msg_on_disk = false, index_on_disk = false },
+ #msg_status { msg_on_disk = true } =
+ maybe_write_msg_to_disk(false, MsgStatus),
State;
tx_publish(_Msg, State) ->
State.
@@ -457,7 +474,7 @@ tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms} }) ->
lists:foldl(
fun (Msg = #basic_message { is_persistent = IsPersistent },
{SeqIdsAcc, StateN}) ->
- {SeqId, StateN1} = publish(Msg, false, true, StateN),
+ {SeqId, StateN1} = publish(Msg, false, IsPersistent, StateN),
SeqIdsAcc1 = case IsPersistent of
true -> [SeqId | SeqIdsAcc];
false -> SeqIdsAcc
@@ -478,22 +495,25 @@ flush_journal(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state =
rabbit_queue_index:flush_journal(IndexState) }.
-status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4,
+status(#vqstate { q1 = Q1, q2 = {Q2Len, _Q2},
+ delta = Delta, q3 = {Q3Len, _Q3}, q4 = Q4,
len = Len, on_sync = {_, _, From},
target_ram_msg_count = TargetRamMsgCount,
ram_msg_count = RamMsgCount,
+ ram_index_count = RamIndexCount,
avg_egress_rate = AvgEgressRate,
avg_ingress_rate = AvgIngressRate,
next_seq_id = NextSeqId }) ->
[ {q1, queue:len(Q1)},
- {q2, queue:len(Q2)},
- {gamma, Gamma},
- {q3, queue:len(Q3)},
+ {q2, Q2Len},
+ {delta, Delta},
+ {q3, Q3Len},
{q4, queue:len(Q4)},
{len, Len},
{outstanding_txns, length(From)},
{target_ram_msg_count, TargetRamMsgCount},
{ram_msg_count, RamMsgCount},
+ {ram_index_count, RamIndexCount},
{avg_egress_rate, AvgEgressRate},
{avg_ingress_rate, AvgIngressRate},
{next_seq_id, NextSeqId} ].
@@ -511,23 +531,45 @@ persistent_msg_ids(Pubs) ->
[MsgId || Obj = #basic_message { guid = MsgId } <- Pubs,
Obj #basic_message.is_persistent].
-entry_salient_details(#alpha { msg = #basic_message { guid = MsgId },
- seq_id = SeqId, is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk }) ->
- {MsgId, SeqId, IsDelivered, MsgOnDisk, IndexOnDisk};
-entry_salient_details(#beta { msg_id = MsgId, seq_id = SeqId,
- is_delivered = IsDelivered,
- index_on_disk = IndexOnDisk }) ->
- {MsgId, SeqId, IsDelivered, true, IndexOnDisk}.
-
betas_from_segment_entries(List, SeqIdLimit) ->
- queue:from_list([#beta { msg_id = MsgId, seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- index_on_disk = true }
- || {MsgId, SeqId, IsPersistent, IsDelivered} <- List,
- SeqId < SeqIdLimit ]).
+ List1 = [#msg_status { msg = undefined,
+ msg_id = MsgId,
+ seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true
+ }
+ || {MsgId, SeqId, IsPersistent, IsDelivered} <- List,
+ SeqId < SeqIdLimit ],
+ {length(List1), queue:from_list([{true, queue:from_list(List1)}])}.
+
+join_betas({HeadLen, Head}, {TailLen, Tail}) ->
+ {HeadLen + TailLen, join_betas1(Head, Tail)}.
+
+join_betas1(Head, Tail) ->
+ case {queue:out_r(Head), queue:out(Tail)} of
+ {{empty, _Head}, _} ->
+ Tail;
+ {_, {empty, _Tail}} ->
+ Head;
+ {{{value, {IndexOnDisk, InnerQHead}}, Head1},
+ {{value, {IndexOnDisk, InnerQTail}}, Tail1}} ->
+ queue:join(
+ queue:in({IndexOnDisk,
+ queue:join(InnerQHead, InnerQTail)}, Head1),
+ Tail1);
+ {_, _} -> queue:join(Head, Tail)
+ end.
+
+grab_beta(Gen, Q) ->
+ case Gen(Q) of
+ {empty, _Q} ->
+ empty;
+ {{value, {_IndexOnDisk, InnerQ}}, _Q} ->
+ {{value, MsgStatus}, _InnerQ} = Gen(InnerQ),
+ MsgStatus
+ end.
read_index_segment(SeqId, IndexState) ->
SeqId1 = SeqId + rabbit_queue_index:segment_size(),
@@ -541,39 +583,39 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) ->
content = rabbit_binary_parser:clear_decoded_content(
rabbit_binary_generator:ensure_content_encoded(Content)) }.
-%% the first arg is the older gamma
-combine_gammas(#gamma { count = 0 }, #gamma { count = 0 }) ->
- ?BLANK_GAMMA;
-combine_gammas(#gamma { count = 0 }, #gamma { } = B) -> B;
-combine_gammas(#gamma { } = A, #gamma { count = 0 }) -> A;
-combine_gammas(#gamma { start_seq_id = SeqIdLow, count = CountLow},
- #gamma { start_seq_id = SeqIdHigh, count = CountHigh,
+%% the first arg is the older delta
+combine_deltas(#delta { count = 0 }, #delta { count = 0 }) ->
+ ?BLANK_DELTA;
+combine_deltas(#delta { count = 0 }, #delta { } = B) -> B;
+combine_deltas(#delta { } = A, #delta { count = 0 }) -> A;
+combine_deltas(#delta { start_seq_id = SeqIdLow, count = CountLow},
+ #delta { start_seq_id = SeqIdHigh, count = CountHigh,
end_seq_id = SeqIdEnd }) ->
true = SeqIdLow =< SeqIdHigh, %% ASSERTION
Count = CountLow + CountHigh,
true = Count =< SeqIdEnd - SeqIdLow, %% ASSERTION
- #gamma { start_seq_id = SeqIdLow, count = Count, end_seq_id = SeqIdEnd }.
+ #delta { start_seq_id = SeqIdLow, count = Count, end_seq_id = SeqIdEnd }.
%%----------------------------------------------------------------------------
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-delete1(NextSeqId, Count, GammaSeqId, IndexState)
- when GammaSeqId >= NextSeqId ->
+delete1(NextSeqId, Count, DeltaSeqId, IndexState)
+ when DeltaSeqId >= NextSeqId ->
{Count, IndexState};
-delete1(NextSeqId, Count, GammaSeqId, IndexState) ->
- Gamma1SeqId = GammaSeqId + rabbit_queue_index:segment_size(),
- case rabbit_queue_index:read_segment_entries(GammaSeqId, IndexState) of
+delete1(NextSeqId, Count, DeltaSeqId, IndexState) ->
+ Delta1SeqId = DeltaSeqId + rabbit_queue_index:segment_size(),
+ case rabbit_queue_index:read_segment_entries(DeltaSeqId, IndexState) of
{[], IndexState1} ->
- delete1(NextSeqId, Count, Gamma1SeqId, IndexState1);
+ delete1(NextSeqId, Count, Delta1SeqId, IndexState1);
{List, IndexState1} ->
- Q = betas_from_segment_entries(List, Gamma1SeqId),
+ {QCount, Q} = betas_from_segment_entries(List, Delta1SeqId),
{QCount, IndexState2} = remove_queue_entries(Q, IndexState1),
- delete1(NextSeqId, Count + QCount, Gamma1SeqId, IndexState2)
+ delete1(NextSeqId, Count + QCount, Delta1SeqId, IndexState2)
end.
-purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) ->
- case queue:is_empty(Q3) of
+purge1(Count, State = #vqstate { q3 = {Q3Len, Q3}, index_state = IndexState }) ->
+ case 0 == Q3Len of
true ->
{Q1Count, IndexState1} =
remove_queue_entries(State #vqstate.q1, IndexState),
@@ -582,30 +624,31 @@ purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) ->
false ->
{Q3Count, IndexState1} = remove_queue_entries(Q3, IndexState),
purge1(Count + Q3Count,
- maybe_gammas_to_betas(
+ maybe_deltas_to_betas(
State #vqstate { index_state = IndexState1,
- q3 = queue:new() }))
+ q3 = {0, queue:new()} }))
end.
remove_queue_entries(Q, IndexState) ->
{Count, MsgIds, SeqIds, IndexState1} =
lists:foldl(
- fun (Entry, {CountN, MsgIdsAcc, SeqIdsAcc, IndexStateN}) ->
- {MsgId, SeqId, IsDelivered, MsgOnDisk, IndexOnDisk} =
- entry_salient_details(Entry),
+ fun (#msg_status { msg_id = MsgId, seq_id = SeqId,
+ is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk },
+ {CountN, MsgIdsAcc, SeqIdsAcc, IndexStateN}) ->
+ MsgIdsAcc1 = case MsgOnDisk of
+ true -> [MsgId | MsgIdsAcc];
+ false -> MsgIdsAcc
+ end,
+ SeqIdsAcc1 = case IndexOnDisk of
+ true -> [SeqId | SeqIdsAcc];
+ false -> SeqIdsAcc
+ end,
IndexStateN1 = case IndexOnDisk andalso not IsDelivered of
true -> rabbit_queue_index:write_delivered(
SeqId, IndexStateN);
false -> IndexStateN
end,
- SeqIdsAcc1 = case IndexOnDisk of
- true -> [SeqId | SeqIdsAcc];
- false -> SeqIdsAcc
- end,
- MsgIdsAcc1 = case MsgOnDisk of
- true -> [MsgId | MsgIdsAcc];
- false -> MsgIdsAcc
- end,
{CountN + 1, MsgIdsAcc1, SeqIdsAcc1, IndexStateN1}
%% we need to write the delivered records in order otherwise
%% we upset the qi. So don't reverse.
@@ -621,45 +664,56 @@ remove_queue_entries(Q, IndexState) ->
end,
{Count, IndexState2}.
-fetch_from_q3_or_gamma(State = #vqstate {
- q1 = Q1, q2 = Q2, gamma = #gamma { count = GammaCount },
- q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount,
+fetch_from_q3_or_delta(State = #vqstate {
+ q1 = Q1, q2 = {Q2Len, _Q2}, delta = #delta { count = DeltaCount },
+ q3 = {Q3Len, Q3}, q4 = Q4, ram_msg_count = RamMsgCount,
+ ram_index_count = RamIndexCount,
msg_store_read_state = MSCState }) ->
case queue:out(Q3) of
{empty, _Q3} ->
- 0 = GammaCount, %% ASSERTION
- true = queue:is_empty(Q2), %% ASSERTION
+ 0 = DeltaCount, %% ASSERTION
+ 0 = Q2Len, %% ASSERTION
+ 0 = Q3Len, %% ASSERTION
true = queue:is_empty(Q1), %% ASSERTION
{empty, State};
- {{value,
- #beta { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
- is_persistent = IsPersistent, index_on_disk = IndexOnDisk }},
- Q3a} ->
+ {{value, {IndexOnDisk, InnerQ}}, Q3a} ->
+ {{value, MsgStatus = #msg_status {
+ msg = undefined, msg_id = MsgId,
+ is_persistent = IsPersistent
+ }}, InnerQ1} = queue:out(InnerQ),
+ Q3LenB = Q3Len - 1,
+ Q3b = {Q3LenB, case queue:is_empty(InnerQ1) of
+ true -> Q3a;
+ false -> queue:in_r({IndexOnDisk, InnerQ1}, Q3a)
+ end},
{{ok, Msg = #basic_message { is_persistent = IsPersistent,
- guid = MsgId }}, MSCState1} =
+ guid = MsgId }}, MSCState1} =
rabbit_msg_store:read(MsgId, MSCState),
- Q4a = queue:in(
- #alpha { msg = Msg, seq_id = SeqId,
- is_delivered = IsDelivered, msg_on_disk = true,
- index_on_disk = IndexOnDisk }, Q4),
- State1 = State #vqstate { q3 = Q3a, q4 = Q4a,
+ Q4a = queue:in(MsgStatus #msg_status { msg = Msg }, Q4),
+ RamIndexCount1 = case IndexOnDisk of
+ true -> RamIndexCount;
+ false -> RamIndexCount - 1
+ end,
+ true = RamIndexCount1 >= 0, %% ASSERTION
+ State1 = State #vqstate { q3 = Q3b, q4 = Q4a,
ram_msg_count = RamMsgCount + 1,
+ ram_index_count = RamIndexCount1,
msg_store_read_state = MSCState1 },
State2 =
- case {queue:is_empty(Q3a), 0 == GammaCount} of
+ case {0 == Q3LenB, 0 == DeltaCount} of
{true, true} ->
- %% q3 is now empty, it wasn't before; gamma is
+ %% q3 is now empty, it wasn't before; delta is
%% still empty. So q2 must be empty, and q1
%% can now be joined onto q4
- true = queue:is_empty(Q2), %% ASSERTION
+ 0 = Q2Len, %% ASSERTION
State1 #vqstate { q1 = queue:new(),
q4 = queue:join(Q4a, Q1) };
{true, false} ->
- maybe_gammas_to_betas(State1);
+ maybe_deltas_to_betas(State1);
{false, _} ->
%% q3 still isn't empty, we've not touched
- %% gamma, so the invariants between q1, q2,
- %% gamma and q3 are maintained
+ %% delta, so the invariants between q1, q2,
+ %% delta and q3 are maintained
State1
end,
fetch(State2)
@@ -673,7 +727,7 @@ reduce_memory_use(State =
#vqstate { target_ram_msg_count = TargetRamMsgCount }) ->
State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)),
case TargetRamMsgCount of
- 0 -> push_betas_to_gammas(State1);
+ 0 -> push_betas_to_deltas(State1);
_ -> State1
end.
@@ -683,7 +737,7 @@ reduce_memory_use(State =
test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount,
ram_msg_count = RamMsgCount,
- q1 = Q1, q3 = Q3 }) ->
+ q1 = Q1, q3 = {_Q3Len, Q3} }) ->
case TargetRamMsgCount of
undefined ->
msg;
@@ -691,17 +745,19 @@ test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount,
case queue:out(Q3) of
{empty, _Q3} ->
%% if TargetRamMsgCount == 0, we know we have no
- %% alphas. If q3 is empty then gamma must be empty
+ %% alphas. If q3 is empty then delta must be empty
%% too, so create a beta, which should end up in
%% q3
index;
- {{value, #beta { seq_id = OldSeqId }}, _Q3a} ->
- %% Don't look at the current gamma as it may be
+ {{value, {_IndexOnDisk, InnerQ}}, _Q3a} ->
+ {{value, #msg_status { seq_id = OldSeqId }}, _InnerQ} =
+ queue:out(InnerQ),
+ %% Don't look at the current delta as it may be
%% empty. If the SeqId is still within the current
%% segment, it'll be a beta, else it'll go into
- %% gamma
+ %% delta
case SeqId >= rabbit_queue_index:next_segment_boundary(OldSeqId) of
- true -> neither;
+ true -> neither;
false -> index
end
end;
@@ -716,178 +772,198 @@ test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount,
end
end.
-publish(Msg, IsDelivered, PersistentMsgsAlreadyOnDisk,
- State = #vqstate { next_seq_id = SeqId, len = Len,
- in_counter = InCount }) ->
- {SeqId, publish(test_keep_msg_in_ram(SeqId, State), Msg, SeqId, IsDelivered,
- PersistentMsgsAlreadyOnDisk,
+publish(Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId },
+ IsDelivered, MsgOnDisk, State =
+ #vqstate { next_seq_id = SeqId, len = Len, in_counter = InCount }) ->
+ MsgStatus = #msg_status {
+ msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent,
+ is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
+ index_on_disk = false },
+ {SeqId, publish(test_keep_msg_in_ram(SeqId, State), MsgStatus,
State #vqstate { next_seq_id = SeqId + 1, len = Len + 1,
in_counter = InCount + 1 })}.
-publish(msg, Msg = #basic_message { guid = MsgId,
- is_persistent = IsPersistent },
- SeqId, IsDelivered, PersistentMsgsAlreadyOnDisk,
- State = #vqstate { index_state = IndexState,
- ram_msg_count = RamMsgCount }) ->
- MsgOnDisk =
- maybe_write_msg_to_disk(false, PersistentMsgsAlreadyOnDisk, Msg),
- {IndexOnDisk, IndexState1} =
- maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId,
- IsDelivered, IndexState),
- Entry = #alpha { msg = Msg, seq_id = SeqId, is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk },
+publish(msg, MsgStatus, State = #vqstate { index_state = IndexState,
+ ram_msg_count = RamMsgCount }) ->
+ MsgStatus1 = maybe_write_msg_to_disk(false, MsgStatus),
+ {MsgStatus2, IndexState1} =
+ maybe_write_index_to_disk(false, MsgStatus1, IndexState),
State1 = State #vqstate { ram_msg_count = RamMsgCount + 1,
index_state = IndexState1 },
- store_alpha_entry(Entry, State1);
-
-publish(index, Msg = #basic_message { guid = MsgId,
- is_persistent = IsPersistent },
- SeqId, IsDelivered, PersistentMsgsAlreadyOnDisk,
- State = #vqstate { index_state = IndexState, q1 = Q1 }) ->
- true = maybe_write_msg_to_disk(true, PersistentMsgsAlreadyOnDisk, Msg),
- {IndexOnDisk, IndexState1} =
- maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId,
- IsDelivered, IndexState),
- Entry = #beta { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
- is_persistent = IsPersistent, index_on_disk = IndexOnDisk },
- State1 = State #vqstate { index_state = IndexState1 },
+ store_alpha_entry(MsgStatus2, State1);
+
+publish(index, MsgStatus, State =
+ #vqstate { index_state = IndexState, q1 = Q1,
+ ram_index_count = RamIndexCount,
+ target_ram_msg_count = TargetRamMsgCount }) ->
+ MsgStatus1 = #msg_status { msg_on_disk = true } =
+ maybe_write_msg_to_disk(true, MsgStatus),
+ ForceIndex = case TargetRamMsgCount of
+ undefined ->
+ false;
+ _ ->
+ RamIndexCount >= (?RAM_INDEX_TARGET_RATIO *
+ TargetRamMsgCount)
+ end,
+ {MsgStatus2, IndexState1} =
+ maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
+ RamIndexCount1 = case MsgStatus2 #msg_status.index_on_disk of
+ true -> RamIndexCount;
+ false -> RamIndexCount + 1
+ end,
+ State1 = State #vqstate { index_state = IndexState1,
+ ram_index_count = RamIndexCount1 },
true = queue:is_empty(Q1), %% ASSERTION
- store_beta_entry(Entry, State1);
-
-publish(neither, Msg = #basic_message { guid = MsgId,
- is_persistent = IsPersistent },
- SeqId, IsDelivered, PersistentMsgsAlreadyOnDisk,
- State = #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2,
- gamma = Gamma }) ->
- true = maybe_write_msg_to_disk(true, PersistentMsgsAlreadyOnDisk, Msg),
- {true, IndexState1} =
- maybe_write_index_to_disk(true, IsPersistent, MsgId, SeqId,
- IsDelivered, IndexState),
- true = queue:is_empty(Q1) andalso queue:is_empty(Q2), %% ASSERTION
- %% gamma may be empty, seq_id > next_segment_boundary from q3
+ store_beta_entry(MsgStatus2, State1);
+
+publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State =
+ #vqstate { index_state = IndexState, q1 = Q1, q2 = {Q2Len, _Q2},
+ delta = Delta }) ->
+ MsgStatus1 = #msg_status { msg_on_disk = true } =
+ maybe_write_msg_to_disk(true, MsgStatus),
+ {#msg_status { index_on_disk = true }, IndexState1} =
+ maybe_write_index_to_disk(true, MsgStatus1, IndexState),
+ true = queue:is_empty(Q1) andalso 0 == Q2Len, %% ASSERTION
+ %% delta may be empty, seq_id > next_segment_boundary from q3
%% head, so we need to find where the segment boundary is before
%% or equal to seq_id
- GammaSeqId = rabbit_queue_index:next_segment_boundary(SeqId) -
+ DeltaSeqId = rabbit_queue_index:next_segment_boundary(SeqId) -
rabbit_queue_index:segment_size(),
- Gamma1 = #gamma { start_seq_id = GammaSeqId, count = 1,
+ Delta1 = #delta { start_seq_id = DeltaSeqId, count = 1,
end_seq_id = SeqId + 1 },
State #vqstate { index_state = IndexState1,
- gamma = combine_gammas(Gamma, Gamma1) }.
-
-store_alpha_entry(Entry = #alpha {}, State =
- #vqstate { q1 = Q1, q2 = Q2,
- gamma = #gamma { count = GammaCount },
- q3 = Q3, q4 = Q4 }) ->
- case queue:is_empty(Q2) andalso GammaCount == 0 andalso
- queue:is_empty(Q3) of
- true ->
- State #vqstate { q4 = queue:in(Entry, Q4) };
- false ->
- maybe_push_q1_to_betas(State #vqstate { q1 = queue:in(Entry, Q1) })
+ delta = combine_deltas(Delta, Delta1) }.
+
+store_alpha_entry(MsgStatus, State =
+ #vqstate { q1 = Q1, q2 = {Q2Len, _Q2},
+ delta = #delta { count = DeltaCount },
+ q3 = {Q3Len, _Q3}, q4 = Q4 }) ->
+ case 0 == Q2Len andalso 0 == DeltaCount andalso 0 == Q3Len of
+ true -> true = queue:is_empty(Q1), %% ASSERTION
+ State #vqstate { q4 = queue:in(MsgStatus, Q4) };
+ false -> maybe_push_q1_to_betas(
+ State #vqstate { q1 = queue:in(MsgStatus, Q1) })
end.
-store_beta_entry(Entry = #beta {}, State =
- #vqstate { q2 = Q2, gamma = #gamma { count = GammaCount },
- q3 = Q3 }) ->
- case GammaCount == 0 of
- true -> State #vqstate { q3 = queue:in(Entry, Q3) };
- false -> State #vqstate { q2 = queue:in(Entry, Q2) }
+store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true },
+ State = #vqstate { q2 = {Q2Len, Q2},
+ delta = #delta { count = DeltaCount },
+ q3 = {Q3Len, Q3} }) ->
+ MsgStatus1 = MsgStatus #msg_status { msg = undefined },
+ case DeltaCount == 0 of
+ true -> State #vqstate { q3 = {Q3Len + 1,
+ store_beta_entry1(
+ fun queue:out_r/1, fun queue:in/2,
+ MsgStatus1, Q3)} };
+ false -> State #vqstate { q2 = {Q2Len + 1,
+ store_beta_entry1(
+ fun queue:out_r/1, fun queue:in/2,
+ MsgStatus1, Q2)} }
end.
-%% Bool IsPersistent PersistentMsgsAlreadyOnDisk | WriteToDisk?
-%% -----------------------------------------------+-------------
-%% false false false | false 1
-%% false true false | true 2
-%% false false true | false 3
-%% false true true | false 4
-%% true false false | true 5
-%% true true false | true 6
-%% true false true | true 7
-%% true true true | false 8
-
-%% (Bool and not (IsPersistent and PersistentMsgsAlreadyOnDisk)) or | 5 6 7
-%% (IsPersistent and (not PersistentMsgsAlreadyOnDisk)) | 2 6
-maybe_write_msg_to_disk(Bool, PersistentMsgsAlreadyOnDisk,
- Msg = #basic_message { guid = MsgId,
- is_persistent = IsPersistent })
- when (Bool andalso not (IsPersistent andalso PersistentMsgsAlreadyOnDisk))
- orelse (IsPersistent andalso not PersistentMsgsAlreadyOnDisk) ->
+store_beta_entry1(Gen, Cons, MsgStatus =
+ #msg_status { index_on_disk = IndexOnDisk }, Q) ->
+ case Gen(Q) of
+ {{value, {IndexOnDisk, InnerQ}}, QTail} ->
+ Cons({IndexOnDisk, Cons(MsgStatus, InnerQ)}, QTail);
+ {_EmptyOrNotIndexOnDisk, _QTail} ->
+ Cons({IndexOnDisk, Cons(MsgStatus, queue:new())}, Q)
+ end.
+
+maybe_write_msg_to_disk(_Force, MsgStatus =
+ #msg_status { msg_on_disk = true }) ->
+ MsgStatus;
+maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
+ msg = Msg, msg_id = MsgId,
+ is_persistent = IsPersistent })
+ when Force orelse IsPersistent ->
ok = rabbit_msg_store:write(MsgId, ensure_binary_properties(Msg)),
- true;
-maybe_write_msg_to_disk(_Bool, true, #basic_message { is_persistent = true }) ->
- true;
-maybe_write_msg_to_disk(_Bool, _PersistentMsgsAlreadyOnDisk, _Msg) ->
- false.
-
-maybe_write_index_to_disk(Bool, IsPersistent, MsgId, SeqId, IsDelivered,
- IndexState) when Bool orelse IsPersistent ->
+ MsgStatus #msg_status { msg_on_disk = true };
+maybe_write_msg_to_disk(_Force, MsgStatus) ->
+ MsgStatus.
+
+maybe_write_index_to_disk(_Force, MsgStatus =
+ #msg_status { index_on_disk = true }, IndexState) ->
+ true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
+ {MsgStatus, IndexState};
+maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
+ msg_id = MsgId, seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered }, IndexState)
+ when Force orelse IsPersistent ->
+ true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
IndexState1 = rabbit_queue_index:write_published(
MsgId, SeqId, IsPersistent, IndexState),
- {true, case IsDelivered of
- true -> rabbit_queue_index:write_delivered(SeqId, IndexState1);
- false -> IndexState1
- end};
-maybe_write_index_to_disk(_Bool, _IsPersistent, _MsgId, _SeqId, _IsDelivered,
- IndexState) ->
- {false, IndexState}.
+ {MsgStatus #msg_status { index_on_disk = true },
+ case IsDelivered of
+ true -> rabbit_queue_index:write_delivered(SeqId, IndexState1);
+ false -> IndexState1
+ end};
+maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
+ {MsgStatus, IndexState}.
%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------
-maybe_gammas_to_betas(State = #vqstate { gamma = #gamma { count = 0 } }) ->
+maybe_deltas_to_betas(State = #vqstate { delta = #delta { count = 0 } }) ->
State;
-maybe_gammas_to_betas(State =
- #vqstate { index_state = IndexState, q2 = Q2, q3 = Q3,
- target_ram_msg_count = TargetRamMsgCount,
- gamma = #gamma { start_seq_id = GammaSeqId,
- count = GammaCount,
- end_seq_id = GammaSeqIdEnd }}
- ) ->
- case (not queue:is_empty(Q3)) andalso 0 == TargetRamMsgCount of
+maybe_deltas_to_betas(
+ State = #vqstate { index_state = IndexState,
+ q2 = Q2All, q3 = {Q3Len, _Q3} = Q3All,
+ target_ram_msg_count = TargetRamMsgCount,
+ delta = #delta { start_seq_id = DeltaSeqId,
+ count = DeltaCount,
+ end_seq_id = DeltaSeqIdEnd }}) ->
+ case (0 < Q3Len) andalso (0 == TargetRamMsgCount) of
true ->
State;
false ->
%% either q3 is empty, in which case we load at least one
%% segment, or TargetRamMsgCount > 0, meaning we should
%% really be holding all the betas in memory.
- {List, IndexState1, Gamma1SeqId} =
- read_index_segment(GammaSeqId, IndexState),
+ {List, IndexState1, Delta1SeqId} =
+ read_index_segment(DeltaSeqId, IndexState),
State1 = State #vqstate { index_state = IndexState1 },
%% length(List) may be < segment_size because of acks. But
%% it can't be []
- Q3b = betas_from_segment_entries(List, GammaSeqIdEnd),
- Q3a = queue:join(Q3, Q3b),
- case GammaCount - queue:len(Q3b) of
+ Q3bAll = {Q3bLen, _Q3b} =
+ betas_from_segment_entries(List, DeltaSeqIdEnd),
+ Q3a = join_betas(Q3All, Q3bAll),
+ case DeltaCount - Q3bLen of
0 ->
- %% gamma is now empty, but it wasn't before, so
+ %% delta is now empty, but it wasn't before, so
%% can now join q2 onto q3
- State1 #vqstate { gamma = ?BLANK_GAMMA,
- q2 = queue:new(),
- q3 = queue:join(Q3a, Q2) };
+ State1 #vqstate { delta = ?BLANK_DELTA,
+ q2 = {0, queue:new()},
+ q3 = join_betas(Q3a, Q2All) };
N when N > 0 ->
State1 #vqstate {
q3 = Q3a,
- gamma = #gamma { start_seq_id = Gamma1SeqId,
+ delta = #delta { start_seq_id = Delta1SeqId,
count = N,
- end_seq_id = GammaSeqIdEnd } }
+ end_seq_id = DeltaSeqIdEnd } }
end
end.
maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) ->
maybe_push_alphas_to_betas(
fun queue:out/1,
- fun (Beta, Q1a, State1) ->
- %% these could legally go to q3 if gamma and q2 are empty
- store_beta_entry(Beta, State1 #vqstate { q1 = Q1a })
+ fun (MsgStatus, Q1a, State1) ->
+ %% these could legally go to q3 if delta and q2 are empty
+ store_beta_entry(MsgStatus, State1 #vqstate { q1 = Q1a })
end, Q1, State).
maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) ->
maybe_push_alphas_to_betas(
fun queue:out_r/1,
- fun (Beta, Q4a, State1 = #vqstate { q3 = Q3 }) ->
+ fun (MsgStatus, Q4a, State1 = #vqstate { q3 = {Q3Len, Q3} }) ->
+ MsgStatus1 = MsgStatus #msg_status { msg = undefined },
%% these must go to q3
- State1 #vqstate { q3 = queue:in_r(Beta, Q3), q4 = Q4a }
+ State1 #vqstate { q3 = {Q3Len + 1,
+ store_beta_entry1(
+ fun queue:out/1, fun queue:in_r/2,
+ MsgStatus1, Q3)}, q4 = Q4a }
end, Q4, State).
maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, State =
@@ -895,112 +971,159 @@ maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, State =
target_ram_msg_count = TargetRamMsgCount })
when TargetRamMsgCount == undefined orelse TargetRamMsgCount >= RamMsgCount ->
State;
-maybe_push_alphas_to_betas(Generator, Consumer, Q, State =
- #vqstate { ram_msg_count = RamMsgCount }) ->
+maybe_push_alphas_to_betas(
+ Generator, Consumer, Q, State =
+ #vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount,
+ target_ram_msg_count = TargetRamMsgCount,
+ index_state = IndexState }) ->
case Generator(Q) of
{empty, _Q} -> State;
- {{value,
- #alpha { msg = Msg = #basic_message { guid = MsgId,
- is_persistent = IsPersistent },
- seq_id = SeqId, is_delivered = IsDelivered,
- index_on_disk = IndexOnDisk }},
- Qa} ->
- true = maybe_write_msg_to_disk(true, true, Msg),
- Beta = #beta { msg_id = MsgId, seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- index_on_disk = IndexOnDisk },
- State1 = State #vqstate { ram_msg_count = RamMsgCount - 1 },
+ {{value, MsgStatus}, Qa} ->
+ MsgStatus1 = maybe_write_msg_to_disk(true, MsgStatus),
+ ForceIndex = case TargetRamMsgCount of
+ undefined ->
+ false;
+ _ ->
+ RamIndexCount >= (?RAM_INDEX_TARGET_RATIO *
+ TargetRamMsgCount)
+ end,
+ {MsgStatus2, IndexState1} =
+ maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
+ RamIndexCount1 = case MsgStatus2 #msg_status.index_on_disk of
+ true -> RamIndexCount;
+ false -> RamIndexCount + 1
+ end,
+ State1 = State #vqstate { ram_msg_count = RamMsgCount - 1,
+ ram_index_count = RamIndexCount1,
+ index_state = IndexState1 },
maybe_push_alphas_to_betas(Generator, Consumer, Qa,
- Consumer(Beta, Qa, State1))
+ Consumer(MsgStatus2, Qa, State1))
end.
-push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3,
+push_betas_to_deltas(State = #vqstate { q2 = {Q2Len, Q2}, delta = Delta,
+ q3 = {Q3Len, Q3},
+ ram_index_count = RamIndexCount,
index_state = IndexState }) ->
%% HighSeqId is high in the sense that it must be higher than the
- %% seq_id in Gamma, but it's also the lowest of the betas that we
- %% transfer from q2 to gamma.
- {HighSeqId, Len1, Q2a, IndexState1} =
- push_betas_to_gammas(fun queue:out/1, undefined, Q2, IndexState),
+ %% seq_id in Delta, but it's also the lowest of the betas that we
+ %% transfer from q2 to delta.
+ {HighSeqId, Q2Len, Q2a, RamIndexCount1, IndexState1} =
+ push_betas_to_deltas(
+ fun queue:out/1,
+ fun (IndexOnDisk, InnerQ, Q) ->
+ join_betas1(queue:from_list([{IndexOnDisk, InnerQ}]), Q)
+ end, undefined, Q2, RamIndexCount, IndexState),
+ true = queue:is_empty(Q2a), %% ASSERTION
EndSeqId = case queue:out_r(Q2) of
- {empty, _Q2} -> undefined;
- {{value, #beta { seq_id = EndSeqId1 }}, _Q2} -> EndSeqId1 + 1
+ {empty, _Q2} ->
+ undefined;
+ {{value, {_IndexOnDisk, InnerQ}}, _Q2} ->
+ {{value, #msg_status { seq_id = EndSeqId1 }}, _InnerQ} =
+ queue:out_r(InnerQ),
+ EndSeqId1 + 1
end,
- Gamma1 = #gamma { start_seq_id = Gamma1SeqId } =
- combine_gammas(Gamma, #gamma { start_seq_id = HighSeqId,
- count = Len1,
+ Delta1 = #delta { start_seq_id = Delta1SeqId } =
+ combine_deltas(Delta, #delta { start_seq_id = HighSeqId,
+ count = Q2Len,
end_seq_id = EndSeqId }),
- State1 = State #vqstate { q2 = Q2a, gamma = Gamma1,
- index_state = IndexState1 },
+ State1 = State #vqstate { q2 = {0, Q2a}, delta = Delta1,
+ index_state = IndexState1,
+ ram_index_count = RamIndexCount1 },
case queue:out(Q3) of
- {empty, _Q3} -> State1;
- {{value, #beta { seq_id = SeqId }}, _Q3a} ->
- {{value, #beta { seq_id = SeqIdMax }}, _Q3b} = queue:out_r(Q3),
+ {empty, _Q3} ->
+ State1;
+ {{value, {_IndexOnDisk1, InnerQ1}}, _Q3} ->
+ {{value, #msg_status { seq_id = SeqId }}, _InnerQ1} =
+ queue:out(InnerQ1),
+ #msg_status { seq_id = SeqIdMax } =
+ grab_beta(fun queue:out_r/1, Q3),
Limit = rabbit_queue_index:next_segment_boundary(SeqId),
%% ASSERTION
- true = Gamma1SeqId == undefined orelse Gamma1SeqId > SeqIdMax,
+ true = Delta1SeqId == undefined orelse Delta1SeqId > SeqIdMax,
case SeqIdMax < Limit of
true -> %% already only holding LTE one segment indices in q3
State1;
false ->
- %% ASSERTION (sadly large!)
- %% This says that if Gamma1SeqId /= undefined then
- %% the gap from Limit to Gamma1SeqId is an integer
+ %% ASSERTION
+ %% This says that if Delta1SeqId /= undefined then
+ %% the gap from Limit to Delta1SeqId is an integer
%% multiple of segment_size
- 0 = case Gamma1SeqId of
+ 0 = case Delta1SeqId of
undefined -> 0;
- _ -> (Gamma1SeqId - Limit) rem
+ _ -> (Delta1SeqId - Limit) rem
rabbit_queue_index:segment_size()
end,
%% SeqIdMax is low in the sense that it must be
- %% lower than the seq_id in gamma1, in fact either
- %% gamma1 has undefined as its seq_id or there
+ %% lower than the seq_id in delta1, in fact either
+ %% delta1 has undefined as its seq_id or there
%% does not exist a seq_id X s.t. X > SeqIdMax and
- %% X < gamma1's seq_id (would be +1 if it wasn't
+ %% X < delta1's seq_id (would be +1 if it wasn't
%% for the possibility of gaps in the seq_ids).
%% But because we use queue:out_r, SeqIdMax is
%% actually also the highest seq_id of the betas we
- %% transfer from q3 to gammas.
- {SeqIdMax, Len2, Q3b, IndexState2} =
- push_betas_to_gammas(fun queue:out_r/1, Limit, Q3,
- IndexState1),
- Gamma2 = combine_gammas(#gamma { start_seq_id = Limit,
+ %% transfer from q3 to deltas.
+ {SeqIdMax, Len2, Q3b, RamIndexCount2, IndexState2} =
+ push_betas_to_deltas(
+ fun queue:out_r/1,
+ fun (IndexOnDisk, InnerQ, Q) ->
+ join_betas1(Q, queue:from_list(
+ [{IndexOnDisk, InnerQ}]))
+ end, Limit, Q3, RamIndexCount1, IndexState1),
+ Delta2 = combine_deltas(#delta { start_seq_id = Limit,
count = Len2,
end_seq_id = SeqIdMax+1 },
- Gamma1),
- State1 #vqstate { q3 = Q3b, gamma = Gamma2,
- index_state = IndexState2 }
+ Delta1),
+ State1 #vqstate { q3 = {Q3Len - Len2, Q3b}, delta = Delta2,
+ index_state = IndexState2,
+ ram_index_count = RamIndexCount2 }
end
end.
-push_betas_to_gammas(Generator, Limit, Q, IndexState) ->
+push_betas_to_deltas(
+ Generator, Consumer, Limit, Q, RamIndexCount, IndexState) ->
case Generator(Q) of
- {empty, Qa} -> {undefined, 0, Qa, IndexState};
- {{value, #beta { seq_id = SeqId }}, _Qa} ->
- {Count, Qb, IndexState1} =
- push_betas_to_gammas(Generator, Limit, Q, 0, IndexState),
- {SeqId, Count, Qb, IndexState1}
+ {empty, Qa} -> {undefined, 0, Qa, RamIndexCount, IndexState};
+ {{value, {IndexOnDisk, InnerQ}}, Qa} ->
+ {{value, #msg_status { seq_id = SeqId }}, _Qb} = Generator(InnerQ),
+ {Count, Qb, RamIndexCount1, IndexState1} =
+ push_betas_to_deltas(
+ Generator, Consumer, Limit, IndexOnDisk, InnerQ, Qa, 0,
+ RamIndexCount, IndexState),
+ {SeqId, Count, Qb, RamIndexCount1, IndexState1}
end.
-push_betas_to_gammas(Generator, Limit, Q, Count, IndexState) ->
+push_betas_to_deltas(
+ Generator, Consumer, Limit, Q, Count, RamIndexCount, IndexState) ->
case Generator(Q) of
- {empty, Qa} -> {Count, Qa, IndexState};
- {{value, #beta { seq_id = SeqId }}, _Qa}
+ {empty, Qa} ->
+ {Count, Qa, RamIndexCount, IndexState};
+ {{value, {IndexOnDisk, InnerQ}}, Qa} ->
+ push_betas_to_deltas(
+ Generator, Consumer, Limit, IndexOnDisk, InnerQ, Qa, Count,
+ RamIndexCount, IndexState)
+ end.
+
+push_betas_to_deltas(Generator, Consumer, Limit, IndexOnDisk, InnerQ, Q,
+ Count, RamIndexCount, IndexState) ->
+ case Generator(InnerQ) of
+ {empty, _InnerQ} ->
+ push_betas_to_deltas(Generator, Consumer, Limit, Q, Count,
+ RamIndexCount, IndexState);
+ {{value, #msg_status { seq_id = SeqId }}, _InnerQ}
when Limit /= undefined andalso SeqId < Limit ->
- {Count, Q, IndexState};
- {{value, #beta { msg_id = MsgId, seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- index_on_disk = IndexOnDisk}}, Qa} ->
- IndexState1 =
+ {Count, Consumer(IndexOnDisk, InnerQ, Q), RamIndexCount,
+ IndexState};
+ {{value, MsgStatus}, InnerQa} ->
+ {RamIndexCount1, IndexState1} =
case IndexOnDisk of
- true -> IndexState;
+ true -> {RamIndexCount, IndexState};
false ->
- {true, IndexState2} =
- maybe_write_index_to_disk(
- true, IsPersistent, MsgId,
- SeqId, IsDelivered, IndexState),
- IndexState2
+ {#msg_status { index_on_disk = true }, IndexState2} =
+ maybe_write_index_to_disk(true, MsgStatus,
+ IndexState),
+ {RamIndexCount - 1, IndexState2}
end,
- push_betas_to_gammas(Generator, Limit, Qa, Count + 1, IndexState1)
+ push_betas_to_deltas(
+ Generator, Consumer, Limit, IndexOnDisk, InnerQa, Q, Count + 1,
+ RamIndexCount1, IndexState1)
end.