diff options
| -rw-r--r-- | src/priority_queue.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_queue_mode_manager.erl | 176 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 56 |
3 files changed, 112 insertions, 152 deletions
diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 9421f281c9..0c777471ec 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -56,7 +56,7 @@ -module(priority_queue). -export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, - out/1, out/2, pout/1, join/2]). + out/1, join/2]). %%---------------------------------------------------------------------------- @@ -74,8 +74,6 @@ -spec(in/2 :: (any(), pqueue()) -> pqueue()). -spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). -spec(out/1 :: (pqueue()) -> {(empty | {value, any()}), pqueue()}). --spec(out/2 :: (priority(), pqueue()) -> {(empty | {value, any()}), pqueue()}). --spec(pout/1 :: (pqueue()) -> {(empty | {value, any(), priority()}), pqueue()}). -spec(join/2 :: (pqueue(), pqueue()) -> pqueue()). -endif. @@ -151,34 +149,6 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. -out(_Priority, {queue, [], []} = Q) -> - {empty, Q}; -out(Priority, {queue, _, _} = Q) when Priority =< 0 -> - out(Q); -out(_Priority, {queue, _, _} = Q) -> - {empty, Q}; -out(Priority, {pqueue, [{P, _Q} | _Queues]} = Q) when Priority =< (-P) -> - out(Q); -out(_Priority, {pqueue, [_|_]} = Q) -> - {empty, Q}. - -pout({queue, [], []} = Q) -> - {empty, Q}; -pout({queue, _, _} = Q) -> - {{value, V}, Q1} = out(Q), - {{value, V, 0}, Q1}; -pout({pqueue, [{P, Q} | Queues]}) -> - {{value, V}, Q1} = out(Q), - NewQ = case is_empty(Q1) of - true -> case Queues of - [] -> {queue, [], []}; - [{0, OnlyQ}] -> OnlyQ; - [_|_] -> {pqueue, Queues} - end; - false -> {pqueue, [{P, Q1} | Queues]} - end, - {{value, V, -P}, NewQ}. - join(A, {queue, [], []}) -> A; join({queue, [], []}, B) -> diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl index ceb09d928b..194ddf9570 100644 --- a/src/rabbit_queue_mode_manager.erl +++ b/src/rabbit_queue_mode_manager.erl @@ -289,10 +289,10 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, case ActivityNew of active -> StateN; disk -> StateN; - lowrate -> StateN #state { lowrate = - priority_queue:in(Pid, Req, Lazy) }; - hibernate -> StateN #state { hibernate = - queue:in(Pid, Sleepy) } + lowrate -> + StateN #state { lowrate = add_to_lowrate(Pid, Req, Lazy) }; + hibernate -> + StateN #state { hibernate = queue:in(Pid, Sleepy) } end, {noreply, StateN1}; @@ -324,6 +324,10 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +add_to_lowrate(Pid, Alloc, Lazy) -> + Bucket = trunc(math:log(Alloc)), + priority_queue:in({Pid, Bucket, Alloc}, Bucket, Lazy). + find_queue(Pid, Mixed) -> case dict:find(Pid, Mixed) of {ok, Value} -> {mixed, Value}; @@ -331,96 +335,98 @@ find_queue(Pid, Mixed) -> end. tidy_and_sum_lazy(IgnorePid, Lazy, Mixed) -> - tidy_and_sum_lazy(sets:add_element(IgnorePid, sets:new()), - Lazy, Mixed, 0, priority_queue:new()). - -tidy_and_sum_lazy(DupCheckSet, Lazy, Mixed, FreeAcc, LazyAcc) -> - case priority_queue:pout(Lazy) of - {empty, Lazy} -> {FreeAcc, LazyAcc}; - {{value, Pid, Alloc}, Lazy1} -> - case sets:is_element(Pid, DupCheckSet) of - true -> - tidy_and_sum_lazy(DupCheckSet, Lazy1, Mixed, FreeAcc, - LazyAcc); - false -> - DupCheckSet1 = sets:add_element(Pid, DupCheckSet), - case find_queue(Pid, Mixed) of - {mixed, {Alloc, lowrate}} -> - tidy_and_sum_lazy(DupCheckSet1, Lazy1, Mixed, - FreeAcc + Alloc, priority_queue:in - (Pid, Alloc, LazyAcc)); - _ -> - tidy_and_sum_lazy(DupCheckSet1, Lazy1, Mixed, - FreeAcc, LazyAcc) - end - end - end. + tidy_and_sum(IgnorePid, Mixed, + fun (Lazy1) -> + case priority_queue:out(Lazy1) of + {empty, Lazy1} -> + {empty, Lazy1}; + {{value, {Pid, _Bucket, _Alloc}}, Lazy2} -> + {{value, Pid}, Lazy2} + end + end, fun add_to_lowrate/3, Lazy, priority_queue:new(), + lowrate). tidy_and_sum_sleepy(IgnorePid, Sleepy, Mixed) -> - tidy_and_sum_sleepy(sets:add_element(IgnorePid, sets:new()), - Sleepy, Mixed, 0, queue:new()). - -tidy_and_sum_sleepy(DupCheckSet, Sleepy, Mixed, FreeAcc, SleepyAcc) -> - case queue:out(Sleepy) of - {empty, Sleepy} -> {FreeAcc, SleepyAcc}; - {{value, Pid}, Sleepy1} -> - case sets:is_element(Pid, DupCheckSet) of - true -> - tidy_and_sum_sleepy(DupCheckSet, Sleepy1, Mixed, FreeAcc, - SleepyAcc); - false -> - DupCheckSet1 = sets:add_element(Pid, DupCheckSet), - case find_queue(Pid, Mixed) of - {mixed, {Alloc, hibernate}} -> - tidy_and_sum_sleepy(DupCheckSet1, Sleepy1, Mixed, - FreeAcc + Alloc, queue:in - (Pid, SleepyAcc)); - _ -> tidy_and_sum_sleepy(DupCheckSet1, Sleepy1, Mixed, - FreeAcc, SleepyAcc) - end - end + tidy_and_sum(IgnorePid, Mixed, fun queue:out/1, + fun (Pid, _Alloc, Queue) -> + queue:in(Pid, Queue) + end, Sleepy, queue:new(), hibernate). + +tidy_and_sum(IgnorePid, Mixed, Catamorphism, Anamorphism, CataInit, AnaInit, + AtomExpected) -> + tidy_and_sum(sets:add_element(IgnorePid, sets:new()), + Mixed, Catamorphism, Anamorphism, CataInit, AnaInit, 0, + AtomExpected). + +tidy_and_sum(DupCheckSet, Mixed, Catamorphism, Anamorphism, CataInit, AnaInit, + AllocAcc, AtomExpected) -> + case Catamorphism(CataInit) of + {empty, CataInit} -> {AnaInit, AllocAcc}; + {{value, Pid}, CataInit1} -> + {DupCheckSet2, AnaInit2, AllocAcc2} = + case sets:is_element(Pid, DupCheckSet) of + true -> + {DupCheckSet, AnaInit, AllocAcc}; + false -> + {AnaInit1, AllocAcc1} = + case find_queue(Pid, Mixed) of + {mixed, {Alloc, AtomExpected}} -> + {Anamorphism(Pid, Alloc, AnaInit), + Alloc + AllocAcc}; + _ -> + {AnaInit, AllocAcc} + end, + {sets:add_element(Pid, DupCheckSet), AnaInit1, + AllocAcc1} + end, + tidy_and_sum(DupCheckSet2, Mixed, Catamorphism, Anamorphism, + CataInit1, AnaInit2, AllocAcc2, AtomExpected) end. free_upto_lazy(IgnorePid, Callbacks, Lazy, Mixed, Req) -> - free_upto_lazy(IgnorePid, Callbacks, Lazy, Mixed, Req, - priority_queue:new()). - -free_upto_lazy(IgnorePid, Callbacks, Lazy, Mixed, Req, LazyAcc) -> - case priority_queue:pout(Lazy) of - {empty, Lazy} -> {priority_queue:join(Lazy, LazyAcc), Mixed, Req}; - {{value, IgnorePid, Alloc}, Lazy1} -> - free_upto_lazy(IgnorePid, Callbacks, Lazy1, Mixed, Req, - priority_queue:in(IgnorePid, Alloc, LazyAcc)); - {{value, Pid, Alloc}, Lazy1} -> - {Module, Function, Args} = dict:fetch(Pid, Callbacks), - ok = erlang:apply(Module, Function, Args ++ [disk]), - Mixed1 = dict:erase(Pid, Mixed), - case Req > Alloc of - true -> free_upto_lazy(IgnorePid, Callbacks, Lazy1, Mixed1, - Req - Alloc, LazyAcc); - false -> {priority_queue:join(Lazy1, LazyAcc), Mixed1, - Req - Alloc} - end - end. + free_from(Callbacks, Mixed, + fun(Lazy1, LazyAcc) -> + case priority_queue:out(Lazy1) of + {empty, Lazy1} -> + empty; + {{value, {IgnorePid, Bucket, Alloc}}, Lazy2} -> + {skip, Lazy2, + priority_queue:in({IgnorePid, Bucket, Alloc}, + Bucket, LazyAcc)}; + {{value, {Pid, _Bucket, Alloc}}, Lazy3} -> + {value, Lazy3, Pid, Alloc} + end + end, fun priority_queue:join/2, Lazy, priority_queue:new(), Req). free_upto_sleepy(IgnorePid, Callbacks, Sleepy, Mixed, Req) -> - free_upto_sleepy(IgnorePid, Callbacks, Sleepy, Mixed, Req, queue:new()). - -free_upto_sleepy(IgnorePid, Callbacks, Sleepy, Mixed, Req, SleepyAcc) -> - case queue:out(Sleepy) of - {empty, Sleepy} -> {queue:join(Sleepy, SleepyAcc), Mixed, Req}; - {{value, IgnorePid}, Sleepy1} -> - free_upto_sleepy(IgnorePid, Callbacks, Sleepy1, Mixed, Req, - queue:in(IgnorePid, SleepyAcc)); - {{value, Pid}, Sleepy1} -> - {Alloc, hibernate} = dict:fetch(Pid, Mixed), + free_from(Callbacks, Mixed, + fun(Sleepy1, SleepyAcc) -> + case queue:out(Sleepy1) of + {empty, Sleepy1} -> + empty; + {{value, IgnorePid}, Sleepy2} -> + {skip, Sleepy2, queue:in(IgnorePid, SleepyAcc)}; + {{value, Pid}, Sleepy3} -> + {Alloc, hibernate} = dict:fetch(Pid, Mixed), + {value, Sleepy3, Pid, Alloc} + end + end, fun queue:join/2, Sleepy, queue:new(), Req). + +free_from(Callbacks, Mixed, Hylomorphism, BaseCase, CataInit, AnaInit, Req) -> + case Hylomorphism(CataInit, AnaInit) of + empty -> + {BaseCase(CataInit, AnaInit), Mixed, Req}; + {skip, CataInit1, AnaInit1} -> + free_from(Callbacks, Mixed, Hylomorphism, BaseCase, CataInit1, + AnaInit1, Req); + {value, CataInit1, Pid, Alloc} -> {Module, Function, Args} = dict:fetch(Pid, Callbacks), ok = erlang:apply(Module, Function, Args ++ [disk]), Mixed1 = dict:erase(Pid, Mixed), case Req > Alloc of - true -> free_upto_sleepy(IgnorePid, Callbacks, Sleepy1, Mixed1, - Req - Alloc, SleepyAcc); - false -> {queue:join(Sleepy1, SleepyAcc), Mixed1, Req - Alloc} + true -> free_from(Callbacks, Mixed1, Hylomorphism, BaseCase, + CataInit1, AnaInit, Req - Alloc); + false -> {BaseCase(CataInit1, AnaInit), Mixed1, Req - Alloc} end end. @@ -431,10 +437,10 @@ free_upto(Pid, Req, State = #state { available_tokens = Avail, hibernate = Sleepy }) -> case Req > Avail of true -> - {SleepySum, Sleepy1} = tidy_and_sum_sleepy(Pid, Sleepy, Mixed), + {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Pid, Sleepy, Mixed), case Req > Avail + SleepySum of true -> %% not enough in sleepy, have a look in lazy too - {LazySum, Lazy1} = tidy_and_sum_lazy(Pid, Lazy, Mixed), + {Lazy1, LazySum} = tidy_and_sum_lazy(Pid, Lazy, Mixed), case Req > Avail + SleepySum + LazySum of true -> %% can't free enough, just return tidied state State #state { lowrate = Lazy1, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 6e3a92d037..476fff41cc 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -69,7 +69,7 @@ test_priority_queue() -> %% empty Q Q = priority_queue:new(), - {true, true, 0, [], [], [], []} = test_priority_queue(Q), + {true, true, 0, [], []} = test_priority_queue(Q), %% 1-4 element no-priority Q true = lists:all(fun (X) -> X =:= passed end, @@ -78,59 +78,58 @@ test_priority_queue() -> %% 1-element priority Q Q1 = priority_queue:in(foo, 1, priority_queue:new()), - {true, false, 1, [{1, foo}], [foo], [], [{foo, 1}]} = + {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1), %% 2-element same-priority Q Q2 = priority_queue:in(bar, 1, Q1), - {true, false, 2, [{1, foo}, {1, bar}], [foo, bar], [], [{foo, 1}, {bar, 1}]} - = test_priority_queue(Q2), + {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} = + test_priority_queue(Q2), %% 2-element different-priority Q Q3 = priority_queue:in(bar, 2, Q1), - {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [bar], [{bar, 2}, {foo, 1}]} = + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = test_priority_queue(Q3), %% 1-element negative priority Q Q4 = priority_queue:in(foo, -1, priority_queue:new()), - {true, false, 1, [{-1, foo}], [foo], [], [{foo, -1}]} = - test_priority_queue(Q4), + {true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4), %% merge 2 * 1-element no-priority Qs Q5 = priority_queue:join(priority_queue:in(foo, Q), priority_queue:in(bar, Q)), - {true, false, 2, [{0, foo}, {0, bar}], [foo, bar], [], [{foo, 0}, {bar, 0}]} - = test_priority_queue(Q5), + {true, false, 2, [{0, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q5), %% merge 1-element no-priority Q with 1-element priority Q Q6 = priority_queue:join(priority_queue:in(foo, Q), priority_queue:in(bar, 1, Q)), - {true, false, 2, [{1, bar}, {0, foo}], [bar, foo], [], [{bar, 1}, {foo, 0}]} - = test_priority_queue(Q6), + {true, false, 2, [{1, bar}, {0, foo}], [bar, foo]} = + test_priority_queue(Q6), %% merge 1-element priority Q with 1-element no-priority Q Q7 = priority_queue:join(priority_queue:in(foo, 1, Q), priority_queue:in(bar, Q)), - {true, false, 2, [{1, foo}, {0, bar}], [foo, bar], [], [{foo, 1}, {bar, 0}]} - = test_priority_queue(Q7), + {true, false, 2, [{1, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q7), %% merge 2 * 1-element same-priority Qs Q8 = priority_queue:join(priority_queue:in(foo, 1, Q), priority_queue:in(bar, 1, Q)), - {true, false, 2, [{1, foo}, {1, bar}], [foo, bar], [], [{foo, 1}, {bar, 1}]} - = test_priority_queue(Q8), + {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} = + test_priority_queue(Q8), %% merge 2 * 1-element different-priority Qs Q9 = priority_queue:join(priority_queue:in(foo, 1, Q), priority_queue:in(bar, 2, Q)), - {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [bar], - [{bar, 2}, {foo, 1}]} = test_priority_queue(Q9), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + test_priority_queue(Q9), %% merge 2 * 1-element different-priority Qs (other way around) Q10 = priority_queue:join(priority_queue:in(bar, 2, Q), priority_queue:in(foo, 1, Q)), - {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [bar], - [{bar, 2}, {foo, 1}]} = test_priority_queue(Q10), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + test_priority_queue(Q10), passed. @@ -142,33 +141,18 @@ priority_queue_out_all(Q) -> {empty, _} -> []; {{value, V}, Q1} -> [V | priority_queue_out_all(Q1)] end. -priority_queue_out_2_all(Q) -> - case priority_queue:out(2, Q) of - {empty, _} -> []; - {{value, V}, Q1} -> [V | priority_queue_out_2_all(Q1)] - end. - -priority_queue_pout_all(Q) -> - case priority_queue:pout(Q) of - {empty, _} -> []; - {{value, V, P}, Q1} -> [{V, P} | priority_queue_pout_all(Q1)] - end. - test_priority_queue(Q) -> {priority_queue:is_queue(Q), priority_queue:is_empty(Q), priority_queue:len(Q), priority_queue:to_list(Q), - priority_queue_out_all(Q), - priority_queue_out_2_all(Q), - priority_queue_pout_all(Q)}. + priority_queue_out_all(Q)}. test_simple_n_element_queue(N) -> Items = lists:seq(1, N), Q = priority_queue_in_all(priority_queue:new(), Items), ToListRes = [{0, X} || X <- Items], - POutAllRes = [{X, 0} || X <- Items], - {true, false, N, ToListRes, Items, [], POutAllRes} = test_priority_queue(Q), + {true, false, N, ToListRes, Items} = test_priority_queue(Q), passed. test_parsing() -> |
