diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-07-09 17:06:30 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-07-09 17:06:30 +0100 |
| commit | b69a05cf71eca2b083fe5c370eb7a2595d0f52e8 (patch) | |
| tree | 35647e92c997cb5c0591e46080fb8a32c7a24544 | |
| parent | 30d8f863ab27e650d57559b1fd20ae43a4d709f8 (diff) | |
| download | rabbitmq-server-git-b69a05cf71eca2b083fe5c370eb7a2595d0f52e8.tar.gz | |
Initial work to permit low priority background tasks to be catered for.
| -rw-r--r-- | src/gen_server2.erl | 213 | ||||
| -rw-r--r-- | src/priority_queue.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 46 |
5 files changed, 236 insertions, 78 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index e46f2645bd..2784090afc 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -37,6 +37,28 @@ %% Explicit timeouts (i.e. not 'binary') from the handle_* functions %% are still supported, and do not have any effect on the current %% timeout value. +%% +%% 6) init/1 can also return (either a further arg in addition to +%% timeout above, or as a key-value list with the timeout as {timeout, +%% Timeout}) a minimum priority (key: min_priority). This can also be +%% returned from handle_* functions (i.e. {noreply, NewState} or +%% {noreply, NewState, Timeout} or {noreply, NewState, Timeout, +%% MinPri} or {noreply, NewState, [{min_priority, MinPri}]} or +%% {noreply, NewState, [{min_priority, MinPri}, {timeout, +%% Timeout}]}). What this does is to only allow messages greater than +%% the indicated priority through to the module. To allow any message +%% through (as is the default), use 'any'. One effect of this is that +%% when hibernating, the process can be woken up to receive a message +%% which it then realises it is not interested in. When this happens, +%% handle_info(roused_and_disinterested, State) will be called as soon +%% as there are no further messages to process (i.e. upon waking, the +%% message queue is drained, and a timeout of 0 is used). A suggested +%% use of this is to cater for low priority background casts, which +%% can be sent with negative priorities, and to use a priority of 0 or +%% higher for everything else. Then, if you return from handle_* with +%% a timeout of 0 and find handle_info(timeout, State) being called, +%% you can then return with a min_priority of 'any' and pick up the +%% low priority messages. %% All modifications are (C) 2009 LShift Ltd. @@ -133,7 +155,8 @@ cast/2, pcast/3, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, - enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]). + enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, + wake_hib/8]). -export([behaviour_info/1]). @@ -322,22 +345,32 @@ multi_call(Nodes, Name, Req, Timeout) %% process, including registering a name for it. %%----------------------------------------------------------------- enter_loop(Mod, Options, State) -> - enter_loop(Mod, Options, State, self(), infinity). + enter_loop(Mod, Options, State, self(), []). enter_loop(Mod, Options, State, ServerName = {_, _}) -> - enter_loop(Mod, Options, State, ServerName, infinity); + enter_loop(Mod, Options, State, ServerName, []); + +enter_loop(Mod, Options, State, Opts) when is_list(Opts) -> + enter_loop(Mod, Options, State, self(), Opts); enter_loop(Mod, Options, State, Timeout) -> - enter_loop(Mod, Options, State, self(), Timeout). + enter_loop(Mod, Options, State, self(), [{timeout, Timeout}]). -enter_loop(Mod, Options, State, ServerName, Timeout) -> +enter_loop(Mod, Options, State, ServerName, Opts) when is_list(Opts) -> Name = get_proc_name(ServerName), Parent = get_parent(), Debug = debug_options(Name, Options), Queue = priority_queue:new(), + [{timeout, Timeout}, {min_priority, MinPri}] = extract_timeout_minpri(Opts), {Timeout1, TimeoutState} = build_timeout_state(Timeout), - loop(Parent, Name, State, Mod, Timeout1, TimeoutState, Queue, Debug). + loop(Parent, Name, State, Mod, Timeout1, TimeoutState, MinPri, Queue, Debug); + +enter_loop(Mod, Options, State, ServerName, Timeout) -> + enter_loop(Mod, Options, State, ServerName, [{timeout, Timeout}]). +enter_loop(Mod, Options, State, ServerName, Timeout, MinPri) -> + enter_loop(Mod, Options, State, ServerName, + [{timeout, Timeout}, {min_priority, MinPri}]). %%%======================================================================== %%% Gen-callback functions %%%======================================================================== @@ -357,13 +390,19 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> Queue = priority_queue:new(), case catch Mod:init(Args) of {ok, State} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug); + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, infinity, undefined, + any, Queue, Debug); {ok, State, Timeout} -> proc_lib:init_ack(Starter, {ok, self()}), {Timeout1, TimeoutState} = build_timeout_state(Timeout), - loop(Parent, Name, State, Mod, Timeout1, TimeoutState, Queue, - Debug); + loop(Parent, Name, State, Mod, Timeout1, TimeoutState, + any, Queue, Debug); + {ok, State, Timeout, MinPri} -> + proc_lib:init_ack(Starter, {ok, self()}), + {Timeout1, TimeoutState} = build_timeout_state(Timeout), + loop(Parent, Name, State, Mod, Timeout1, TimeoutState, + MinPri, Queue, Debug); {stop, Reason} -> %% For consistency, we must make sure that the %% registered name (if any) is unregistered before @@ -407,57 +446,70 @@ build_timeout_state(Timeout) -> _ -> {Timeout, undefined} end. +extract_timeout_minpri(Opts) -> + rabbit_misc:keygets([{timeout, infinity}, {min_priority, any}], Opts). + %%%======================================================================== %%% Internal functions %%%======================================================================== %%% --------------------------------------------------- %%% The MAIN loop. %%% --------------------------------------------------- -loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) -> - proc_lib:hibernate(?MODULE,wake_hib, - [Parent, Name, State, Mod, undefined, Queue, Debug]); -loop(Parent, Name, State, Mod, hibernate, {Current, Min, undefined}, Queue, - Debug) -> +loop(Parent, Name, State, Mod, hibernate, undefined, MinPri, Queue, Debug) -> + proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod, undefined, + MinPri, Queue, Debug]); +loop(Parent, Name, State, Mod, hibernate, {Current, Min, undefined}, + MinPri, Queue, Debug) -> proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, - {Current, Min, now()}, Queue, Debug]); -loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> + {Current, Min, now()}, + MinPri, Queue, Debug]); +loop(Parent, Name, State, Mod, Time, TimeoutState, MinPri, Queue, Debug) -> receive Input -> loop(Parent, Name, State, Mod, - Time, TimeoutState, in(Input, Queue), Debug) + Time, TimeoutState, MinPri, in(Input, Queue), Debug) after 0 -> process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, - Queue, Debug, false) + MinPri, Queue, Debug, false) end. -process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, +process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, MinPri, Queue, Debug, Hib) -> - case priority_queue:out(Queue) of + Res = case MinPri of + any -> priority_queue:out(Queue); + _ -> priority_queue:out(MinPri, Queue) + end, + case Res of {{value, Msg}, Queue1} -> process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue1, Debug, Hib, Msg); {empty, Queue1} -> - Time1 = case {Time, TimeoutState} of - {binary, {Current, _Min, undefined}} -> Current; + Time1 = case {Hib, Time, TimeoutState} of + {true, _, _} -> 0; + {_, binary, {Current, _Min, undefined}} -> Current; _ -> Time end, receive Input -> loop(Parent, Name, State, Mod, - Time, TimeoutState, in(Input, Queue1), Debug) + Time, TimeoutState, MinPri, in(Input, Queue1), Debug) after Time1 -> process_msg(Parent, Name, State, Mod, - Time, TimeoutState, Queue1, Debug, Hib, timeout) + Time, TimeoutState, Queue1, Debug, Hib, + case Hib of + true -> roused_and_disinterested; + false -> timeout + end) end end. -wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> +wake_hib(Parent, Name, State, Mod, TimeoutState, MinPri, Queue, Debug) -> Msg = receive Input -> Input end, TimeoutState1 = adjust_hibernate_after(TimeoutState), process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState1, - in(Msg, Queue), Debug, true). + MinPri, in(Msg, Queue), Debug, true). adjust_hibernate_after(undefined) -> undefined; @@ -707,14 +759,34 @@ handle_msg({'$gen_call', From, Msg}, case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> reply(From, Reply), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); - {reply, Reply, NState, Time1} -> + loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue, + []); + {reply, Reply, NState, Opts} when is_list(Opts) -> + reply(From, Reply), + [{timeout, Time}, {min_priority, MinPri}] = + extract_timeout_minpri(Opts), + loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, + []); + {reply, Reply, NState, Time} -> reply(From, Reply), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); + loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, []); + {reply, Reply, NState, Time, MinPri} -> + reply(From, Reply), + loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, + []); {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); - {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue, + []); + {noreply, NState, Opts} when is_list(Opts) -> + [{timeout, Time}, {min_priority, MinPri}] = + extract_timeout_minpri(Opts), + loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, + []); + {noreply, NState, Time} -> + loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, []); + {noreply, NState, Time, MinPri} -> + loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, + []); {stop, Reason, Reply, NState} -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, NState, [])), @@ -734,20 +806,44 @@ handle_msg({'$gen_call', From, Msg}, case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, + loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue, + Debug1); + {reply, Reply, NState, Opts} when is_list(Opts) -> + Debug1 = reply(Name, From, Reply, NState, Debug), + [{timeout, Time}, {min_priority, MinPri}] = + extract_timeout_minpri(Opts), + loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, + Debug1); + {reply, Reply, NState, Time} -> + Debug1 = reply(Name, From, Reply, NState, Debug), + loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, Debug1); - {reply, Reply, NState, Time1} -> + {reply, Reply, NState, Time, MinPri} -> Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, + Debug1); {noreply, NState} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, + loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue, + Debug1); + {noreply, NState, Opts} when is_list(Opts) -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + [{timeout, Time}, {min_priority, MinPri}] = + extract_timeout_minpri(Opts), + loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, Debug1); - {noreply, NState, Time1} -> + {noreply, NState, Time} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, + Debug1); + {noreply, NState, Time, MinPri} -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, + Debug1); {stop, Reason, Reply, NState} -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, NState, Debug)), @@ -767,9 +863,18 @@ handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue) -> case Reply of {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); - {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue, + []); + {noreply, NState, Opts} when is_list(Opts) -> + [{timeout, Time}, {min_priority, MinPri}] = + extract_timeout_minpri(Opts), + loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, + []); + {noreply, NState, Time} -> + loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, []); + {noreply, NState, Time, MinPri} -> + loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, + []); {stop, Reason, NState} -> terminate(Reason, Name, Msg, Mod, NState, []); {'EXIT', What} -> @@ -784,12 +889,25 @@ handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue, {noreply, NState} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, + loop(Parent, Name, NState, Mod, infinity, TimeoutState, any, Queue, Debug1); - {noreply, NState, Time1} -> + {noreply, NState, Opts} when is_list(Opts) -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); + [{timeout, Time}, {min_priority, MinPri}] = + extract_timeout_minpri(Opts), + loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, + Debug1); + {noreply, NState, Time} -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, Time, TimeoutState, any, Queue, + Debug1); + {noreply, NState, Time, MinPri} -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, Time, TimeoutState, MinPri, Queue, + Debug1); {stop, Reason, NState} -> terminate(Reason, Name, Msg, Mod, NState, Debug); {'EXIT', What} -> @@ -807,8 +925,9 @@ reply(Name, {To, Tag}, Reply, State, Debug) -> %%----------------------------------------------------------------- %% Callback functions for system messages handling. %%----------------------------------------------------------------- -system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) -> - loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug). +system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, MinPri, + Queue]) -> + loop(Parent, Name, State, Mod, Time, TimeoutState, MinPri, Queue, Debug). -ifdef(use_specs). -spec system_terminate(_, _, _, [_]) -> no_return(). diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 9683809933..9421f281c9 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -56,7 +56,7 @@ -module(priority_queue). -export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, - out/1, pout/1, join/2]). + out/1, out/2, pout/1, join/2]). %%---------------------------------------------------------------------------- @@ -73,8 +73,9 @@ -spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]). -spec(in/2 :: (any(), pqueue()) -> pqueue()). -spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). --spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}). --spec(pout/1 :: (pqueue()) -> {empty | {value, any(), priority()}, pqueue()}). +-spec(out/1 :: (pqueue()) -> {(empty | {value, any()}), pqueue()}). +-spec(out/2 :: (priority(), pqueue()) -> {(empty | {value, any()}), pqueue()}). +-spec(pout/1 :: (pqueue()) -> {(empty | {value, any(), priority()}), pqueue()}). -spec(join/2 :: (pqueue(), pqueue()) -> pqueue()). -endif. @@ -150,8 +151,19 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. -pout({queue, [], []}) -> - {empty, {queue, [], []}}; +out(_Priority, {queue, [], []} = Q) -> + {empty, Q}; +out(Priority, {queue, _, _} = Q) when Priority =< 0 -> + out(Q); +out(_Priority, {queue, _, _} = Q) -> + {empty, Q}; +out(Priority, {pqueue, [{P, _Q} | _Queues]} = Q) when Priority =< (-P) -> + out(Q); +out(_Priority, {pqueue, [_|_]} = Q) -> + {empty, Q}. + +pout({queue, [], []} = Q) -> + {empty, Q}; pout({queue, _, _} = Q) -> {{value, V}, Q1} = out(Q), {{value, V, 0}, Q1}; diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 3656694eb7..a537e45687 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -817,7 +817,7 @@ fetch_and_increment_cache(MsgId, #dqstate { message_cache = Cache }) -> decrement_cache(MsgId, #dqstate { message_cache = Cache }) -> true = try case ets:update_counter(Cache, MsgId, {4, -1}) of - 0 -> ets:delete(Cache, MsgId); + N when N =< 0 -> ets:delete(Cache, MsgId); _N -> true end catch error:badarg -> @@ -1114,22 +1114,24 @@ internal_requeue_next_n(Q, N, State = #dqstate { sequences = Sequences }) -> mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), - requeue_next_messages(Q, N, ReadSeqId, WriteSeqId) + requeue_next_messages(Q, State, N, ReadSeqId, WriteSeqId) end ), true = ets:insert(Sequences, {Q, ReadSeqIdN, WriteSeqIdN}), {ok, State} end. -requeue_next_messages(_Q, 0, ReadSeq, WriteSeq) -> +requeue_next_messages(_Q, _State, 0, ReadSeq, WriteSeq) -> {ReadSeq, WriteSeq}; -requeue_next_messages(Q, N, ReadSeq, WriteSeq) -> - [Obj] = mnesia:read(rabbit_disk_queue, {Q, ReadSeq}, write), +requeue_next_messages(Q, State, N, ReadSeq, WriteSeq) -> + [Obj = #dq_msg_loc { msg_id = MsgId }] = + mnesia:read(rabbit_disk_queue, {Q, ReadSeq}, write), ok = mnesia:write(rabbit_disk_queue, Obj #dq_msg_loc {queue_and_seq_id = {Q, WriteSeq}}, write), ok = mnesia:delete(rabbit_disk_queue, {Q, ReadSeq}, write), - requeue_next_messages(Q, N - 1, ReadSeq + 1, WriteSeq + 1). + decrement_cache(MsgId, State), + requeue_next_messages(Q, State, N - 1, ReadSeq + 1, WriteSeq + 1). internal_purge(Q, State = #dqstate { sequences = Sequences }) -> case sequence_lookup(Sequences, Q) of diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index e66eb6b088..176ddddb03 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -52,7 +52,7 @@ -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). --export([unfold/2, ceil/1]). +-export([unfold/2, ceil/1, keygets/2]). -import(mnesia). -import(lists). @@ -116,6 +116,8 @@ -spec(stop_applications/1 :: ([atom()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). -spec(ceil/1 :: (number()) -> number()). +-spec(keygets/2 :: ([({K, V} | {K, non_neg_integer(), V})], [any()]) -> + [({K, V} | any())]). -endif. @@ -448,3 +450,18 @@ ceil(N) when N - trunc(N) > 0 -> 1 + trunc(N); ceil(N) -> N. + +keygets(Keys, KeyList) -> + lists:reverse( + lists:foldl( + fun({Key, Pos, Default}, Acc) -> + case lists:keysearch(Key, Pos, KeyList) of + false -> [{Key, Default} | Acc]; + {value, T} -> [T | Acc] + end; + ({Key, Default}, Acc) -> + case lists:keysearch(Key, 1, KeyList) of + false -> [{Key, Default} | Acc]; + {value, T} -> [T | Acc] + end + end, [], Keys)). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 8ab8267725..6d76d23fe9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -65,7 +65,7 @@ test_priority_queue() -> %% empty Q Q = priority_queue:new(), - {true, true, 0, [], [], []} = test_priority_queue(Q), + {true, true, 0, [], [], [], []} = test_priority_queue(Q), %% 1-4 element no-priority Q true = lists:all(fun (X) -> X =:= passed end, @@ -74,57 +74,59 @@ test_priority_queue() -> %% 1-element priority Q Q1 = priority_queue:in(foo, 1, priority_queue:new()), - {true, false, 1, [{1, foo}], [foo], [{foo, 1}]} = test_priority_queue(Q1), + {true, false, 1, [{1, foo}], [foo], [], [{foo, 1}]} = + test_priority_queue(Q1), %% 2-element same-priority Q Q2 = priority_queue:in(bar, 1, Q1), - {true, false, 2, [{1, foo}, {1, bar}], [foo, bar], [{foo, 1}, {bar, 1}]} = - test_priority_queue(Q2), + {true, false, 2, [{1, foo}, {1, bar}], [foo, bar], [], [{foo, 1}, {bar, 1}]} + = test_priority_queue(Q2), %% 2-element different-priority Q Q3 = priority_queue:in(bar, 2, Q1), - {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [{bar, 2}, {foo, 1}]} = + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [bar], [{bar, 2}, {foo, 1}]} = test_priority_queue(Q3), %% 1-element negative priority Q Q4 = priority_queue:in(foo, -1, priority_queue:new()), - {true, false, 1, [{-1, foo}], [foo], [{foo, -1}]} = test_priority_queue(Q4), + {true, false, 1, [{-1, foo}], [foo], [], [{foo, -1}]} = + test_priority_queue(Q4), %% merge 2 * 1-element no-priority Qs Q5 = priority_queue:join(priority_queue:in(foo, Q), priority_queue:in(bar, Q)), - {true, false, 2, [{0, foo}, {0, bar}], [foo, bar], [{foo, 0}, {bar, 0}]} = - test_priority_queue(Q5), + {true, false, 2, [{0, foo}, {0, bar}], [foo, bar], [], [{foo, 0}, {bar, 0}]} + = test_priority_queue(Q5), %% merge 1-element no-priority Q with 1-element priority Q Q6 = priority_queue:join(priority_queue:in(foo, Q), priority_queue:in(bar, 1, Q)), - {true, false, 2, [{1, bar}, {0, foo}], [bar, foo], [{bar, 1}, {foo, 0}]} = - test_priority_queue(Q6), + {true, false, 2, [{1, bar}, {0, foo}], [bar, foo], [], [{bar, 1}, {foo, 0}]} + = test_priority_queue(Q6), %% merge 1-element priority Q with 1-element no-priority Q Q7 = priority_queue:join(priority_queue:in(foo, 1, Q), priority_queue:in(bar, Q)), - {true, false, 2, [{1, foo}, {0, bar}], [foo, bar], [{foo, 1}, {bar, 0}]} = - test_priority_queue(Q7), + {true, false, 2, [{1, foo}, {0, bar}], [foo, bar], [], [{foo, 1}, {bar, 0}]} + = test_priority_queue(Q7), %% merge 2 * 1-element same-priority Qs Q8 = priority_queue:join(priority_queue:in(foo, 1, Q), priority_queue:in(bar, 1, Q)), - {true, false, 2, [{1, foo}, {1, bar}], [foo, bar], [{foo, 1}, {bar, 1}]} = - test_priority_queue(Q8), + {true, false, 2, [{1, foo}, {1, bar}], [foo, bar], [], [{foo, 1}, {bar, 1}]} + = test_priority_queue(Q8), %% merge 2 * 1-element different-priority Qs Q9 = priority_queue:join(priority_queue:in(foo, 1, Q), priority_queue:in(bar, 2, Q)), - {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [{bar, 2}, {foo, 1}]} = - test_priority_queue(Q9), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [bar], + [{bar, 2}, {foo, 1}]} = test_priority_queue(Q9), %% merge 2 * 1-element different-priority Qs (other way around) Q10 = priority_queue:join(priority_queue:in(bar, 2, Q), priority_queue:in(foo, 1, Q)), - {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [{bar, 2}, {foo, 1}]} = - test_priority_queue(Q10), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo], [bar], + [{bar, 2}, {foo, 1}]} = test_priority_queue(Q10), passed. @@ -136,6 +138,11 @@ priority_queue_out_all(Q) -> {empty, _} -> []; {{value, V}, Q1} -> [V | priority_queue_out_all(Q1)] end. +priority_queue_out_2_all(Q) -> + case priority_queue:out(2, Q) of + {empty, _} -> []; + {{value, V}, Q1} -> [V | priority_queue_out_2_all(Q1)] + end. priority_queue_pout_all(Q) -> case priority_queue:pout(Q) of @@ -149,6 +156,7 @@ test_priority_queue(Q) -> priority_queue:len(Q), priority_queue:to_list(Q), priority_queue_out_all(Q), + priority_queue_out_2_all(Q), priority_queue_pout_all(Q)}. test_simple_n_element_queue(N) -> @@ -156,7 +164,7 @@ test_simple_n_element_queue(N) -> Q = priority_queue_in_all(priority_queue:new(), Items), ToListRes = [{0, X} || X <- Items], POutAllRes = [{X, 0} || X <- Items], - {true, false, N, ToListRes, Items, POutAllRes} = test_priority_queue(Q), + {true, false, N, ToListRes, Items, [], POutAllRes} = test_priority_queue(Q), passed. test_parsing() -> |
