diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-22 11:53:18 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-22 11:53:18 +0100 |
| commit | f1f88f65323d694ea68366fd8b0dfe3f42b84a61 (patch) | |
| tree | a3f46376103d91309320f22901dc684e93f6422d | |
| parent | 2993e75072b5ba8921d79367bcd8a446a5416f1b (diff) | |
| download | rabbitmq-server-git-f1f88f65323d694ea68366fd8b0dfe3f42b84a61.tar.gz | |
refactor: simplify limit_ram_index
| -rw-r--r-- | src/rabbit_variable_queue.erl | 69 |
1 files changed, 28 insertions, 41 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 25dc5910c7..23879df780 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1204,52 +1204,39 @@ limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) -> ?RAM_INDEX_BATCH_SIZE]), case Reduction < ?RAM_INDEX_BATCH_SIZE of true -> State; - false -> {Reduction1, State1} = - limit_q2_ram_index(Reduction, State), - {_Red2, State2} = - limit_q3_ram_index(Reduction1, State1), - State2 + false -> #vqstate { q2 = Q2, q3 = Q3, + index_state = IndexState } = State, + {Q2a, {Reduction1, IndexState1}} = + limit_ram_index(fun bpqueue:map_fold_filter_l/4, + Q2, {Reduction, IndexState}), + {Q3a, {Reduction2, IndexState2}} = + limit_ram_index(fun bpqueue:map_fold_filter_r/4, + Q3, {Reduction1, IndexState1}), + RamIndexCount1 = RamIndexCount - + (Reduction - Reduction2), + State #vqstate { q2 = Q2a, q3 = Q3a, + index_state = IndexState2, + ram_index_count = RamIndexCount1 } end; true -> State end. -limit_q2_ram_index(Reduction, State = #vqstate { q2 = Q2 }) - when Reduction > 0 -> - {Q2a, Reduction1, State1} = limit_ram_index(fun bpqueue:map_fold_filter_l/4, - Q2, Reduction, State), - {Reduction1, State1 #vqstate { q2 = Q2a }}; -limit_q2_ram_index(Reduction, State) -> - {Reduction, State}. - -limit_q3_ram_index(Reduction, State = #vqstate { q3 = Q3 }) - when Reduction > 0 -> - %% use the _r version so that we prioritise the msgs closest to - %% delta, and least soon to be delivered - {Q3a, Reduction1, State1} = limit_ram_index(fun bpqueue:map_fold_filter_r/4, - Q3, Reduction, State), - {Reduction1, State1 #vqstate { q3 = Q3a }}; -limit_q3_ram_index(Reduction, State) -> - {Reduction, State}. - -limit_ram_index(MapFoldFilterFun, Q, Reduction, - State = #vqstate { index_state = IndexState, - ram_index_count = RamIndexCount }) -> - {Qa, {Reduction1, IndexState1}} = - MapFoldFilterFun( - fun erlang:'not'/1, - fun (MsgStatus, {0, _IndexStateN}) -> - false = MsgStatus #msg_status.index_on_disk, %% ASSERTION - stop; - (MsgStatus, {N, IndexStateN}) when N > 0 -> - false = MsgStatus #msg_status.index_on_disk, %% ASSERTION - {MsgStatus1, IndexStateN1} = - maybe_write_index_to_disk(true, MsgStatus, IndexStateN), - {true, MsgStatus1, {N-1, IndexStateN1}} - end, {Reduction, IndexState}, Q), - RamIndexCount1 = RamIndexCount - (Reduction - Reduction1), - {Qa, Reduction1, State #vqstate { index_state = IndexState1, - ram_index_count = RamIndexCount1 }}. +limit_ram_index(_MapFoldFilterFun, Q, {Reduction, IndexState}) + when Reduction == 0 -> + {Q, {Reduction, IndexState}}; +limit_ram_index(MapFoldFilterFun, Q, {Reduction, IndexState}) -> + MapFoldFilterFun( + fun erlang:'not'/1, + fun (MsgStatus, {0, _IndexStateN}) -> + false = MsgStatus #msg_status.index_on_disk, %% ASSERTION + stop; + (MsgStatus, {N, IndexStateN}) when N > 0 -> + false = MsgStatus #msg_status.index_on_disk, %% ASSERTION + {MsgStatus1, IndexStateN1} = + maybe_write_index_to_disk(true, MsgStatus, IndexStateN), + {true, MsgStatus1, {N-1, IndexStateN1}} + end, {Reduction, IndexState}, Q). maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> State; |
