diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-12 20:55:20 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-12 20:55:20 +0100 |
| commit | 1746889b95a47b592c65f2675a9487f39807dddf (patch) | |
| tree | d906e0da648b084aaa57b3da2a590a75cf97351f /src | |
| parent | 8e9bcf586365bb1be70db2cc10775e18dbe9625a (diff) | |
| download | rabbitmq-server-git-1746889b95a47b592c65f2675a9487f39807dddf.tar.gz | |
Because I've got rid of ram_index_count (i.e. betas count), and because the guard around whether or not betas_to_deltas is now based on q2 and q3 len rather than ram_index_count, we no longer need to have the separate gammas_to_deltas fun, which means we should no longer have the problem with it being called too often as we should be able to rely on the chunk stuff working out.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 68 |
1 files changed, 20 insertions, 48 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 9951359a00..d418a10ff8 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -756,7 +756,6 @@ needs_timeout(State) -> fun (_Quota, State1) -> {0, State1} end, fun (_Quota, State1) -> State1 end, fun (_Quota, State1) -> {0, State1} end, - fun null_gamma_delta/1, State) of {true, _State} -> idle; {false, _State} -> false @@ -764,21 +763,6 @@ needs_timeout(State) -> true -> timed end. -null_gamma_delta(#vqstate { q2 = Q2, q3 = Q3 } = State) -> - {null_gamma_delta_msg(?QUEUE:peek(Q2), ?QUEUE:peek(Q2), - fun (SeqId) -> SeqId end) orelse - null_gamma_delta_msg(?QUEUE:peek_r(Q3), ?QUEUE:peek(Q3), - fun rabbit_queue_index:next_segment_boundary/1), - State}. - -null_gamma_delta_msg({value, #msg_status { seq_id = SeqId1, - index_on_disk = true }}, - {value, #msg_status { seq_id = SeqId2 }}, - LimitFun) -> - LimitFun =:= undefined orelse SeqId1 >= LimitFun(SeqId2); -null_gamma_delta_msg(_, _, _) -> - false. - timeout(State) -> a(reduce_memory_use(confirm_commit_index(State))). @@ -1448,10 +1432,10 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. %% one segment's worth of messages in q3 - and thus would risk %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. -reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun, _GammaDeltaFun, +reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun, State = #vqstate {target_ram_count = infinity}) -> {false, State}; -reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, GammaDeltaFun, +reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, State = #vqstate { ram_ack_index = RamAckIndex, ram_msg_count = RamMsgCount, @@ -1483,14 +1467,11 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, GammaDeltaFun, {true, State2} end, - {Reduce1, State3} = - case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), - permitted_beta_count(State1)) of - ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)}; - _ -> {Reduce, State1} - end, - {Reduce2, State4} = GammaDeltaFun(State3), - {Reduce1 orelse Reduce2, State4}. + case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), + permitted_beta_count(State1)) of + ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)}; + _ -> {Reduce, State1} + end. limit_ram_acks(0, State) -> {0, State}; @@ -1515,7 +1496,6 @@ reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, fun push_betas_to_deltas/2, fun limit_ram_acks/2, - fun push_gammas_to_deltas/1, State), State1. @@ -1664,31 +1644,36 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, q3 = Q3, index_state = IndexState }) -> PushState = {Quota, Delta, IndexState}, - {Q3a, PushState1} = push_with_limit( + {Q3a, PushState1} = push_betas_to_deltas( fun ?QUEUE:out_r/1, fun rabbit_queue_index:next_segment_boundary/1, - Q3, fun push_betas_to_deltas1/4, PushState), - {Q2a, PushState2} = push_with_limit( + Q3, PushState), + {Q2a, PushState2} = push_betas_to_deltas( fun ?QUEUE:out/1, fun (Q2MinSeqId) -> Q2MinSeqId end, - Q2, fun push_betas_to_deltas1/4, PushState1), + Q2, PushState1), {_, Delta1, IndexState1} = PushState2, State #vqstate { q2 = Q2a, delta = Delta1, q3 = Q3a, index_state = IndexState1 }. -push_with_limit(Generator, LimitFun, Q, PushFun, PushState) -> +push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> case ?QUEUE:is_empty(Q) of true -> {Q, PushState}; false -> - {{value, #msg_status { seq_id = MinSeqId }}, _Qa} = ?QUEUE:out(Q), - {{value, #msg_status { seq_id = MaxSeqId }}, _Qb} = ?QUEUE:out_r(Q), + {value, #msg_status { seq_id = MinSeqId }} = ?QUEUE:peek(Q), + {value, #msg_status { seq_id = MaxSeqId }} = ?QUEUE:peek_r(Q), Limit = LimitFun(MinSeqId), case MaxSeqId < Limit of true -> {Q, PushState}; - false -> PushFun(Generator, Limit, Q, PushState) + false -> {Q1, {Quota, Delta, IndexState}} = + push_betas_to_deltas1( + Generator, Limit, Q, PushState), + {Q2, Delta1} = + push_gammas_to_deltas(Generator, Limit, Q1, Delta), + {Q2, {Quota, Delta1, IndexState}} end end. @@ -1719,19 +1704,6 @@ push_betas_to_deltas1(Generator, Limit, Q, {Quota1, Delta1, IndexState1}) end. -push_gammas_to_deltas(State = #vqstate { q2 = Q2, - delta = Delta, - q3 = Q3 }) -> - {Q2a, Delta1} = push_with_limit( - fun ?QUEUE:out/1, - fun (Q2MinSeqId) -> Q2MinSeqId end, - Q2, fun push_gammas_to_deltas/4, Delta), - {Q3a, Delta2} = push_with_limit( - fun ?QUEUE:out_r/1, - fun rabbit_queue_index:next_segment_boundary/1, - Q3, fun push_gammas_to_deltas/4, Delta1), - {Delta2 =/= Delta, State #vqstate { q2 = Q2a, delta = Delta2, q3 = Q3a }}. - push_gammas_to_deltas(Generator, Limit, Q, Delta) -> case Generator(Q) of {{value, #msg_status { seq_id = SeqId, index_on_disk = true }}, Q1} |
