summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-07-28 14:45:06 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-07-28 14:45:06 +0100
commit9d08c2abc95443ce9a4ffef1098a7ec48d879a20 (patch)
tree150e5082c9ff5294eae542915274ace44be86c82 /src
parent5ad5ca885c05a04fd2274bf71571c8aec1966edc (diff)
downloadrabbitmq-server-git-9d08c2abc95443ce9a4ffef1098a7ec48d879a20.tar.gz
Make priority_queue be able to have a concept of infinity as a priority
Diffstat (limited to 'src')
-rw-r--r--src/priority_queue.erl37
-rw-r--r--src/rabbit_tests.erl36
2 files changed, 64 insertions, 9 deletions
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 4a94b24b49..ccc87cd5ac 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -47,7 +47,7 @@
-ifdef(use_specs).
--type(priority() :: integer()).
+-type(priority() :: integer() | 'infinity').
-type(squeue() :: {queue, [any()], [any()]}).
-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).
@@ -71,8 +71,9 @@ new() ->
is_queue({queue, R, F}) when is_list(R), is_list(F) ->
true;
is_queue({pqueue, Queues}) when is_list(Queues) ->
- lists:all(fun ({P, Q}) -> is_integer(P) andalso is_queue(Q) end,
- Queues);
+ lists:all(fun ({infinity, Q}) -> is_queue(Q);
+ ({P, Q}) -> is_integer(P) andalso is_queue(Q)
+ end, Queues);
is_queue(_) ->
false.
@@ -89,7 +90,12 @@ len({pqueue, Queues}) ->
to_list({queue, In, Out}) when is_list(In), is_list(Out) ->
[{0, V} || V <- Out ++ lists:reverse(In, [])];
to_list({pqueue, Queues}) ->
- [{-P, V} || {P, Q} <- Queues, {0, V} <- to_list(Q)].
+ [{P1, V} || {P, Q} <- Queues,
+ case P of
+ infinity -> P1 = P, true;
+ _ -> P1 = -P, true
+ end,
+ {0, V} <- to_list(Q)].
in(Item, Q) ->
in(Item, 0, Q).
@@ -103,12 +109,23 @@ in(X, Priority, _Q = {queue, [], []}) ->
in(X, Priority, Q = {queue, _, _}) ->
in(X, Priority, {pqueue, [{0, Q}]});
in(X, Priority, {pqueue, Queues}) ->
- P = -Priority,
+ P = case Priority of
+ infinity -> Priority;
+ _ -> -Priority
+ end,
{pqueue, case lists:keysearch(P, 1, Queues) of
{value, {_, Q}} ->
lists:keyreplace(P, 1, Queues, {P, in(X, Q)});
+ false when P == infinity ->
+ [{P, {queue, [X], []}} | Queues];
false ->
- lists:keysort(1, [{P, {queue, [X], []}} | Queues])
+ case Queues of
+ [{infinity, InfQueue} | Queues1] ->
+ [{infinity, InfQueue} |
+ lists:keysort(1, [{P, {queue, [X], []}} | Queues1])];
+ _ ->
+ lists:keysort(1, [{P, {queue, [X], []}} | Queues])
+ end
end}.
out({queue, [], []} = Q) ->
@@ -141,7 +158,8 @@ join({queue, [], []}, B) ->
join({queue, AIn, AOut}, {queue, BIn, BOut}) ->
{queue, BIn, AOut ++ lists:reverse(AIn, BOut)};
join(A = {queue, _, _}, {pqueue, BPQ}) ->
- {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ),
+ {Pre, Post} =
+ lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, BPQ),
Post1 = case Post of
[] -> [ {0, A} ];
[ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ];
@@ -149,7 +167,8 @@ join(A = {queue, _, _}, {pqueue, BPQ}) ->
end,
{pqueue, Pre ++ Post1};
join({pqueue, APQ}, B = {queue, _, _}) ->
- {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ),
+ {Pre, Post} =
+ lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, APQ),
Post1 = case Post of
[] -> [ {0, B} ];
[ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ];
@@ -165,7 +184,7 @@ merge(APQ, [], Acc) ->
lists:reverse(Acc, APQ);
merge([{P, A}|As], [{P, B}|Bs], Acc) ->
merge(As, Bs, [ {P, join(A, B)} | Acc ]);
-merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB ->
+merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity ->
merge(As, Bs, [ {PA, A} | Acc ]);
merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
merge(As, Bs, [ {PB, B} | Acc ]).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 2e4544114a..283a5c4a97 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -203,6 +203,42 @@ test_priority_queue() ->
{true, false, 3, [{1, baz}, {0, foo}, {0, bar}], [baz, foo, bar]} =
test_priority_queue(Q15),
+ %% 1-element infinity priority Q
+ Q16 = priority_queue:in(foo, infinity, Q),
+ {true, false, 1, [{infinity, foo}], [foo]} = test_priority_queue(Q16),
+
+ %% add infinity to 0-priority Q
+ Q17 = priority_queue:in(foo, infinity, priority_queue:in(bar, Q)),
+ {true, false, 2, [{infinity, foo}, {0, bar}], [foo, bar]} =
+ test_priority_queue(Q17),
+
+ %% and the other way around
+ Q18 = priority_queue:in(bar, priority_queue:in(foo, infinity, Q)),
+ {true, false, 2, [{infinity, foo}, {0, bar}], [foo, bar]} =
+ test_priority_queue(Q18),
+
+ %% add infinity to mixed-priority Q
+ Q19 = priority_queue:in(qux, infinity, Q3),
+ {true, false, 3, [{infinity, qux}, {2, bar}, {1, foo}], [qux, bar, foo]} =
+ test_priority_queue(Q19),
+
+ %% merge the above with a negative priority Q
+ Q20 = priority_queue:join(Q19, Q4),
+ {true, false, 4, [{infinity, qux}, {2, bar}, {1, foo}, {-1, foo}],
+ [qux, bar, foo, foo]} = test_priority_queue(Q20),
+
+ %% merge two infinity priority queues
+ Q21 = priority_queue:join(priority_queue:in(foo, infinity, Q),
+ priority_queue:in(bar, infinity, Q)),
+ {true, false, 2, [{infinity, foo}, {infinity, bar}], [foo, bar]} =
+ test_priority_queue(Q21),
+
+ %% merge two mixed priority with infinity queues
+ Q22 = priority_queue:join(Q18, Q20),
+ {true, false, 6, [{infinity, foo}, {infinity, qux}, {2, bar}, {1, foo},
+ {0, bar}, {-1, foo}], [foo, qux, bar, foo, bar, foo]} =
+ test_priority_queue(Q22),
+
passed.
priority_queue_in_all(Q, L) ->