summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-09 17:06:30 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-09 17:06:30 +0100
commitb69a05cf71eca2b083fe5c370eb7a2595d0f52e8 (patch)
tree35647e92c997cb5c0591e46080fb8a32c7a24544
parent30d8f863ab27e650d57559b1fd20ae43a4d709f8 (diff)
downloadrabbitmq-server-git-b69a05cf71eca2b083fe5c370eb7a2595d0f52e8.tar.gz
Initial work to permit low priority background tasks to be catered for.
-rw-r--r--src/gen_server2.erl213
-rw-r--r--src/priority_queue.erl22
-rw-r--r--src/rabbit_disk_queue.erl14
-rw-r--r--src/rabbit_misc.erl19
-rw-r--r--src/rabbit_tests.erl46
5 files changed, 236 insertions, 78 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index e46f2645bd..2784090afc 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -37,6 +37,28 @@
%% Explicit timeouts (i.e. not 'binary') from the handle_* functions
%% are still supported, and do not have any effect on the current
%% timeout value.
+%%
+%% 6) init/1 can also return (either a further arg in addition to
+%% timeout above, or as a key-value list with the timeout as {timeout,
+%% Timeout}) a minimum priority (key: min_priority). This can also be
+%% returned from handle_* functions (i.e. {noreply, NewState} or
+%% {noreply, NewState, Timeout} or {noreply, NewState, Timeout,
+%% MinPri} or {noreply, NewState, [{min_priority, MinPri}]} or
+%% {noreply, NewState, [{min_priority, MinPri}, {timeout,
+%% Timeout}]}). What this does is to only allow messages greater than
+%% the indicated priority through to the module. To allow any message
+%% through (as is the default), use 'any'. One effect of this is that
+%% when hibernating, the process can be woken up to receive a message
+%% which it then realises it is not interested in. When this happens,
+%% handle_info(roused_and_disinterested, State) will be called as soon
+%% as there are no further messages to process (i.e. upon waking, the
+%% message queue is drained, and a timeout of 0 is used). A suggested
+%% use of this is to cater for low priority background casts, which
+%% can be sent with negative priorities, and to use a priority of 0 or
+%% higher for everything else. Then, if you return from handle_* with
+%% a timeout of 0 and find handle_info(timeout, State) being called,
+%% you can then return with a min_priority of 'any' and pick up the
+%% low priority messages.
%% All modifications are (C) 2009 LShift Ltd.
@@ -133,7 +155,8 @@
cast/2, pcast/3, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
- enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]).
+ enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6,
+ wake_hib/8]).
-export([behaviour_info/1]).
@@ -322,22 +345,32 @@ multi_call(Nodes, Name, Req, Timeout)
%% process, including registering a name for it.
%%-----------------------------------------------------------------
enter_loop(Mod, Options, State) ->
- enter_loop(Mod, Options, State, self(), infinity).
+ enter_loop(Mod, Options, State, self(), []).
enter_loop(Mod, Options, State, ServerName = {_, _}) ->
- enter_loop(Mod, Options, State, ServerName, infinity);
+ enter_loop(Mod, Options, State, ServerName, []);
+
+enter_loop(Mod, Options, State, Opts) when is_list(Opts) ->
+ enter_loop(Mod, Options, State, self(), Opts);
enter_loop(Mod, Options, State, Timeout) ->
- enter_loop(Mod, Options, State, self(), Timeout).
+ enter_loop(Mod, Options, State, self(), [{timeout, Timeout}]).
-enter_loop(Mod, Options, State, ServerName, Timeout) ->
+enter_loop(Mod, Options, State, ServerName, Opts) when is_list(Opts) ->
Name = get_proc_name(ServerName),
Parent = get_parent(),
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
+ [{timeout, Timeout}, {min_priority, MinPri}] = extract_timeout_minpri(Opts),
{Timeout1, TimeoutState} = build_timeout_state(Timeout),
- loop(Parent, Name, State, Mod, Timeout1, TimeoutState, Queue, Debug).
+ loop(Parent, Name, State, Mod, Timeout1, TimeoutState, MinPri, Queue, Debug);
+
+enter_loop(Mod, Options, State, ServerName, Timeout) ->
+ enter_loop(Mod, Options, State, ServerName, [{timeout, Timeout}]).
+enter_loop(Mod, Options, State, ServerName, Timeout, MinPri) ->
+ enter_loop(Mod, Options, State, ServerName,
+ [{timeout, Timeout}, {min_priority, MinPri}]).
%%%========================================================================
%%% Gen-callback functions
%%%========================================================================
@@ -357,13 +390,19 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
Queue = priority_queue:new(),
case catch Mod:init(Args) of
{ok, State} ->
- proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug);
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(Parent, Name, State, Mod, infinity, undefined,
+ any, Queue, Debug);
{ok, State, Timeout} ->
proc_lib:init_ack(Starter, {ok, self()}),
{Timeout1, TimeoutState} = build_timeout_state(Timeout),
- loop(Parent, Name, State, Mod, Timeout1, TimeoutState, Queue,
- Debug);
+ loop(Parent, Name, State, Mod, Timeout1, TimeoutState,
+ any, Queue, Debug);
+ {ok, State, Timeout, MinPri} ->
+ proc_lib:init_ack(Starter, {ok, self()}),
+ {Timeout1, TimeoutState} = build_timeout_state(Timeout),
+ loop(Parent, Name, State, Mod, Timeout1, TimeoutState,
+ MinPri, Queue, Debug);
{stop, Reason} ->
%% For consistency, we must make sure that the
%% registered name (if any) is unregistered before
@@ -407,57 +446,70 @@ build_timeout_state(Timeout) ->
_ -> {Timeout, undefined}
end.
+extract_timeout_minpri(Opts) ->
+ rabbit_misc:keygets([{timeout, infinity}, {min_priority, any}], Opts).
+
%%%========================================================================
%%% Internal functions
%%%========================================================================
%%% ---------------------------------------------------
%%% The MAIN loop.
%%% ---------------------------------------------------
-loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) ->
- proc_lib:hibernate(?MODULE,wake_hib,
- [Parent, Name, State, Mod, undefined, Queue, Debug]);
-loop(Parent, Name, State, Mod, hibernate, {Current, Min, undefined}, Queue,
- Debug) ->
+loop(Parent, Name, State, Mod, hibernate, undefined, MinPri, Queue, Debug) ->
+ proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod, undefined,
+ MinPri, Queue, Debug]);
+loop(Parent, Name, State, Mod, hibernate, {Current, Min, undefined},
+ MinPri, Queue, Debug) ->
proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod,
- {Current, Min, now()}, Queue, Debug]);
-loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
+ {Current, Min, now()},
+ MinPri, Queue, Debug]);
+loop(Parent, Name, State, Mod, Time, TimeoutState, MinPri, Queue, Debug) ->
receive
Input -> loop(Parent, Name, State, Mod,
- Time, TimeoutState, in(Input, Queue), Debug)
+ Time, TimeoutState, MinPri, in(Input, Queue), Debug)
after 0 ->
process_next_msg(Parent, Name, State, Mod, Time, TimeoutState,
- Queue, Debug, false)
+ MinPri, Queue, Debug, false)
end.
-process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
+process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, MinPri, Queue,
Debug, Hib) ->
- case priority_queue:out(Queue) of
+ Res = case MinPri of
+ any -> priority_queue:out(Queue);
+ _ -> priority_queue:out(MinPri, Queue)
+ end,
+ case Res of
{{value, Msg}, Queue1} ->
process_msg(Parent, Name, State, Mod,
Time, TimeoutState, Queue1, Debug, Hib, Msg);
{empty, Queue1} ->
- Time1 = case {Time, TimeoutState} of
- {binary, {Current, _Min, undefined}} -> Current;
+ Time1 = case {Hib, Time, TimeoutState} of
+ {true, _, _} -> 0;
+ {_, binary, {Current, _Min, undefined}} -> Current;
_ -> Time
end,
receive
Input ->
loop(Parent, Name, State, Mod,
- Time, TimeoutState, in(Input, Queue1), Debug)
+ Time, TimeoutState, MinPri, in(Input, Queue1), Debug)
after Time1 ->
process_msg(Parent, Name, State, Mod,
- Time, TimeoutState, Queue1, Debug, Hib, timeout)
+ Time, TimeoutState, Queue1, Debug, Hib,
+ case Hib of
+ true -> roused_and_disinterested;
+ false -> timeout
+ end)
end
end.
-wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+wake_hib(Parent, Name, State, Mod, TimeoutState, MinPri, Queue, Debug) ->
Msg = receive
Input ->
Input
end,
TimeoutState1 = adjust_hibernate_after(TimeoutState),
process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState1,
- in(Msg, Queue), Debug, true).
+ MinPri, in(Msg, Queue), Debug, true).
adjust_hibernate_after(undefined) ->
undefined;
@@ -707,14 +759,34 @@ handle_msg({'$gen_call', From, Msg},
case catch Mod:handle_call(Msg, From, State) of
{reply, Reply, NState} ->
reply(From, Reply),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
- {reply, Reply, NState, Time1} ->
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue,
+ []);
+ {reply, Reply, NState, Opts} when is_list(Opts) ->
+ reply(From, Reply),
+ [{timeout, Time}, {min_priority, MinPri}] =
+ extract_timeout_minpri(Opts),
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
+ []);
+ {reply, Reply, NState, Time} ->
reply(From, Reply),
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, []);
+ {reply, Reply, NState, Time, MinPri} ->
+ reply(From, Reply),
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
+ []);
{noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
- {noreply, NState, Time1} ->
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue,
+ []);
+ {noreply, NState, Opts} when is_list(Opts) ->
+ [{timeout, Time}, {min_priority, MinPri}] =
+ extract_timeout_minpri(Opts),
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
+ []);
+ {noreply, NState, Time} ->
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, []);
+ {noreply, NState, Time, MinPri} ->
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
+ []);
{stop, Reason, Reply, NState} ->
{'EXIT', R} =
(catch terminate(Reason, Name, Msg, Mod, NState, [])),
@@ -734,20 +806,44 @@ handle_msg({'$gen_call', From, Msg},
case catch Mod:handle_call(Msg, From, State) of
{reply, Reply, NState} ->
Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue,
+ Debug1);
+ {reply, Reply, NState, Opts} when is_list(Opts) ->
+ Debug1 = reply(Name, From, Reply, NState, Debug),
+ [{timeout, Time}, {min_priority, MinPri}] =
+ extract_timeout_minpri(Opts),
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
+ Debug1);
+ {reply, Reply, NState, Time} ->
+ Debug1 = reply(Name, From, Reply, NState, Debug),
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue,
Debug1);
- {reply, Reply, NState, Time1} ->
+ {reply, Reply, NState, Time, MinPri} ->
Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
+ Debug1);
{noreply, NState} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue,
+ Debug1);
+ {noreply, NState, Opts} when is_list(Opts) ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ [{timeout, Time}, {min_priority, MinPri}] =
+ extract_timeout_minpri(Opts),
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
Debug1);
- {noreply, NState, Time1} ->
+ {noreply, NState, Time} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue,
+ Debug1);
+ {noreply, NState, Time, MinPri} ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
+ Debug1);
{stop, Reason, Reply, NState} ->
{'EXIT', R} =
(catch terminate(Reason, Name, Msg, Mod, NState, Debug)),
@@ -767,9 +863,18 @@ handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
TimeoutState, Queue) ->
case Reply of
{noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
- {noreply, NState, Time1} ->
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue,
+ []);
+ {noreply, NState, Opts} when is_list(Opts) ->
+ [{timeout, Time}, {min_priority, MinPri}] =
+ extract_timeout_minpri(Opts),
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
+ []);
+ {noreply, NState, Time} ->
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, []);
+ {noreply, NState, Time, MinPri} ->
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
+ []);
{stop, Reason, NState} ->
terminate(Reason, Name, Msg, Mod, NState, []);
{'EXIT', What} ->
@@ -784,12 +889,25 @@ handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue,
{noreply, NState} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue,
Debug1);
- {noreply, NState, Time1} ->
+ {noreply, NState, Opts} when is_list(Opts) ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
+ [{timeout, Time}, {min_priority, MinPri}] =
+ extract_timeout_minpri(Opts),
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
+ Debug1);
+ {noreply, NState, Time} ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue,
+ Debug1);
+ {noreply, NState, Time, MinPri} ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue,
+ Debug1);
{stop, Reason, NState} ->
terminate(Reason, Name, Msg, Mod, NState, Debug);
{'EXIT', What} ->
@@ -807,8 +925,9 @@ reply(Name, {To, Tag}, Reply, State, Debug) ->
%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
%%-----------------------------------------------------------------
-system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) ->
- loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug).
+system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, MinPri,
+ Queue]) ->
+ loop(Parent, Name, State, Mod, Time, TimeoutState, MinPri, Queue, Debug).
-ifdef(use_specs).
-spec system_terminate(_, _, _, [_]) -> no_return().
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 9683809933..9421f281c9 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -56,7 +56,7 @@
-module(priority_queue).
-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3,
- out/1, pout/1, join/2]).
+ out/1, out/2, pout/1, join/2]).
%%----------------------------------------------------------------------------
@@ -73,8 +73,9 @@
-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
--spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
--spec(pout/1 :: (pqueue()) -> {empty | {value, any(), priority()}, pqueue()}).
+-spec(out/1 :: (pqueue()) -> {(empty | {value, any()}), pqueue()}).
+-spec(out/2 :: (priority(), pqueue()) -> {(empty | {value, any()}), pqueue()}).
+-spec(pout/1 :: (pqueue()) -> {(empty | {value, any(), priority()}), pqueue()}).
-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
-endif.
@@ -150,8 +151,19 @@ out({pqueue, [{P, Q} | Queues]}) ->
end,
{R, NewQ}.
-pout({queue, [], []}) ->
- {empty, {queue, [], []}};
+out(_Priority, {queue, [], []} = Q) ->
+ {empty, Q};
+out(Priority, {queue, _, _} = Q) when Priority =< 0 ->
+ out(Q);
+out(_Priority, {queue, _, _} = Q) ->
+ {empty, Q};
+out(Priority, {pqueue, [{P, _Q} | _Queues]} = Q) when Priority =< (-P) ->
+ out(Q);
+out(_Priority, {pqueue, [_|_]} = Q) ->
+ {empty, Q}.
+
+pout({queue, [], []} = Q) ->
+ {empty, Q};
pout({queue, _, _} = Q) ->
{{value, V}, Q1} = out(Q),
{{value, V, 0}, Q1};
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 3656694eb7..a537e45687 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -817,7 +817,7 @@ fetch_and_increment_cache(MsgId, #dqstate { message_cache = Cache }) ->
decrement_cache(MsgId, #dqstate { message_cache = Cache }) ->
true = try case ets:update_counter(Cache, MsgId, {4, -1}) of
- 0 -> ets:delete(Cache, MsgId);
+ N when N =< 0 -> ets:delete(Cache, MsgId);
_N -> true
end
catch error:badarg ->
@@ -1114,22 +1114,24 @@ internal_requeue_next_n(Q, N, State = #dqstate { sequences = Sequences }) ->
mnesia:transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
- requeue_next_messages(Q, N, ReadSeqId, WriteSeqId)
+ requeue_next_messages(Q, State, N, ReadSeqId, WriteSeqId)
end
),
true = ets:insert(Sequences, {Q, ReadSeqIdN, WriteSeqIdN}),
{ok, State}
end.
-requeue_next_messages(_Q, 0, ReadSeq, WriteSeq) ->
+requeue_next_messages(_Q, _State, 0, ReadSeq, WriteSeq) ->
{ReadSeq, WriteSeq};
-requeue_next_messages(Q, N, ReadSeq, WriteSeq) ->
- [Obj] = mnesia:read(rabbit_disk_queue, {Q, ReadSeq}, write),
+requeue_next_messages(Q, State, N, ReadSeq, WriteSeq) ->
+ [Obj = #dq_msg_loc { msg_id = MsgId }] =
+ mnesia:read(rabbit_disk_queue, {Q, ReadSeq}, write),
ok = mnesia:write(rabbit_disk_queue,
Obj #dq_msg_loc {queue_and_seq_id = {Q, WriteSeq}},
write),
ok = mnesia:delete(rabbit_disk_queue, {Q, ReadSeq}, write),
- requeue_next_messages(Q, N - 1, ReadSeq + 1, WriteSeq + 1).
+ decrement_cache(MsgId, State),
+ requeue_next_messages(Q, State, N - 1, ReadSeq + 1, WriteSeq + 1).
internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
case sequence_lookup(Sequences, Q) of
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index e66eb6b088..176ddddb03 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -52,7 +52,7 @@
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
--export([unfold/2, ceil/1]).
+-export([unfold/2, ceil/1, keygets/2]).
-import(mnesia).
-import(lists).
@@ -116,6 +116,8 @@
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-spec(ceil/1 :: (number()) -> number()).
+-spec(keygets/2 :: ([({K, V} | {K, non_neg_integer(), V})], [any()]) ->
+ [({K, V} | any())]).
-endif.
@@ -448,3 +450,18 @@ ceil(N) when N - trunc(N) > 0 ->
1 + trunc(N);
ceil(N) ->
N.
+
+keygets(Keys, KeyList) ->
+ lists:reverse(
+ lists:foldl(
+ fun({Key, Pos, Default}, Acc) ->
+ case lists:keysearch(Key, Pos, KeyList) of
+ false -> [{Key, Default} | Acc];
+ {value, T} -> [T | Acc]
+ end;
+ ({Key, Default}, Acc) ->
+ case lists:keysearch(Key, 1, KeyList) of
+ false -> [{Key, Default} | Acc];
+ {value, T} -> [T | Acc]
+ end
+ end, [], Keys)).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 8ab8267725..6d76d23fe9 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -65,7 +65,7 @@ test_priority_queue() ->
%% empty Q
Q = priority_queue:new(),
- {true, true, 0, [], [], []} = test_priority_queue(Q),
+ {true, true, 0, [], [], [], []} = test_priority_queue(Q),
%% 1-4 element no-priority Q
true = lists:all(fun (X) -> X =:= passed end,
@@ -74,57 +74,59 @@ test_priority_queue() ->
%% 1-element priority Q
Q1 = priority_queue:in(foo, 1, priority_queue:new()),
- {true, false, 1, [{1, foo}], [foo], [{foo, 1}]} = test_priority_queue(Q1),
+ {true, false, 1, [{1, foo}], [foo], [], [{foo, 1}]} =
+ test_priority_queue(Q1),
%% 2-element same-priority Q
Q2 = priority_queue:in(bar, 1, Q1),
- {true, false, 2, [{1, foo}, {1, bar}], [foo, bar], [{foo, 1}, {bar, 1}]} =
- test_priority_queue(Q2),
+ {true, false, 2, [{1, foo}, {1, bar}], [foo, bar], [], [{foo, 1}, {bar, 1}]}
+ = test_priority_queue(Q2),
%% 2-element different-priority Q
Q3 = priority_queue:in(bar, 2, Q1),
- {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [{bar, 2}, {foo, 1}]} =
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [bar], [{bar, 2}, {foo, 1}]} =
test_priority_queue(Q3),
%% 1-element negative priority Q
Q4 = priority_queue:in(foo, -1, priority_queue:new()),
- {true, false, 1, [{-1, foo}], [foo], [{foo, -1}]} = test_priority_queue(Q4),
+ {true, false, 1, [{-1, foo}], [foo], [], [{foo, -1}]} =
+ test_priority_queue(Q4),
%% merge 2 * 1-element no-priority Qs
Q5 = priority_queue:join(priority_queue:in(foo, Q),
priority_queue:in(bar, Q)),
- {true, false, 2, [{0, foo}, {0, bar}], [foo, bar], [{foo, 0}, {bar, 0}]} =
- test_priority_queue(Q5),
+ {true, false, 2, [{0, foo}, {0, bar}], [foo, bar], [], [{foo, 0}, {bar, 0}]}
+ = test_priority_queue(Q5),
%% merge 1-element no-priority Q with 1-element priority Q
Q6 = priority_queue:join(priority_queue:in(foo, Q),
priority_queue:in(bar, 1, Q)),
- {true, false, 2, [{1, bar}, {0, foo}], [bar, foo], [{bar, 1}, {foo, 0}]} =
- test_priority_queue(Q6),
+ {true, false, 2, [{1, bar}, {0, foo}], [bar, foo], [], [{bar, 1}, {foo, 0}]}
+ = test_priority_queue(Q6),
%% merge 1-element priority Q with 1-element no-priority Q
Q7 = priority_queue:join(priority_queue:in(foo, 1, Q),
priority_queue:in(bar, Q)),
- {true, false, 2, [{1, foo}, {0, bar}], [foo, bar], [{foo, 1}, {bar, 0}]} =
- test_priority_queue(Q7),
+ {true, false, 2, [{1, foo}, {0, bar}], [foo, bar], [], [{foo, 1}, {bar, 0}]}
+ = test_priority_queue(Q7),
%% merge 2 * 1-element same-priority Qs
Q8 = priority_queue:join(priority_queue:in(foo, 1, Q),
priority_queue:in(bar, 1, Q)),
- {true, false, 2, [{1, foo}, {1, bar}], [foo, bar], [{foo, 1}, {bar, 1}]} =
- test_priority_queue(Q8),
+ {true, false, 2, [{1, foo}, {1, bar}], [foo, bar], [], [{foo, 1}, {bar, 1}]}
+ = test_priority_queue(Q8),
%% merge 2 * 1-element different-priority Qs
Q9 = priority_queue:join(priority_queue:in(foo, 1, Q),
priority_queue:in(bar, 2, Q)),
- {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [{bar, 2}, {foo, 1}]} =
- test_priority_queue(Q9),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [bar],
+ [{bar, 2}, {foo, 1}]} = test_priority_queue(Q9),
%% merge 2 * 1-element different-priority Qs (other way around)
Q10 = priority_queue:join(priority_queue:in(bar, 2, Q),
priority_queue:in(foo, 1, Q)),
- {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [{bar, 2}, {foo, 1}]} =
- test_priority_queue(Q10),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [bar],
+ [{bar, 2}, {foo, 1}]} = test_priority_queue(Q10),
passed.
@@ -136,6 +138,11 @@ priority_queue_out_all(Q) ->
{empty, _} -> [];
{{value, V}, Q1} -> [V | priority_queue_out_all(Q1)]
end.
+priority_queue_out_2_all(Q) ->
+ case priority_queue:out(2, Q) of
+ {empty, _} -> [];
+ {{value, V}, Q1} -> [V | priority_queue_out_2_all(Q1)]
+ end.
priority_queue_pout_all(Q) ->
case priority_queue:pout(Q) of
@@ -149,6 +156,7 @@ test_priority_queue(Q) ->
priority_queue:len(Q),
priority_queue:to_list(Q),
priority_queue_out_all(Q),
+ priority_queue_out_2_all(Q),
priority_queue_pout_all(Q)}.
test_simple_n_element_queue(N) ->
@@ -156,7 +164,7 @@ test_simple_n_element_queue(N) ->
Q = priority_queue_in_all(priority_queue:new(), Items),
ToListRes = [{0, X} || X <- Items],
POutAllRes = [{X, 0} || X <- Items],
- {true, false, N, ToListRes, Items, POutAllRes} = test_priority_queue(Q),
+ {true, false, N, ToListRes, Items, [], POutAllRes} = test_priority_queue(Q),
passed.
test_parsing() ->