diff options
| -rw-r--r-- | src/bpqueue.erl | 164 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 89 |
3 files changed, 192 insertions, 75 deletions
diff --git a/src/bpqueue.erl b/src/bpqueue.erl index 7237473f32..a556ec2342 100644 --- a/src/bpqueue.erl +++ b/src/bpqueue.erl @@ -63,12 +63,12 @@ -spec(to_list/1 :: (bpqueue()) -> [{prefix(), [value()]}]). -spec(map_fold_filter_l/4 :: (fun ((prefix()) -> boolean()), - fun ((value(), B) -> {prefix(), value(), B}), B, bpqueue()) -> - {bpqueue(), B}). + fun ((value(), B) -> ({prefix(), value(), B} | 'stop')), B, + bpqueue()) -> {bpqueue(), B}). -spec(map_fold_filter_r/4 :: (fun ((prefix()) -> boolean()), - fun ((value(), B) -> {prefix(), value(), B}), B, bpqueue()) -> - {bpqueue(), B}). + fun ((value(), B) -> ({prefix(), value(), B} | 'stop')), B, + bpqueue()) -> {bpqueue(), B}). -endif. @@ -107,6 +107,40 @@ in_r(Prefix, Value, {N, Q}) -> queue:in_r({Prefix, queue:in(Value, queue:new())}, Q) end}. +in_q(Prefix, Queue, BPQ = {0, Q}) -> + case queue:len(Queue) of + 0 -> BPQ; + N -> {N, queue:in({Prefix, Queue}, Q)} + end; +in_q(Prefix, Queue, BPQ = {N, Q}) -> + case queue:len(Queue) of + 0 -> BPQ; + M -> {N + M, + case queue:out_r(Q) of + {{value, {Prefix, InnerQ}}, Q1} -> + queue:in({Prefix, queue:join(InnerQ, Queue)}, Q1); + {{value, {_Prefix, _InnerQ}}, _Q1} -> + queue:in({Prefix, Queue}, Q) + end} + end. + +in_q_r(Prefix, Queue, BPQ = {0, Q}) -> + case queue:len(Queue) of + 0 -> BPQ; + N -> {N, queue:in({Prefix, Queue}, Q)} + end; +in_q_r(Prefix, Queue, BPQ = {N, Q}) -> + case queue:len(Queue) of + 0 -> BPQ; + M -> {N + M, + case queue:out(Q) of + {{value, {Prefix, InnerQ}}, Q1} -> + queue:in_r({Prefix, queue:join(Queue, InnerQ)}, Q1); + {{value, {_Prefix, _InnerQ}}, _Q1} -> + queue:in_r({Prefix, Queue}, Q) + end} + end. + out({0, _Q} = BPQ) -> {empty, BPQ}; out({N, Q}) -> @@ -195,7 +229,7 @@ to_list1({Prefix, InnerQ}) -> %% map_fold_filter_[lr](FilterFun, Fun, Init, BPQ) -> {BPQ, Init} %% where FilterFun(Prefix) -> boolean() -%% Fun(Value, Init) -> {Prefix, Value, Init} +%% Fun(Value, Init) -> {Prefix, Value, Init} | stop %% %% The filter fun allows you to skip very quickly over blocks that %% you're not interested in. Such blocks appear in the resulting bpq @@ -204,50 +238,106 @@ to_list1({Prefix, InnerQ}) -> %% value, and also to modify the Init/Acc (just like a fold). map_fold_filter_l(_PFilter, _Fun, Init, BPQ = {0, _Q}) -> {BPQ, Init}; -map_fold_filter_l(PFilter, Fun, Init, {_N, Q}) -> - map_fold_filter_l1(PFilter, Fun, Init, Q, new()). +map_fold_filter_l(PFilter, Fun, Init, {N, Q}) -> + map_fold_filter_l1(N, PFilter, Fun, Init, Q, new()). -map_fold_filter_l1(PFilter, Fun, Init, Q, QNew) -> +map_fold_filter_l1(Len, PFilter, Fun, Init, Q, QNew) -> case queue:out(Q) of {empty, _Q} -> {QNew, Init}; {{value, {Prefix, InnerQ}}, Q1} -> - InnerList = queue:to_list(InnerQ), - {Init1, QNew1} = - case PFilter(Prefix) of - true -> - lists:foldl( - fun (Value, {Acc, QNew2}) -> - {Prefix1, Value1, Acc1} = Fun(Value, Acc), - {Acc1, in(Prefix1, Value1, QNew2)} - end, {Init, QNew}, InnerList); - false -> - {Init, join(QNew, from_list([{Prefix, InnerList}]))} - end, - map_fold_filter_l1(PFilter, Fun, Init1, Q1, QNew1) + case PFilter(Prefix) of + true -> + {Init1, QNew1, Cont} = + map_fold_filter_l2( + Fun, Prefix, Prefix, Init, InnerQ, QNew, queue:new()), + case Cont of + false -> + {join(QNew1, {Len - len(QNew1), Q1}), Init1}; + true -> + map_fold_filter_l1( + Len, PFilter, Fun, Init1, Q1, QNew1) + end; + false -> + map_fold_filter_l1( + Len, PFilter, Fun, Init, Q1, in_q(Prefix, InnerQ, QNew)) + end + end. + +map_fold_filter_l2(Fun, OrigPrefix, Prefix, Init, InnerQ, QNew, InnerQNew) -> + case queue:out(InnerQ) of + {empty, _Q} -> + {Init, in_q(OrigPrefix, InnerQ, + in_q(Prefix, InnerQNew, QNew)), true}; + {{value, Value}, InnerQ1} -> + case Fun(Value, Init) of + stop -> + {Init, in_q(OrigPrefix, InnerQ, + in_q(Prefix, InnerQNew, QNew)), false}; + {Prefix1, Value1, Init1} -> + case Prefix1 =:= Prefix of + true -> + map_fold_filter_l2( + Fun, OrigPrefix, Prefix, Init1, InnerQ1, QNew, + queue:in(Value1, InnerQNew)); + false -> + map_fold_filter_l2( + Fun, OrigPrefix, Prefix1, Init1, InnerQ1, + in_q(Prefix, InnerQNew, QNew), + queue:in(Value1, queue:new())) + end + end end. map_fold_filter_r(_PFilter, _Fun, Init, BPQ = {0, _Q}) -> {BPQ, Init}; -map_fold_filter_r(PFilter, Fun, Init, {_N, Q}) -> - map_fold_filter_r1(PFilter, Fun, Init, Q, new()). +map_fold_filter_r(PFilter, Fun, Init, {N, Q}) -> + map_fold_filter_r1(N, PFilter, Fun, Init, Q, new()). -map_fold_filter_r1(PFilter, Fun, Init, Q, QNew) -> +map_fold_filter_r1(Len, PFilter, Fun, Init, Q, QNew) -> case queue:out_r(Q) of {empty, _Q} -> {QNew, Init}; {{value, {Prefix, InnerQ}}, Q1} -> - InnerList = queue:to_list(InnerQ), - {Init1, QNew1} = - case PFilter(Prefix) of - true -> - lists:foldr( - fun (Value, {Acc, QNew2}) -> - {Prefix1, Value1, Acc1} = Fun(Value, Acc), - {Acc1, in_r(Prefix1, Value1, QNew2)} - end, {Init, QNew}, InnerList); - false -> - {Init, join(from_list([{Prefix, InnerList}]), QNew)} - end, - map_fold_filter_r1(PFilter, Fun, Init1, Q1, QNew1) + case PFilter(Prefix) of + true -> + {Init1, QNew1, Cont} = + map_fold_filter_r2( + Fun, Prefix, Prefix, Init, InnerQ, QNew, queue:new()), + case Cont of + false -> + {join({Len - len(QNew1), Q1}, QNew1), Init1}; + true -> + map_fold_filter_r1( + Len, PFilter, Fun, Init1, Q1, QNew1) + end; + false -> + map_fold_filter_r1( + Len, PFilter, Fun, Init, Q1, in_q_r(Prefix, InnerQ, QNew)) + end + end. + +map_fold_filter_r2(Fun, OrigPrefix, Prefix, Init, InnerQ, QNew, InnerQNew) -> + case queue:out_r(InnerQ) of + {empty, _Q} -> + {Init, in_q_r(OrigPrefix, InnerQ, + in_q_r(Prefix, InnerQNew, QNew)), true}; + {{value, Value}, InnerQ1} -> + case Fun(Value, Init) of + stop -> + {Init, in_q_r(OrigPrefix, InnerQ, + in_q_r(Prefix, InnerQNew, QNew)), false}; + {Prefix1, Value1, Init1} -> + case Prefix1 =:= Prefix of + true -> + map_fold_filter_r2( + Fun, OrigPrefix, Prefix, Init1, InnerQ1, QNew, + queue:in_r(Value1, InnerQNew)); + false -> + map_fold_filter_r2( + Fun, OrigPrefix, Prefix1, Init1, InnerQ1, + in_q_r(Prefix, InnerQNew, QNew), + queue:in(Value1, queue:new())) + end + end end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 45b480177d..291f4cb0e8 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -239,7 +239,8 @@ test_bpqueue() -> fun (foo) -> true; (_) -> false end, - fun (V, Num) -> {bar, -V, V - Num} end, + fun (2, _Num) -> stop; + (V, Num) -> {bar, -V, V - Num} end, 0, Qn) end, @@ -248,14 +249,15 @@ test_bpqueue() -> fun (foo) -> true; (_) -> false end, - fun (V, Num) -> {bar, -V, V - Num} end, + fun (2, _Num) -> stop; + (V, Num) -> {bar, -V, V - Num} end, 0, Qn) end, - {Q9, 1} = F1(Q1), %% 2 - (1 - 0) == 1 - [{bar, [-1, -2, 3]}] = bpqueue:to_list(Q9), - {Q10, -1} = F2(Q1), %% 1 - (2 - 0) == -1 - [{bar, [-1, -2, 3]}] = bpqueue:to_list(Q10), + {Q9, 1} = F1(Q1), + [{bar, [-1]}, {foo, [2]}, {bar, [3]}] = bpqueue:to_list(Q9), + {Q10, 0} = F2(Q1), + [{foo, [1, 2]}, {bar, [3]}] = bpqueue:to_list(Q10), {Q11, 0} = F1(Q), [] = bpqueue:to_list(Q11), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 6c5efbd5f8..a62a90ceae 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -78,8 +78,20 @@ index_on_disk }). --define(RAM_INDEX_TARGET_RATIO, 256). --define(RAM_INDEX_MAX_WORK, 32). +%% If there are N msgs in the q, and M of them are betas, then it is +%% required that RAM_INDEX_BETA_RATIO * (M/N) * M of those have their +%% index on disk. Eg if RAM_INDEX_BETA_RATIO is 1.0, and there are 36 +%% msgs in the queue, of which 12 are betas, then 4 of those betas +%% must have their index on disk. +-define(RAM_INDEX_BETA_RATIO, 0.8). +%% When we discover, on publish, that we should write some indices to +%% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of +%% betas that we must be due to write indices for before we do any +%% work at all. This is both a minimum and a maximum - we don't write +%% fewer than RAM_INDEX_BATCH_SIZE indices out in one go, and we don't +%% write more - we can always come back on the next publish to do +%% more. +-define(RAM_INDEX_BATCH_SIZE, 1024). %%---------------------------------------------------------------------------- @@ -577,6 +589,26 @@ beta_fold_no_index_on_disk(Fun, Init, Q) -> Fun(Value, Acc) end, Init, Q). +permitted_ram_index_count(#vqstate { len = 0 }) -> + undefined; +permitted_ram_index_count(#vqstate { len = Len, q2 = Q2, q3 = Q3 }) -> + case bpqueue:len(Q2) + bpqueue:len(Q3) of + 0 -> + undefined; + BetaLength -> + %% the fraction of the queue that are betas + BetaFrac = BetaLength / Len, + BetaLength - trunc(BetaFrac * BetaLength * ?RAM_INDEX_BETA_RATIO) + end. + + +should_force_index_to_disk(State = + #vqstate { ram_index_count = RamIndexCount }) -> + case permitted_ram_index_count(State) of + undefined -> false; + Permitted -> RamIndexCount >= Permitted + end. + %%---------------------------------------------------------------------------- %% Internal major helpers for Public API %%---------------------------------------------------------------------------- @@ -771,17 +803,10 @@ publish(msg, MsgStatus, State = #vqstate { index_state = IndexState, publish(index, MsgStatus, State = #vqstate { index_state = IndexState, q1 = Q1, - ram_index_count = RamIndexCount, - target_ram_msg_count = TargetRamMsgCount }) -> + ram_index_count = RamIndexCount }) -> MsgStatus1 = #msg_status { msg_on_disk = true } = maybe_write_msg_to_disk(true, MsgStatus), - ForceIndex = case TargetRamMsgCount of - undefined -> - false; - _ -> - RamIndexCount >= (?RAM_INDEX_TARGET_RATIO * - TargetRamMsgCount) - end, + ForceIndex = should_force_index_to_disk(State), {MsgStatus2, IndexState1} = maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), RamIndexCount1 = case MsgStatus2 #msg_status.index_on_disk of @@ -872,17 +897,24 @@ maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> %% Phase changes %%---------------------------------------------------------------------------- -limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount, - target_ram_msg_count = TargetRamMsgCount }) - when RamIndexCount > ?RAM_INDEX_TARGET_RATIO * TargetRamMsgCount -> - Reduction = lists:min([?RAM_INDEX_MAX_WORK, - RamIndexCount - (?RAM_INDEX_TARGET_RATIO * - TargetRamMsgCount)]), - {Reduction1, State1} = limit_q2_ram_index(Reduction, State), - {_Reduction2, State2} = limit_q3_ram_index(Reduction1, State1), - State2; -limit_ram_index(State) -> - State. +limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) -> + case permitted_ram_index_count(State) of + undefined -> + State; + Permitted when RamIndexCount > Permitted -> + Reduction = lists:min([RamIndexCount - Permitted, + ?RAM_INDEX_BATCH_SIZE]), + case Reduction < ?RAM_INDEX_BATCH_SIZE of + true -> + State; + false -> + {Reduction1, State1} = limit_q2_ram_index(Reduction, State), + {_Red2, State2} = limit_q3_ram_index(Reduction1, State1), + State2 + end; + _ -> + State + end. limit_q2_ram_index(Reduction, State = #vqstate { q2 = Q2 }) when Reduction > 0 -> @@ -908,9 +940,9 @@ limit_ram_index(MapFoldFilterFun, Q, Reduction, State = {Qa, {Reduction1, IndexState1}} = MapFoldFilterFun( fun erlang:'not'/1, - fun (MsgStatus, {0, _IndexStateN} = Acc) -> + fun (MsgStatus, {0, _IndexStateN}) -> false = MsgStatus #msg_status.index_on_disk, %% ASSERTION - {false, MsgStatus, Acc}; + stop; (MsgStatus, {N, IndexStateN}) when N > 0 -> false = MsgStatus #msg_status.index_on_disk, %% ASSERTION {MsgStatus1, IndexStateN1} = @@ -986,19 +1018,12 @@ maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, State = maybe_push_alphas_to_betas( Generator, Consumer, Q, State = #vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, - target_ram_msg_count = TargetRamMsgCount, index_state = IndexState }) -> case Generator(Q) of {empty, _Q} -> State; {{value, MsgStatus}, Qa} -> MsgStatus1 = maybe_write_msg_to_disk(true, MsgStatus), - ForceIndex = case TargetRamMsgCount of - undefined -> - false; - _ -> - RamIndexCount >= (?RAM_INDEX_TARGET_RATIO * - TargetRamMsgCount) - end, + ForceIndex = should_force_index_to_disk(State), {MsgStatus2, IndexState1} = maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), RamIndexCount1 = case MsgStatus2 #msg_status.index_on_disk of |
