diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/gen_server2.erl | 62 | ||||
| -rw-r--r-- | src/priority_queue.erl | 153 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 58 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_multi.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 63 |
9 files changed, 370 insertions, 47 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 11bb66d743..ba8becfca9 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -16,6 +16,11 @@ %% The original code could reorder messages when communicating with a %% process on a remote node that was not currently connected. %% +%% 4) The new functions gen_server2:pcall/3, pcall/4, and pcast/3 +%% allow callers to attach priorities to requests. Requests with +%% higher priorities are processed before requests with lower +%% priorities. The default priority is 0. +%% %% All modifications are (C) 2009 LShift Ltd. %% ``The contents of this file are subject to the Erlang Public License, @@ -107,8 +112,8 @@ %% API -export([start/3, start/4, start_link/3, start_link/4, - call/2, call/3, - cast/2, reply/2, + call/2, call/3, pcall/3, pcall/4, + cast/2, pcast/3, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, enter_loop/3, enter_loop/4, enter_loop/5]). @@ -188,6 +193,22 @@ call(Name, Request, Timeout) -> exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) end. +pcall(Name, Priority, Request) -> + case catch gen:call(Name, '$gen_pcall', {Priority, Request}) of + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, pcall, [Name, Priority, Request]}}) + end. + +pcall(Name, Priority, Request, Timeout) -> + case catch gen:call(Name, '$gen_pcall', {Priority, Request}, Timeout) of + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, pcall, [Name, Priority, Request, Timeout]}}) + end. + %% ----------------------------------------------------------------- %% Make a cast to a generic server. %% ----------------------------------------------------------------- @@ -207,6 +228,22 @@ do_cast(Dest, Request) -> cast_msg(Request) -> {'$gen_cast',Request}. +pcast({global,Name}, Priority, Request) -> + catch global:send(Name, cast_msg(Priority, Request)), + ok; +pcast({Name,Node}=Dest, Priority, Request) when is_atom(Name), is_atom(Node) -> + do_cast(Dest, Priority, Request); +pcast(Dest, Priority, Request) when is_atom(Dest) -> + do_cast(Dest, Priority, Request); +pcast(Dest, Priority, Request) when is_pid(Dest) -> + do_cast(Dest, Priority, Request). + +do_cast(Dest, Priority, Request) -> + do_send(Dest, cast_msg(Priority, Request)), + ok. + +cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}. + %% ----------------------------------------------------------------- %% Send a reply to the client. %% ----------------------------------------------------------------- @@ -276,7 +313,7 @@ enter_loop(Mod, Options, State, ServerName, Timeout) -> Name = get_proc_name(ServerName), Parent = get_parent(), Debug = debug_options(Name, Options), - Queue = queue:new(), + Queue = priority_queue:new(), loop(Parent, Name, State, Mod, Timeout, Queue, Debug). %%%======================================================================== @@ -294,7 +331,7 @@ init_it(Starter, self, Name, Mod, Args, Options) -> init_it(Starter, self(), Name, Mod, Args, Options); init_it(Starter, Parent, Name, Mod, Args, Options) -> Debug = debug_options(Name, Options), - Queue = queue:new(), + Queue = priority_queue:new(), case catch Mod:init(Args) of {ok, State} -> proc_lib:init_ack(Starter, {ok, self()}), @@ -326,9 +363,9 @@ init_it(Starter, Parent, Name, Mod, Args, Options) -> loop(Parent, Name, State, Mod, Time, Queue, Debug) -> receive Input -> loop(Parent, Name, State, Mod, - Time, queue:in(Input, Queue), Debug) + Time, in(Input, Queue), Debug) after 0 -> - case queue:out(Queue) of + case priority_queue:out(Queue) of {{value, Msg}, Queue1} -> process_msg(Parent, Name, State, Mod, Time, Queue1, Debug, Msg); @@ -336,14 +373,21 @@ loop(Parent, Name, State, Mod, Time, Queue, Debug) -> receive Input -> loop(Parent, Name, State, Mod, - Time, queue:in(Input, Queue1), Debug) + Time, in(Input, Queue1), Debug) after Time -> process_msg(Parent, Name, State, Mod, Time, Queue1, Debug, timeout) end end end. - + +in({'$gen_pcast', {Priority, Msg}}, Queue) -> + priority_queue:in({'$gen_cast', Msg}, Priority, Queue); +in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> + priority_queue:in({'$gen_call', From, Msg}, Priority, Queue); +in(Input, Queue) -> + priority_queue:in(Input, Queue). + process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) -> case Msg of {system, From, Req} -> @@ -850,5 +894,5 @@ format_status(Opt, StatusData) -> {data, [{"Status", SysState}, {"Parent", Parent}, {"Logged events", Log}, - {"Queued messages", queue:to_list(Queue)}]} | + {"Queued messages", priority_queue:to_list(Queue)}]} | Specfic]. diff --git a/src/priority_queue.erl b/src/priority_queue.erl new file mode 100644 index 0000000000..88ad0c182d --- /dev/null +++ b/src/priority_queue.erl @@ -0,0 +1,153 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +%% Priority queues have essentially the same interface as ordinary +%% queues, except that a) there is an in/3 that takes a priority, and +%% b) we have only implemented the core API we need. +%% +%% Priorities should be integers - the higher the value the higher the +%% priority - but we don't actually check that. +%% +%% in/2 inserts items with priority 0. +%% +%% We optimise the case where a priority queue is being used just like +%% an ordinary queue. When that is the case we represent the priority +%% queue as an ordinary queue. We could just call into the 'queue' +%% module for that, but for efficiency we implement the relevant +%% functions directly in here, thus saving on inter-module calls and +%% eliminating a level of boxing. +%% +%% When the queue contains items with non-zero priorities, it is +%% represented as a sorted kv list with the inverted Priority as the +%% key and an ordinary queue as the value. Here again we use our own +%% ordinary queue implemention for efficiency, often making recursive +%% calls into the same function knowing that ordinary queues represent +%% a base case. + + +-module(priority_queue). + +-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(priority() :: integer()). +-type(squeue() :: {queue, [any()], [any()]}). +-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}). + +-spec(new/0 :: () -> pqueue()). +-spec(is_queue/1 :: (any()) -> bool()). +-spec(is_empty/1 :: (pqueue()) -> bool()). +-spec(len/1 :: (pqueue()) -> non_neg_integer()). +-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]). +-spec(in/2 :: (any(), pqueue()) -> pqueue()). +-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). +-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}). + +-endif. + +%%---------------------------------------------------------------------------- + +new() -> + {queue, [], []}. + +is_queue({queue, R, F}) when is_list(R), is_list(F) -> + true; +is_queue({pqueue, Queues}) when is_list(Queues) -> + lists:all(fun ({P, Q}) -> is_integer(P) andalso is_queue(Q) end, + Queues); +is_queue(_) -> + false. + +is_empty({queue, [], []}) -> + true; +is_empty(_) -> + false. + +len({queue, R, F}) when is_list(R), is_list(F) -> + length(R) + length(F); +len({pqueue, Queues}) -> + lists:sum([len(Q) || {_, Q} <- Queues]). + +to_list({queue, In, Out}) when is_list(In), is_list(Out) -> + [{0, V} || V <- Out ++ lists:reverse(In, [])]; +to_list({pqueue, Queues}) -> + [{-P, V} || {P, Q} <- Queues, {0, V} <- to_list(Q)]. + +in(Item, Q) -> + in(Item, 0, Q). + +in(X, 0, {queue, [_] = In, []}) -> + {queue, [X], In}; +in(X, 0, {queue, In, Out}) when is_list(In), is_list(Out) -> + {queue, [X|In], Out}; +in(X, Priority, Q = {queue, [], []}) -> + in(X, Priority, {pqueue, []}); +in(X, Priority, Q = {queue, _, _}) -> + in(X, Priority, {pqueue, [{0, Q}]}); +in(X, Priority, {pqueue, Queues}) -> + P = -Priority, + {pqueue, case lists:keysearch(P, 1, Queues) of + {value, {_, Q}} -> + lists:keyreplace(P, 1, Queues, {P, in(X, Q)}); + false -> + lists:keysort(1, [{P, {queue, [X], []}} | Queues]) + end}. + +out({queue, [], []} = Q) -> + {empty, Q}; +out({queue, [V], []}) -> + {{value, V}, {queue, [], []}}; +out({queue, [Y|In], []}) -> + [V|Out] = lists:reverse(In, []), + {{value, V}, {queue, [Y], Out}}; +out({queue, In, [V]}) when is_list(In) -> + {{value,V}, r2f(In)}; +out({queue, In,[V|Out]}) when is_list(In) -> + {{value, V}, {queue, In, Out}}; +out({pqueue, [{P, Q} | Queues]}) -> + {R, Q1} = out(Q), + NewQ = case is_empty(Q1) of + true -> case Queues of + [] -> {queue, [], []}; + [{0, OnlyQ}] -> OnlyQ; + [_|_] -> {pqueue, Queues} + end; + false -> {pqueue, [{P, Q1} | Queues]} + end, + {R, NewQ}. + +r2f([]) -> {queue, [], []}; +r2f([_] = R) -> {queue, [], R}; +r2f([X,Y]) -> {queue, [X], [Y]}; +r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fdc99c380d..eb076e94d6 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -213,10 +213,10 @@ list(VHostPath) -> map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - gen_server2:call(QPid, info, infinity). + gen_server2:pcall(QPid, 9, info, infinity). info(#amqqueue{ pid = QPid }, Items) -> - case gen_server2:call(QPid, {info, Items}, infinity) of + case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -305,28 +305,29 @@ internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of - [] -> {error, not_found}; - [Q] -> - ok = delete_queue(Q), + [] -> {error, not_found}; + [_] -> + ok = rabbit_exchange:delete_queue_bindings(QueueName), + ok = mnesia:delete({rabbit_queue, QueueName}), ok = mnesia:delete({rabbit_durable_queue, QueueName}), ok end end). -delete_queue(#amqqueue{name = QueueName}) -> - ok = rabbit_exchange:delete_bindings_for_queue(QueueName), - ok = mnesia:delete({rabbit_queue, QueueName}), - ok. - on_node_down(Node) -> rabbit_misc:execute_mnesia_transaction( fun () -> qlc:fold( - fun (Q, Acc) -> ok = delete_queue(Q), Acc end, + fun (QueueName, Acc) -> + ok = rabbit_exchange:delete_transient_queue_bindings( + QueueName), + ok = mnesia:delete({rabbit_queue, QueueName}), + Acc + end, ok, - qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) + qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])) end). pseudo_queue(QueueName, Pid) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a9fcd8e01a..246d04608a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -573,7 +573,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_configure_permitted(ExchangeName, State), case rabbit_exchange:delete(ExchangeName, IfUnused) of - {error, exchange_not_found} -> + {error, not_found} -> rabbit_misc:protocol_error( not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); {error, in_use} -> @@ -740,12 +740,16 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of + {error, exchange_not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); {error, queue_not_found} -> rabbit_misc:protocol_error( not_found, "no ~s", [rabbit_misc:rs(QueueName)]); - {error, exchange_not_found} -> + {error, exchange_and_queue_not_found} -> rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); + not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName), + rabbit_misc:rs(QueueName)]); {error, binding_not_found} -> rabbit_misc:protocol_error( not_found, "no binding ~s between ~s and ~s", diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index e6717d689f..6649899ade 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -38,6 +38,19 @@ -define(RPC_TIMEOUT, 30000). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start/0 :: () -> no_return()). +-spec(stop/0 :: () -> 'ok'). +-spec(action/4 :: (atom(), erlang_node(), [string()], + fun ((string(), [any()]) -> 'ok')) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + start() -> FullCommand = init:get_plain_arguments(), #params{quiet = Quiet, node = Node, command = Command, args = Args} = diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 493bc365d5..43c77c92d8 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -40,7 +40,7 @@ route/3]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). --export([delete_bindings_for_queue/1]). +-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). -export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]). %% EXTENDED API @@ -58,9 +58,11 @@ -ifdef(use_specs). -type(publish_res() :: {'ok', [pid()]} | - not_found() | {'error', 'unroutable' | 'no_consumers'}). --type(bind_res() :: 'ok' | - {'error', 'queue_not_found' | 'exchange_not_found'}). + not_found() | {'error', 'unroutable' | 'not_delivered'}). +-type(bind_res() :: 'ok' | {'error', + 'queue_not_found' | + 'exchange_not_found' | + 'exchange_and_queue_not_found'}). -spec(recover/0 :: () -> 'ok'). -spec(declare/4 :: (exchange_name(), exchange_type(), bool(), amqp_table()) -> exchange()). -spec(check_type/1 :: (binary()) -> atom()). @@ -85,11 +87,12 @@ bind_res() | {'error', 'binding_not_found'}). -spec(list_bindings/1 :: (vhost()) -> [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). --spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok'). +-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok'). +-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok'). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). -spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()). -spec(delete/2 :: (exchange_name(), bool()) -> - 'ok' | {'error', 'exchange_not_found'} | {'error', 'in_use'}). + 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> [{exchange_name(), routing_key(), amqp_table()}]). -spec(list_exchange_bindings/1 :: (exchange_name()) -> @@ -290,7 +293,7 @@ lookup_qpids(Queues) -> %% refactored to its own module, especially seeing as unbind will have %% to be implemented for 0.91 ? -delete_bindings_for_exchange(ExchangeName) -> +delete_exchange_bindings(ExchangeName) -> [begin ok = mnesia:delete_object(rabbit_reverse_route, reverse_route(Route), write), @@ -302,9 +305,16 @@ delete_bindings_for_exchange(ExchangeName) -> write)], ok. -delete_bindings_for_queue(QueueName) -> +delete_queue_bindings(QueueName) -> + delete_queue_bindings(QueueName, fun delete_forward_routes/1). + +delete_transient_queue_bindings(QueueName) -> + delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1). + +delete_queue_bindings(QueueName, FwdDeleteFun) -> + Exchanges = exchanges_for_queue(QueueName), [begin - ok = delete_forward_routes(reverse_route(Route)), + ok = FwdDeleteFun(reverse_route(Route)), ok = mnesia:delete_object(rabbit_reverse_route, Route, write) end || Route <- mnesia:match_object( rabbit_reverse_route, @@ -318,6 +328,18 @@ delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), ok = mnesia:delete_object(rabbit_durable_route, Route, write). +delete_transient_forward_routes(Route) -> + ok = mnesia:delete_object(rabbit_route, Route, write). + +exchanges_for_queue(QueueName) -> + MatchHead = reverse_route( + #route{binding = #binding{exchange_name = '$1', + queue_name = QueueName, + _ = '_'}}), + sets:to_list( + sets:from_list( + mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). + has_bindings(ExchangeName) -> MatchHead = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, @@ -340,18 +362,20 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)). call_with_exchange(Exchange, Fun) -> rabbit_misc:execute_mnesia_transaction( fun() -> case mnesia:read({rabbit_exchange, Exchange}) of - [] -> {error, exchange_not_found}; + [] -> {error, not_found}; [X] -> Fun(X) end end). call_with_exchange_and_queue(Exchange, Queue, Fun) -> - call_with_exchange( - Exchange, - fun(X) -> case mnesia:read({rabbit_queue, Queue}) of - [] -> {error, queue_not_found}; - [Q] -> Fun(X, Q) - end + rabbit_misc:execute_mnesia_transaction( + fun() -> case {mnesia:read({rabbit_exchange, Exchange}), + mnesia:read({rabbit_queue, Queue})} of + {[X], [Q]} -> Fun(X, Q); + {[ ], [_]} -> {error, exchange_not_found}; + {[_], [ ]} -> {error, queue_not_found}; + {[ ], [ ]} -> {error, exchange_and_queue_not_found} + end end). add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> @@ -534,7 +558,7 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) -> end. unconditional_delete(#exchange{name = ExchangeName}) -> - ok = delete_bindings_for_exchange(ExchangeName), + ok = delete_exchange_bindings(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), ok = mnesia:delete({rabbit_exchange, ExchangeName}). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index de7bc010b2..eced0b3cbe 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -106,7 +106,7 @@ -spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}). -spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). --spec(format_stderr/2 :: (string(), [any()]) -> 'true'). +-spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). @@ -375,9 +375,19 @@ ensure_parent_dirs_exist(Filename) -> end. format_stderr(Fmt, Args) -> - Port = open_port({fd, 0, 2}, [out]), - port_command(Port, io_lib:format(Fmt, Args)), - port_close(Port). + case os:type() of + {unix, _} -> + Port = open_port({fd, 0, 2}, [out]), + port_command(Port, io_lib:format(Fmt, Args)), + port_close(Port); + {win32, _} -> + %% stderr on Windows is buffered and I can't figure out a + %% way to trigger a fflush(stderr) in Erlang. So rather + %% than risk losing output we write to stdout instead, + %% which appears to be unbuffered. + io:format(Fmt, Args) + end, + ok. manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> Iterate(fun (App, Acc) -> diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 5e8edd53a1..d91975359a 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -36,6 +36,17 @@ -define(RPC_SLEEP, 500). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start/0 :: () -> no_return()). +-spec(stop/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + start() -> RpcTimeout = case init:get_argument(maxwait) of diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 6312e8e364..8f0a3a8973 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -45,6 +45,7 @@ test_content_prop_roundtrip(Datum, Binary) -> Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion all_tests() -> + passed = test_priority_queue(), passed = test_parsing(), passed = test_topic_matching(), passed = test_log_management(), @@ -55,6 +56,62 @@ all_tests() -> passed = test_server_status(), passed. +test_priority_queue() -> + + false = priority_queue:is_queue(not_a_queue), + + %% empty Q + Q = priority_queue:new(), + {true, true, 0, [], []} = test_priority_queue(Q), + + %% 1-4 element no-priority Q + true = lists:all(fun (X) -> X =:= passed end, + lists:map(fun test_simple_n_element_queue/1, + lists:seq(1, 4))), + + %% 1-element priority Q + Q1 = priority_queue:in(foo, 1, priority_queue:new()), + {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1), + + %% 2-element same-priority Q + Q2 = priority_queue:in(bar, 1, Q1), + {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} = + test_priority_queue(Q2), + + %% 2-element different-priority Q + Q3 = priority_queue:in(bar, 2, Q1), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + test_priority_queue(Q3), + + %% 1-element negative priority Q + Q4 = priority_queue:in(foo, -1, priority_queue:new()), + {true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4), + + passed. + +priority_queue_in_all(Q, L) -> + lists:foldl(fun (X, Acc) -> priority_queue:in(X, Acc) end, Q, L). + +priority_queue_out_all(Q) -> + case priority_queue:out(Q) of + {empty, _} -> []; + {{value, V}, Q1} -> [V | priority_queue_out_all(Q1)] + end. + +test_priority_queue(Q) -> + {priority_queue:is_queue(Q), + priority_queue:is_empty(Q), + priority_queue:len(Q), + priority_queue:to_list(Q), + priority_queue_out_all(Q)}. + +test_simple_n_element_queue(N) -> + Items = lists:seq(1, N), + Q = priority_queue_in_all(priority_queue:new(), Items), + ToListRes = [{0, X} || X <- Items], + {true, false, N, ToListRes, Items} = test_priority_queue(Q), + passed. + test_parsing() -> passed = test_content_properties(), passed. @@ -430,7 +487,13 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(stop_app, []), {error, {no_running_cluster_nodes, _, _}} = control_action(reset, []), + + %% leave system clustered, with the secondary node as a ram node ok = control_action(force_reset, []), + ok = control_action(start_app, []), + ok = control_action(force_reset, SecondaryNode, []), + ok = control_action(cluster, SecondaryNode, [NodeS]), + ok = control_action(start_app, SecondaryNode, []), passed. |
