summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-10-12 20:55:20 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-10-12 20:55:20 +0100
commit1746889b95a47b592c65f2675a9487f39807dddf (patch)
treed906e0da648b084aaa57b3da2a590a75cf97351f /src
parent8e9bcf586365bb1be70db2cc10775e18dbe9625a (diff)
downloadrabbitmq-server-git-1746889b95a47b592c65f2675a9487f39807dddf.tar.gz
Because I've got rid of ram_index_count (i.e. betas count), and because the guard around whether or not betas_to_deltas is now based on q2 and q3 len rather than ram_index_count, we no longer need to have the separate gammas_to_deltas fun, which means we should no longer have the problem with it being called too often as we should be able to rely on the chunk stuff working out.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl68
1 files changed, 20 insertions, 48 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9951359a00..d418a10ff8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -756,7 +756,6 @@ needs_timeout(State) ->
fun (_Quota, State1) -> {0, State1} end,
fun (_Quota, State1) -> State1 end,
fun (_Quota, State1) -> {0, State1} end,
- fun null_gamma_delta/1,
State) of
{true, _State} -> idle;
{false, _State} -> false
@@ -764,21 +763,6 @@ needs_timeout(State) ->
true -> timed
end.
-null_gamma_delta(#vqstate { q2 = Q2, q3 = Q3 } = State) ->
- {null_gamma_delta_msg(?QUEUE:peek(Q2), ?QUEUE:peek(Q2),
- fun (SeqId) -> SeqId end) orelse
- null_gamma_delta_msg(?QUEUE:peek_r(Q3), ?QUEUE:peek(Q3),
- fun rabbit_queue_index:next_segment_boundary/1),
- State}.
-
-null_gamma_delta_msg({value, #msg_status { seq_id = SeqId1,
- index_on_disk = true }},
- {value, #msg_status { seq_id = SeqId2 }},
- LimitFun) ->
- LimitFun =:= undefined orelse SeqId1 >= LimitFun(SeqId2);
-null_gamma_delta_msg(_, _, _) ->
- false.
-
timeout(State) ->
a(reduce_memory_use(confirm_commit_index(State))).
@@ -1448,10 +1432,10 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
%% one segment's worth of messages in q3 - and thus would risk
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
-reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun, _GammaDeltaFun,
+reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun,
State = #vqstate {target_ram_count = infinity}) ->
{false, State};
-reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, GammaDeltaFun,
+reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
State = #vqstate {
ram_ack_index = RamAckIndex,
ram_msg_count = RamMsgCount,
@@ -1483,14 +1467,11 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, GammaDeltaFun,
{true, State2}
end,
- {Reduce1, State3} =
- case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
- permitted_beta_count(State1)) of
- ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)};
- _ -> {Reduce, State1}
- end,
- {Reduce2, State4} = GammaDeltaFun(State3),
- {Reduce1 orelse Reduce2, State4}.
+ case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
+ permitted_beta_count(State1)) of
+ ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)};
+ _ -> {Reduce, State1}
+ end.
limit_ram_acks(0, State) ->
{0, State};
@@ -1515,7 +1496,6 @@ reduce_memory_use(State) ->
{_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
fun push_betas_to_deltas/2,
fun limit_ram_acks/2,
- fun push_gammas_to_deltas/1,
State),
State1.
@@ -1664,31 +1644,36 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
q3 = Q3,
index_state = IndexState }) ->
PushState = {Quota, Delta, IndexState},
- {Q3a, PushState1} = push_with_limit(
+ {Q3a, PushState1} = push_betas_to_deltas(
fun ?QUEUE:out_r/1,
fun rabbit_queue_index:next_segment_boundary/1,
- Q3, fun push_betas_to_deltas1/4, PushState),
- {Q2a, PushState2} = push_with_limit(
+ Q3, PushState),
+ {Q2a, PushState2} = push_betas_to_deltas(
fun ?QUEUE:out/1,
fun (Q2MinSeqId) -> Q2MinSeqId end,
- Q2, fun push_betas_to_deltas1/4, PushState1),
+ Q2, PushState1),
{_, Delta1, IndexState1} = PushState2,
State #vqstate { q2 = Q2a,
delta = Delta1,
q3 = Q3a,
index_state = IndexState1 }.
-push_with_limit(Generator, LimitFun, Q, PushFun, PushState) ->
+push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
case ?QUEUE:is_empty(Q) of
true ->
{Q, PushState};
false ->
- {{value, #msg_status { seq_id = MinSeqId }}, _Qa} = ?QUEUE:out(Q),
- {{value, #msg_status { seq_id = MaxSeqId }}, _Qb} = ?QUEUE:out_r(Q),
+ {value, #msg_status { seq_id = MinSeqId }} = ?QUEUE:peek(Q),
+ {value, #msg_status { seq_id = MaxSeqId }} = ?QUEUE:peek_r(Q),
Limit = LimitFun(MinSeqId),
case MaxSeqId < Limit of
true -> {Q, PushState};
- false -> PushFun(Generator, Limit, Q, PushState)
+ false -> {Q1, {Quota, Delta, IndexState}} =
+ push_betas_to_deltas1(
+ Generator, Limit, Q, PushState),
+ {Q2, Delta1} =
+ push_gammas_to_deltas(Generator, Limit, Q1, Delta),
+ {Q2, {Quota, Delta1, IndexState}}
end
end.
@@ -1719,19 +1704,6 @@ push_betas_to_deltas1(Generator, Limit, Q,
{Quota1, Delta1, IndexState1})
end.
-push_gammas_to_deltas(State = #vqstate { q2 = Q2,
- delta = Delta,
- q3 = Q3 }) ->
- {Q2a, Delta1} = push_with_limit(
- fun ?QUEUE:out/1,
- fun (Q2MinSeqId) -> Q2MinSeqId end,
- Q2, fun push_gammas_to_deltas/4, Delta),
- {Q3a, Delta2} = push_with_limit(
- fun ?QUEUE:out_r/1,
- fun rabbit_queue_index:next_segment_boundary/1,
- Q3, fun push_gammas_to_deltas/4, Delta1),
- {Delta2 =/= Delta, State #vqstate { q2 = Q2a, delta = Delta2, q3 = Q3a }}.
-
push_gammas_to_deltas(Generator, Limit, Q, Delta) ->
case Generator(Q) of
{{value, #msg_status { seq_id = SeqId, index_on_disk = true }}, Q1}