summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-06-22 11:53:18 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-06-22 11:53:18 +0100
commitf1f88f65323d694ea68366fd8b0dfe3f42b84a61 (patch)
treea3f46376103d91309320f22901dc684e93f6422d
parent2993e75072b5ba8921d79367bcd8a446a5416f1b (diff)
downloadrabbitmq-server-git-f1f88f65323d694ea68366fd8b0dfe3f42b84a61.tar.gz
refactor: simplify limit_ram_index
-rw-r--r--src/rabbit_variable_queue.erl69
1 files changed, 28 insertions, 41 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 25dc5910c7..23879df780 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1204,52 +1204,39 @@ limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) ->
?RAM_INDEX_BATCH_SIZE]),
case Reduction < ?RAM_INDEX_BATCH_SIZE of
true -> State;
- false -> {Reduction1, State1} =
- limit_q2_ram_index(Reduction, State),
- {_Red2, State2} =
- limit_q3_ram_index(Reduction1, State1),
- State2
+ false -> #vqstate { q2 = Q2, q3 = Q3,
+ index_state = IndexState } = State,
+ {Q2a, {Reduction1, IndexState1}} =
+ limit_ram_index(fun bpqueue:map_fold_filter_l/4,
+ Q2, {Reduction, IndexState}),
+ {Q3a, {Reduction2, IndexState2}} =
+ limit_ram_index(fun bpqueue:map_fold_filter_r/4,
+ Q3, {Reduction1, IndexState1}),
+ RamIndexCount1 = RamIndexCount -
+ (Reduction - Reduction2),
+ State #vqstate { q2 = Q2a, q3 = Q3a,
+ index_state = IndexState2,
+ ram_index_count = RamIndexCount1 }
end;
true ->
State
end.
-limit_q2_ram_index(Reduction, State = #vqstate { q2 = Q2 })
- when Reduction > 0 ->
- {Q2a, Reduction1, State1} = limit_ram_index(fun bpqueue:map_fold_filter_l/4,
- Q2, Reduction, State),
- {Reduction1, State1 #vqstate { q2 = Q2a }};
-limit_q2_ram_index(Reduction, State) ->
- {Reduction, State}.
-
-limit_q3_ram_index(Reduction, State = #vqstate { q3 = Q3 })
- when Reduction > 0 ->
- %% use the _r version so that we prioritise the msgs closest to
- %% delta, and least soon to be delivered
- {Q3a, Reduction1, State1} = limit_ram_index(fun bpqueue:map_fold_filter_r/4,
- Q3, Reduction, State),
- {Reduction1, State1 #vqstate { q3 = Q3a }};
-limit_q3_ram_index(Reduction, State) ->
- {Reduction, State}.
-
-limit_ram_index(MapFoldFilterFun, Q, Reduction,
- State = #vqstate { index_state = IndexState,
- ram_index_count = RamIndexCount }) ->
- {Qa, {Reduction1, IndexState1}} =
- MapFoldFilterFun(
- fun erlang:'not'/1,
- fun (MsgStatus, {0, _IndexStateN}) ->
- false = MsgStatus #msg_status.index_on_disk, %% ASSERTION
- stop;
- (MsgStatus, {N, IndexStateN}) when N > 0 ->
- false = MsgStatus #msg_status.index_on_disk, %% ASSERTION
- {MsgStatus1, IndexStateN1} =
- maybe_write_index_to_disk(true, MsgStatus, IndexStateN),
- {true, MsgStatus1, {N-1, IndexStateN1}}
- end, {Reduction, IndexState}, Q),
- RamIndexCount1 = RamIndexCount - (Reduction - Reduction1),
- {Qa, Reduction1, State #vqstate { index_state = IndexState1,
- ram_index_count = RamIndexCount1 }}.
+limit_ram_index(_MapFoldFilterFun, Q, {Reduction, IndexState})
+ when Reduction == 0 ->
+ {Q, {Reduction, IndexState}};
+limit_ram_index(MapFoldFilterFun, Q, {Reduction, IndexState}) ->
+ MapFoldFilterFun(
+ fun erlang:'not'/1,
+ fun (MsgStatus, {0, _IndexStateN}) ->
+ false = MsgStatus #msg_status.index_on_disk, %% ASSERTION
+ stop;
+ (MsgStatus, {N, IndexStateN}) when N > 0 ->
+ false = MsgStatus #msg_status.index_on_disk, %% ASSERTION
+ {MsgStatus1, IndexStateN1} =
+ maybe_write_index_to_disk(true, MsgStatus, IndexStateN),
+ {true, MsgStatus1, {N-1, IndexStateN1}}
+ end, {Reduction, IndexState}, Q).
maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
State;