diff options
| -rw-r--r-- | src/bpqueue.erl | 70 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 83 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 56 |
3 files changed, 206 insertions, 3 deletions
diff --git a/src/bpqueue.erl b/src/bpqueue.erl index 5e7471f71d..7237473f32 100644 --- a/src/bpqueue.erl +++ b/src/bpqueue.erl @@ -37,7 +37,8 @@ %% prefix. len/1 returns the flattened length of the queue and is O(1) -export([new/0, is_empty/1, len/1, in/3, in_r/3, out/1, out_r/1, join/2, - fold/3, from_list/1, to_list/1]). + fold/3, from_list/1, to_list/1, map_fold_filter_l/4, + map_fold_filter_r/4]). %%---------------------------------------------------------------------------- @@ -60,6 +61,14 @@ -spec(fold/3 :: (fun ((prefix(), value(), B) -> B), B, bpqueue()) -> B). -spec(from_list/1 :: ([{prefix(), [value()]}]) -> bpqueue()). -spec(to_list/1 :: (bpqueue()) -> [{prefix(), [value()]}]). +-spec(map_fold_filter_l/4 :: + (fun ((prefix()) -> boolean()), + fun ((value(), B) -> {prefix(), value(), B}), B, bpqueue()) -> + {bpqueue(), B}). +-spec(map_fold_filter_r/4 :: + (fun ((prefix()) -> boolean()), + fun ((value(), B) -> {prefix(), value(), B}), B, bpqueue()) -> + {bpqueue(), B}). -endif. @@ -183,3 +192,62 @@ to_list({_N, Q}) -> to_list1({Prefix, InnerQ}) -> {Prefix, queue:to_list(InnerQ)}. + +%% map_fold_filter_[lr](FilterFun, Fun, Init, BPQ) -> {BPQ, Init} +%% where FilterFun(Prefix) -> boolean() +%% Fun(Value, Init) -> {Prefix, Value, Init} +%% +%% The filter fun allows you to skip very quickly over blocks that +%% you're not interested in. Such blocks appear in the resulting bpq +%% without modification. The Fun is then used both to map the value, +%% which also allows you to change the prefix (and thus block) of the +%% value, and also to modify the Init/Acc (just like a fold). +map_fold_filter_l(_PFilter, _Fun, Init, BPQ = {0, _Q}) -> + {BPQ, Init}; +map_fold_filter_l(PFilter, Fun, Init, {_N, Q}) -> + map_fold_filter_l1(PFilter, Fun, Init, Q, new()). + +map_fold_filter_l1(PFilter, Fun, Init, Q, QNew) -> + case queue:out(Q) of + {empty, _Q} -> + {QNew, Init}; + {{value, {Prefix, InnerQ}}, Q1} -> + InnerList = queue:to_list(InnerQ), + {Init1, QNew1} = + case PFilter(Prefix) of + true -> + lists:foldl( + fun (Value, {Acc, QNew2}) -> + {Prefix1, Value1, Acc1} = Fun(Value, Acc), + {Acc1, in(Prefix1, Value1, QNew2)} + end, {Init, QNew}, InnerList); + false -> + {Init, join(QNew, from_list([{Prefix, InnerList}]))} + end, + map_fold_filter_l1(PFilter, Fun, Init1, Q1, QNew1) + end. + +map_fold_filter_r(_PFilter, _Fun, Init, BPQ = {0, _Q}) -> + {BPQ, Init}; +map_fold_filter_r(PFilter, Fun, Init, {_N, Q}) -> + map_fold_filter_r1(PFilter, Fun, Init, Q, new()). + +map_fold_filter_r1(PFilter, Fun, Init, Q, QNew) -> + case queue:out_r(Q) of + {empty, _Q} -> + {QNew, Init}; + {{value, {Prefix, InnerQ}}, Q1} -> + InnerList = queue:to_list(InnerQ), + {Init1, QNew1} = + case PFilter(Prefix) of + true -> + lists:foldr( + fun (Value, {Acc, QNew2}) -> + {Prefix1, Value1, Acc1} = Fun(Value, Acc), + {Acc1, in_r(Prefix1, Value1, QNew2)} + end, {Init, QNew}, InnerList); + false -> + {Init, join(from_list([{Prefix, InnerList}]), QNew)} + end, + map_fold_filter_r1(PFilter, Fun, Init1, Q1, QNew1) + end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 16332f325c..45b480177d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -55,6 +55,7 @@ all_tests() -> passed = test_queue_index(), passed = test_variable_queue(), passed = test_priority_queue(), + passed = test_bpqueue(), passed = test_unfold(), passed = test_parsing(), passed = test_topic_matching(), @@ -181,6 +182,88 @@ test_priority_queue(Q) -> priority_queue:to_list(Q), priority_queue_out_all(Q)}. +test_bpqueue() -> + Q = bpqueue:new(), + true = bpqueue:is_empty(Q), + 0 = bpqueue:len(Q), + + Q1 = bpqueue:in(bar, 3, bpqueue:in(foo, 2, bpqueue:in(foo, 1, Q))), + false = bpqueue:is_empty(Q1), + 3 = bpqueue:len(Q1), + [{foo, [1, 2]}, {bar, [3]}] = bpqueue:to_list(Q1), + + Q2 = bpqueue:in_r(bar, 3, bpqueue:in_r(foo, 2, bpqueue:in_r(foo, 1, Q))), + false = bpqueue:is_empty(Q2), + 3 = bpqueue:len(Q2), + [{bar, [3]}, {foo, [2, 1]}] = bpqueue:to_list(Q2), + + {empty, _Q} = bpqueue:out(Q), + {{value, foo, 1}, Q3} = bpqueue:out(Q1), + {{value, foo, 2}, Q4} = bpqueue:out(Q3), + {{value, bar, 3}, _Q5} = bpqueue:out(Q4), + + {empty, _Q} = bpqueue:out_r(Q), + {{value, foo, 1}, Q6} = bpqueue:out_r(Q2), + {{value, foo, 2}, Q7} = bpqueue:out_r(Q6), + {{value, bar, 3}, _Q8} = bpqueue:out_r(Q7), + + [{foo, [1, 2]}, {bar, [3]}] = bpqueue:to_list(bpqueue:join(Q, Q1)), + [{bar, [3]}, {foo, [2, 1]}] = bpqueue:to_list(bpqueue:join(Q2, Q)), + [{foo, [1, 2]}, {bar, [3, 3]}, {foo, [2,1]}] = + bpqueue:to_list(bpqueue:join(Q1, Q2)), + + [{foo, [1, 2]}, {bar, [3]}, {foo, [1, 2]}, {bar, [3]}] = + bpqueue:to_list(bpqueue:join(Q1, Q1)), + + [{foo, [1, 2]}, {bar, [3]}] = + bpqueue:to_list( + bpqueue:from_list( + [{x, []}, {foo, [1]}, {y, []}, {foo, [2]}, {bar, [3]}, {z, []}])), + + [{undefined, [a]}] = bpqueue:to_list(bpqueue:from_list([{undefined, [a]}])), + + {4, [a,b,c,d]} = + bpqueue:fold( + fun (Prefix, Value, {Prefix, Acc}) -> + {Prefix + 1, [Value | Acc]} + end, + {0, []}, bpqueue:from_list([{0,[d]}, {1,[c]}, {2,[b]}, {3,[a]}])), + + ok = bpqueue:fold(fun (Prefix, Value, ok) -> {error, Prefix, Value} end, + ok, Q), + + [] = bpqueue:to_list(Q), + + F1 = fun (Qn) -> + bpqueue:map_fold_filter_l( + fun (foo) -> true; + (_) -> false + end, + fun (V, Num) -> {bar, -V, V - Num} end, + 0, Qn) + end, + + F2 = fun (Qn) -> + bpqueue:map_fold_filter_r( + fun (foo) -> true; + (_) -> false + end, + fun (V, Num) -> {bar, -V, V - Num} end, + 0, Qn) + end, + + {Q9, 1} = F1(Q1), %% 2 - (1 - 0) == 1 + [{bar, [-1, -2, 3]}] = bpqueue:to_list(Q9), + {Q10, -1} = F2(Q1), %% 1 - (2 - 0) == -1 + [{bar, [-1, -2, 3]}] = bpqueue:to_list(Q10), + + {Q11, 0} = F1(Q), + [] = bpqueue:to_list(Q11), + {Q12, 0} = F2(Q), + [] = bpqueue:to_list(Q12), + + passed. + test_simple_n_element_queue(N) -> Items = lists:seq(1, N), Q = priority_queue_in_all(priority_queue:new(), Items), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8205f79f66..e821cf6b72 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -78,7 +78,8 @@ index_on_disk }). --define(RAM_INDEX_TARGET_RATIO, 32768). +-define(RAM_INDEX_TARGET_RATIO, 1024). +-define(RAM_INDEX_MAX_WORK, 32). %%---------------------------------------------------------------------------- @@ -226,7 +227,8 @@ terminate(State = #vqstate { index_state = IndexState, State #vqstate { index_state = rabbit_queue_index:terminate(IndexState) }. publish(Msg, State) -> - publish(Msg, false, false, State). + State1 = limit_ram_index(State), + publish(Msg, false, false, State1). publish_delivered(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, @@ -870,6 +872,56 @@ maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> %% Phase changes %%---------------------------------------------------------------------------- +limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount, + target_ram_msg_count = TargetRamMsgCount }) + when RamIndexCount > ?RAM_INDEX_TARGET_RATIO * TargetRamMsgCount -> + Reduction = lists:min([?RAM_INDEX_MAX_WORK, + RamIndexCount - (?RAM_INDEX_TARGET_RATIO * + TargetRamMsgCount)]), + io:format("~p~n", [Reduction]), + {Reduction1, State1} = limit_q2_ram_index(Reduction, State), + {_Reduction2, State2} = limit_q3_ram_index(Reduction1, State1), + State2; +limit_ram_index(State) -> + State. + +limit_q2_ram_index(Reduction, State = #vqstate { q2 = Q2 }) + when Reduction > 0 -> + {Q2a, Reduction1, State1} = limit_ram_index(fun bpqueue:map_fold_filter_l/4, + Q2, Reduction, State), + {Reduction1, State1 #vqstate { q2 = Q2a }}; +limit_q2_ram_index(Reduction, State) -> + {Reduction, State}. + +limit_q3_ram_index(Reduction, State = #vqstate { q3 = Q3 }) + when Reduction > 0 -> + %% use the _r version so that we prioritise the msgs closest to + %% delta, and least soon to be delivered + {Q3a, Reduction1, State1} = limit_ram_index(fun bpqueue:map_fold_filter_r/4, + Q3, Reduction, State), + {Reduction1, State1 #vqstate { q3 = Q3a }}; +limit_q3_ram_index(Reduction, State) -> + {Reduction, State}. + +limit_ram_index(MapFoldFilterFun, Q, Reduction, State = + #vqstate { ram_index_count = RamIndexCount, + index_state = IndexState }) -> + {Qa, {Reduction1, IndexState1}} = + MapFoldFilterFun( + fun erlang:'not'/1, + fun (MsgStatus, {0, _IndexStateN} = Acc) -> + false = MsgStatus #msg_status.index_on_disk, %% ASSERTION + {false, MsgStatus, Acc}; + (MsgStatus, {N, IndexStateN}) when N > 0 -> + false = MsgStatus #msg_status.index_on_disk, %% ASSERTION + {MsgStatus1, IndexStateN1} = + maybe_write_index_to_disk(true, MsgStatus, IndexStateN), + {true, MsgStatus1, {N-1, IndexStateN1}} + end, {Reduction, IndexState}, Q), + RamIndexCount1 = RamIndexCount - (Reduction - Reduction1), + {Qa, Reduction1, State #vqstate { index_state = IndexState1, + ram_index_count = RamIndexCount1 }}. + maybe_deltas_to_betas(State = #vqstate { delta = #delta { count = 0 } }) -> State; maybe_deltas_to_betas( |
