summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-08-19 21:31:35 +0100
committerMatthias Radestock <matthias@lshift.net>2009-08-19 21:31:35 +0100
commit4de3d262e5c23a8e9e357c7984dbc9a267d0600b (patch)
tree334c39917b5e64219af0c782f277e770f28d61b6 /src
parentd1cb178e854bc7943953019c8535c7bda4ce6783 (diff)
parente6df59a9e679ef0b60a287290e3dd8b33f1f799d (diff)
downloadrabbitmq-server-git-4de3d262e5c23a8e9e357c7984dbc9a267d0600b.tar.gz
merge bug21425 into default
Diffstat (limited to 'src')
-rw-r--r--src/priority_queue.erl40
-rw-r--r--src/rabbit_tests.erl68
2 files changed, 106 insertions, 2 deletions
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 732757c41c..c74b39a957 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -55,7 +55,8 @@
-module(priority_queue).
--export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]).
+-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3,
+ out/1, join/2]).
%%----------------------------------------------------------------------------
@@ -73,6 +74,7 @@
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
+-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
-endif.
@@ -147,6 +149,42 @@ out({pqueue, [{P, Q} | Queues]}) ->
end,
{R, NewQ}.
+join(A, {queue, [], []}) ->
+ A;
+join({queue, [], []}, B) ->
+ B;
+join({queue, AIn, AOut}, {queue, BIn, BOut}) ->
+ {queue, BIn, AOut ++ lists:reverse(AIn, BOut)};
+join(A = {queue, _, _}, {pqueue, BPQ}) ->
+ {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ),
+ Post1 = case Post of
+ [] -> [ {0, A} ];
+ [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ];
+ _ -> [ {0, A} | Post ]
+ end,
+ {pqueue, Pre ++ Post1};
+join({pqueue, APQ}, B = {queue, _, _}) ->
+ {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ),
+ Post1 = case Post of
+ [] -> [ {0, B} ];
+ [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ];
+ _ -> [ {0, B} | Post ]
+ end,
+ {pqueue, Pre ++ Post1};
+join({pqueue, APQ}, {pqueue, BPQ}) ->
+ {pqueue, merge(APQ, BPQ, [])}.
+
+merge([], BPQ, Acc) ->
+ lists:reverse(Acc, BPQ);
+merge(APQ, [], Acc) ->
+ lists:reverse(Acc, APQ);
+merge([{P, A}|As], [{P, B}|Bs], Acc) ->
+ merge(As, Bs, [ {P, join(A, B)} | Acc ]);
+merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB ->
+ merge(As, Bs, [ {PA, A} | Acc ]);
+merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
+ merge(As, Bs, [ {PB, B} | Acc ]).
+
r2f([]) -> {queue, [], []};
r2f([_] = R) -> {queue, [], R};
r2f([X,Y]) -> {queue, [X], [Y]};
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index e5100ccd16..5e1cdfb18a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -75,7 +75,8 @@ test_priority_queue() ->
%% 1-element priority Q
Q1 = priority_queue:in(foo, 1, priority_queue:new()),
- {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1),
+ {true, false, 1, [{1, foo}], [foo]} =
+ test_priority_queue(Q1),
%% 2-element same-priority Q
Q2 = priority_queue:in(bar, 1, Q1),
@@ -91,6 +92,71 @@ test_priority_queue() ->
Q4 = priority_queue:in(foo, -1, priority_queue:new()),
{true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4),
+ %% merge 2 * 1-element no-priority Qs
+ Q5 = priority_queue:join(priority_queue:in(foo, Q),
+ priority_queue:in(bar, Q)),
+ {true, false, 2, [{0, foo}, {0, bar}], [foo, bar]} =
+ test_priority_queue(Q5),
+
+ %% merge 1-element no-priority Q with 1-element priority Q
+ Q6 = priority_queue:join(priority_queue:in(foo, Q),
+ priority_queue:in(bar, 1, Q)),
+ {true, false, 2, [{1, bar}, {0, foo}], [bar, foo]} =
+ test_priority_queue(Q6),
+
+ %% merge 1-element priority Q with 1-element no-priority Q
+ Q7 = priority_queue:join(priority_queue:in(foo, 1, Q),
+ priority_queue:in(bar, Q)),
+ {true, false, 2, [{1, foo}, {0, bar}], [foo, bar]} =
+ test_priority_queue(Q7),
+
+ %% merge 2 * 1-element same-priority Qs
+ Q8 = priority_queue:join(priority_queue:in(foo, 1, Q),
+ priority_queue:in(bar, 1, Q)),
+ {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} =
+ test_priority_queue(Q8),
+
+ %% merge 2 * 1-element different-priority Qs
+ Q9 = priority_queue:join(priority_queue:in(foo, 1, Q),
+ priority_queue:in(bar, 2, Q)),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
+ test_priority_queue(Q9),
+
+ %% merge 2 * 1-element different-priority Qs (other way around)
+ Q10 = priority_queue:join(priority_queue:in(bar, 2, Q),
+ priority_queue:in(foo, 1, Q)),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
+ test_priority_queue(Q10),
+
+ %% merge 2 * 2-element multi-different-priority Qs
+ Q11 = priority_queue:join(Q6, Q5),
+ {true, false, 4, [{1, bar}, {0, foo}, {0, foo}, {0, bar}],
+ [bar, foo, foo, bar]} = test_priority_queue(Q11),
+
+ %% and the other way around
+ Q12 = priority_queue:join(Q5, Q6),
+ {true, false, 4, [{1, bar}, {0, foo}, {0, bar}, {0, foo}],
+ [bar, foo, bar, foo]} = test_priority_queue(Q12),
+
+ %% merge with negative priorities
+ Q13 = priority_queue:join(Q4, Q5),
+ {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} =
+ test_priority_queue(Q13),
+
+ %% and the other way around
+ Q14 = priority_queue:join(Q5, Q4),
+ {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} =
+ test_priority_queue(Q14),
+
+ %% joins with empty queues:
+ Q1 = priority_queue:join(Q, Q1),
+ Q1 = priority_queue:join(Q1, Q),
+
+ %% insert with priority into non-empty zero-priority queue
+ Q15 = priority_queue:in(baz, 1, Q5),
+ {true, false, 3, [{1, baz}, {0, foo}, {0, bar}], [baz, foo, bar]} =
+ test_priority_queue(Q15),
+
passed.
priority_queue_in_all(Q, L) ->