summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-14 19:47:19 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-14 19:47:19 +0100
commit531d817bb9142194c29e6c05dfe08bc930a453cb (patch)
tree4121ca1ca174f3aa738d63edcb3747981b7d3072
parentcade9b2f39c6cab02389accbe163a8ab2c7d4564 (diff)
downloadrabbitmq-server-git-531d817bb9142194c29e6c05dfe08bc930a453cb.tar.gz
removed pout and friends from priority queue as requested. Large refactoring of queue_mode_manager. Appears to still work but needs a rereading in the cold light of day!
-rw-r--r--src/priority_queue.erl32
-rw-r--r--src/rabbit_queue_mode_manager.erl176
-rw-r--r--src/rabbit_tests.erl56
3 files changed, 112 insertions, 152 deletions
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 9421f281c9..0c777471ec 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -56,7 +56,7 @@
-module(priority_queue).
-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3,
- out/1, out/2, pout/1, join/2]).
+ out/1, join/2]).
%%----------------------------------------------------------------------------
@@ -74,8 +74,6 @@
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
-spec(out/1 :: (pqueue()) -> {(empty | {value, any()}), pqueue()}).
--spec(out/2 :: (priority(), pqueue()) -> {(empty | {value, any()}), pqueue()}).
--spec(pout/1 :: (pqueue()) -> {(empty | {value, any(), priority()}), pqueue()}).
-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
-endif.
@@ -151,34 +149,6 @@ out({pqueue, [{P, Q} | Queues]}) ->
end,
{R, NewQ}.
-out(_Priority, {queue, [], []} = Q) ->
- {empty, Q};
-out(Priority, {queue, _, _} = Q) when Priority =< 0 ->
- out(Q);
-out(_Priority, {queue, _, _} = Q) ->
- {empty, Q};
-out(Priority, {pqueue, [{P, _Q} | _Queues]} = Q) when Priority =< (-P) ->
- out(Q);
-out(_Priority, {pqueue, [_|_]} = Q) ->
- {empty, Q}.
-
-pout({queue, [], []} = Q) ->
- {empty, Q};
-pout({queue, _, _} = Q) ->
- {{value, V}, Q1} = out(Q),
- {{value, V, 0}, Q1};
-pout({pqueue, [{P, Q} | Queues]}) ->
- {{value, V}, Q1} = out(Q),
- NewQ = case is_empty(Q1) of
- true -> case Queues of
- [] -> {queue, [], []};
- [{0, OnlyQ}] -> OnlyQ;
- [_|_] -> {pqueue, Queues}
- end;
- false -> {pqueue, [{P, Q1} | Queues]}
- end,
- {{value, V, -P}, NewQ}.
-
join(A, {queue, [], []}) ->
A;
join({queue, [], []}, B) ->
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
index ceb09d928b..194ddf9570 100644
--- a/src/rabbit_queue_mode_manager.erl
+++ b/src/rabbit_queue_mode_manager.erl
@@ -289,10 +289,10 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
case ActivityNew of
active -> StateN;
disk -> StateN;
- lowrate -> StateN #state { lowrate =
- priority_queue:in(Pid, Req, Lazy) };
- hibernate -> StateN #state { hibernate =
- queue:in(Pid, Sleepy) }
+ lowrate ->
+ StateN #state { lowrate = add_to_lowrate(Pid, Req, Lazy) };
+ hibernate ->
+ StateN #state { hibernate = queue:in(Pid, Sleepy) }
end,
{noreply, StateN1};
@@ -324,6 +324,10 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+add_to_lowrate(Pid, Alloc, Lazy) ->
+ Bucket = trunc(math:log(Alloc)),
+ priority_queue:in({Pid, Bucket, Alloc}, Bucket, Lazy).
+
find_queue(Pid, Mixed) ->
case dict:find(Pid, Mixed) of
{ok, Value} -> {mixed, Value};
@@ -331,96 +335,98 @@ find_queue(Pid, Mixed) ->
end.
tidy_and_sum_lazy(IgnorePid, Lazy, Mixed) ->
- tidy_and_sum_lazy(sets:add_element(IgnorePid, sets:new()),
- Lazy, Mixed, 0, priority_queue:new()).
-
-tidy_and_sum_lazy(DupCheckSet, Lazy, Mixed, FreeAcc, LazyAcc) ->
- case priority_queue:pout(Lazy) of
- {empty, Lazy} -> {FreeAcc, LazyAcc};
- {{value, Pid, Alloc}, Lazy1} ->
- case sets:is_element(Pid, DupCheckSet) of
- true ->
- tidy_and_sum_lazy(DupCheckSet, Lazy1, Mixed, FreeAcc,
- LazyAcc);
- false ->
- DupCheckSet1 = sets:add_element(Pid, DupCheckSet),
- case find_queue(Pid, Mixed) of
- {mixed, {Alloc, lowrate}} ->
- tidy_and_sum_lazy(DupCheckSet1, Lazy1, Mixed,
- FreeAcc + Alloc, priority_queue:in
- (Pid, Alloc, LazyAcc));
- _ ->
- tidy_and_sum_lazy(DupCheckSet1, Lazy1, Mixed,
- FreeAcc, LazyAcc)
- end
- end
- end.
+ tidy_and_sum(IgnorePid, Mixed,
+ fun (Lazy1) ->
+ case priority_queue:out(Lazy1) of
+ {empty, Lazy1} ->
+ {empty, Lazy1};
+ {{value, {Pid, _Bucket, _Alloc}}, Lazy2} ->
+ {{value, Pid}, Lazy2}
+ end
+ end, fun add_to_lowrate/3, Lazy, priority_queue:new(),
+ lowrate).
tidy_and_sum_sleepy(IgnorePid, Sleepy, Mixed) ->
- tidy_and_sum_sleepy(sets:add_element(IgnorePid, sets:new()),
- Sleepy, Mixed, 0, queue:new()).
-
-tidy_and_sum_sleepy(DupCheckSet, Sleepy, Mixed, FreeAcc, SleepyAcc) ->
- case queue:out(Sleepy) of
- {empty, Sleepy} -> {FreeAcc, SleepyAcc};
- {{value, Pid}, Sleepy1} ->
- case sets:is_element(Pid, DupCheckSet) of
- true ->
- tidy_and_sum_sleepy(DupCheckSet, Sleepy1, Mixed, FreeAcc,
- SleepyAcc);
- false ->
- DupCheckSet1 = sets:add_element(Pid, DupCheckSet),
- case find_queue(Pid, Mixed) of
- {mixed, {Alloc, hibernate}} ->
- tidy_and_sum_sleepy(DupCheckSet1, Sleepy1, Mixed,
- FreeAcc + Alloc, queue:in
- (Pid, SleepyAcc));
- _ -> tidy_and_sum_sleepy(DupCheckSet1, Sleepy1, Mixed,
- FreeAcc, SleepyAcc)
- end
- end
+ tidy_and_sum(IgnorePid, Mixed, fun queue:out/1,
+ fun (Pid, _Alloc, Queue) ->
+ queue:in(Pid, Queue)
+ end, Sleepy, queue:new(), hibernate).
+
+tidy_and_sum(IgnorePid, Mixed, Catamorphism, Anamorphism, CataInit, AnaInit,
+ AtomExpected) ->
+ tidy_and_sum(sets:add_element(IgnorePid, sets:new()),
+ Mixed, Catamorphism, Anamorphism, CataInit, AnaInit, 0,
+ AtomExpected).
+
+tidy_and_sum(DupCheckSet, Mixed, Catamorphism, Anamorphism, CataInit, AnaInit,
+ AllocAcc, AtomExpected) ->
+ case Catamorphism(CataInit) of
+ {empty, CataInit} -> {AnaInit, AllocAcc};
+ {{value, Pid}, CataInit1} ->
+ {DupCheckSet2, AnaInit2, AllocAcc2} =
+ case sets:is_element(Pid, DupCheckSet) of
+ true ->
+ {DupCheckSet, AnaInit, AllocAcc};
+ false ->
+ {AnaInit1, AllocAcc1} =
+ case find_queue(Pid, Mixed) of
+ {mixed, {Alloc, AtomExpected}} ->
+ {Anamorphism(Pid, Alloc, AnaInit),
+ Alloc + AllocAcc};
+ _ ->
+ {AnaInit, AllocAcc}
+ end,
+ {sets:add_element(Pid, DupCheckSet), AnaInit1,
+ AllocAcc1}
+ end,
+ tidy_and_sum(DupCheckSet2, Mixed, Catamorphism, Anamorphism,
+ CataInit1, AnaInit2, AllocAcc2, AtomExpected)
end.
free_upto_lazy(IgnorePid, Callbacks, Lazy, Mixed, Req) ->
- free_upto_lazy(IgnorePid, Callbacks, Lazy, Mixed, Req,
- priority_queue:new()).
-
-free_upto_lazy(IgnorePid, Callbacks, Lazy, Mixed, Req, LazyAcc) ->
- case priority_queue:pout(Lazy) of
- {empty, Lazy} -> {priority_queue:join(Lazy, LazyAcc), Mixed, Req};
- {{value, IgnorePid, Alloc}, Lazy1} ->
- free_upto_lazy(IgnorePid, Callbacks, Lazy1, Mixed, Req,
- priority_queue:in(IgnorePid, Alloc, LazyAcc));
- {{value, Pid, Alloc}, Lazy1} ->
- {Module, Function, Args} = dict:fetch(Pid, Callbacks),
- ok = erlang:apply(Module, Function, Args ++ [disk]),
- Mixed1 = dict:erase(Pid, Mixed),
- case Req > Alloc of
- true -> free_upto_lazy(IgnorePid, Callbacks, Lazy1, Mixed1,
- Req - Alloc, LazyAcc);
- false -> {priority_queue:join(Lazy1, LazyAcc), Mixed1,
- Req - Alloc}
- end
- end.
+ free_from(Callbacks, Mixed,
+ fun(Lazy1, LazyAcc) ->
+ case priority_queue:out(Lazy1) of
+ {empty, Lazy1} ->
+ empty;
+ {{value, {IgnorePid, Bucket, Alloc}}, Lazy2} ->
+ {skip, Lazy2,
+ priority_queue:in({IgnorePid, Bucket, Alloc},
+ Bucket, LazyAcc)};
+ {{value, {Pid, _Bucket, Alloc}}, Lazy3} ->
+ {value, Lazy3, Pid, Alloc}
+ end
+ end, fun priority_queue:join/2, Lazy, priority_queue:new(), Req).
free_upto_sleepy(IgnorePid, Callbacks, Sleepy, Mixed, Req) ->
- free_upto_sleepy(IgnorePid, Callbacks, Sleepy, Mixed, Req, queue:new()).
-
-free_upto_sleepy(IgnorePid, Callbacks, Sleepy, Mixed, Req, SleepyAcc) ->
- case queue:out(Sleepy) of
- {empty, Sleepy} -> {queue:join(Sleepy, SleepyAcc), Mixed, Req};
- {{value, IgnorePid}, Sleepy1} ->
- free_upto_sleepy(IgnorePid, Callbacks, Sleepy1, Mixed, Req,
- queue:in(IgnorePid, SleepyAcc));
- {{value, Pid}, Sleepy1} ->
- {Alloc, hibernate} = dict:fetch(Pid, Mixed),
+ free_from(Callbacks, Mixed,
+ fun(Sleepy1, SleepyAcc) ->
+ case queue:out(Sleepy1) of
+ {empty, Sleepy1} ->
+ empty;
+ {{value, IgnorePid}, Sleepy2} ->
+ {skip, Sleepy2, queue:in(IgnorePid, SleepyAcc)};
+ {{value, Pid}, Sleepy3} ->
+ {Alloc, hibernate} = dict:fetch(Pid, Mixed),
+ {value, Sleepy3, Pid, Alloc}
+ end
+ end, fun queue:join/2, Sleepy, queue:new(), Req).
+
+free_from(Callbacks, Mixed, Hylomorphism, BaseCase, CataInit, AnaInit, Req) ->
+ case Hylomorphism(CataInit, AnaInit) of
+ empty ->
+ {BaseCase(CataInit, AnaInit), Mixed, Req};
+ {skip, CataInit1, AnaInit1} ->
+ free_from(Callbacks, Mixed, Hylomorphism, BaseCase, CataInit1,
+ AnaInit1, Req);
+ {value, CataInit1, Pid, Alloc} ->
{Module, Function, Args} = dict:fetch(Pid, Callbacks),
ok = erlang:apply(Module, Function, Args ++ [disk]),
Mixed1 = dict:erase(Pid, Mixed),
case Req > Alloc of
- true -> free_upto_sleepy(IgnorePid, Callbacks, Sleepy1, Mixed1,
- Req - Alloc, SleepyAcc);
- false -> {queue:join(Sleepy1, SleepyAcc), Mixed1, Req - Alloc}
+ true -> free_from(Callbacks, Mixed1, Hylomorphism, BaseCase,
+ CataInit1, AnaInit, Req - Alloc);
+ false -> {BaseCase(CataInit1, AnaInit), Mixed1, Req - Alloc}
end
end.
@@ -431,10 +437,10 @@ free_upto(Pid, Req, State = #state { available_tokens = Avail,
hibernate = Sleepy }) ->
case Req > Avail of
true ->
- {SleepySum, Sleepy1} = tidy_and_sum_sleepy(Pid, Sleepy, Mixed),
+ {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Pid, Sleepy, Mixed),
case Req > Avail + SleepySum of
true -> %% not enough in sleepy, have a look in lazy too
- {LazySum, Lazy1} = tidy_and_sum_lazy(Pid, Lazy, Mixed),
+ {Lazy1, LazySum} = tidy_and_sum_lazy(Pid, Lazy, Mixed),
case Req > Avail + SleepySum + LazySum of
true -> %% can't free enough, just return tidied state
State #state { lowrate = Lazy1,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 6e3a92d037..476fff41cc 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -69,7 +69,7 @@ test_priority_queue() ->
%% empty Q
Q = priority_queue:new(),
- {true, true, 0, [], [], [], []} = test_priority_queue(Q),
+ {true, true, 0, [], []} = test_priority_queue(Q),
%% 1-4 element no-priority Q
true = lists:all(fun (X) -> X =:= passed end,
@@ -78,59 +78,58 @@ test_priority_queue() ->
%% 1-element priority Q
Q1 = priority_queue:in(foo, 1, priority_queue:new()),
- {true, false, 1, [{1, foo}], [foo], [], [{foo, 1}]} =
+ {true, false, 1, [{1, foo}], [foo]} =
test_priority_queue(Q1),
%% 2-element same-priority Q
Q2 = priority_queue:in(bar, 1, Q1),
- {true, false, 2, [{1, foo}, {1, bar}], [foo, bar], [], [{foo, 1}, {bar, 1}]}
- = test_priority_queue(Q2),
+ {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} =
+ test_priority_queue(Q2),
%% 2-element different-priority Q
Q3 = priority_queue:in(bar, 2, Q1),
- {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [bar], [{bar, 2}, {foo, 1}]} =
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
test_priority_queue(Q3),
%% 1-element negative priority Q
Q4 = priority_queue:in(foo, -1, priority_queue:new()),
- {true, false, 1, [{-1, foo}], [foo], [], [{foo, -1}]} =
- test_priority_queue(Q4),
+ {true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4),
%% merge 2 * 1-element no-priority Qs
Q5 = priority_queue:join(priority_queue:in(foo, Q),
priority_queue:in(bar, Q)),
- {true, false, 2, [{0, foo}, {0, bar}], [foo, bar], [], [{foo, 0}, {bar, 0}]}
- = test_priority_queue(Q5),
+ {true, false, 2, [{0, foo}, {0, bar}], [foo, bar]} =
+ test_priority_queue(Q5),
%% merge 1-element no-priority Q with 1-element priority Q
Q6 = priority_queue:join(priority_queue:in(foo, Q),
priority_queue:in(bar, 1, Q)),
- {true, false, 2, [{1, bar}, {0, foo}], [bar, foo], [], [{bar, 1}, {foo, 0}]}
- = test_priority_queue(Q6),
+ {true, false, 2, [{1, bar}, {0, foo}], [bar, foo]} =
+ test_priority_queue(Q6),
%% merge 1-element priority Q with 1-element no-priority Q
Q7 = priority_queue:join(priority_queue:in(foo, 1, Q),
priority_queue:in(bar, Q)),
- {true, false, 2, [{1, foo}, {0, bar}], [foo, bar], [], [{foo, 1}, {bar, 0}]}
- = test_priority_queue(Q7),
+ {true, false, 2, [{1, foo}, {0, bar}], [foo, bar]} =
+ test_priority_queue(Q7),
%% merge 2 * 1-element same-priority Qs
Q8 = priority_queue:join(priority_queue:in(foo, 1, Q),
priority_queue:in(bar, 1, Q)),
- {true, false, 2, [{1, foo}, {1, bar}], [foo, bar], [], [{foo, 1}, {bar, 1}]}
- = test_priority_queue(Q8),
+ {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} =
+ test_priority_queue(Q8),
%% merge 2 * 1-element different-priority Qs
Q9 = priority_queue:join(priority_queue:in(foo, 1, Q),
priority_queue:in(bar, 2, Q)),
- {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [bar],
- [{bar, 2}, {foo, 1}]} = test_priority_queue(Q9),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
+ test_priority_queue(Q9),
%% merge 2 * 1-element different-priority Qs (other way around)
Q10 = priority_queue:join(priority_queue:in(bar, 2, Q),
priority_queue:in(foo, 1, Q)),
- {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [bar],
- [{bar, 2}, {foo, 1}]} = test_priority_queue(Q10),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
+ test_priority_queue(Q10),
passed.
@@ -142,33 +141,18 @@ priority_queue_out_all(Q) ->
{empty, _} -> [];
{{value, V}, Q1} -> [V | priority_queue_out_all(Q1)]
end.
-priority_queue_out_2_all(Q) ->
- case priority_queue:out(2, Q) of
- {empty, _} -> [];
- {{value, V}, Q1} -> [V | priority_queue_out_2_all(Q1)]
- end.
-
-priority_queue_pout_all(Q) ->
- case priority_queue:pout(Q) of
- {empty, _} -> [];
- {{value, V, P}, Q1} -> [{V, P} | priority_queue_pout_all(Q1)]
- end.
-
test_priority_queue(Q) ->
{priority_queue:is_queue(Q),
priority_queue:is_empty(Q),
priority_queue:len(Q),
priority_queue:to_list(Q),
- priority_queue_out_all(Q),
- priority_queue_out_2_all(Q),
- priority_queue_pout_all(Q)}.
+ priority_queue_out_all(Q)}.
test_simple_n_element_queue(N) ->
Items = lists:seq(1, N),
Q = priority_queue_in_all(priority_queue:new(), Items),
ToListRes = [{0, X} || X <- Items],
- POutAllRes = [{X, 0} || X <- Items],
- {true, false, N, ToListRes, Items, [], POutAllRes} = test_priority_queue(Q),
+ {true, false, N, ToListRes, Items} = test_priority_queue(Q),
passed.
test_parsing() ->