diff options
| -rw-r--r-- | src/priority_queue.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 36 |
2 files changed, 64 insertions, 9 deletions
diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 4a94b24b49..ccc87cd5ac 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -47,7 +47,7 @@ -ifdef(use_specs). --type(priority() :: integer()). +-type(priority() :: integer() | 'infinity'). -type(squeue() :: {queue, [any()], [any()]}). -type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}). @@ -71,8 +71,9 @@ new() -> is_queue({queue, R, F}) when is_list(R), is_list(F) -> true; is_queue({pqueue, Queues}) when is_list(Queues) -> - lists:all(fun ({P, Q}) -> is_integer(P) andalso is_queue(Q) end, - Queues); + lists:all(fun ({infinity, Q}) -> is_queue(Q); + ({P, Q}) -> is_integer(P) andalso is_queue(Q) + end, Queues); is_queue(_) -> false. @@ -89,7 +90,12 @@ len({pqueue, Queues}) -> to_list({queue, In, Out}) when is_list(In), is_list(Out) -> [{0, V} || V <- Out ++ lists:reverse(In, [])]; to_list({pqueue, Queues}) -> - [{-P, V} || {P, Q} <- Queues, {0, V} <- to_list(Q)]. + [{P1, V} || {P, Q} <- Queues, + case P of + infinity -> P1 = P, true; + _ -> P1 = -P, true + end, + {0, V} <- to_list(Q)]. in(Item, Q) -> in(Item, 0, Q). @@ -103,12 +109,23 @@ in(X, Priority, _Q = {queue, [], []}) -> in(X, Priority, Q = {queue, _, _}) -> in(X, Priority, {pqueue, [{0, Q}]}); in(X, Priority, {pqueue, Queues}) -> - P = -Priority, + P = case Priority of + infinity -> Priority; + _ -> -Priority + end, {pqueue, case lists:keysearch(P, 1, Queues) of {value, {_, Q}} -> lists:keyreplace(P, 1, Queues, {P, in(X, Q)}); + false when P == infinity -> + [{P, {queue, [X], []}} | Queues]; false -> - lists:keysort(1, [{P, {queue, [X], []}} | Queues]) + case Queues of + [{infinity, InfQueue} | Queues1] -> + [{infinity, InfQueue} | + lists:keysort(1, [{P, {queue, [X], []}} | Queues1])]; + _ -> + lists:keysort(1, [{P, {queue, [X], []}} | Queues]) + end end}. out({queue, [], []} = Q) -> @@ -141,7 +158,8 @@ join({queue, [], []}, B) -> join({queue, AIn, AOut}, {queue, BIn, BOut}) -> {queue, BIn, AOut ++ lists:reverse(AIn, BOut)}; join(A = {queue, _, _}, {pqueue, BPQ}) -> - {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ), + {Pre, Post} = + lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, BPQ), Post1 = case Post of [] -> [ {0, A} ]; [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ]; @@ -149,7 +167,8 @@ join(A = {queue, _, _}, {pqueue, BPQ}) -> end, {pqueue, Pre ++ Post1}; join({pqueue, APQ}, B = {queue, _, _}) -> - {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ), + {Pre, Post} = + lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, APQ), Post1 = case Post of [] -> [ {0, B} ]; [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ]; @@ -165,7 +184,7 @@ merge(APQ, [], Acc) -> lists:reverse(Acc, APQ); merge([{P, A}|As], [{P, B}|Bs], Acc) -> merge(As, Bs, [ {P, join(A, B)} | Acc ]); -merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB -> +merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity -> merge(As, Bs, [ {PA, A} | Acc ]); merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) -> merge(As, Bs, [ {PB, B} | Acc ]). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 2e4544114a..283a5c4a97 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -203,6 +203,42 @@ test_priority_queue() -> {true, false, 3, [{1, baz}, {0, foo}, {0, bar}], [baz, foo, bar]} = test_priority_queue(Q15), + %% 1-element infinity priority Q + Q16 = priority_queue:in(foo, infinity, Q), + {true, false, 1, [{infinity, foo}], [foo]} = test_priority_queue(Q16), + + %% add infinity to 0-priority Q + Q17 = priority_queue:in(foo, infinity, priority_queue:in(bar, Q)), + {true, false, 2, [{infinity, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q17), + + %% and the other way around + Q18 = priority_queue:in(bar, priority_queue:in(foo, infinity, Q)), + {true, false, 2, [{infinity, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q18), + + %% add infinity to mixed-priority Q + Q19 = priority_queue:in(qux, infinity, Q3), + {true, false, 3, [{infinity, qux}, {2, bar}, {1, foo}], [qux, bar, foo]} = + test_priority_queue(Q19), + + %% merge the above with a negative priority Q + Q20 = priority_queue:join(Q19, Q4), + {true, false, 4, [{infinity, qux}, {2, bar}, {1, foo}, {-1, foo}], + [qux, bar, foo, foo]} = test_priority_queue(Q20), + + %% merge two infinity priority queues + Q21 = priority_queue:join(priority_queue:in(foo, infinity, Q), + priority_queue:in(bar, infinity, Q)), + {true, false, 2, [{infinity, foo}, {infinity, bar}], [foo, bar]} = + test_priority_queue(Q21), + + %% merge two mixed priority with infinity queues + Q22 = priority_queue:join(Q18, Q20), + {true, false, 6, [{infinity, foo}, {infinity, qux}, {2, bar}, {1, foo}, + {0, bar}, {-1, foo}], [foo, qux, bar, foo, bar, foo]} = + test_priority_queue(Q22), + passed. priority_queue_in_all(Q, L) -> |
