diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_tests.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 286 |
2 files changed, 132 insertions, 161 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 606c4fe816..44c9b49905 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2189,12 +2189,7 @@ test_variable_queue_requeue(VQ0) -> Seq = lists:seq(1, Count), VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish(false, Count, VQ1), - {VQ3, Acks} = lists:foldl( - fun (_N, {VQN, AckTags}) -> - {{#basic_message{}, false, AckTag, _}, VQM} = - rabbit_variable_queue:fetch(true, VQN), - {VQM, [AckTag | AckTags]} - end, {VQ2, []}, Seq), + {VQ3, Acks} = variable_queue_fetch(Count, false, false, Count, VQ2), Subset = lists:foldl(fun ({Ack, N}, Acc) when N rem Interval == 0 -> [Ack | Acc]; (_, Acc) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 03004e102c..60c3dfd2d3 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -248,7 +248,6 @@ ram_msg_count, ram_msg_count_prev, ram_ack_count_prev, - ram_index_count, out_counter, in_counter, rates, @@ -336,7 +335,6 @@ target_ram_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), - ram_index_count :: non_neg_integer(), out_counter :: non_neg_integer(), in_counter :: non_neg_integer(), rates :: rates(), @@ -490,7 +488,6 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState3, len = 0, ram_msg_count = 0, - ram_index_count = 0, persistent_count = PCount1 })}. publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, @@ -718,6 +715,7 @@ needs_timeout(State) -> fun (_Quota, State1) -> {0, State1} end, fun (_Quota, State1) -> State1 end, fun (_Quota, State1) -> {0, State1} end, + fun null_gamma_delta/1, State) of {true, _State} -> idle; {false, _State} -> false @@ -725,6 +723,21 @@ needs_timeout(State) -> true -> timed end. +null_gamma_delta(#vqstate { q2 = Q2, q3 = Q3 } = State) -> + {null_gamma_delta_msg(?QUEUE:out(Q2), ?QUEUE:out(Q2), + fun (SeqId) -> SeqId end) orelse + null_gamma_delta_msg(?QUEUE:out_r(Q3), ?QUEUE:out(Q3), + fun rabbit_queue_index:next_segment_boundary/1), + State}. + +null_gamma_delta_msg({{value, #msg_status { seq_id = SeqId1, + index_on_disk = true }}, _Q}, + {{value, #msg_status { seq_id = SeqId2 }}, _Q2}, + LimitFun) -> + SeqId1 >= LimitFun(SeqId2); +null_gamma_delta_msg(_, _, _) -> + false. + timeout(State) -> a(reduce_memory_use(confirm_commit_index(State))). @@ -738,7 +751,6 @@ status(#vqstate { ram_ack_index = RAI, target_ram_count = TargetRamCount, ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount, next_seq_id = NextSeqId, persistent_count = PersistentCount, rates = #rates { avg_egress = AvgEgressRate, @@ -755,7 +767,6 @@ status(#vqstate { {target_ram_count , TargetRamCount}, {ram_msg_count , RamMsgCount}, {ram_ack_count , gb_trees:size(RAI)}, - {ram_index_count , RamIndexCount}, {next_seq_id , NextSeqId}, {persistent_count , PersistentCount}, {avg_ingress_rate , AvgIngressRate}, @@ -774,10 +785,9 @@ discard(_Msg, _ChPid, State) -> State. %%---------------------------------------------------------------------------- a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - len = Len, - persistent_count = PersistentCount, - ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount }) -> + len = Len, + persistent_count = PersistentCount, + ram_msg_count = RamMsgCount }) -> E1 = ?QUEUE:is_empty(Q1), E2 = ?QUEUE:is_empty(Q2), ED = Delta#delta.count == 0, @@ -793,7 +803,6 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, true = Len >= 0, true = PersistentCount >= 0, true = RamMsgCount >= 0, - true = RamIndexCount >= 0, State. @@ -910,44 +919,25 @@ betas_from_index_entries(List, TransientThreshold, PA, IndexState) -> {Filtered, rabbit_queue_index:ack( Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. -%% the first arg is the older delta -combine_deltas(?BLANK_DELTA_PATTERN(X), ?BLANK_DELTA_PATTERN(Y)) -> - ?BLANK_DELTA; -combine_deltas(?BLANK_DELTA_PATTERN(X), #delta { start_seq_id = Start, - count = Count, - end_seq_id = End } = B) -> - true = Start + Count =< End, %% ASSERTION - B; -combine_deltas(#delta { start_seq_id = Start, - count = Count, - end_seq_id = End } = A, ?BLANK_DELTA_PATTERN(Y)) -> - true = Start + Count =< End, %% ASSERTION - A; -combine_deltas(#delta { start_seq_id = StartLow, - count = CountLow, - end_seq_id = EndLow }, - #delta { start_seq_id = StartHigh, - count = CountHigh, - end_seq_id = EndHigh }) -> - Count = CountLow + CountHigh, - true = (StartLow =< StartHigh) %% ASSERTIONS - andalso ((StartLow + CountLow) =< EndLow) - andalso ((StartHigh + CountHigh) =< EndHigh) - andalso ((StartLow + Count) =< EndHigh), - #delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }. - -expand_delta(SeqId, Delta) -> - DeltaInc = #delta { start_seq_id = SeqId, - count = 1, - end_seq_id = SeqId + 1 }, - case Delta of - ?BLANK_DELTA -> - DeltaInc; - #delta { start_seq_id = StartSeqId } when SeqId < StartSeqId -> - combine_deltas(DeltaInc, Delta); - #delta { end_seq_id = EndSeqId } when SeqId >= EndSeqId -> - combine_deltas(Delta, DeltaInc) - end. +expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) -> + #delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 }; +expand_delta(SeqId, #delta { start_seq_id = StartSeqId, + count = Count, + end_seq_id = EndSeqId } = Delta) + when SeqId < StartSeqId -> + true = StartSeqId + Count =< EndSeqId, %% ASSERTION + Delta #delta { start_seq_id = SeqId, count = Count + 1 }; +expand_delta(SeqId, #delta { start_seq_id = StartSeqId, + count = Count, + end_seq_id = EndSeqId } = Delta) + when SeqId >= EndSeqId -> + true = StartSeqId + Count =< EndSeqId, %% ASSERTION + Delta #delta { count = Count + 1, end_seq_id = SeqId + 1 }; +expand_delta(_SeqId, #delta { start_seq_id = StartSeqId, + count = Count, + end_seq_id = EndSeqId } = Delta) -> + true = StartSeqId + Count + 1 =< EndSeqId, %% ASSERTION + Delta #delta { count = Count + 1 }. update_rate(Now, Then, Count, {OThen, OCount}) -> %% avg over the current period and the previous @@ -992,7 +982,6 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, ram_msg_count = 0, ram_msg_count_prev = 0, ram_ack_count_prev = 0, - ram_index_count = 0, out_counter = 0, in_counter = 0, rates = blank_rate(Now, DeltaCount1), @@ -1012,12 +1001,10 @@ blank_rate(Timestamp, IngressLength) -> avg_ingress = 0.0, timestamp = Timestamp }. -in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk }, - State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) -> +in_r(MsgStatus = #msg_status { msg = undefined }, + State = #vqstate { q3 = Q3, q4 = Q4 }) -> case ?QUEUE:is_empty(Q4) of - true -> State #vqstate { - q3 = ?QUEUE:in_r(MsgStatus, Q3), - ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }; + true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) } @@ -1335,14 +1322,12 @@ publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) -> {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}. -publish_beta(#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, State) -> - {#msg_status { index_on_disk = IndexOnDisk, msg = Msg} = MsgStatus1, - #vqstate { ram_index_count = RamIndexCount, - ram_msg_count = RamMsgCount } = State1} = - maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, State), +publish_beta(MsgStatus, State) -> + {#msg_status { msg = Msg} = MsgStatus1, + #vqstate { ram_msg_count = RamMsgCount } = State1} = + maybe_write_to_disk(true, false, MsgStatus, State), {MsgStatus1, State1 #vqstate { - ram_msg_count = RamMsgCount + one_if(Msg =/= undefined), - ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }}. + ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}. %% Rebuild queue, inserting sequence ids to maintain ordering queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, MsgPropsFun, State) -> @@ -1375,14 +1360,10 @@ delta_merge([], Delta, MsgIds, _MsgPropsFun, State) -> {Delta, MsgIds, State}; delta_merge(SeqIds, Delta, MsgIds, MsgPropsFun, State) -> lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) -> - {#msg_status { msg_id = MsgId, - index_on_disk = IndexOnDisk, - msg_on_disk = MsgOnDisk} = MsgStatus, - State1} = + {#msg_status { msg_id = MsgId } = MsgStatus, State1} = msg_from_pending_ack(SeqId, MsgPropsFun, State0), {_MsgStatus, State2} = - maybe_write_to_disk(not MsgOnDisk, not IndexOnDisk, - MsgStatus, State1), + maybe_write_to_disk(true, true, MsgStatus, State1), {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2} end, {Delta, MsgIds, State}, SeqIds). @@ -1426,10 +1407,10 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. %% one segment's worth of messages in q3 - and thus would risk %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. -reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun, +reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun, _GammaDeltaFun, State = #vqstate {target_ram_count = infinity}) -> {false, State}; -reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, +reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, GammaDeltaFun, State = #vqstate { ram_ack_index = RamAckIndex, ram_msg_count = RamMsgCount, @@ -1440,7 +1421,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, avg_egress = AvgAckEgress } }) -> - {Reduce, State1} = + {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} = case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), TargetRamCount) of 0 -> {false, State}; @@ -1461,15 +1442,14 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, {true, State2} end, - %% AlphaBetaFun may have produced gammas that are bordering - %% delta. We must ensure that we push these into delta, which is - %% largely a no-op. This is why we call BetaDeltaFun even with a - %% quota of 0. - case chunk_size(State1 #vqstate.ram_index_count, - permitted_beta_count(State1)) of - ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)}; - _ -> {Reduce, BetaDeltaFun(0, State1)} - end. + {Reduce1, State3} = + case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), + permitted_beta_count(State1)) of + ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)}; + _ -> {Reduce, State1} + end, + {Reduce2, State4} = GammaDeltaFun(State3), + {Reduce1 orelse Reduce2, State4}. limit_ram_acks(0, State) -> {0, State}; @@ -1494,23 +1474,20 @@ reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, fun push_betas_to_deltas/2, fun limit_ram_acks/2, + fun push_gammas_to_deltas/1, State), State1. permitted_beta_count(#vqstate { len = 0 }) -> infinity; +permitted_beta_count(#vqstate { target_ram_count = 0 }) -> + rabbit_queue_index:next_segment_boundary(0); permitted_beta_count(#vqstate { len = Len, q1 = Q1, - q3 = Q3, q4 = Q4 }) -> BetaDeltaLen = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4), - Permitted = BetaDeltaLen - trunc(BetaDeltaLen * BetaDeltaLen / Len), - case ?QUEUE:out(Q3) of - {empty, _Q3} -> Permitted; - {{value, #msg_status { seq_id = MinSeqId }}, _Q3} -> - lists:max([Permitted, rabbit_queue_index:next_segment_boundary( - MinSeqId) - MinSeqId]) - end. + lists:max([BetaDeltaLen - ((BetaDeltaLen * BetaDeltaLen) div Len), + rabbit_queue_index:next_segment_boundary(0)]). chunk_size(Current, Permitted) when Permitted =:= infinity orelse Permitted >= Current -> @@ -1518,41 +1495,35 @@ chunk_size(Current, Permitted) chunk_size(Current, Permitted) -> lists:min([Current - Permitted, ?IO_BATCH_SIZE]). -fetch_from_q3(State = #vqstate { - q1 = Q1, - q2 = Q2, - delta = #delta { count = DeltaCount }, - q3 = Q3, - q4 = Q4, - ram_index_count = RamIndexCount}) -> +fetch_from_q3(State = #vqstate { q1 = Q1, + q2 = Q2, + delta = #delta { count = DeltaCount }, + q3 = Q3, + q4 = Q4 }) -> case ?QUEUE:out(Q3) of {empty, _Q3} -> {empty, State}; - {{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk }}, Q3a} -> - RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), - true = RamIndexCount1 >= 0, %% ASSERTION - State1 = State #vqstate { q3 = Q3a, - ram_index_count = RamIndexCount1 }, - State2 = - case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of - {true, true} -> - %% q3 is now empty, it wasn't before; delta is - %% still empty. So q2 must be empty, and we - %% know q4 is empty otherwise we wouldn't be - %% loading from q3. As such, we can just set - %% q4 to Q1. - true = ?QUEUE:is_empty(Q2), %% ASSERTION - true = ?QUEUE:is_empty(Q4), %% ASSERTION - State1 #vqstate { q1 = ?QUEUE:new(), - q4 = Q1 }; - {true, false} -> - maybe_deltas_to_betas(State1); - {false, _} -> - %% q3 still isn't empty, we've not touched - %% delta, so the invariants between q1, q2, - %% delta and q3 are maintained - State1 - end, + {{value, MsgStatus}, Q3a} -> + State1 = State #vqstate { q3 = Q3a }, + State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of + {true, true} -> + %% q3 is now empty, it wasn't before; + %% delta is still empty. So q2 must be + %% empty, and we know q4 is empty + %% otherwise we wouldn't be loading from + %% q3. As such, we can just set q4 to Q1. + true = ?QUEUE:is_empty(Q2), %% ASSERTION + true = ?QUEUE:is_empty(Q4), %% ASSERTION + State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 }; + {true, false} -> + maybe_deltas_to_betas(State1); + {false, _} -> + %% q3 still isn't empty, we've not + %% touched delta, so the invariants + %% between q1, q2, delta and q3 are + %% maintained + State1 + end, {loaded, {MsgStatus, State2}} end. @@ -1639,42 +1610,35 @@ maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> {empty, _Q} -> {Quota, State}; {{value, MsgStatus}, Qa} -> - {MsgStatus1 = #msg_status { msg_on_disk = true, - index_on_disk = IndexOnDisk }, - State1 = #vqstate { ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount }} = + {MsgStatus1 = #msg_status { msg_on_disk = true }, + State1 = #vqstate { ram_msg_count = RamMsgCount }} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk), - State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1, - ram_index_count = RamIndexCount1 }, + State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1 }, maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa, Consumer(MsgStatus2, Qa, State2)) end. -push_betas_to_deltas(Quota, - State = #vqstate { q2 = Q2, - delta = Delta, - q3 = Q3, - index_state = IndexState, - ram_index_count = RamIndexCount }) -> - PushState = {Quota, Delta, RamIndexCount, IndexState}, - {Q2a, PushState1} = push_betas_to_deltas( +push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, + delta = Delta, + q3 = Q3, + index_state = IndexState }) -> + PushState = {Quota, Delta, IndexState}, + {Q2a, PushState1} = push_with_limit( fun ?QUEUE:out/1, fun (Q2MinSeqId) -> Q2MinSeqId end, - Q2, PushState), - {Q3a, PushState2} = push_betas_to_deltas( + Q2, fun push_betas_to_deltas1/4, PushState), + {Q3a, PushState2} = push_with_limit( fun ?QUEUE:out_r/1, fun rabbit_queue_index:next_segment_boundary/1, - Q3, PushState1), - {_, Delta1, RamIndexCount1, IndexState1} = PushState2, - State #vqstate { q2 = Q2a, - delta = Delta1, - q3 = Q3a, - index_state = IndexState1, - ram_index_count = RamIndexCount1 }. - -push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> + Q3, fun push_betas_to_deltas1/4, PushState1), + {_, Delta1, IndexState1} = PushState2, + State #vqstate { q2 = Q2a, + delta = Delta1, + q3 = Q3a, + index_state = IndexState1 }. + +push_with_limit(Generator, LimitFun, Q, PushFun, PushState) -> case ?QUEUE:is_empty(Q) of true -> {Q, PushState}; @@ -1684,16 +1648,15 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> Limit = LimitFun(MinSeqId), case MaxSeqId < Limit of true -> {Q, PushState}; - false -> push_betas_to_deltas1(Generator, Limit, Q, PushState) + false -> PushFun(Generator, Limit, Q, PushState) end end. +push_betas_to_deltas1(_Generator, _Limit, Q, + {0, _Delta, _IndexState} = PushState) -> + {Q, PushState}; push_betas_to_deltas1(Generator, Limit, Q, - {0, Delta, RamIndexCount, IndexState}) -> - {Qb, Delta1} = push_gammas_to_deltas(Generator, Limit, Q, Delta), - {Qb, {0, Delta1, RamIndexCount, IndexState}}; -push_betas_to_deltas1(Generator, Limit, Q, - {Quota, Delta, RamIndexCount, IndexState} = PushState) -> + {Quota, Delta, IndexState} = PushState) -> case Generator(Q) of {empty, _Q} -> {Q, PushState}; @@ -1702,20 +1665,33 @@ push_betas_to_deltas1(Generator, Limit, Q, {Q, PushState}; {{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk, seq_id = SeqId }}, Qa} -> - {Quota1, RamIndexCount1, IndexState1} = + {Quota1, IndexState1} = case IndexOnDisk of - true -> {Quota, RamIndexCount, IndexState}; + true -> {Quota, IndexState}; false -> {#msg_status { index_on_disk = true }, IndexState2} = maybe_write_index_to_disk(true, MsgStatus, IndexState), - {Quota - 1, RamIndexCount - 1, IndexState2} + {Quota - 1, IndexState2} end, Delta1 = expand_delta(SeqId, Delta), push_betas_to_deltas1(Generator, Limit, Qa, - {Quota1, Delta1, RamIndexCount1, IndexState1}) + {Quota1, Delta1, IndexState1}) end. +push_gammas_to_deltas(State = #vqstate { q2 = Q2, + delta = Delta, + q3 = Q3 }) -> + {Q2a, Delta1} = push_with_limit( + fun ?QUEUE:out/1, + fun (Q2MinSeqId) -> Q2MinSeqId end, + Q2, fun push_gammas_to_deltas/4, Delta), + {Q3a, Delta2} = push_with_limit( + fun ?QUEUE:out_r/1, + fun rabbit_queue_index:next_segment_boundary/1, + Q3, fun push_gammas_to_deltas/4, Delta1), + {Delta2 =/= Delta, State #vqstate { q2 = Q2a, delta = Delta2, q3 = Q3a }}. + push_gammas_to_deltas(Generator, Limit, Q, Delta) -> case Generator(Q) of {{value, #msg_status { seq_id = SeqId, index_on_disk = true }}, Q1} |
