diff options
| author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2015-09-02 11:32:12 +0200 |
|---|---|---|
| committer | Jean-Sébastien Pédron <jean-sebastien.pedron@dumbbell.fr> | 2015-10-20 11:10:48 +0200 |
| commit | acab9f2f00a30b99fbf8da93f16fce950af23089 (patch) | |
| tree | 5c876bdc1eb4ffc5e07b6e069fdc0a964aeb0c7f | |
| parent | 93d830ee5a3f94477de2c8c4a216d71ad729dad0 (diff) | |
| download | rabbitmq-server-git-acab9f2f00a30b99fbf8da93f16fce950af23089.tar.gz | |
Tests are moved to rabbitmq-test
| -rw-r--r-- | test/src/credit_flow_test.erl | 50 | ||||
| -rw-r--r-- | test/src/gm_qc.erl | 384 | ||||
| -rw-r--r-- | test/src/gm_soak_test.erl | 136 | ||||
| -rw-r--r-- | test/src/gm_speed_test.erl | 85 | ||||
| -rw-r--r-- | test/src/gm_tests.erl | 186 | ||||
| -rw-r--r-- | test/src/mirrored_supervisor_tests.erl | 307 | ||||
| -rw-r--r-- | test/src/mirrored_supervisor_tests_gs.erl | 66 | ||||
| -rw-r--r-- | test/src/on_disk_store_tunable_parameter_validation_test.erl | 47 | ||||
| -rw-r--r-- | test/src/rabbit_backing_queue_qc.erl | 473 | ||||
| -rw-r--r-- | test/src/rabbit_runtime_parameters_test.erl | 72 | ||||
| -rw-r--r-- | test/src/rabbit_tests.erl | 3214 | ||||
| -rw-r--r-- | test/src/rabbit_tests_event_receiver.erl | 58 | ||||
| -rw-r--r-- | test/src/supervisor2_tests.erl | 75 | ||||
| -rw-r--r-- | test/src/test_sup.erl | 93 | ||||
| -rw-r--r-- | test/src/vm_memory_monitor_tests.erl | 35 |
15 files changed, 0 insertions, 5281 deletions
diff --git a/test/src/credit_flow_test.erl b/test/src/credit_flow_test.erl deleted file mode 100644 index 4910da3db3..0000000000 --- a/test/src/credit_flow_test.erl +++ /dev/null @@ -1,50 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2011-2015 Pivotal Software, Inc. All rights reserved. -%% - --module(credit_flow_test). - --export([test_credit_flow_settings/0]). - -test_credit_flow_settings() -> - %% default values - passed = test_proc(200, 50), - - application:set_env(rabbit, credit_flow_default_credit, {100, 20}), - passed = test_proc(100, 20), - - application:unset_env(rabbit, credit_flow_default_credit), - - % back to defaults - passed = test_proc(200, 50), - passed. - -test_proc(InitialCredit, MoreCreditAfter) -> - Pid = spawn(fun dummy/0), - Pid ! {credit, self()}, - {InitialCredit, MoreCreditAfter} = - receive - {credit, Val} -> Val - end, - passed. - -dummy() -> - credit_flow:send(self()), - receive - {credit, From} -> - From ! {credit, get(credit_flow_default_credit)}; - _ -> - dummy() - end. diff --git a/test/src/gm_qc.erl b/test/src/gm_qc.erl deleted file mode 100644 index ce6bb40975..0000000000 --- a/test/src/gm_qc.erl +++ /dev/null @@ -1,384 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2011-2015 Pivotal Software, Inc. All rights reserved. -%% - --module(gm_qc). --ifdef(use_proper_qc). - --include_lib("proper/include/proper.hrl"). - --define(GROUP, test_group). --define(MAX_SIZE, 5). --define(MSG_TIMEOUT, 1000000). %% micros - --export([prop_gm_test/0]). - --behaviour(proper_statem). --export([initial_state/0, command/1, precondition/2, postcondition/3, - next_state/3]). - --behaviour(gm). --export([joined/2, members_changed/3, handle_msg/3, handle_terminate/2]). - -%% Helpers --export([do_join/0, do_leave/1, do_send/1, do_proceed1/1, do_proceed2/2]). - -%% For insertion into gm --export([call/3, cast/2, monitor/1, demonitor/1, execute_mnesia_transaction/1]). - --record(state, {seq, %% symbolic and dynamic - instrumented, %% dynamic only - outstanding, %% dynamic only - monitors, %% dynamic only - all_join, %% for symbolic - to_join, %% dynamic only - to_leave %% for symbolic - }). - -prop_gm_test() -> - case ?INSTR_MOD of - ?MODULE -> ok; - _ -> exit(compile_with_INSTRUMENT_FOR_QC) - end, - process_flag(trap_exit, true), - erlang:register(?MODULE, self()), - ?FORALL(Cmds, commands(?MODULE), gm_test(Cmds)). - -gm_test(Cmds) -> - {_H, State, Res} = run_commands(?MODULE, Cmds), - cleanup(State), - ?WHENFAIL( - io:format("Result: ~p~n", [Res]), - aggregate(command_names(Cmds), Res =:= ok)). - -cleanup(S) -> - S2 = ensure_joiners_joined_and_msgs_received(S), - All = gms_joined(S2), - All = gms(S2), %% assertion - none to join - check_stale_members(All), - [gm:leave(GM) || GM <- All], - drain_and_proceed_gms(S2), - [await_death(GM) || GM <- All], - gm:forget_group(?GROUP), - ok. - -check_stale_members(All) -> - GMs = [P || P <- processes(), is_gm_process(?GROUP, P)], - case GMs -- All of - [] -> ok; - Rest -> exit({forgot, Rest}) - end. - -is_gm_process(Group, P) -> - case process_info(P, dictionary) of - undefined -> false; - {dictionary, D} -> {gm, Group} =:= proplists:get_value(process_name, D) - end. - -await_death(P) -> - MRef = erlang:monitor(process, P), - await_death(MRef, P). - -await_death(MRef, P) -> - receive - {'DOWN', MRef, process, P, _} -> ok; - {'DOWN', _, _, _, _} -> await_death(MRef, P); - {'EXIT', _, normal} -> await_death(MRef, P); - {'EXIT', _, Reason} -> exit(Reason); - {joined, _GM} -> await_death(MRef, P); - {left, _GM} -> await_death(MRef, P); - Anything -> exit({stray_msg, Anything}) - end. - -%% --------------------------------------------------------------------------- -%% proper_statem -%% --------------------------------------------------------------------------- - -initial_state() -> #state{seq = 1, - outstanding = dict:new(), - instrumented = dict:new(), - monitors = dict:new(), - all_join = sets:new(), - to_join = sets:new(), - to_leave = sets:new()}. - -command(S) -> - case {length(gms_symb_not_left(S)), length(gms_symb(S))} of - {0, 0} -> qc_join(S); - {0, _} -> frequency([{1, qc_join(S)}, - {3, qc_proceed1(S)}, - {5, qc_proceed2(S)}]); - _ -> frequency([{1, qc_join(S)}, - {1, qc_leave(S)}, - {10, qc_send(S)}, - {5, qc_proceed1(S)}, - {15, qc_proceed2(S)}]) - end. - -qc_join(_S) -> {call,?MODULE,do_join, []}. -qc_leave(S) -> {call,?MODULE,do_leave,[oneof(gms_symb_not_left(S))]}. -qc_send(S) -> {call,?MODULE,do_send, [oneof(gms_symb_not_left(S))]}. -qc_proceed1(S) -> {call,?MODULE,do_proceed1, [oneof(gms_symb(S))]}. -qc_proceed2(S) -> {call,?MODULE,do_proceed2, [oneof(gms_symb(S)), - oneof(gms_symb(S))]}. - -precondition(S, {call, ?MODULE, do_join, []}) -> - length(gms_symb(S)) < ?MAX_SIZE; - -precondition(_S, {call, ?MODULE, do_leave, [_GM]}) -> - true; - -precondition(_S, {call, ?MODULE, do_send, [_GM]}) -> - true; - -precondition(_S, {call, ?MODULE, do_proceed1, [_GM]}) -> - true; - -precondition(_S, {call, ?MODULE, do_proceed2, [GM1, GM2]}) -> - GM1 =/= GM2. - -postcondition(_S, {call, _M, _F, _A}, _Res) -> - true. - -next_state(S = #state{to_join = ToSet, - all_join = AllSet}, GM, {call, ?MODULE, do_join, []}) -> - S#state{to_join = sets:add_element(GM, ToSet), - all_join = sets:add_element(GM, AllSet)}; - -next_state(S = #state{to_leave = Set}, _Res, {call, ?MODULE, do_leave, [GM]}) -> - S#state{to_leave = sets:add_element(GM, Set)}; - -next_state(S = #state{seq = Seq, - outstanding = Outstanding}, _Res, - {call, ?MODULE, do_send, [GM]}) -> - case is_pid(GM) andalso lists:member(GM, gms_joined(S)) of - true -> - %% Dynamic state, i.e. runtime - Msg = [{sequence, Seq}, - {sent_to, GM}, - {dests, gms_joined(S)}], - gm:broadcast(GM, Msg), - Outstanding1 = dict:map( - fun (_GM, Set) -> - gb_sets:add_element(Msg, Set) - end, Outstanding), - drain(S#state{seq = Seq + 1, - outstanding = Outstanding1}); - false -> - S - end; - -next_state(S, _Res, {call, ?MODULE, do_proceed1, [Pid]}) -> - proceed(Pid, S); - -next_state(S, _Res, {call, ?MODULE, do_proceed2, [From, To]}) -> - proceed({From, To}, S). - -proceed(K, S = #state{instrumented = Msgs}) -> - case dict:find(K, Msgs) of - {ok, Q} -> case queue:out(Q) of - {{value, Thing}, Q2} -> - S2 = proceed(K, Thing, S), - S2#state{instrumented = dict:store(K, Q2, Msgs)}; - {empty, _} -> - S - end; - error -> S - end. - -%% --------------------------------------------------------------------------- -%% GM -%% --------------------------------------------------------------------------- - -joined(Pid, _Members) -> Pid ! {joined, self()}, - ok. -members_changed(_Pid, _Bs, _Ds) -> ok. -handle_msg(Pid, _From, Msg) -> Pid ! {gm, self(), Msg}, ok. -handle_terminate(Pid, _Reason) -> Pid ! {left, self()}. - -%% --------------------------------------------------------------------------- -%% Helpers -%% --------------------------------------------------------------------------- - -do_join() -> - {ok, GM} = gm:start_link(?GROUP, ?MODULE, self(), - fun execute_mnesia_transaction/1), - GM. - -do_leave(GM) -> - gm:leave(GM), - GM. - -%% We need to update the state, so do the work in next_state -do_send( _GM) -> ok. -do_proceed1(_Pid) -> ok. -do_proceed2(_From, _To) -> ok. - -%% All GMs, joined and to join -gms(#state{outstanding = Outstanding, - to_join = ToJoin}) -> - dict:fetch_keys(Outstanding) ++ sets:to_list(ToJoin). - -%% All GMs, joined and to join -gms_joined(#state{outstanding = Outstanding}) -> - dict:fetch_keys(Outstanding). - -%% All GMs including those that have left (symbolic) -gms_symb(#state{all_join = AllJoin}) -> - sets:to_list(AllJoin). - -%% All GMs not including those that have left (symbolic) -gms_symb_not_left(#state{all_join = AllJoin, - to_leave = ToLeave}) -> - sets:to_list(sets:subtract(AllJoin, ToLeave)). - -drain(S) -> - receive - Msg -> drain(handle_msg(Msg, S)) - after 10 -> S - end. - -drain_and_proceed_gms(S0) -> - S = #state{instrumented = Msgs} = drain(S0), - case dict:size(Msgs) of - 0 -> S; - _ -> S1 = dict:fold( - fun (Key, Q, Si) -> - lists:foldl( - fun (Msg, Sij) -> - proceed(Key, Msg, Sij) - end, Si, queue:to_list(Q)) - end, S, Msgs), - drain_and_proceed_gms(S1#state{instrumented = dict:new()}) - end. - -handle_msg({gm, GM, Msg}, S = #state{outstanding = Outstanding}) -> - case dict:find(GM, Outstanding) of - {ok, Set} -> - Set2 = gb_sets:del_element(Msg, Set), - S#state{outstanding = dict:store(GM, Set2, Outstanding)}; - error -> - %% Message from GM that has already died. OK. - S - end; -handle_msg({instrumented, Key, Thing}, S = #state{instrumented = Msgs}) -> - Q1 = case dict:find(Key, Msgs) of - {ok, Q} -> queue:in(Thing, Q); - error -> queue:from_list([Thing]) - end, - S#state{instrumented = dict:store(Key, Q1, Msgs)}; -handle_msg({joined, GM}, S = #state{outstanding = Outstanding, - to_join = ToJoin}) -> - S#state{outstanding = dict:store(GM, gb_sets:empty(), Outstanding), - to_join = sets:del_element(GM, ToJoin)}; -handle_msg({left, GM}, S = #state{outstanding = Outstanding, - to_join = ToJoin}) -> - true = dict:is_key(GM, Outstanding) orelse sets:is_element(GM, ToJoin), - S#state{outstanding = dict:erase(GM, Outstanding), - to_join = sets:del_element(GM, ToJoin)}; -handle_msg({'DOWN', MRef, _, From, _} = Msg, S = #state{monitors = Mons}) -> - To = dict:fetch(MRef, Mons), - handle_msg({instrumented, {From, To}, {info, Msg}}, - S#state{monitors = dict:erase(MRef, Mons)}); -handle_msg({'EXIT', _From, normal}, S) -> - S; -handle_msg({'EXIT', _From, Reason}, _S) -> - %% We just trapped exits to get nicer SASL logging. - exit(Reason). - -proceed({_From, To}, {cast, Msg}, S) -> gen_server2:cast(To, Msg), S; -proceed({_From, To}, {info, Msg}, S) -> To ! Msg, S; -proceed({From, _To}, {wait, Ref}, S) -> From ! {proceed, Ref}, S; -proceed({From, To}, {mon, Ref}, S) -> add_monitor(From, To, Ref, S); -proceed(_Pid, {demon, MRef}, S) -> erlang:demonitor(MRef), S; -proceed(Pid, {wait, Ref}, S) -> Pid ! {proceed, Ref}, S. - -%% NB From here is To in handle_msg/DOWN above, since the msg is going -%% the other way -add_monitor(From, To, Ref, S = #state{monitors = Mons}) -> - MRef = erlang:monitor(process, To), - From ! {mref, Ref, MRef}, - S#state{monitors = dict:store(MRef, From, Mons)}. - -%% ---------------------------------------------------------------------------- -%% Assertions -%% ---------------------------------------------------------------------------- - -ensure_joiners_joined_and_msgs_received(S0) -> - S = drain_and_proceed_gms(S0), - case outstanding_joiners(S) of - true -> ensure_joiners_joined_and_msgs_received(S); - false -> case outstanding_msgs(S) of - [] -> S; - Out -> exit({outstanding_msgs, Out}) - end - end. - -outstanding_joiners(#state{to_join = ToJoin}) -> - sets:size(ToJoin) > 0. - -outstanding_msgs(#state{outstanding = Outstanding}) -> - dict:fold(fun (GM, Set, OS) -> - case gb_sets:is_empty(Set) of - true -> OS; - false -> [{GM, gb_sets:to_list(Set)} | OS] - end - end, [], Outstanding). - -%% --------------------------------------------------------------------------- -%% For insertion into GM -%% --------------------------------------------------------------------------- - -call(Pid, Msg, infinity) -> - Ref = make_ref(), - whereis(?MODULE) ! {instrumented, {self(), Pid}, {wait, Ref}}, - receive - {proceed, Ref} -> ok - end, - gen_server2:call(Pid, Msg, infinity). - -cast(Pid, Msg) -> - whereis(?MODULE) ! {instrumented, {self(), Pid}, {cast, Msg}}, - ok. - -monitor(Pid) -> - Ref = make_ref(), - whereis(?MODULE) ! {instrumented, {self(), Pid}, {mon, Ref}}, - receive - {mref, Ref, MRef} -> MRef - end. - -demonitor(MRef) -> - whereis(?MODULE) ! {instrumented, self(), {demon, MRef}}, - true. - -execute_mnesia_transaction(Fun) -> - Ref = make_ref(), - whereis(?MODULE) ! {instrumented, self(), {wait, Ref}}, - receive - {proceed, Ref} -> ok - end, - rabbit_misc:execute_mnesia_transaction(Fun). - --else. - --export([prop_disabled/0]). - -prop_disabled() -> - exit({compiled_without_proper, - "PropEr was not present during compilation of the test module. " - "Hence all tests are disabled."}). - --endif. diff --git a/test/src/gm_soak_test.erl b/test/src/gm_soak_test.erl deleted file mode 100644 index 64baa035b7..0000000000 --- a/test/src/gm_soak_test.erl +++ /dev/null @@ -1,136 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. -%% - --module(gm_soak_test). - --export([test/0]). --export([joined/2, members_changed/3, handle_msg/3, handle_terminate/2]). - --behaviour(gm). - --include("gm_specs.hrl"). - -%% --------------------------------------------------------------------------- -%% Soak test -%% --------------------------------------------------------------------------- - -get_state() -> - get(state). - -with_state(Fun) -> - put(state, Fun(get_state())). - -inc() -> - case 1 + get(count) of - 100000 -> Now = time_compat:monotonic_time(), - Start = put(ts, Now), - Diff = time_compat:convert_time_unit(Now - Start, - native, - micro_seconds), - Rate = 100000 / (Diff / 1000000), - io:format("~p seeing ~p msgs/sec~n", [self(), Rate]), - put(count, 0); - N -> put(count, N) - end. - -joined([], Members) -> - io:format("Joined ~p (~p members)~n", [self(), length(Members)]), - put(state, dict:from_list([{Member, empty} || Member <- Members])), - put(count, 0), - put(ts, time_compat:monotonic_time()), - ok. - -members_changed([], Births, Deaths) -> - with_state( - fun (State) -> - State1 = - lists:foldl( - fun (Born, StateN) -> - false = dict:is_key(Born, StateN), - dict:store(Born, empty, StateN) - end, State, Births), - lists:foldl( - fun (Died, StateN) -> - true = dict:is_key(Died, StateN), - dict:store(Died, died, StateN) - end, State1, Deaths) - end), - ok. - -handle_msg([], From, {test_msg, Num}) -> - inc(), - with_state( - fun (State) -> - ok = case dict:find(From, State) of - {ok, died} -> - exit({{from, From}, - {received_posthumous_delivery, Num}}); - {ok, empty} -> ok; - {ok, Num} -> ok; - {ok, Num1} when Num < Num1 -> - exit({{from, From}, - {duplicate_delivery_of, Num}, - {expecting, Num1}}); - {ok, Num1} -> - exit({{from, From}, - {received_early, Num}, - {expecting, Num1}}); - error -> - exit({{from, From}, - {received_premature_delivery, Num}}) - end, - dict:store(From, Num + 1, State) - end), - ok. - -handle_terminate([], Reason) -> - io:format("Left ~p (~p)~n", [self(), Reason]), - ok. - -spawn_member() -> - spawn_link( - fun () -> - random:seed(erlang:phash2([node()]), - time_compat:monotonic_time(), - time_compat:unique_integer()), - %% start up delay of no more than 10 seconds - timer:sleep(random:uniform(10000)), - {ok, Pid} = gm:start_link( - ?MODULE, ?MODULE, [], - fun rabbit_misc:execute_mnesia_transaction/1), - Start = random:uniform(10000), - send_loop(Pid, Start, Start + random:uniform(10000)), - gm:leave(Pid), - spawn_more() - end). - -spawn_more() -> - [spawn_member() || _ <- lists:seq(1, 4 - random:uniform(4))]. - -send_loop(_Pid, Target, Target) -> - ok; -send_loop(Pid, Count, Target) when Target > Count -> - case random:uniform(3) of - 3 -> gm:confirmed_broadcast(Pid, {test_msg, Count}); - _ -> gm:broadcast(Pid, {test_msg, Count}) - end, - timer:sleep(random:uniform(5) - 1), %% sleep up to 4 ms - send_loop(Pid, Count + 1, Target). - -test() -> - ok = gm:create_tables(), - spawn_member(), - spawn_member(). diff --git a/test/src/gm_speed_test.erl b/test/src/gm_speed_test.erl deleted file mode 100644 index b0693fc136..0000000000 --- a/test/src/gm_speed_test.erl +++ /dev/null @@ -1,85 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. -%% - --module(gm_speed_test). - --export([test/3]). --export([joined/2, members_changed/3, handle_msg/3, handle_terminate/2]). --export([wile_e_coyote/2]). - --behaviour(gm). - --include("gm_specs.hrl"). - -%% callbacks - -joined(Owner, _Members) -> - Owner ! joined, - ok. - -members_changed(_Owner, _Births, _Deaths) -> - ok. - -handle_msg(Owner, _From, ping) -> - Owner ! ping, - ok. - -handle_terminate(Owner, _Reason) -> - Owner ! terminated, - ok. - -%% other - -wile_e_coyote(Time, WriteUnit) -> - {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self(), - fun rabbit_misc:execute_mnesia_transaction/1), - receive joined -> ok end, - timer:sleep(1000), %% wait for all to join - timer:send_after(Time, stop), - Start = time_compat:monotonic_time(), - {Sent, Received} = loop(Pid, WriteUnit, 0, 0), - End = time_compat:monotonic_time(), - ok = gm:leave(Pid), - receive terminated -> ok end, - Elapsed = time_compat:convert_time_unit(End - Start, - native, - micro_seconds) / 1000000, - io:format("Sending rate: ~p msgs/sec~nReceiving rate: ~p msgs/sec~n~n", - [Sent/Elapsed, Received/Elapsed]), - ok. - -loop(Pid, WriteUnit, Sent, Received) -> - case read(Received) of - {stop, Received1} -> {Sent, Received1}; - {ok, Received1} -> ok = write(Pid, WriteUnit), - loop(Pid, WriteUnit, Sent + WriteUnit, Received1) - end. - -read(Count) -> - receive - ping -> read(Count + 1); - stop -> {stop, Count} - after 5 -> - {ok, Count} - end. - -write(_Pid, 0) -> ok; -write(Pid, N) -> ok = gm:broadcast(Pid, ping), - write(Pid, N - 1). - -test(Time, WriteUnit, Nodes) -> - ok = gm:create_tables(), - [spawn(Node, ?MODULE, wile_e_coyote, [Time, WriteUnit]) || Node <- Nodes]. diff --git a/test/src/gm_tests.erl b/test/src/gm_tests.erl deleted file mode 100644 index 8daac11125..0000000000 --- a/test/src/gm_tests.erl +++ /dev/null @@ -1,186 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. -%% - --module(gm_tests). - --export([test_join_leave/0, - test_broadcast/0, - test_confirmed_broadcast/0, - test_member_death/0, - test_receive_in_order/0, - all_tests/0]). --export([joined/2, members_changed/3, handle_msg/3, handle_terminate/2]). - --behaviour(gm). - --include("gm_specs.hrl"). - --define(RECEIVE_OR_THROW(Body, Bool, Error), - receive Body -> - true = Bool, - passed - after 1000 -> - throw(Error) - end). - -joined(Pid, Members) -> - Pid ! {joined, self(), Members}, - ok. - -members_changed(Pid, Births, Deaths) -> - Pid ! {members_changed, self(), Births, Deaths}, - ok. - -handle_msg(Pid, From, Msg) -> - Pid ! {msg, self(), From, Msg}, - ok. - -handle_terminate(Pid, Reason) -> - Pid ! {termination, self(), Reason}, - ok. - -%% --------------------------------------------------------------------------- -%% Functional tests -%% --------------------------------------------------------------------------- - -all_tests() -> - passed = test_join_leave(), - passed = test_broadcast(), - passed = test_confirmed_broadcast(), - passed = test_member_death(), - passed = test_receive_in_order(), - passed. - -test_join_leave() -> - with_two_members(fun (_Pid, _Pid2) -> passed end). - -test_broadcast() -> - test_broadcast(fun gm:broadcast/2). - -test_confirmed_broadcast() -> - test_broadcast(fun gm:confirmed_broadcast/2). - -test_member_death() -> - with_two_members( - fun (Pid, Pid2) -> - {ok, Pid3} = gm:start_link( - ?MODULE, ?MODULE, self(), - fun rabbit_misc:execute_mnesia_transaction/1), - passed = receive_joined(Pid3, [Pid, Pid2, Pid3], - timeout_joining_gm_group_3), - passed = receive_birth(Pid, Pid3, timeout_waiting_for_birth_3_1), - passed = receive_birth(Pid2, Pid3, timeout_waiting_for_birth_3_2), - - unlink(Pid3), - exit(Pid3, kill), - - %% Have to do some broadcasts to ensure that all members - %% find out about the death. - passed = (test_broadcast_fun(fun gm:confirmed_broadcast/2))( - Pid, Pid2), - - passed = receive_death(Pid, Pid3, timeout_waiting_for_death_3_1), - passed = receive_death(Pid2, Pid3, timeout_waiting_for_death_3_2), - - passed - end). - -test_receive_in_order() -> - with_two_members( - fun (Pid, Pid2) -> - Numbers = lists:seq(1,1000), - [begin ok = gm:broadcast(Pid, N), ok = gm:broadcast(Pid2, N) end - || N <- Numbers], - passed = receive_numbers( - Pid, Pid, {timeout_for_msgs, Pid, Pid}, Numbers), - passed = receive_numbers( - Pid, Pid2, {timeout_for_msgs, Pid, Pid2}, Numbers), - passed = receive_numbers( - Pid2, Pid, {timeout_for_msgs, Pid2, Pid}, Numbers), - passed = receive_numbers( - Pid2, Pid2, {timeout_for_msgs, Pid2, Pid2}, Numbers), - passed - end). - -test_broadcast(Fun) -> - with_two_members(test_broadcast_fun(Fun)). - -test_broadcast_fun(Fun) -> - fun (Pid, Pid2) -> - ok = Fun(Pid, magic_message), - passed = receive_or_throw({msg, Pid, Pid, magic_message}, - timeout_waiting_for_msg), - passed = receive_or_throw({msg, Pid2, Pid, magic_message}, - timeout_waiting_for_msg) - end. - -with_two_members(Fun) -> - ok = gm:create_tables(), - - {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self(), - fun rabbit_misc:execute_mnesia_transaction/1), - passed = receive_joined(Pid, [Pid], timeout_joining_gm_group_1), - - {ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self(), - fun rabbit_misc:execute_mnesia_transaction/1), - passed = receive_joined(Pid2, [Pid, Pid2], timeout_joining_gm_group_2), - passed = receive_birth(Pid, Pid2, timeout_waiting_for_birth_2), - - passed = Fun(Pid, Pid2), - - ok = gm:leave(Pid), - passed = receive_death(Pid2, Pid, timeout_waiting_for_death_1), - passed = - receive_termination(Pid, normal, timeout_waiting_for_termination_1), - - ok = gm:leave(Pid2), - passed = - receive_termination(Pid2, normal, timeout_waiting_for_termination_2), - - receive X -> throw({unexpected_message, X}) - after 0 -> passed - end. - -receive_or_throw(Pattern, Error) -> - ?RECEIVE_OR_THROW(Pattern, true, Error). - -receive_birth(From, Born, Error) -> - ?RECEIVE_OR_THROW({members_changed, From, Birth, Death}, - ([Born] == Birth) andalso ([] == Death), - Error). - -receive_death(From, Died, Error) -> - ?RECEIVE_OR_THROW({members_changed, From, Birth, Death}, - ([] == Birth) andalso ([Died] == Death), - Error). - -receive_joined(From, Members, Error) -> - ?RECEIVE_OR_THROW({joined, From, Members1}, - lists:usort(Members) == lists:usort(Members1), - Error). - -receive_termination(From, Reason, Error) -> - ?RECEIVE_OR_THROW({termination, From, Reason1}, - Reason == Reason1, - Error). - -receive_numbers(_Pid, _Sender, _Error, []) -> - passed; -receive_numbers(Pid, Sender, Error, [N | Numbers]) -> - ?RECEIVE_OR_THROW({msg, Pid, Sender, M}, - M == N, - Error), - receive_numbers(Pid, Sender, Error, Numbers). diff --git a/test/src/mirrored_supervisor_tests.erl b/test/src/mirrored_supervisor_tests.erl deleted file mode 100644 index 34411c2c62..0000000000 --- a/test/src/mirrored_supervisor_tests.erl +++ /dev/null @@ -1,307 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2011-2015 Pivotal Software, Inc. All rights reserved. -%% - --module(mirrored_supervisor_tests). - --export([all_tests/0]). - --export([init/1]). - --behaviour(mirrored_supervisor). - --define(MS, mirrored_supervisor). --define(SERVER, mirrored_supervisor_tests_gs). - -%% --------------------------------------------------------------------------- -%% Functional tests -%% --------------------------------------------------------------------------- - -all_tests() -> - passed = test_migrate(), - passed = test_migrate_twice(), - passed = test_already_there(), - passed = test_delete_restart(), - passed = test_which_children(), - passed = test_large_group(), - passed = test_childspecs_at_init(), - passed = test_anonymous_supervisors(), - passed = test_no_migration_on_shutdown(), - passed = test_start_idempotence(), - passed = test_unsupported(), - passed = test_ignore(), - passed = test_startup_failure(), - passed. - -%% Simplest test -test_migrate() -> - with_sups(fun([A, _]) -> - ?MS:start_child(a, childspec(worker)), - Pid1 = pid_of(worker), - kill_registered(A, Pid1), - Pid2 = pid_of(worker), - false = (Pid1 =:= Pid2) - end, [a, b]). - -%% Is migration transitive? -test_migrate_twice() -> - with_sups(fun([A, B]) -> - ?MS:start_child(a, childspec(worker)), - Pid1 = pid_of(worker), - kill_registered(A, Pid1), - {ok, C} = start_sup(c), - Pid2 = pid_of(worker), - kill_registered(B, Pid2), - Pid3 = pid_of(worker), - false = (Pid1 =:= Pid3), - kill(C) - end, [a, b]). - -%% Can't start the same child twice -test_already_there() -> - with_sups(fun([_, _]) -> - S = childspec(worker), - {ok, Pid} = ?MS:start_child(a, S), - {error, {already_started, Pid}} = ?MS:start_child(b, S) - end, [a, b]). - -%% Deleting and restarting should work as per a normal supervisor -test_delete_restart() -> - with_sups(fun([_, _]) -> - S = childspec(worker), - {ok, Pid1} = ?MS:start_child(a, S), - {error, running} = ?MS:delete_child(a, worker), - ok = ?MS:terminate_child(a, worker), - ok = ?MS:delete_child(a, worker), - {ok, Pid2} = ?MS:start_child(b, S), - false = (Pid1 =:= Pid2), - ok = ?MS:terminate_child(b, worker), - {ok, Pid3} = ?MS:restart_child(b, worker), - Pid3 = pid_of(worker), - false = (Pid2 =:= Pid3), - %% Not the same supervisor as the worker is on - ok = ?MS:terminate_child(a, worker), - ok = ?MS:delete_child(a, worker), - {ok, Pid4} = ?MS:start_child(a, S), - false = (Pid3 =:= Pid4) - end, [a, b]). - -test_which_children() -> - with_sups( - fun([A, B] = Both) -> - ?MS:start_child(A, childspec(worker)), - assert_wc(Both, fun ([C]) -> true = is_pid(wc_pid(C)) end), - ok = ?MS:terminate_child(a, worker), - assert_wc(Both, fun ([C]) -> undefined = wc_pid(C) end), - {ok, _} = ?MS:restart_child(a, worker), - assert_wc(Both, fun ([C]) -> true = is_pid(wc_pid(C)) end), - ?MS:start_child(B, childspec(worker2)), - assert_wc(Both, fun (C) -> 2 = length(C) end) - end, [a, b]). - -assert_wc(Sups, Fun) -> - [Fun(?MS:which_children(Sup)) || Sup <- Sups]. - -wc_pid(Child) -> - {worker, Pid, worker, [mirrored_supervisor_tests]} = Child, - Pid. - -%% Not all the members of the group should actually do the failover -test_large_group() -> - with_sups(fun([A, _, _, _]) -> - ?MS:start_child(a, childspec(worker)), - Pid1 = pid_of(worker), - kill_registered(A, Pid1), - Pid2 = pid_of(worker), - false = (Pid1 =:= Pid2) - end, [a, b, c, d]). - -%% Do childspecs work when returned from init? -test_childspecs_at_init() -> - S = childspec(worker), - with_sups(fun([A, _]) -> - Pid1 = pid_of(worker), - kill_registered(A, Pid1), - Pid2 = pid_of(worker), - false = (Pid1 =:= Pid2) - end, [{a, [S]}, {b, [S]}]). - -test_anonymous_supervisors() -> - with_sups(fun([A, _B]) -> - ?MS:start_child(A, childspec(worker)), - Pid1 = pid_of(worker), - kill_registered(A, Pid1), - Pid2 = pid_of(worker), - false = (Pid1 =:= Pid2) - end, [anon, anon]). - -%% When a mirrored_supervisor terminates, we should not migrate, but -%% the whole supervisor group should shut down. To test this we set up -%% a situation where the gen_server will only fail if it's running -%% under the supervisor called 'evil'. It should not migrate to -%% 'good' and survive, rather the whole group should go away. -test_no_migration_on_shutdown() -> - with_sups(fun([Evil, _]) -> - ?MS:start_child(Evil, childspec(worker)), - try - call(worker, ping, 1000, 100), - exit(worker_should_not_have_migrated) - catch exit:{timeout_waiting_for_server, _, _} -> - ok - end - end, [evil, good]). - -test_start_idempotence() -> - with_sups(fun([_]) -> - CS = childspec(worker), - {ok, Pid} = ?MS:start_child(a, CS), - {error, {already_started, Pid}} = ?MS:start_child(a, CS), - ?MS:terminate_child(a, worker), - {error, already_present} = ?MS:start_child(a, CS) - end, [a]). - -test_unsupported() -> - try - ?MS:start_link({global, foo}, get_group(group), fun tx_fun/1, ?MODULE, - {one_for_one, []}), - exit(no_global) - catch error:badarg -> - ok - end, - try - ?MS:start_link({local, foo}, get_group(group), fun tx_fun/1, ?MODULE, - {simple_one_for_one, []}), - exit(no_sofo) - catch error:badarg -> - ok - end, - passed. - -%% Just test we don't blow up -test_ignore() -> - ?MS:start_link({local, foo}, get_group(group), fun tx_fun/1, ?MODULE, - {fake_strategy_for_ignore, []}), - passed. - -test_startup_failure() -> - [test_startup_failure(F) || F <- [want_error, want_exit]], - passed. - -test_startup_failure(Fail) -> - process_flag(trap_exit, true), - ?MS:start_link(get_group(group), fun tx_fun/1, ?MODULE, - {one_for_one, [childspec(Fail)]}), - receive - {'EXIT', _, shutdown} -> - ok - after 1000 -> - exit({did_not_exit, Fail}) - end, - process_flag(trap_exit, false). - -%% --------------------------------------------------------------------------- - -with_sups(Fun, Sups) -> - inc_group(), - Pids = [begin {ok, Pid} = start_sup(Sup), Pid end || Sup <- Sups], - Fun(Pids), - [kill(Pid) || Pid <- Pids, is_process_alive(Pid)], - timer:sleep(500), - passed. - -start_sup(Spec) -> - start_sup(Spec, group). - -start_sup({Name, ChildSpecs}, Group) -> - {ok, Pid} = start_sup0(Name, get_group(Group), ChildSpecs), - %% We are not a supervisor, when we kill the supervisor we do not - %% want to die! - unlink(Pid), - {ok, Pid}; - -start_sup(Name, Group) -> - start_sup({Name, []}, Group). - -start_sup0(anon, Group, ChildSpecs) -> - ?MS:start_link(Group, fun tx_fun/1, ?MODULE, - {one_for_one, ChildSpecs}); - -start_sup0(Name, Group, ChildSpecs) -> - ?MS:start_link({local, Name}, Group, fun tx_fun/1, ?MODULE, - {one_for_one, ChildSpecs}). - -childspec(Id) -> - {Id,{?SERVER, start_link, [Id]}, transient, 16#ffffffff, worker, [?MODULE]}. - -pid_of(Id) -> - {received, Pid, ping} = call(Id, ping), - Pid. - -tx_fun(Fun) -> - case mnesia:sync_transaction(Fun) of - {atomic, Result} -> Result; - {aborted, Reason} -> throw({error, Reason}) - end. - -inc_group() -> - Count = case get(counter) of - undefined -> 0; - C -> C - end + 1, - put(counter, Count). - -get_group(Group) -> - {Group, get(counter)}. - -call(Id, Msg) -> call(Id, Msg, 10*1000, 100). - -call(Id, Msg, 0, _Decr) -> - exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()}); - -call(Id, Msg, MaxDelay, Decr) -> - try - gen_server:call(Id, Msg, infinity) - catch exit:_ -> timer:sleep(Decr), - call(Id, Msg, MaxDelay - Decr, Decr) - end. - -kill(Pid) -> kill(Pid, []). -kill(Pid, Wait) when is_pid(Wait) -> kill(Pid, [Wait]); -kill(Pid, Waits) -> - erlang:monitor(process, Pid), - [erlang:monitor(process, P) || P <- Waits], - exit(Pid, bang), - kill_wait(Pid), - [kill_wait(P) || P <- Waits]. - -kill_registered(Pid, Child) -> - {registered_name, Name} = erlang:process_info(Child, registered_name), - kill(Pid, Child), - false = (Child =:= whereis(Name)), - ok. - -kill_wait(Pid) -> - receive - {'DOWN', _Ref, process, Pid, _Reason} -> - ok - end. - -%% --------------------------------------------------------------------------- - -init({fake_strategy_for_ignore, _ChildSpecs}) -> - ignore; - -init({Strategy, ChildSpecs}) -> - {ok, {{Strategy, 0, 1}, ChildSpecs}}. diff --git a/test/src/mirrored_supervisor_tests_gs.erl b/test/src/mirrored_supervisor_tests_gs.erl deleted file mode 100644 index beaf49a44d..0000000000 --- a/test/src/mirrored_supervisor_tests_gs.erl +++ /dev/null @@ -1,66 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2011-2015 Pivotal Software, Inc. All rights reserved. -%% - --module(mirrored_supervisor_tests_gs). - -%% Dumb gen_server we can supervise - --export([start_link/1]). - --export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3, - handle_cast/2]). - --behaviour(gen_server). - --define(MS, mirrored_supervisor). - -start_link(want_error) -> - {error, foo}; - -start_link(want_exit) -> - exit(foo); - -start_link(Id) -> - gen_server:start_link({local, Id}, ?MODULE, [], []). - -%% --------------------------------------------------------------------------- - -init([]) -> - {ok, state}. - -handle_call(Msg, _From, State) -> - die_if_my_supervisor_is_evil(), - {reply, {received, self(), Msg}, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -die_if_my_supervisor_is_evil() -> - try lists:keysearch(self(), 2, ?MS:which_children(evil)) of - false -> ok; - _ -> exit(doooom) - catch - exit:{noproc, _} -> ok - end. diff --git a/test/src/on_disk_store_tunable_parameter_validation_test.erl b/test/src/on_disk_store_tunable_parameter_validation_test.erl deleted file mode 100644 index 9db5425e6d..0000000000 --- a/test/src/on_disk_store_tunable_parameter_validation_test.erl +++ /dev/null @@ -1,47 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2011-2015 Pivotal Software, Inc. All rights reserved. -%% - --module(on_disk_store_tunable_parameter_validation_test). - --include("rabbit.hrl"). - --export([test_msg_store_parameter_validation/0]). - --define(T(Fun, Args), (catch apply(rabbit, Fun, Args))). - -test_msg_store_parameter_validation() -> - %% make sure it works with default values - ok = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [?CREDIT_DISC_BOUND, ?IO_BATCH_SIZE]), - - %% IO_BATCH_SIZE must be greater than CREDIT_DISC_BOUND initial credit - ok = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 500}, 3000]), - {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 500}, 1500]), - - %% All values must be integers - {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 500}, "1500"]), - {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{"2000", 500}, abc]), - {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, "500"}, 2048]), - - %% CREDIT_DISC_BOUND must be a tuple - {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [[2000, 500], 1500]), - {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [2000, 1500]), - - %% config values can't be smaller than default values - {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{1999, 500}, 2048]), - {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 499}, 2048]), - {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 500}, 2047]), - - passed. diff --git a/test/src/rabbit_backing_queue_qc.erl b/test/src/rabbit_backing_queue_qc.erl deleted file mode 100644 index 1469f61162..0000000000 --- a/test/src/rabbit_backing_queue_qc.erl +++ /dev/null @@ -1,473 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2011-2015 Pivotal Software, Inc. All rights reserved. -%% - --module(rabbit_backing_queue_qc). --ifdef(use_proper_qc). --include("rabbit.hrl"). --include("rabbit_framing.hrl"). --include_lib("proper/include/proper.hrl"). - --behaviour(proper_statem). - --define(BQMOD, rabbit_variable_queue). --define(QUEUE_MAXLEN, 10000). --define(TIMEOUT_LIMIT, 100). - --define(RECORD_INDEX(Key, Record), - proplists:get_value( - Key, lists:zip(record_info(fields, Record), - lists:seq(2, record_info(size, Record))))). - --export([initial_state/0, command/1, precondition/2, postcondition/3, - next_state/3]). - --export([prop_backing_queue_test/0, publish_multiple/1, - timeout/2, bump_credit/1]). - --record(state, {bqstate, - len, %% int - next_seq_id, %% int - messages, %% gb_trees of seqid => {msg_props, basic_msg} - acks, %% [{acktag, {seqid, {msg_props, basic_msg}}}] - confirms, %% set of msgid - publishing}).%% int - -%% Initialise model - -initial_state() -> - #state{bqstate = qc_variable_queue_init(qc_test_queue()), - len = 0, - next_seq_id = 0, - messages = gb_trees:empty(), - acks = [], - confirms = gb_sets:new(), - publishing = 0}. - -%% Property - -prop_backing_queue_test() -> - ?FORALL(Cmds, commands(?MODULE, initial_state()), - backing_queue_test(Cmds)). - -backing_queue_test(Cmds) -> - {ok, FileSizeLimit} = - application:get_env(rabbit, msg_store_file_size_limit), - application:set_env(rabbit, msg_store_file_size_limit, 512, - infinity), - {ok, MaxJournal} = - application:get_env(rabbit, queue_index_max_journal_entries), - application:set_env(rabbit, queue_index_max_journal_entries, 128, - infinity), - - {_H, #state{bqstate = BQ}, Res} = run_commands(?MODULE, Cmds), - - application:set_env(rabbit, msg_store_file_size_limit, - FileSizeLimit, infinity), - application:set_env(rabbit, queue_index_max_journal_entries, - MaxJournal, infinity), - - ?BQMOD:delete_and_terminate(shutdown, BQ), - ?WHENFAIL( - io:format("Result: ~p~n", [Res]), - aggregate(command_names(Cmds), Res =:= ok)). - -%% Commands - -%% Command frequencies are tuned so that queues are normally -%% reasonably short, but they may sometimes exceed -%% ?QUEUE_MAXLEN. Publish-multiple and purging cause extreme queue -%% lengths, so these have lower probabilities. Fetches/drops are -%% sufficiently frequent so that commands that need acktags get decent -%% coverage. - -command(S) -> - frequency([{10, qc_publish(S)}, - {1, qc_publish_delivered(S)}, - {1, qc_publish_multiple(S)}, %% very slow - {9, qc_fetch(S)}, %% needed for ack and requeue - {6, qc_drop(S)}, %% - {15, qc_ack(S)}, - {15, qc_requeue(S)}, - {3, qc_set_ram_duration_target(S)}, - {1, qc_ram_duration(S)}, - {1, qc_drain_confirmed(S)}, - {1, qc_dropwhile(S)}, - {1, qc_is_empty(S)}, - {1, qc_timeout(S)}, - {1, qc_bump_credit(S)}, - {1, qc_purge(S)}, - {1, qc_fold(S)}]). - -qc_publish(#state{bqstate = BQ}) -> - {call, ?BQMOD, publish, - [qc_message(), - #message_properties{needs_confirming = frequency([{1, true}, - {20, false}]), - expiry = oneof([undefined | lists:seq(1, 10)]), - size = 10}, - false, self(), noflow, BQ]}. - -qc_publish_multiple(#state{}) -> - {call, ?MODULE, publish_multiple, [resize(?QUEUE_MAXLEN, pos_integer())]}. - -qc_publish_delivered(#state{bqstate = BQ}) -> - {call, ?BQMOD, publish_delivered, - [qc_message(), #message_properties{size = 10}, self(), noflow, BQ]}. - -qc_fetch(#state{bqstate = BQ}) -> - {call, ?BQMOD, fetch, [boolean(), BQ]}. - -qc_drop(#state{bqstate = BQ}) -> - {call, ?BQMOD, drop, [boolean(), BQ]}. - -qc_ack(#state{bqstate = BQ, acks = Acks}) -> - {call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}. - -qc_requeue(#state{bqstate = BQ, acks = Acks}) -> - {call, ?BQMOD, requeue, [rand_choice(proplists:get_keys(Acks)), BQ]}. - -qc_set_ram_duration_target(#state{bqstate = BQ}) -> - {call, ?BQMOD, set_ram_duration_target, - [oneof([0, 1, 2, resize(1000, pos_integer()), infinity]), BQ]}. - -qc_ram_duration(#state{bqstate = BQ}) -> - {call, ?BQMOD, ram_duration, [BQ]}. - -qc_drain_confirmed(#state{bqstate = BQ}) -> - {call, ?BQMOD, drain_confirmed, [BQ]}. - -qc_dropwhile(#state{bqstate = BQ}) -> - {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}. - -qc_is_empty(#state{bqstate = BQ}) -> - {call, ?BQMOD, is_empty, [BQ]}. - -qc_timeout(#state{bqstate = BQ}) -> - {call, ?MODULE, timeout, [BQ, ?TIMEOUT_LIMIT]}. - -qc_bump_credit(#state{bqstate = BQ}) -> - {call, ?MODULE, bump_credit, [BQ]}. - -qc_purge(#state{bqstate = BQ}) -> - {call, ?BQMOD, purge, [BQ]}. - -qc_fold(#state{bqstate = BQ}) -> - {call, ?BQMOD, fold, [makefoldfun(pos_integer()), foldacc(), BQ]}. - -%% Preconditions - -%% Create long queues by only allowing publishing -precondition(#state{publishing = Count}, {call, _Mod, Fun, _Arg}) - when Count > 0, Fun /= publish -> - false; -precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg}) - when Fun =:= ack; Fun =:= requeue -> - length(Acks) > 0; -precondition(#state{messages = Messages}, - {call, ?BQMOD, publish_delivered, _Arg}) -> - gb_trees:is_empty(Messages); -precondition(_S, {call, ?BQMOD, _Fun, _Arg}) -> - true; -precondition(_S, {call, ?MODULE, timeout, _Arg}) -> - true; -precondition(_S, {call, ?MODULE, bump_credit, _Arg}) -> - true; -precondition(#state{len = Len}, {call, ?MODULE, publish_multiple, _Arg}) -> - Len < ?QUEUE_MAXLEN. - -%% Model updates - -next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Del, _Pid, _Flow, _BQ]}) -> - #state{len = Len, - messages = Messages, - confirms = Confirms, - publishing = PublishCount, - next_seq_id = NextSeq} = S, - MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]}, - NeedsConfirm = - {call, erlang, element, - [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]}, - S#state{bqstate = BQ, - len = Len + 1, - next_seq_id = NextSeq + 1, - messages = gb_trees:insert(NextSeq, {MsgProps, Msg}, Messages), - publishing = {call, erlang, max, [0, {call, erlang, '-', - [PublishCount, 1]}]}, - confirms = case eval(NeedsConfirm) of - true -> gb_sets:add(MsgId, Confirms); - _ -> Confirms - end}; - -next_state(S, _BQ, {call, ?MODULE, publish_multiple, [PublishCount]}) -> - S#state{publishing = PublishCount}; - -next_state(S, Res, - {call, ?BQMOD, publish_delivered, - [Msg, MsgProps, _Pid, _Flow, _BQ]}) -> - #state{confirms = Confirms, acks = Acks, next_seq_id = NextSeq} = S, - AckTag = {call, erlang, element, [1, Res]}, - BQ1 = {call, erlang, element, [2, Res]}, - MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]}, - NeedsConfirm = - {call, erlang, element, - [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]}, - S#state{bqstate = BQ1, - next_seq_id = NextSeq + 1, - confirms = case eval(NeedsConfirm) of - true -> gb_sets:add(MsgId, Confirms); - _ -> Confirms - end, - acks = [{AckTag, {NextSeq, {MsgProps, Msg}}}|Acks] - }; - -next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) -> - next_state_fetch_and_drop(S, Res, AckReq, 3); - -next_state(S, Res, {call, ?BQMOD, drop, [AckReq, _BQ]}) -> - next_state_fetch_and_drop(S, Res, AckReq, 2); - -next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) -> - #state{acks = AcksState} = S, - BQ1 = {call, erlang, element, [2, Res]}, - S#state{bqstate = BQ1, - acks = lists:foldl(fun proplists:delete/2, AcksState, AcksArg)}; - -next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _V]}) -> - #state{messages = Messages, acks = AcksState} = S, - BQ1 = {call, erlang, element, [2, Res]}, - Messages1 = lists:foldl(fun (AckTag, Msgs) -> - {SeqId, MsgPropsMsg} = - proplists:get_value(AckTag, AcksState), - gb_trees:insert(SeqId, MsgPropsMsg, Msgs) - end, Messages, AcksArg), - S#state{bqstate = BQ1, - len = gb_trees:size(Messages1), - messages = Messages1, - acks = lists:foldl(fun proplists:delete/2, AcksState, AcksArg)}; - -next_state(S, BQ, {call, ?BQMOD, set_ram_duration_target, _Args}) -> - S#state{bqstate = BQ}; - -next_state(S, Res, {call, ?BQMOD, ram_duration, _Args}) -> - BQ1 = {call, erlang, element, [2, Res]}, - S#state{bqstate = BQ1}; - -next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) -> - BQ1 = {call, erlang, element, [2, Res]}, - S#state{bqstate = BQ1}; - -next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) -> - BQ = {call, erlang, element, [2, Res]}, - #state{messages = Messages} = S, - Msgs1 = drop_messages(Messages), - S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1}; - -next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) -> - S; - -next_state(S, BQ, {call, ?MODULE, timeout, _Args}) -> - S#state{bqstate = BQ}; -next_state(S, BQ, {call, ?MODULE, bump_credit, _Args}) -> - S#state{bqstate = BQ}; - -next_state(S, Res, {call, ?BQMOD, purge, _Args}) -> - BQ1 = {call, erlang, element, [2, Res]}, - S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()}; - -next_state(S, Res, {call, ?BQMOD, fold, _Args}) -> - BQ1 = {call, erlang, element, [2, Res]}, - S#state{bqstate = BQ1}. - -%% Postconditions - -postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) -> - #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S, - case Res of - {{MsgFetched, _IsDelivered, AckTag}, _BQ} -> - {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages), - MsgFetched =:= Msg andalso - not proplists:is_defined(AckTag, Acks) andalso - not gb_sets:is_element(AckTag, Confrms) andalso - Len =/= 0; - {empty, _BQ} -> - Len =:= 0 - end; - -postcondition(S, {call, ?BQMOD, drop, _Args}, Res) -> - #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S, - case Res of - {{MsgIdFetched, AckTag}, _BQ} -> - {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages), - MsgId = eval({call, erlang, element, - [?RECORD_INDEX(id, basic_message), Msg]}), - MsgIdFetched =:= MsgId andalso - not proplists:is_defined(AckTag, Acks) andalso - not gb_sets:is_element(AckTag, Confrms) andalso - Len =/= 0; - {empty, _BQ} -> - Len =:= 0 - end; - -postcondition(S, {call, ?BQMOD, publish_delivered, _Args}, {AckTag, _BQ}) -> - #state{acks = Acks, confirms = Confrms} = S, - not proplists:is_defined(AckTag, Acks) andalso - not gb_sets:is_element(AckTag, Confrms); - -postcondition(#state{len = Len}, {call, ?BQMOD, purge, _Args}, Res) -> - {PurgeCount, _BQ} = Res, - Len =:= PurgeCount; - -postcondition(#state{len = Len}, {call, ?BQMOD, is_empty, _Args}, Res) -> - (Len =:= 0) =:= Res; - -postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) -> - #state{confirms = Confirms} = S, - {ReportedConfirmed, _BQ} = Res, - lists:all(fun (M) -> gb_sets:is_element(M, Confirms) end, - ReportedConfirmed); - -postcondition(S, {call, ?BQMOD, fold, [FoldFun, Acc0, _BQ0]}, {Res, _BQ1}) -> - #state{messages = Messages} = S, - {_, Model} = lists:foldl(fun ({_SeqId, {_MsgProps, _Msg}}, {stop, Acc}) -> - {stop, Acc}; - ({_SeqId, {MsgProps, Msg}}, {cont, Acc}) -> - FoldFun(Msg, MsgProps, false, Acc) - end, {cont, Acc0}, gb_trees:to_list(Messages)), - true = Model =:= Res; - -postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) -> - ?BQMOD:len(BQ) =:= Len. - -%% Helpers - -publish_multiple(_C) -> - ok. - -timeout(BQ, 0) -> - BQ; -timeout(BQ, AtMost) -> - case ?BQMOD:needs_timeout(BQ) of - false -> BQ; - _ -> timeout(?BQMOD:timeout(BQ), AtMost - 1) - end. - -bump_credit(BQ) -> - case credit_flow:blocked() of - false -> BQ; - true -> receive - {bump_credit, Msg} -> - credit_flow:handle_bump_msg(Msg), - ?BQMOD:resume(BQ) - end - end. - -qc_message_payload() -> ?SIZED(Size, resize(Size * Size, binary())). - -qc_routing_key() -> noshrink(binary(10)). - -qc_delivery_mode() -> oneof([1, 2]). - -qc_message() -> qc_message(qc_delivery_mode()). - -qc_message(DeliveryMode) -> - {call, rabbit_basic, message, [qc_default_exchange(), - qc_routing_key(), - #'P_basic'{delivery_mode = DeliveryMode}, - qc_message_payload()]}. - -qc_default_exchange() -> - {call, rabbit_misc, r, [<<>>, exchange, <<>>]}. - -qc_variable_queue_init(Q) -> - {call, ?BQMOD, init, - [Q, new, function(2, {ok, []})]}. - -qc_test_q() -> {call, rabbit_misc, r, [<<"/">>, queue, noshrink(binary(16))]}. - -qc_test_queue() -> qc_test_queue(boolean()). - -qc_test_queue(Durable) -> - #amqqueue{name = qc_test_q(), - durable = Durable, - auto_delete = false, - arguments = [], - pid = self()}. - -rand_choice([]) -> []; -rand_choice(List) -> rand_choice(List, [], random:uniform(length(List))). - -rand_choice(_List, Selection, 0) -> - Selection; -rand_choice(List, Selection, N) -> - Picked = lists:nth(random:uniform(length(List)), List), - rand_choice(List -- [Picked], [Picked | Selection], - N - 1). - -makefoldfun(Size) -> - fun (Msg, _MsgProps, Unacked, Acc) -> - case {length(Acc) > Size, Unacked} of - {false, false} -> {cont, [Msg | Acc]}; - {false, true} -> {cont, Acc}; - {true, _} -> {stop, Acc} - end - end. -foldacc() -> []. - -dropfun(Props) -> - Expiry = eval({call, erlang, element, - [?RECORD_INDEX(expiry, message_properties), Props]}), - Expiry =/= 1. - -drop_messages(Messages) -> - case gb_trees:is_empty(Messages) of - true -> - Messages; - false -> {_Seq, MsgProps_Msg, M2} = gb_trees:take_smallest(Messages), - MsgProps = {call, erlang, element, [1, MsgProps_Msg]}, - case dropfun(MsgProps) of - true -> drop_messages(M2); - false -> Messages - end - end. - -next_state_fetch_and_drop(S, Res, AckReq, AckTagIdx) -> - #state{len = Len, messages = Messages, acks = Acks} = S, - ResultInfo = {call, erlang, element, [1, Res]}, - BQ1 = {call, erlang, element, [2, Res]}, - AckTag = {call, erlang, element, [AckTagIdx, ResultInfo]}, - S1 = S#state{bqstate = BQ1}, - case gb_trees:is_empty(Messages) of - true -> S1; - false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages), - S2 = S1#state{len = Len - 1, messages = M2}, - case AckReq of - true -> - S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]}; - false -> - S2 - end - end. - --else. - --export([prop_disabled/0]). - -prop_disabled() -> - exit({compiled_without_proper, - "PropEr was not present during compilation of the test module. " - "Hence all tests are disabled."}). - --endif. diff --git a/test/src/rabbit_runtime_parameters_test.erl b/test/src/rabbit_runtime_parameters_test.erl deleted file mode 100644 index d88975b61e..0000000000 --- a/test/src/rabbit_runtime_parameters_test.erl +++ /dev/null @@ -1,72 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. -%% - --module(rabbit_runtime_parameters_test). --behaviour(rabbit_runtime_parameter). --behaviour(rabbit_policy_validator). - --include("rabbit.hrl"). - --export([validate/5, notify/4, notify_clear/3]). --export([register/0, unregister/0]). --export([validate_policy/1]). --export([register_policy_validator/0, unregister_policy_validator/0]). - -%---------------------------------------------------------------------------- - -register() -> - rabbit_registry:register(runtime_parameter, <<"test">>, ?MODULE). - -unregister() -> - rabbit_registry:unregister(runtime_parameter, <<"test">>). - -validate(_, <<"test">>, <<"good">>, _Term, _User) -> ok; -validate(_, <<"test">>, <<"maybe">>, <<"good">>, _User) -> ok; -validate(_, <<"test">>, <<"admin">>, _Term, none) -> ok; -validate(_, <<"test">>, <<"admin">>, _Term, User) -> - case lists:member(administrator, User#user.tags) of - true -> ok; - false -> {error, "meh", []} - end; -validate(_, <<"test">>, _, _, _) -> {error, "meh", []}. - -notify(_, _, _, _) -> ok. -notify_clear(_, _, _) -> ok. - -%---------------------------------------------------------------------------- - -register_policy_validator() -> - rabbit_registry:register(policy_validator, <<"testeven">>, ?MODULE), - rabbit_registry:register(policy_validator, <<"testpos">>, ?MODULE). - -unregister_policy_validator() -> - rabbit_registry:unregister(policy_validator, <<"testeven">>), - rabbit_registry:unregister(policy_validator, <<"testpos">>). - -validate_policy([{<<"testeven">>, Terms}]) when is_list(Terms) -> - case length(Terms) rem 2 =:= 0 of - true -> ok; - false -> {error, "meh", []} - end; - -validate_policy([{<<"testpos">>, Terms}]) when is_list(Terms) -> - case lists:all(fun (N) -> is_integer(N) andalso N > 0 end, Terms) of - true -> ok; - false -> {error, "meh", []} - end; - -validate_policy(_) -> - {error, "meh", []}. diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl deleted file mode 100644 index 2235717ebf..0000000000 --- a/test/src/rabbit_tests.erl +++ /dev/null @@ -1,3214 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. -%% --module(rabbit_tests). - --compile([export_all]). - --export([all_tests/0]). - --import(rabbit_misc, [pget/2]). - --include("rabbit.hrl"). --include("rabbit_framing.hrl"). --include_lib("kernel/include/file.hrl"). - --define(PERSISTENT_MSG_STORE, msg_store_persistent). --define(TRANSIENT_MSG_STORE, msg_store_transient). --define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>). --define(TIMEOUT, 30000). - -all_tests() -> - try - all_tests0() - catch - Type:Error -> - rabbit_misc:format( - "Tests failed~nError: {~p, ~p}~nStack trace:~n~p~n", - [Type, Error, erlang:get_stacktrace()]) - end. - -all_tests0() -> - ok = setup_cluster(), - ok = truncate:test(), - ok = supervisor2_tests:test_all(), - passed = gm_tests:all_tests(), - passed = mirrored_supervisor_tests:all_tests(), - application:set_env(rabbit, file_handles_high_watermark, 10), - ok = file_handle_cache:set_limit(10), - passed = test_version_equivalance(), - passed = test_file_handle_cache(), - passed = test_backing_queue(), - passed = test_rabbit_basic_header_handling(), - passed = test_priority_queue(), - passed = test_pg_local(), - passed = test_unfold(), - passed = test_pmerge(), - passed = test_plmerge(), - passed = test_supervisor_delayed_restart(), - passed = test_table_codec(), - passed = test_content_framing(), - passed = test_content_transcoding(), - passed = test_topic_matching(), - passed = test_log_management(), - passed = test_app_management(), - passed = test_log_management_during_startup(), - passed = test_ch_statistics(), - passed = test_head_message_timestamp_statistic(), - passed = test_arguments_parser(), - passed = test_dynamic_mirroring(), - passed = test_user_management(), - passed = test_runtime_parameters(), - passed = test_policy_validation(), - passed = test_policy_opts_validation(), - passed = test_ha_policy_validation(), - passed = test_queue_master_location_policy_validation(), - passed = test_server_status(), - passed = test_amqp_connection_refusal(), - passed = test_confirms(), - passed = test_with_state(), - passed = test_mcall(), - passed = - do_if_secondary_node( - fun run_cluster_dependent_tests/1, - fun (SecondaryNode) -> - io:format("Skipping cluster dependent tests with node ~p~n", - [SecondaryNode]), - passed - end), - passed = test_configurable_server_properties(), - passed = vm_memory_monitor_tests:all_tests(), - passed = credit_flow_test:test_credit_flow_settings(), - passed = on_disk_store_tunable_parameter_validation_test:test_msg_store_parameter_validation(), - passed = password_hashing_tests:test_password_hashing(), - passed = credit_flow_test:test_credit_flow_settings(), - - passed. - -do_if_secondary_node(Up, Down) -> - SecondaryNode = rabbit_nodes:make("hare"), - - case net_adm:ping(SecondaryNode) of - pong -> Up(SecondaryNode); - pang -> Down(SecondaryNode) - end. - -setup_cluster() -> - do_if_secondary_node( - fun (SecondaryNode) -> - ok = control_action(stop_app, []), - ok = control_action(join_cluster, - [atom_to_list(SecondaryNode)]), - ok = control_action(start_app, []), - ok = control_action(start_app, SecondaryNode, [], []) - end, - fun (_) -> ok end). - -maybe_run_cluster_dependent_tests() -> - do_if_secondary_node( - fun (SecondaryNode) -> - passed = run_cluster_dependent_tests(SecondaryNode) - end, - fun (SecondaryNode) -> - io:format("Skipping cluster dependent tests with node ~p~n", - [SecondaryNode]) - end). - -run_cluster_dependent_tests(SecondaryNode) -> - io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]), - passed = test_delegates_async(SecondaryNode), - passed = test_delegates_sync(SecondaryNode), - passed = test_queue_cleanup(SecondaryNode), - passed = test_declare_on_dead_queue(SecondaryNode), - passed = test_refresh_events(SecondaryNode), - - %% we now run the tests remotely, so that code coverage on the - %% local node picks up more of the delegate - Node = node(), - Self = self(), - Remote = spawn(SecondaryNode, - fun () -> Rs = [ test_delegates_async(Node), - test_delegates_sync(Node), - test_queue_cleanup(Node), - test_declare_on_dead_queue(Node), - test_refresh_events(Node) ], - Self ! {self(), Rs} - end), - receive - {Remote, Result} -> - Result = lists:duplicate(length(Result), passed) - after 30000 -> - throw(timeout) - end, - - passed. - -test_version_equivalance() -> - true = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.0"), - true = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.1"), - true = rabbit_misc:version_minor_equivalent("%%VSN%%", "%%VSN%%"), - false = rabbit_misc:version_minor_equivalent("3.0.0", "3.1.0"), - false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0"), - false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.0.1"), - false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.foo"), - passed. - -test_rabbit_basic_header_handling() -> - passed = write_table_with_invalid_existing_type_test(), - passed = invalid_existing_headers_test(), - passed = disparate_invalid_header_entries_accumulate_separately_test(), - passed = corrupt_or_invalid_headers_are_overwritten_test(), - passed = invalid_same_header_entry_accumulation_test(), - passed. - --define(XDEATH_TABLE, - [{<<"reason">>, longstr, <<"blah">>}, - {<<"queue">>, longstr, <<"foo.bar.baz">>}, - {<<"exchange">>, longstr, <<"my-exchange">>}, - {<<"routing-keys">>, array, []}]). - --define(ROUTE_TABLE, [{<<"redelivered">>, bool, <<"true">>}]). - --define(BAD_HEADER(K), {<<K>>, longstr, <<"bad ", K>>}). --define(BAD_HEADER2(K, Suf), {<<K>>, longstr, <<"bad ", K, Suf>>}). --define(FOUND_BAD_HEADER(K), {<<K>>, array, [{longstr, <<"bad ", K>>}]}). - -write_table_with_invalid_existing_type_test() -> - prepend_check(<<"header1">>, ?XDEATH_TABLE, [?BAD_HEADER("header1")]), - passed. - -invalid_existing_headers_test() -> - Headers = - prepend_check(<<"header2">>, ?ROUTE_TABLE, [?BAD_HEADER("header2")]), - {array, [{table, ?ROUTE_TABLE}]} = - rabbit_misc:table_lookup(Headers, <<"header2">>), - passed. - -disparate_invalid_header_entries_accumulate_separately_test() -> - BadHeaders = [?BAD_HEADER("header2")], - Headers = prepend_check(<<"header2">>, ?ROUTE_TABLE, BadHeaders), - Headers2 = prepend_check(<<"header1">>, ?XDEATH_TABLE, - [?BAD_HEADER("header1") | Headers]), - {table, [?FOUND_BAD_HEADER("header1"), - ?FOUND_BAD_HEADER("header2")]} = - rabbit_misc:table_lookup(Headers2, ?INVALID_HEADERS_KEY), - passed. - -corrupt_or_invalid_headers_are_overwritten_test() -> - Headers0 = [?BAD_HEADER("header1"), - ?BAD_HEADER("x-invalid-headers")], - Headers1 = prepend_check(<<"header1">>, ?XDEATH_TABLE, Headers0), - {table,[?FOUND_BAD_HEADER("header1"), - ?FOUND_BAD_HEADER("x-invalid-headers")]} = - rabbit_misc:table_lookup(Headers1, ?INVALID_HEADERS_KEY), - passed. - -invalid_same_header_entry_accumulation_test() -> - BadHeader1 = ?BAD_HEADER2("header1", "a"), - Headers = prepend_check(<<"header1">>, ?ROUTE_TABLE, [BadHeader1]), - Headers2 = prepend_check(<<"header1">>, ?ROUTE_TABLE, - [?BAD_HEADER2("header1", "b") | Headers]), - {table, InvalidHeaders} = - rabbit_misc:table_lookup(Headers2, ?INVALID_HEADERS_KEY), - {array, [{longstr,<<"bad header1b">>}, - {longstr,<<"bad header1a">>}]} = - rabbit_misc:table_lookup(InvalidHeaders, <<"header1">>), - passed. - -prepend_check(HeaderKey, HeaderTable, Headers) -> - Headers1 = rabbit_basic:prepend_table_header( - HeaderKey, HeaderTable, Headers), - {table, Invalid} = - rabbit_misc:table_lookup(Headers1, ?INVALID_HEADERS_KEY), - {Type, Value} = rabbit_misc:table_lookup(Headers, HeaderKey), - {array, [{Type, Value} | _]} = - rabbit_misc:table_lookup(Invalid, HeaderKey), - Headers1. - -test_priority_queue() -> - - false = priority_queue:is_queue(not_a_queue), - - %% empty Q - Q = priority_queue:new(), - {true, true, 0, [], []} = test_priority_queue(Q), - - %% 1-4 element no-priority Q - true = lists:all(fun (X) -> X =:= passed end, - lists:map(fun test_simple_n_element_queue/1, - lists:seq(1, 4))), - - %% 1-element priority Q - Q1 = priority_queue:in(foo, 1, priority_queue:new()), - {true, false, 1, [{1, foo}], [foo]} = - test_priority_queue(Q1), - - %% 2-element same-priority Q - Q2 = priority_queue:in(bar, 1, Q1), - {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} = - test_priority_queue(Q2), - - %% 2-element different-priority Q - Q3 = priority_queue:in(bar, 2, Q1), - {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = - test_priority_queue(Q3), - - %% 1-element negative priority Q - Q4 = priority_queue:in(foo, -1, priority_queue:new()), - {true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4), - - %% merge 2 * 1-element no-priority Qs - Q5 = priority_queue:join(priority_queue:in(foo, Q), - priority_queue:in(bar, Q)), - {true, false, 2, [{0, foo}, {0, bar}], [foo, bar]} = - test_priority_queue(Q5), - - %% merge 1-element no-priority Q with 1-element priority Q - Q6 = priority_queue:join(priority_queue:in(foo, Q), - priority_queue:in(bar, 1, Q)), - {true, false, 2, [{1, bar}, {0, foo}], [bar, foo]} = - test_priority_queue(Q6), - - %% merge 1-element priority Q with 1-element no-priority Q - Q7 = priority_queue:join(priority_queue:in(foo, 1, Q), - priority_queue:in(bar, Q)), - {true, false, 2, [{1, foo}, {0, bar}], [foo, bar]} = - test_priority_queue(Q7), - - %% merge 2 * 1-element same-priority Qs - Q8 = priority_queue:join(priority_queue:in(foo, 1, Q), - priority_queue:in(bar, 1, Q)), - {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} = - test_priority_queue(Q8), - - %% merge 2 * 1-element different-priority Qs - Q9 = priority_queue:join(priority_queue:in(foo, 1, Q), - priority_queue:in(bar, 2, Q)), - {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = - test_priority_queue(Q9), - - %% merge 2 * 1-element different-priority Qs (other way around) - Q10 = priority_queue:join(priority_queue:in(bar, 2, Q), - priority_queue:in(foo, 1, Q)), - {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = - test_priority_queue(Q10), - - %% merge 2 * 2-element multi-different-priority Qs - Q11 = priority_queue:join(Q6, Q5), - {true, false, 4, [{1, bar}, {0, foo}, {0, foo}, {0, bar}], - [bar, foo, foo, bar]} = test_priority_queue(Q11), - - %% and the other way around - Q12 = priority_queue:join(Q5, Q6), - {true, false, 4, [{1, bar}, {0, foo}, {0, bar}, {0, foo}], - [bar, foo, bar, foo]} = test_priority_queue(Q12), - - %% merge with negative priorities - Q13 = priority_queue:join(Q4, Q5), - {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} = - test_priority_queue(Q13), - - %% and the other way around - Q14 = priority_queue:join(Q5, Q4), - {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} = - test_priority_queue(Q14), - - %% joins with empty queues: - Q1 = priority_queue:join(Q, Q1), - Q1 = priority_queue:join(Q1, Q), - - %% insert with priority into non-empty zero-priority queue - Q15 = priority_queue:in(baz, 1, Q5), - {true, false, 3, [{1, baz}, {0, foo}, {0, bar}], [baz, foo, bar]} = - test_priority_queue(Q15), - - %% 1-element infinity priority Q - Q16 = priority_queue:in(foo, infinity, Q), - {true, false, 1, [{infinity, foo}], [foo]} = test_priority_queue(Q16), - - %% add infinity to 0-priority Q - Q17 = priority_queue:in(foo, infinity, priority_queue:in(bar, Q)), - {true, false, 2, [{infinity, foo}, {0, bar}], [foo, bar]} = - test_priority_queue(Q17), - - %% and the other way around - Q18 = priority_queue:in(bar, priority_queue:in(foo, infinity, Q)), - {true, false, 2, [{infinity, foo}, {0, bar}], [foo, bar]} = - test_priority_queue(Q18), - - %% add infinity to mixed-priority Q - Q19 = priority_queue:in(qux, infinity, Q3), - {true, false, 3, [{infinity, qux}, {2, bar}, {1, foo}], [qux, bar, foo]} = - test_priority_queue(Q19), - - %% merge the above with a negative priority Q - Q20 = priority_queue:join(Q19, Q4), - {true, false, 4, [{infinity, qux}, {2, bar}, {1, foo}, {-1, foo}], - [qux, bar, foo, foo]} = test_priority_queue(Q20), - - %% merge two infinity priority queues - Q21 = priority_queue:join(priority_queue:in(foo, infinity, Q), - priority_queue:in(bar, infinity, Q)), - {true, false, 2, [{infinity, foo}, {infinity, bar}], [foo, bar]} = - test_priority_queue(Q21), - - %% merge two mixed priority with infinity queues - Q22 = priority_queue:join(Q18, Q20), - {true, false, 6, [{infinity, foo}, {infinity, qux}, {2, bar}, {1, foo}, - {0, bar}, {-1, foo}], [foo, qux, bar, foo, bar, foo]} = - test_priority_queue(Q22), - - passed. - -priority_queue_in_all(Q, L) -> - lists:foldl(fun (X, Acc) -> priority_queue:in(X, Acc) end, Q, L). - -priority_queue_out_all(Q) -> - case priority_queue:out(Q) of - {empty, _} -> []; - {{value, V}, Q1} -> [V | priority_queue_out_all(Q1)] - end. - -test_priority_queue(Q) -> - {priority_queue:is_queue(Q), - priority_queue:is_empty(Q), - priority_queue:len(Q), - priority_queue:to_list(Q), - priority_queue_out_all(Q)}. - -test_simple_n_element_queue(N) -> - Items = lists:seq(1, N), - Q = priority_queue_in_all(priority_queue:new(), Items), - ToListRes = [{0, X} || X <- Items], - {true, false, N, ToListRes, Items} = test_priority_queue(Q), - passed. - -test_pg_local() -> - [P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- [x, x]], - check_pg_local(ok, [], []), - check_pg_local(pg_local:join(a, P), [P], []), - check_pg_local(pg_local:join(b, P), [P], [P]), - check_pg_local(pg_local:join(a, P), [P, P], [P]), - check_pg_local(pg_local:join(a, Q), [P, P, Q], [P]), - check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q]), - check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q, Q]), - check_pg_local(pg_local:leave(a, P), [P, Q], [P, Q, Q]), - check_pg_local(pg_local:leave(b, P), [P, Q], [Q, Q]), - check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]), - check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]), - [begin X ! done, - Ref = erlang:monitor(process, X), - receive {'DOWN', Ref, process, X, _Info} -> ok end - end || X <- [P, Q]], - check_pg_local(ok, [], []), - passed. - -check_pg_local(ok, APids, BPids) -> - ok = pg_local:sync(), - [true, true] = [lists:sort(Pids) == lists:sort(pg_local:get_members(Key)) || - {Key, Pids} <- [{a, APids}, {b, BPids}]]. - -test_unfold() -> - {[], test} = rabbit_misc:unfold(fun (_V) -> false end, test), - List = lists:seq(2,20,2), - {List, 0} = rabbit_misc:unfold(fun (0) -> false; - (N) -> {true, N*2, N-1} - end, 10), - passed. - -test_pmerge() -> - P = [{a, 1}, {b, 2}], - P = rabbit_misc:pmerge(a, 3, P), - [{c, 3} | P] = rabbit_misc:pmerge(c, 3, P), - passed. - -test_plmerge() -> - P1 = [{a, 1}, {b, 2}, {c, 3}], - P2 = [{a, 2}, {d, 4}], - [{a, 1}, {b, 2}, {c, 3}, {d, 4}] = rabbit_misc:plmerge(P1, P2), - passed. - -test_table_codec() -> - %% FIXME this does not test inexact numbers (double and float) yet, - %% because they won't pass the equality assertions - Table = [{<<"longstr">>, longstr, <<"Here is a long string">>}, - {<<"signedint">>, signedint, 12345}, - {<<"decimal">>, decimal, {3, 123456}}, - {<<"timestamp">>, timestamp, 109876543209876}, - {<<"table">>, table, [{<<"one">>, signedint, 54321}, - {<<"two">>, longstr, - <<"A long string">>}]}, - {<<"byte">>, byte, -128}, - {<<"long">>, long, 1234567890}, - {<<"short">>, short, 655}, - {<<"bool">>, bool, true}, - {<<"binary">>, binary, <<"a binary string">>}, - {<<"void">>, void, undefined}, - {<<"array">>, array, [{signedint, 54321}, - {longstr, <<"A long string">>}]} - ], - Binary = << - 7,"longstr", "S", 21:32, "Here is a long string", - 9,"signedint", "I", 12345:32/signed, - 7,"decimal", "D", 3, 123456:32, - 9,"timestamp", "T", 109876543209876:64, - 5,"table", "F", 31:32, % length of table - 3,"one", "I", 54321:32, - 3,"two", "S", 13:32, "A long string", - 4,"byte", "b", -128:8/signed, - 4,"long", "l", 1234567890:64, - 5,"short", "s", 655:16, - 4,"bool", "t", 1, - 6,"binary", "x", 15:32, "a binary string", - 4,"void", "V", - 5,"array", "A", 23:32, - "I", 54321:32, - "S", 13:32, "A long string" - >>, - Binary = rabbit_binary_generator:generate_table(Table), - Table = rabbit_binary_parser:parse_table(Binary), - passed. - -%% Test that content frames don't exceed frame-max -test_content_framing(FrameMax, BodyBin) -> - [Header | Frames] = - rabbit_binary_generator:build_simple_content_frames( - 1, - rabbit_binary_generator:ensure_content_encoded( - rabbit_basic:build_content(#'P_basic'{}, BodyBin), - rabbit_framing_amqp_0_9_1), - FrameMax, - rabbit_framing_amqp_0_9_1), - %% header is formatted correctly and the size is the total of the - %% fragments - <<_FrameHeader:7/binary, _ClassAndWeight:4/binary, - BodySize:64/unsigned, _Rest/binary>> = list_to_binary(Header), - BodySize = size(BodyBin), - true = lists:all( - fun (ContentFrame) -> - FrameBinary = list_to_binary(ContentFrame), - %% assert - <<_TypeAndChannel:3/binary, - Size:32/unsigned, _Payload:Size/binary, 16#CE>> = - FrameBinary, - size(FrameBinary) =< FrameMax - end, Frames), - passed. - -test_content_framing() -> - %% no content - passed = test_content_framing(4096, <<>>), - %% easily fit in one frame - passed = test_content_framing(4096, <<"Easy">>), - %% exactly one frame (empty frame = 8 bytes) - passed = test_content_framing(11, <<"One">>), - %% more than one frame - passed = test_content_framing(11, <<"More than one frame">>), - passed. - -test_content_transcoding() -> - %% there are no guarantees provided by 'clear' - it's just a hint - ClearDecoded = fun rabbit_binary_parser:clear_decoded_content/1, - ClearEncoded = fun rabbit_binary_generator:clear_encoded_content/1, - EnsureDecoded = - fun (C0) -> - C1 = rabbit_binary_parser:ensure_content_decoded(C0), - true = C1#content.properties =/= none, - C1 - end, - EnsureEncoded = - fun (Protocol) -> - fun (C0) -> - C1 = rabbit_binary_generator:ensure_content_encoded( - C0, Protocol), - true = C1#content.properties_bin =/= none, - C1 - end - end, - %% Beyond the assertions in Ensure*, the only testable guarantee - %% is that the operations should never fail. - %% - %% If we were using quickcheck we'd simply stuff all the above - %% into a generator for sequences of operations. In the absence of - %% quickcheck we pick particularly interesting sequences that: - %% - %% - execute every op twice since they are idempotent - %% - invoke clear_decoded, clear_encoded, decode and transcode - %% with one or both of decoded and encoded content present - [begin - sequence_with_content([Op]), - sequence_with_content([ClearEncoded, Op]), - sequence_with_content([ClearDecoded, Op]) - end || Op <- [ClearDecoded, ClearEncoded, EnsureDecoded, - EnsureEncoded(rabbit_framing_amqp_0_9_1), - EnsureEncoded(rabbit_framing_amqp_0_8)]], - passed. - -sequence_with_content(Sequence) -> - lists:foldl(fun (F, V) -> F(F(V)) end, - rabbit_binary_generator:ensure_content_encoded( - rabbit_basic:build_content(#'P_basic'{}, <<>>), - rabbit_framing_amqp_0_9_1), - Sequence). - -test_topic_matching() -> - XName = #resource{virtual_host = <<"/">>, - kind = exchange, - name = <<"test_exchange">>}, - X0 = #exchange{name = XName, type = topic, durable = false, - auto_delete = false, arguments = []}, - X = rabbit_exchange_decorator:set(X0), - %% create - rabbit_exchange_type_topic:validate(X), - exchange_op_callback(X, create, []), - - %% add some bindings - Bindings = [#binding{source = XName, - key = list_to_binary(Key), - destination = #resource{virtual_host = <<"/">>, - kind = queue, - name = list_to_binary(Q)}, - args = Args} || - {Key, Q, Args} <- [{"a.b.c", "t1", []}, - {"a.*.c", "t2", []}, - {"a.#.b", "t3", []}, - {"a.b.b.c", "t4", []}, - {"#", "t5", []}, - {"#.#", "t6", []}, - {"#.b", "t7", []}, - {"*.*", "t8", []}, - {"a.*", "t9", []}, - {"*.b.c", "t10", []}, - {"a.#", "t11", []}, - {"a.#.#", "t12", []}, - {"b.b.c", "t13", []}, - {"a.b.b", "t14", []}, - {"a.b", "t15", []}, - {"b.c", "t16", []}, - {"", "t17", []}, - {"*.*.*", "t18", []}, - {"vodka.martini", "t19", []}, - {"a.b.c", "t20", []}, - {"*.#", "t21", []}, - {"#.*.#", "t22", []}, - {"*.#.#", "t23", []}, - {"#.#.#", "t24", []}, - {"*", "t25", []}, - {"#.b.#", "t26", []}, - {"args-test", "t27", - [{<<"foo">>, longstr, <<"bar">>}]}, - {"args-test", "t27", %% Note aliasing - [{<<"foo">>, longstr, <<"baz">>}]}]], - lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end, - Bindings), - - %% test some matches - test_topic_expect_match( - X, [{"a.b.c", ["t1", "t2", "t5", "t6", "t10", "t11", "t12", - "t18", "t20", "t21", "t22", "t23", "t24", - "t26"]}, - {"a.b", ["t3", "t5", "t6", "t7", "t8", "t9", "t11", - "t12", "t15", "t21", "t22", "t23", "t24", - "t26"]}, - {"a.b.b", ["t3", "t5", "t6", "t7", "t11", "t12", "t14", - "t18", "t21", "t22", "t23", "t24", "t26"]}, - {"", ["t5", "t6", "t17", "t24"]}, - {"b.c.c", ["t5", "t6", "t18", "t21", "t22", "t23", - "t24", "t26"]}, - {"a.a.a.a.a", ["t5", "t6", "t11", "t12", "t21", "t22", - "t23", "t24"]}, - {"vodka.gin", ["t5", "t6", "t8", "t21", "t22", "t23", - "t24"]}, - {"vodka.martini", ["t5", "t6", "t8", "t19", "t21", "t22", "t23", - "t24"]}, - {"b.b.c", ["t5", "t6", "t10", "t13", "t18", "t21", - "t22", "t23", "t24", "t26"]}, - {"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]}, - {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24", - "t25"]}, - {"args-test", ["t5", "t6", "t21", "t22", "t23", "t24", - "t25", "t27"]}]), - %% remove some bindings - RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings), - lists:nth(11, Bindings), lists:nth(19, Bindings), - lists:nth(21, Bindings), lists:nth(28, Bindings)], - exchange_op_callback(X, remove_bindings, [RemovedBindings]), - RemainingBindings = ordsets:to_list( - ordsets:subtract(ordsets:from_list(Bindings), - ordsets:from_list(RemovedBindings))), - - %% test some matches - test_topic_expect_match( - X, - [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22", - "t23", "t24", "t26"]}, - {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15", - "t22", "t23", "t24", "t26"]}, - {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22", - "t23", "t24", "t26"]}, - {"", ["t6", "t17", "t24"]}, - {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]}, - {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]}, - {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]}, - {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]}, - {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23", - "t24", "t26"]}, - {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]}, - {"oneword", ["t6", "t22", "t23", "t24", "t25"]}, - {"args-test", ["t6", "t22", "t23", "t24", "t25", "t27"]}]), - - %% remove the entire exchange - exchange_op_callback(X, delete, [RemainingBindings]), - %% none should match now - test_topic_expect_match(X, [{"a.b.c", []}, {"b.b.c", []}, {"", []}]), - passed. - -exchange_op_callback(X, Fun, Args) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> rabbit_exchange:callback(X, Fun, transaction, [X] ++ Args) end), - rabbit_exchange:callback(X, Fun, none, [X] ++ Args). - -test_topic_expect_match(X, List) -> - lists:foreach( - fun ({Key, Expected}) -> - BinKey = list_to_binary(Key), - Message = rabbit_basic:message(X#exchange.name, BinKey, - #'P_basic'{}, <<>>), - Res = rabbit_exchange_type_topic:route( - X, #delivery{mandatory = false, - sender = self(), - message = Message}), - ExpectedRes = lists:map( - fun (Q) -> #resource{virtual_host = <<"/">>, - kind = queue, - name = list_to_binary(Q)} - end, Expected), - true = (lists:usort(ExpectedRes) =:= lists:usort(Res)) - end, List). - -test_app_management() -> - control_action(wait, [rabbit_mnesia:dir() ++ ".pid"]), - %% Starting, stopping and diagnostics. Note that we don't try - %% 'report' when the rabbit app is stopped and that we enable - %% tracing for the duration of this function. - ok = control_action(trace_on, []), - ok = control_action(stop_app, []), - ok = control_action(stop_app, []), - ok = control_action(status, []), - ok = control_action(cluster_status, []), - ok = control_action(environment, []), - ok = control_action(start_app, []), - ok = control_action(start_app, []), - ok = control_action(status, []), - ok = control_action(report, []), - ok = control_action(cluster_status, []), - ok = control_action(environment, []), - ok = control_action(trace_off, []), - passed. - -test_log_management() -> - MainLog = rabbit:log_location(kernel), - SaslLog = rabbit:log_location(sasl), - Suffix = ".1", - - %% prepare basic logs - file:delete([MainLog, Suffix]), - file:delete([SaslLog, Suffix]), - - %% simple logs reopening - ok = control_action(rotate_logs, []), - [true, true] = empty_files([MainLog, SaslLog]), - ok = test_logs_working(MainLog, SaslLog), - - %% simple log rotation - ok = control_action(rotate_logs, [Suffix]), - [true, true] = non_empty_files([[MainLog, Suffix], [SaslLog, Suffix]]), - [true, true] = empty_files([MainLog, SaslLog]), - ok = test_logs_working(MainLog, SaslLog), - - %% reopening logs with log rotation performed first - ok = clean_logs([MainLog, SaslLog], Suffix), - ok = control_action(rotate_logs, []), - ok = file:rename(MainLog, [MainLog, Suffix]), - ok = file:rename(SaslLog, [SaslLog, Suffix]), - ok = test_logs_working([MainLog, Suffix], [SaslLog, Suffix]), - ok = control_action(rotate_logs, []), - ok = test_logs_working(MainLog, SaslLog), - - %% log rotation on empty files (the main log will have a ctl action logged) - ok = clean_logs([MainLog, SaslLog], Suffix), - ok = control_action(rotate_logs, []), - ok = control_action(rotate_logs, [Suffix]), - [false, true] = empty_files([[MainLog, Suffix], [SaslLog, Suffix]]), - - %% logs with suffix are not writable - ok = control_action(rotate_logs, [Suffix]), - ok = make_files_non_writable([[MainLog, Suffix], [SaslLog, Suffix]]), - ok = control_action(rotate_logs, [Suffix]), - ok = test_logs_working(MainLog, SaslLog), - - %% rotate when original log files are not writable - ok = make_files_non_writable([MainLog, SaslLog]), - ok = control_action(rotate_logs, []), - - %% logging directed to tty (first, remove handlers) - ok = delete_log_handlers([rabbit_sasl_report_file_h, - rabbit_error_logger_file_h]), - ok = clean_logs([MainLog, SaslLog], Suffix), - ok = application:set_env(rabbit, sasl_error_logger, tty), - ok = application:set_env(rabbit, error_logger, tty), - ok = control_action(rotate_logs, []), - [{error, enoent}, {error, enoent}] = empty_files([MainLog, SaslLog]), - - %% rotate logs when logging is turned off - ok = application:set_env(rabbit, sasl_error_logger, false), - ok = application:set_env(rabbit, error_logger, silent), - ok = control_action(rotate_logs, []), - [{error, enoent}, {error, enoent}] = empty_files([MainLog, SaslLog]), - - %% cleanup - ok = application:set_env(rabbit, sasl_error_logger, {file, SaslLog}), - ok = application:set_env(rabbit, error_logger, {file, MainLog}), - ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog}, - {rabbit_sasl_report_file_h, SaslLog}]), - passed. - -test_log_management_during_startup() -> - MainLog = rabbit:log_location(kernel), - SaslLog = rabbit:log_location(sasl), - - %% start application with simple tty logging - ok = control_action(stop_app, []), - ok = application:set_env(rabbit, error_logger, tty), - ok = application:set_env(rabbit, sasl_error_logger, tty), - ok = add_log_handlers([{error_logger_tty_h, []}, - {sasl_report_tty_h, []}]), - ok = control_action(start_app, []), - - %% start application with tty logging and - %% proper handlers not installed - ok = control_action(stop_app, []), - ok = error_logger:tty(false), - ok = delete_log_handlers([sasl_report_tty_h]), - ok = case catch control_action(start_app, []) of - ok -> exit({got_success_but_expected_failure, - log_rotation_tty_no_handlers_test}); - {badrpc, {'EXIT', {error, - {cannot_log_to_tty, _, not_installed}}}} -> ok - end, - - %% fix sasl logging - ok = application:set_env(rabbit, sasl_error_logger, {file, SaslLog}), - - %% start application with logging to non-existing directory - TmpLog = "/tmp/rabbit-tests/test.log", - delete_file(TmpLog), - ok = control_action(stop_app, []), - ok = application:set_env(rabbit, error_logger, {file, TmpLog}), - - ok = delete_log_handlers([rabbit_error_logger_file_h]), - ok = add_log_handlers([{error_logger_file_h, MainLog}]), - ok = control_action(start_app, []), - - %% start application with logging to directory with no - %% write permissions - ok = control_action(stop_app, []), - TmpDir = "/tmp/rabbit-tests", - ok = set_permissions(TmpDir, 8#00400), - ok = delete_log_handlers([rabbit_error_logger_file_h]), - ok = add_log_handlers([{error_logger_file_h, MainLog}]), - ok = case control_action(start_app, []) of - ok -> exit({got_success_but_expected_failure, - log_rotation_no_write_permission_dir_test}); - {badrpc, {'EXIT', - {error, {cannot_log_to_file, _, _}}}} -> ok - end, - - %% start application with logging to a subdirectory which - %% parent directory has no write permissions - ok = control_action(stop_app, []), - TmpTestDir = "/tmp/rabbit-tests/no-permission/test/log", - ok = application:set_env(rabbit, error_logger, {file, TmpTestDir}), - ok = add_log_handlers([{error_logger_file_h, MainLog}]), - ok = case control_action(start_app, []) of - ok -> exit({got_success_but_expected_failure, - log_rotatation_parent_dirs_test}); - {badrpc, - {'EXIT', - {error, {cannot_log_to_file, _, - {error, - {cannot_create_parent_dirs, _, eacces}}}}}} -> ok - end, - ok = set_permissions(TmpDir, 8#00700), - ok = set_permissions(TmpLog, 8#00600), - ok = delete_file(TmpLog), - ok = file:del_dir(TmpDir), - - %% start application with standard error_logger_file_h - %% handler not installed - ok = control_action(stop_app, []), - ok = application:set_env(rabbit, error_logger, {file, MainLog}), - ok = control_action(start_app, []), - - %% start application with standard sasl handler not installed - %% and rabbit main log handler installed correctly - ok = control_action(stop_app, []), - ok = delete_log_handlers([rabbit_sasl_report_file_h]), - ok = control_action(start_app, []), - passed. - -test_arguments_parser() -> - GlobalOpts1 = [{"-f1", flag}, {"-o1", {option, "foo"}}], - Commands1 = [command1, {command2, [{"-f2", flag}, {"-o2", {option, "bar"}}]}], - - GetOptions = - fun (Args) -> - rabbit_cli:parse_arguments(Commands1, GlobalOpts1, "-n", Args) - end, - - check_parse_arguments(no_command, GetOptions, []), - check_parse_arguments(no_command, GetOptions, ["foo", "bar"]), - check_parse_arguments( - {ok, {command1, [{"-f1", false}, {"-o1", "foo"}], []}}, - GetOptions, ["command1"]), - check_parse_arguments( - {ok, {command1, [{"-f1", false}, {"-o1", "blah"}], []}}, - GetOptions, ["command1", "-o1", "blah"]), - check_parse_arguments( - {ok, {command1, [{"-f1", true}, {"-o1", "foo"}], []}}, - GetOptions, ["command1", "-f1"]), - check_parse_arguments( - {ok, {command1, [{"-f1", false}, {"-o1", "blah"}], []}}, - GetOptions, ["-o1", "blah", "command1"]), - check_parse_arguments( - {ok, {command1, [{"-f1", false}, {"-o1", "blah"}], ["quux"]}}, - GetOptions, ["-o1", "blah", "command1", "quux"]), - check_parse_arguments( - {ok, {command1, [{"-f1", true}, {"-o1", "blah"}], ["quux", "baz"]}}, - GetOptions, ["command1", "quux", "-f1", "-o1", "blah", "baz"]), - %% For duplicate flags, the last one counts - check_parse_arguments( - {ok, {command1, [{"-f1", false}, {"-o1", "second"}], []}}, - GetOptions, ["-o1", "first", "command1", "-o1", "second"]), - %% If the flag "eats" the command, the command won't be recognised - check_parse_arguments(no_command, GetOptions, - ["-o1", "command1", "quux"]), - %% If a flag eats another flag, the eaten flag won't be recognised - check_parse_arguments( - {ok, {command1, [{"-f1", false}, {"-o1", "-f1"}], []}}, - GetOptions, ["command1", "-o1", "-f1"]), - - %% Now for some command-specific flags... - check_parse_arguments( - {ok, {command2, [{"-f1", false}, {"-f2", false}, - {"-o1", "foo"}, {"-o2", "bar"}], []}}, - GetOptions, ["command2"]), - - check_parse_arguments( - {ok, {command2, [{"-f1", false}, {"-f2", true}, - {"-o1", "baz"}, {"-o2", "bar"}], ["quux", "foo"]}}, - GetOptions, ["-f2", "command2", "quux", "-o1", "baz", "foo"]), - - passed. - -test_dynamic_mirroring() -> - %% Just unit tests of the node selection logic, see multi node - %% tests for the rest... - Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, - {MNode, SNodes, SSNodes}, All) -> - {ok, M} = rabbit_mirror_queue_misc:module(Policy), - {NewM, NewSs0} = M:suggested_queue_nodes( - Params, MNode, SNodes, SSNodes, All), - NewSs1 = lists:sort(NewSs0), - case dm_list_match(NewSs, NewSs1, ExtraSs) of - ok -> ok; - error -> exit({no_match, NewSs, NewSs1, ExtraSs}) - end - end, - - Test({a,[b,c],0},<<"all">>,'_',{a,[], []}, [a,b,c]), - Test({a,[b,c],0},<<"all">>,'_',{a,[b,c],[b,c]},[a,b,c]), - Test({a,[b,c],0},<<"all">>,'_',{a,[d], [d]}, [a,b,c]), - - N = fun (Atoms) -> [list_to_binary(atom_to_list(A)) || A <- Atoms] end, - - %% Add a node - Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[b],[b]},[a,b,c,d]), - Test({b,[a,c],0},<<"nodes">>,N([a,b,c]),{b,[a],[a]},[a,b,c,d]), - %% Add two nodes and drop one - Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[d],[d]},[a,b,c,d]), - %% Don't try to include nodes that are not running - Test({a,[b], 0},<<"nodes">>,N([a,b,f]),{a,[b],[b]},[a,b,c,d]), - %% If we can't find any of the nodes listed then just keep the master - Test({a,[], 0},<<"nodes">>,N([f,g,h]),{a,[b],[b]},[a,b,c,d]), - %% And once that's happened, still keep the master even when not listed, - %% if nothing is synced - Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[], []}, [a,b,c,d]), - Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[b],[]}, [a,b,c,d]), - %% But if something is synced we can lose the master - but make - %% sure we pick the new master from the nodes which are synced! - Test({b,[c], 0},<<"nodes">>,N([b,c]), {a,[b],[b]},[a,b,c,d]), - Test({b,[c], 0},<<"nodes">>,N([c,b]), {a,[b],[b]},[a,b,c,d]), - - Test({a,[], 1},<<"exactly">>,2,{a,[], []}, [a,b,c,d]), - Test({a,[], 2},<<"exactly">>,3,{a,[], []}, [a,b,c,d]), - Test({a,[c], 0},<<"exactly">>,2,{a,[c], [c]}, [a,b,c,d]), - Test({a,[c], 1},<<"exactly">>,3,{a,[c], [c]}, [a,b,c,d]), - Test({a,[c], 0},<<"exactly">>,2,{a,[c,d],[c,d]},[a,b,c,d]), - Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d],[c,d]},[a,b,c,d]), - - passed. - -%% Does the first list match the second where the second is required -%% to have exactly Extra superfluous items? -dm_list_match([], [], 0) -> ok; -dm_list_match(_, [], _Extra) -> error; -dm_list_match([H|T1], [H |T2], Extra) -> dm_list_match(T1, T2, Extra); -dm_list_match(L1, [_H|T2], Extra) -> dm_list_match(L1, T2, Extra - 1). - -test_user_management() -> - - %% lots if stuff that should fail - {error, {no_such_user, _}} = - control_action(delete_user, ["foo"]), - {error, {no_such_user, _}} = - control_action(change_password, ["foo", "baz"]), - {error, {no_such_vhost, _}} = - control_action(delete_vhost, ["/testhost"]), - {error, {no_such_user, _}} = - control_action(set_permissions, ["foo", ".*", ".*", ".*"]), - {error, {no_such_user, _}} = - control_action(clear_permissions, ["foo"]), - {error, {no_such_user, _}} = - control_action(list_user_permissions, ["foo"]), - {error, {no_such_vhost, _}} = - control_action(list_permissions, [], [{"-p", "/testhost"}]), - {error, {invalid_regexp, _, _}} = - control_action(set_permissions, ["guest", "+foo", ".*", ".*"]), - {error, {no_such_user, _}} = - control_action(set_user_tags, ["foo", "bar"]), - - %% user creation - ok = control_action(add_user, ["foo", "bar"]), - {error, {user_already_exists, _}} = - control_action(add_user, ["foo", "bar"]), - ok = control_action(clear_password, ["foo"]), - ok = control_action(change_password, ["foo", "baz"]), - - TestTags = fun (Tags) -> - Args = ["foo" | [atom_to_list(T) || T <- Tags]], - ok = control_action(set_user_tags, Args), - {ok, #internal_user{tags = Tags}} = - rabbit_auth_backend_internal:lookup_user(<<"foo">>), - ok = control_action(list_users, []) - end, - TestTags([foo, bar, baz]), - TestTags([administrator]), - TestTags([]), - - %% vhost creation - ok = control_action(add_vhost, ["/testhost"]), - {error, {vhost_already_exists, _}} = - control_action(add_vhost, ["/testhost"]), - ok = control_action(list_vhosts, []), - - %% user/vhost mapping - ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], - [{"-p", "/testhost"}]), - ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], - [{"-p", "/testhost"}]), - ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], - [{"-p", "/testhost"}]), - ok = control_action(list_permissions, [], [{"-p", "/testhost"}]), - ok = control_action(list_permissions, [], [{"-p", "/testhost"}]), - ok = control_action(list_user_permissions, ["foo"]), - - %% user/vhost unmapping - ok = control_action(clear_permissions, ["foo"], [{"-p", "/testhost"}]), - ok = control_action(clear_permissions, ["foo"], [{"-p", "/testhost"}]), - - %% vhost deletion - ok = control_action(delete_vhost, ["/testhost"]), - {error, {no_such_vhost, _}} = - control_action(delete_vhost, ["/testhost"]), - - %% deleting a populated vhost - ok = control_action(add_vhost, ["/testhost"]), - ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], - [{"-p", "/testhost"}]), - {new, _} = rabbit_amqqueue:declare( - rabbit_misc:r(<<"/testhost">>, queue, <<"test">>), - true, false, [], none), - ok = control_action(delete_vhost, ["/testhost"]), - - %% user deletion - ok = control_action(delete_user, ["foo"]), - {error, {no_such_user, _}} = - control_action(delete_user, ["foo"]), - - passed. - -test_runtime_parameters() -> - rabbit_runtime_parameters_test:register(), - Good = fun(L) -> ok = control_action(set_parameter, L) end, - Bad = fun(L) -> {error_string, _} = control_action(set_parameter, L) end, - - %% Acceptable for bijection - Good(["test", "good", "\"ignore\""]), - Good(["test", "good", "123"]), - Good(["test", "good", "true"]), - Good(["test", "good", "false"]), - Good(["test", "good", "null"]), - Good(["test", "good", "{\"key\": \"value\"}"]), - - %% Invalid json - Bad(["test", "good", "atom"]), - Bad(["test", "good", "{\"foo\": \"bar\""]), - Bad(["test", "good", "{foo: \"bar\"}"]), - - %% Test actual validation hook - Good(["test", "maybe", "\"good\""]), - Bad(["test", "maybe", "\"bad\""]), - Good(["test", "admin", "\"ignore\""]), %% ctl means 'user' -> none - - ok = control_action(list_parameters, []), - - ok = control_action(clear_parameter, ["test", "good"]), - ok = control_action(clear_parameter, ["test", "maybe"]), - ok = control_action(clear_parameter, ["test", "admin"]), - {error_string, _} = - control_action(clear_parameter, ["test", "neverexisted"]), - - %% We can delete for a component that no longer exists - Good(["test", "good", "\"ignore\""]), - rabbit_runtime_parameters_test:unregister(), - ok = control_action(clear_parameter, ["test", "good"]), - passed. - -test_policy_validation() -> - rabbit_runtime_parameters_test:register_policy_validator(), - SetPol = fun (Key, Val) -> - control_action_opts( - ["set_policy", "name", ".*", - rabbit_misc:format("{\"~s\":~p}", [Key, Val])]) - end, - - ok = SetPol("testeven", []), - ok = SetPol("testeven", [1, 2]), - ok = SetPol("testeven", [1, 2, 3, 4]), - ok = SetPol("testpos", [2, 5, 5678]), - - error = SetPol("testpos", [-1, 0, 1]), - error = SetPol("testeven", [ 1, 2, 3]), - - ok = control_action(clear_policy, ["name"]), - rabbit_runtime_parameters_test:unregister_policy_validator(), - passed. - -test_policy_opts_validation() -> - Set = fun (Extra) -> control_action_opts( - ["set_policy", "name", ".*", "{\"ha-mode\":\"all\"}" - | Extra]) end, - OK = fun (Extra) -> ok = Set(Extra) end, - Fail = fun (Extra) -> error = Set(Extra) end, - - OK ([]), - - OK (["--priority", "0"]), - OK (["--priority", "3"]), - Fail(["--priority", "banana"]), - Fail(["--priority"]), - - OK (["--apply-to", "all"]), - OK (["--apply-to", "queues"]), - Fail(["--apply-to", "bananas"]), - Fail(["--apply-to"]), - - OK (["--priority", "3", "--apply-to", "queues"]), - Fail(["--priority", "banana", "--apply-to", "queues"]), - Fail(["--priority", "3", "--apply-to", "bananas"]), - - Fail(["--offline"]), - - ok = control_action(clear_policy, ["name"]), - passed. - -test_ha_policy_validation() -> - Set = fun (JSON) -> control_action_opts( - ["set_policy", "name", ".*", JSON]) end, - OK = fun (JSON) -> ok = Set(JSON) end, - Fail = fun (JSON) -> error = Set(JSON) end, - - OK ("{\"ha-mode\":\"all\"}"), - Fail("{\"ha-mode\":\"made_up\"}"), - - Fail("{\"ha-mode\":\"nodes\"}"), - Fail("{\"ha-mode\":\"nodes\",\"ha-params\":2}"), - Fail("{\"ha-mode\":\"nodes\",\"ha-params\":[\"a\",2]}"), - OK ("{\"ha-mode\":\"nodes\",\"ha-params\":[\"a\",\"b\"]}"), - Fail("{\"ha-params\":[\"a\",\"b\"]}"), - - Fail("{\"ha-mode\":\"exactly\"}"), - Fail("{\"ha-mode\":\"exactly\",\"ha-params\":[\"a\",\"b\"]}"), - OK ("{\"ha-mode\":\"exactly\",\"ha-params\":2}"), - Fail("{\"ha-params\":2}"), - - OK ("{\"ha-mode\":\"all\",\"ha-sync-mode\":\"manual\"}"), - OK ("{\"ha-mode\":\"all\",\"ha-sync-mode\":\"automatic\"}"), - Fail("{\"ha-mode\":\"all\",\"ha-sync-mode\":\"made_up\"}"), - Fail("{\"ha-sync-mode\":\"manual\"}"), - Fail("{\"ha-sync-mode\":\"automatic\"}"), - - ok = control_action(clear_policy, ["name"]), - passed. - -test_queue_master_location_policy_validation() -> - Set = fun (JSON) -> - control_action_opts( ["set_policy", "name", ".*", JSON] ) - end, - OK = fun (JSON) -> ok = Set(JSON) end, - Fail = fun (JSON) -> error = Set(JSON) end, - - OK ("{\"x-queue-master-locator\":\"min-masters\"}"), - OK ("{\"x-queue-master-locator\":\"client-local\"}"), - OK ("{\"x-queue-master-locator\":\"random\"}"), - Fail("{\"x-queue-master-locator\":\"made_up\"}"), - - ok = control_action(clear_policy, ["name"]), - passed. - -test_server_status() -> - %% create a few things so there is some useful information to list - {_Writer, Limiter, Ch} = test_channel(), - [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], - {new, Queue = #amqqueue{}} <- - [rabbit_amqqueue:declare( - rabbit_misc:r(<<"/">>, queue, Name), - false, false, [], none)]], - ok = rabbit_amqqueue:basic_consume( - Q, true, Ch, Limiter, false, 0, <<"ctag">>, true, [], undefined), - - %% list queues - ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true), - - %% list exchanges - ok = info_action(list_exchanges, rabbit_exchange:info_keys(), true), - - %% list bindings - ok = info_action(list_bindings, rabbit_binding:info_keys(), true), - %% misc binding listing APIs - [_|_] = rabbit_binding:list_for_source( - rabbit_misc:r(<<"/">>, exchange, <<"">>)), - [_] = rabbit_binding:list_for_destination( - rabbit_misc:r(<<"/">>, queue, <<"foo">>)), - [_] = rabbit_binding:list_for_source_and_destination( - rabbit_misc:r(<<"/">>, exchange, <<"">>), - rabbit_misc:r(<<"/">>, queue, <<"foo">>)), - - %% list connections - {H, P} = find_listener(), - {ok, C} = gen_tcp:connect(H, P, []), - gen_tcp:send(C, <<"AMQP", 0, 0, 9, 1>>), - timer:sleep(100), - ok = info_action(list_connections, - rabbit_networking:connection_info_keys(), false), - %% close_connection - [ConnPid] = rabbit_networking:connections(), - ok = control_action(close_connection, [rabbit_misc:pid_to_string(ConnPid), - "go away"]), - - %% list channels - ok = info_action(list_channels, rabbit_channel:info_keys(), false), - - %% list consumers - ok = control_action(list_consumers, []), - - %% set vm memory high watermark - HWM = vm_memory_monitor:get_vm_memory_high_watermark(), - ok = control_action(set_vm_memory_high_watermark, ["1"]), - ok = control_action(set_vm_memory_high_watermark, ["1.0"]), - %% this will trigger an alarm - ok = control_action(set_vm_memory_high_watermark, ["0.0"]), - %% reset - ok = control_action(set_vm_memory_high_watermark, [float_to_list(HWM)]), - - %% eval - {error_string, _} = control_action(eval, ["\""]), - {error_string, _} = control_action(eval, ["a("]), - ok = control_action(eval, ["a."]), - - %% cleanup - [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]], - - unlink(Ch), - ok = rabbit_channel:shutdown(Ch), - - passed. - -test_amqp_connection_refusal() -> - [passed = test_amqp_connection_refusal(V) || - V <- [<<"AMQP",9,9,9,9>>, <<"AMQP",0,1,0,0>>, <<"XXXX",0,0,9,1>>]], - passed. - -test_amqp_connection_refusal(Header) -> - {H, P} = find_listener(), - {ok, C} = gen_tcp:connect(H, P, [binary, {active, false}]), - ok = gen_tcp:send(C, Header), - {ok, <<"AMQP",0,0,9,1>>} = gen_tcp:recv(C, 8, 100), - ok = gen_tcp:close(C), - passed. - -find_listener() -> - [#listener{host = H, port = P} | _] = - [L || L = #listener{node = N, protocol = amqp} - <- rabbit_networking:active_listeners(), - N =:= node()], - {H, P}. - -test_writer(Pid) -> - receive - {'$gen_call', From, flush} -> gen_server:reply(From, ok), - test_writer(Pid); - {send_command, Method} -> Pid ! Method, - test_writer(Pid); - shutdown -> ok - end. - -test_channel() -> - Me = self(), - Writer = spawn(fun () -> test_writer(Me) end), - {ok, Limiter} = rabbit_limiter:start_link(no_id), - {ok, Ch} = rabbit_channel:start_link( - 1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1, - user(<<"guest">>), <<"/">>, [], Me, Limiter), - {Writer, Limiter, Ch}. - -test_spawn() -> - {Writer, _Limiter, Ch} = test_channel(), - ok = rabbit_channel:do(Ch, #'channel.open'{}), - receive #'channel.open_ok'{} -> ok - after ?TIMEOUT -> throw(failed_to_receive_channel_open_ok) - end, - {Writer, Ch}. - -test_spawn(Node) -> - rpc:call(Node, ?MODULE, test_spawn_remote, []). - -%% Spawn an arbitrary long lived process, so we don't end up linking -%% the channel to the short-lived process (RPC, here) spun up by the -%% RPC server. -test_spawn_remote() -> - RPC = self(), - spawn(fun () -> - {Writer, Ch} = test_spawn(), - RPC ! {Writer, Ch}, - link(Ch), - receive - _ -> ok - end - end), - receive Res -> Res - after ?TIMEOUT -> throw(failed_to_receive_result) - end. - -user(Username) -> - #user{username = Username, - tags = [administrator], - authz_backends = [{rabbit_auth_backend_internal, none}]}. - -test_confirms() -> - {_Writer, Ch} = test_spawn(), - DeclareBindDurableQueue = - fun() -> - rabbit_channel:do(Ch, #'queue.declare'{durable = true}), - receive #'queue.declare_ok'{queue = Q0} -> - rabbit_channel:do(Ch, #'queue.bind'{ - queue = Q0, - exchange = <<"amq.direct">>, - routing_key = "magic" }), - receive #'queue.bind_ok'{} -> Q0 - after ?TIMEOUT -> throw(failed_to_bind_queue) - end - after ?TIMEOUT -> throw(failed_to_declare_queue) - end - end, - %% Declare and bind two queues - QName1 = DeclareBindDurableQueue(), - QName2 = DeclareBindDurableQueue(), - %% Get the first one's pid (we'll crash it later) - {ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName1)), - QPid1 = Q1#amqqueue.pid, - %% Enable confirms - rabbit_channel:do(Ch, #'confirm.select'{}), - receive - #'confirm.select_ok'{} -> ok - after ?TIMEOUT -> throw(failed_to_enable_confirms) - end, - %% Publish a message - rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"amq.direct">>, - routing_key = "magic" - }, - rabbit_basic:build_content( - #'P_basic'{delivery_mode = 2}, <<"">>)), - %% We must not kill the queue before the channel has processed the - %% 'publish'. - ok = rabbit_channel:flush(Ch), - %% Crash the queue - QPid1 ! boom, - %% Wait for a nack - receive - #'basic.nack'{} -> ok; - #'basic.ack'{} -> throw(received_ack_instead_of_nack) - after ?TIMEOUT-> throw(did_not_receive_nack) - end, - receive - #'basic.ack'{} -> throw(received_ack_when_none_expected) - after 1000 -> ok - end, - %% Cleanup - rabbit_channel:do(Ch, #'queue.delete'{queue = QName2}), - receive - #'queue.delete_ok'{} -> ok - after ?TIMEOUT -> throw(failed_to_cleanup_queue) - end, - unlink(Ch), - ok = rabbit_channel:shutdown(Ch), - - passed. - -test_with_state() -> - fhc_state = gen_server2:with_state(file_handle_cache, - fun (S) -> element(1, S) end), - passed. - -test_mcall() -> - P1 = spawn(fun gs2_test_listener/0), - register(foo, P1), - global:register_name(gfoo, P1), - - P2 = spawn(fun() -> exit(bang) end), - %% ensure P2 is dead (ignore the race setting up the monitor) - await_exit(P2), - - P3 = spawn(fun gs2_test_crasher/0), - - %% since P2 crashes almost immediately and P3 after receiving its first - %% message, we have to spawn a few more processes to handle the additional - %% cases we're interested in here - register(baz, spawn(fun gs2_test_crasher/0)), - register(bog, spawn(fun gs2_test_crasher/0)), - global:register_name(gbaz, spawn(fun gs2_test_crasher/0)), - - NoNode = rabbit_nodes:make("nonode"), - - Targets = - %% pids - [P1, P2, P3] - ++ - %% registered names - [foo, bar, baz] - ++ - %% {Name, Node} pairs - [{foo, node()}, {bar, node()}, {bog, node()}, {foo, NoNode}] - ++ - %% {global, Name} - [{global, gfoo}, {global, gbar}, {global, gbaz}], - - GoodResults = [{D, goodbye} || D <- [P1, foo, - {foo, node()}, - {global, gfoo}]], - - BadResults = [{P2, noproc}, % died before use - {P3, boom}, % died on first use - {bar, noproc}, % never registered - {baz, boom}, % died on first use - {{bar, node()}, noproc}, % never registered - {{bog, node()}, boom}, % died on first use - {{foo, NoNode}, nodedown}, % invalid node - {{global, gbar}, noproc}, % never registered globally - {{global, gbaz}, boom}], % died on first use - - {Replies, Errors} = gen_server2:mcall([{T, hello} || T <- Targets]), - true = lists:sort(Replies) == lists:sort(GoodResults), - true = lists:sort(Errors) == lists:sort(BadResults), - - %% cleanup (ignore the race setting up the monitor) - P1 ! stop, - await_exit(P1), - passed. - -await_exit(Pid) -> - MRef = erlang:monitor(process, Pid), - receive - {'DOWN', MRef, _, _, _} -> ok - end. - -gs2_test_crasher() -> - receive - {'$gen_call', _From, hello} -> exit(boom) - end. - -gs2_test_listener() -> - receive - {'$gen_call', From, hello} -> - gen_server2:reply(From, goodbye), - gs2_test_listener(); - stop -> - ok - end. - -test_statistics_event_receiver(Pid) -> - receive - Foo -> Pid ! Foo, test_statistics_event_receiver(Pid) - end. - -test_ch_statistics_receive_event(Ch, Matcher) -> - rabbit_channel:flush(Ch), - Ch ! emit_stats, - test_ch_statistics_receive_event1(Ch, Matcher). - -test_ch_statistics_receive_event1(Ch, Matcher) -> - receive #event{type = channel_stats, props = Props} -> - case Matcher(Props) of - true -> Props; - _ -> test_ch_statistics_receive_event1(Ch, Matcher) - end - after ?TIMEOUT -> throw(failed_to_receive_event) - end. - -test_ch_statistics() -> - application:set_env(rabbit, collect_statistics, fine), - - %% ATM this just tests the queue / exchange stats in channels. That's - %% by far the most complex code though. - - %% Set up a channel and queue - {_Writer, Ch} = test_spawn(), - rabbit_channel:do(Ch, #'queue.declare'{}), - QName = receive #'queue.declare_ok'{queue = Q0} -> Q0 - after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok) - end, - QRes = rabbit_misc:r(<<"/">>, queue, QName), - X = rabbit_misc:r(<<"/">>, exchange, <<"">>), - - rabbit_tests_event_receiver:start(self(), [node()], [channel_stats]), - - %% Check stats empty - Event = test_ch_statistics_receive_event(Ch, fun (_) -> true end), - [] = proplists:get_value(channel_queue_stats, Event), - [] = proplists:get_value(channel_exchange_stats, Event), - [] = proplists:get_value(channel_queue_exchange_stats, Event), - - %% Publish and get a message - rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>, - routing_key = QName}, - rabbit_basic:build_content(#'P_basic'{}, <<"">>)), - rabbit_channel:do(Ch, #'basic.get'{queue = QName}), - - %% Check the stats reflect that - Event2 = test_ch_statistics_receive_event( - Ch, - fun (E) -> - length(proplists:get_value( - channel_queue_exchange_stats, E)) > 0 - end), - [{QRes, [{get,1}]}] = proplists:get_value(channel_queue_stats, Event2), - [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2), - [{{QRes,X},[{publish,1}]}] = - proplists:get_value(channel_queue_exchange_stats, Event2), - - %% Check the stats remove stuff on queue deletion - rabbit_channel:do(Ch, #'queue.delete'{queue = QName}), - Event3 = test_ch_statistics_receive_event( - Ch, - fun (E) -> - length(proplists:get_value( - channel_queue_exchange_stats, E)) == 0 - end), - - [] = proplists:get_value(channel_queue_stats, Event3), - [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event3), - [] = proplists:get_value(channel_queue_exchange_stats, Event3), - - rabbit_channel:shutdown(Ch), - rabbit_tests_event_receiver:stop(), - passed. - -test_queue_statistics_receive_event(Q, Matcher) -> - %% Q ! emit_stats, - test_queue_statistics_receive_event1(Q, Matcher). - -test_queue_statistics_receive_event1(Q, Matcher) -> - receive #event{type = queue_stats, props = Props} -> - case Matcher(Props) of - true -> Props; - _ -> test_queue_statistics_receive_event1(Q, Matcher) - end - after ?TIMEOUT -> throw(failed_to_receive_event) - end. - -test_head_message_timestamp_statistic() -> - %% Can't find a way to receive the ack here so can't test pending acks status - - application:set_env(rabbit, collect_statistics, fine), - - %% Set up a channel and queue - {_Writer, Ch} = test_spawn(), - rabbit_channel:do(Ch, #'queue.declare'{}), - QName = receive #'queue.declare_ok'{queue = Q0} -> Q0 - after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok) - end, - QRes = rabbit_misc:r(<<"/">>, queue, QName), - - {ok, Q1} = rabbit_amqqueue:lookup(QRes), - QPid = Q1#amqqueue.pid, - - %% Set up event receiver for queue - rabbit_tests_event_receiver:start(self(), [node()], [queue_stats]), - - %% Check timestamp is empty when queue is empty - Event1 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end), - '' = proplists:get_value(head_message_timestamp, Event1), - - %% Publish two messages and check timestamp is that of first message - rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>, - routing_key = QName}, - rabbit_basic:build_content(#'P_basic'{timestamp = 1}, <<"">>)), - rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>, - routing_key = QName}, - rabbit_basic:build_content(#'P_basic'{timestamp = 2}, <<"">>)), - Event2 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end), - 1 = proplists:get_value(head_message_timestamp, Event2), - - %% Get first message and check timestamp is that of second message - rabbit_channel:do(Ch, #'basic.get'{queue = QName, no_ack = true}), - Event3 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end), - 2 = proplists:get_value(head_message_timestamp, Event3), - - %% Get second message and check timestamp is empty again - rabbit_channel:do(Ch, #'basic.get'{queue = QName, no_ack = true}), - Event4 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end), - '' = proplists:get_value(head_message_timestamp, Event4), - - %% Teardown - rabbit_channel:do(Ch, #'queue.delete'{queue = QName}), - rabbit_channel:shutdown(Ch), - rabbit_tests_event_receiver:stop(), - - passed. - -test_refresh_events(SecondaryNode) -> - rabbit_tests_event_receiver:start(self(), [node(), SecondaryNode], - [channel_created, queue_created]), - - {_Writer, Ch} = test_spawn(), - expect_events(pid, Ch, channel_created), - rabbit_channel:shutdown(Ch), - - {_Writer2, Ch2} = test_spawn(SecondaryNode), - expect_events(pid, Ch2, channel_created), - rabbit_channel:shutdown(Ch2), - - {new, #amqqueue{name = QName} = Q} = - rabbit_amqqueue:declare(test_queue(), false, false, [], none), - expect_events(name, QName, queue_created), - rabbit_amqqueue:delete(Q, false, false), - - rabbit_tests_event_receiver:stop(), - passed. - -expect_events(Tag, Key, Type) -> - expect_event(Tag, Key, Type), - rabbit:force_event_refresh(make_ref()), - expect_event(Tag, Key, Type). - -expect_event(Tag, Key, Type) -> - receive #event{type = Type, props = Props} -> - case pget(Tag, Props) of - Key -> ok; - _ -> expect_event(Tag, Key, Type) - end - after ?TIMEOUT -> throw({failed_to_receive_event, Type}) - end. - -test_delegates_async(SecondaryNode) -> - Self = self(), - Sender = fun (Pid) -> Pid ! {invoked, Self} end, - - Responder = make_responder(fun ({invoked, Pid}) -> Pid ! response end), - - ok = delegate:invoke_no_result(spawn(Responder), Sender), - ok = delegate:invoke_no_result(spawn(SecondaryNode, Responder), Sender), - await_response(2), - - LocalPids = spawn_responders(node(), Responder, 10), - RemotePids = spawn_responders(SecondaryNode, Responder, 10), - ok = delegate:invoke_no_result(LocalPids ++ RemotePids, Sender), - await_response(20), - - passed. - -make_responder(FMsg) -> make_responder(FMsg, timeout). -make_responder(FMsg, Throw) -> - fun () -> - receive Msg -> FMsg(Msg) - after ?TIMEOUT -> throw(Throw) - end - end. - -spawn_responders(Node, Responder, Count) -> - [spawn(Node, Responder) || _ <- lists:seq(1, Count)]. - -await_response(0) -> - ok; -await_response(Count) -> - receive - response -> ok, - await_response(Count - 1) - after ?TIMEOUT -> throw(timeout) - end. - -must_exit(Fun) -> - try - Fun(), - throw(exit_not_thrown) - catch - exit:_ -> ok - end. - -test_delegates_sync(SecondaryNode) -> - Sender = fun (Pid) -> gen_server:call(Pid, invoked, infinity) end, - BadSender = fun (_Pid) -> exit(exception) end, - - Responder = make_responder(fun ({'$gen_call', From, invoked}) -> - gen_server:reply(From, response) - end), - - BadResponder = make_responder(fun ({'$gen_call', From, invoked}) -> - gen_server:reply(From, response) - end, bad_responder_died), - - response = delegate:invoke(spawn(Responder), Sender), - response = delegate:invoke(spawn(SecondaryNode, Responder), Sender), - - must_exit(fun () -> delegate:invoke(spawn(BadResponder), BadSender) end), - must_exit(fun () -> - delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end), - - LocalGoodPids = spawn_responders(node(), Responder, 2), - RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2), - LocalBadPids = spawn_responders(node(), BadResponder, 2), - RemoteBadPids = spawn_responders(SecondaryNode, BadResponder, 2), - - {GoodRes, []} = delegate:invoke(LocalGoodPids ++ RemoteGoodPids, Sender), - true = lists:all(fun ({_, response}) -> true end, GoodRes), - GoodResPids = [Pid || {Pid, _} <- GoodRes], - - Good = lists:usort(LocalGoodPids ++ RemoteGoodPids), - Good = lists:usort(GoodResPids), - - {[], BadRes} = delegate:invoke(LocalBadPids ++ RemoteBadPids, BadSender), - true = lists:all(fun ({_, {exit, exception, _}}) -> true end, BadRes), - BadResPids = [Pid || {Pid, _} <- BadRes], - - Bad = lists:usort(LocalBadPids ++ RemoteBadPids), - Bad = lists:usort(BadResPids), - - MagicalPids = [rabbit_misc:string_to_pid(Str) || - Str <- ["<nonode@nohost.0.1.0>", "<nonode@nohost.0.2.0>"]], - {[], BadNodes} = delegate:invoke(MagicalPids, Sender), - true = lists:all( - fun ({_, {exit, {nodedown, nonode@nohost}, _Stack}}) -> true end, - BadNodes), - BadNodesPids = [Pid || {Pid, _} <- BadNodes], - - Magical = lists:usort(MagicalPids), - Magical = lists:usort(BadNodesPids), - - passed. - -test_queue_cleanup(_SecondaryNode) -> - {_Writer, Ch} = test_spawn(), - rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }), - receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} -> - ok - after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok) - end, - rabbit_channel:shutdown(Ch), - rabbit:stop(), - rabbit:start(), - {_Writer2, Ch2} = test_spawn(), - rabbit_channel:do(Ch2, #'queue.declare'{ passive = true, - queue = ?CLEANUP_QUEUE_NAME }), - receive - #'channel.close'{reply_code = ?NOT_FOUND} -> - ok - after ?TIMEOUT -> throw(failed_to_receive_channel_exit) - end, - rabbit_channel:shutdown(Ch2), - passed. - -test_declare_on_dead_queue(SecondaryNode) -> - QueueName = rabbit_misc:r(<<"/">>, queue, ?CLEANUP_QUEUE_NAME), - Self = self(), - Pid = spawn(SecondaryNode, - fun () -> - {new, #amqqueue{name = QueueName, pid = QPid}} = - rabbit_amqqueue:declare(QueueName, false, false, [], - none), - exit(QPid, kill), - Self ! {self(), killed, QPid} - end), - receive - {Pid, killed, OldPid} -> - Q = dead_queue_loop(QueueName, OldPid), - {ok, 0} = rabbit_amqqueue:delete(Q, false, false), - passed - after ?TIMEOUT -> throw(failed_to_create_and_kill_queue) - end. - -dead_queue_loop(QueueName, OldPid) -> - {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none), - case Q#amqqueue.pid of - OldPid -> timer:sleep(25), - dead_queue_loop(QueueName, OldPid); - _ -> true = rabbit_misc:is_process_alive(Q#amqqueue.pid), - Q - end. - -%%--------------------------------------------------------------------- - -control_action(Command, Args) -> - control_action(Command, node(), Args, default_options()). - -control_action(Command, Args, NewOpts) -> - control_action(Command, node(), Args, - expand_options(default_options(), NewOpts)). - -control_action(Command, Node, Args, Opts) -> - case catch rabbit_control_main:action( - Command, Node, Args, Opts, - fun (Format, Args1) -> - io:format(Format ++ " ...~n", Args1) - end) of - ok -> - io:format("done.~n"), - ok; - Other -> - io:format("failed.~n"), - Other - end. - -control_action_opts(Raw) -> - NodeStr = atom_to_list(node()), - case rabbit_control_main:parse_arguments(Raw, NodeStr) of - {ok, {Cmd, Opts, Args}} -> - case control_action(Cmd, node(), Args, Opts) of - ok -> ok; - _ -> error - end; - _ -> - error - end. - -info_action(Command, Args, CheckVHost) -> - ok = control_action(Command, []), - if CheckVHost -> ok = control_action(Command, [], ["-p", "/"]); - true -> ok - end, - ok = control_action(Command, lists:map(fun atom_to_list/1, Args)), - {bad_argument, dummy} = control_action(Command, ["dummy"]), - ok. - -default_options() -> [{"-p", "/"}, {"-q", "false"}]. - -expand_options(As, Bs) -> - lists:foldl(fun({K, _}=A, R) -> - case proplists:is_defined(K, R) of - true -> R; - false -> [A | R] - end - end, Bs, As). - -check_parse_arguments(ExpRes, Fun, As) -> - SortRes = - fun (no_command) -> no_command; - ({ok, {C, KVs, As1}}) -> {ok, {C, lists:sort(KVs), As1}} - end, - - true = SortRes(ExpRes) =:= SortRes(Fun(As)). - -empty_files(Files) -> - [case file:read_file_info(File) of - {ok, FInfo} -> FInfo#file_info.size == 0; - Error -> Error - end || File <- Files]. - -non_empty_files(Files) -> - [case EmptyFile of - {error, Reason} -> {error, Reason}; - _ -> not(EmptyFile) - end || EmptyFile <- empty_files(Files)]. - -test_logs_working(MainLogFile, SaslLogFile) -> - ok = rabbit_log:error("foo bar"), - ok = error_logger:error_report(crash_report, [foo, bar]), - %% give the error loggers some time to catch up - timer:sleep(100), - [true, true] = non_empty_files([MainLogFile, SaslLogFile]), - ok. - -set_permissions(Path, Mode) -> - case file:read_file_info(Path) of - {ok, FInfo} -> file:write_file_info( - Path, - FInfo#file_info{mode=Mode}); - Error -> Error - end. - -clean_logs(Files, Suffix) -> - [begin - ok = delete_file(File), - ok = delete_file([File, Suffix]) - end || File <- Files], - ok. - -assert_ram_node() -> - case rabbit_mnesia:node_type() of - disc -> exit('not_ram_node'); - ram -> ok - end. - -assert_disc_node() -> - case rabbit_mnesia:node_type() of - disc -> ok; - ram -> exit('not_disc_node') - end. - -delete_file(File) -> - case file:delete(File) of - ok -> ok; - {error, enoent} -> ok; - Error -> Error - end. - -make_files_non_writable(Files) -> - [ok = file:write_file_info(File, #file_info{mode=0}) || - File <- Files], - ok. - -add_log_handlers(Handlers) -> - [ok = error_logger:add_report_handler(Handler, Args) || - {Handler, Args} <- Handlers], - ok. - -%% sasl_report_file_h returns [] during terminate -%% see: https://github.com/erlang/otp/blob/maint/lib/stdlib/src/error_logger_file_h.erl#L98 -%% -%% error_logger_file_h returns ok since OTP 18.1 -%% see: https://github.com/erlang/otp/blob/maint/lib/stdlib/src/error_logger_file_h.erl#L98 -delete_log_handlers(Handlers) -> - [ok_or_empty_list(error_logger:delete_report_handler(Handler)) - || Handler <- Handlers], - ok. - -ok_or_empty_list([]) -> - []; -ok_or_empty_list(ok) -> - ok. - -test_supervisor_delayed_restart() -> - test_sup:test_supervisor_delayed_restart(). - -test_file_handle_cache() -> - %% test copying when there is just one spare handle - Limit = file_handle_cache:get_limit(), - ok = file_handle_cache:set_limit(5), %% 1 or 2 sockets, 2 msg_stores - TmpDir = filename:join(rabbit_mnesia:dir(), "tmp"), - ok = filelib:ensure_dir(filename:join(TmpDir, "nothing")), - [Src1, Dst1, Src2, Dst2] = Files = - [filename:join(TmpDir, Str) || Str <- ["file1", "file2", "file3", "file4"]], - Content = <<"foo">>, - CopyFun = fun (Src, Dst) -> - {ok, Hdl} = prim_file:open(Src, [binary, write]), - ok = prim_file:write(Hdl, Content), - ok = prim_file:sync(Hdl), - prim_file:close(Hdl), - - {ok, SrcHdl} = file_handle_cache:open(Src, [read], []), - {ok, DstHdl} = file_handle_cache:open(Dst, [write], []), - Size = size(Content), - {ok, Size} = file_handle_cache:copy(SrcHdl, DstHdl, Size), - ok = file_handle_cache:delete(SrcHdl), - ok = file_handle_cache:delete(DstHdl) - end, - Pid = spawn(fun () -> {ok, Hdl} = file_handle_cache:open( - filename:join(TmpDir, "file5"), - [write], []), - receive {next, Pid1} -> Pid1 ! {next, self()} end, - file_handle_cache:delete(Hdl), - %% This will block and never return, so we - %% exercise the fhc tidying up the pending - %% queue on the death of a process. - ok = CopyFun(Src1, Dst1) - end), - ok = CopyFun(Src1, Dst1), - ok = file_handle_cache:set_limit(2), - Pid ! {next, self()}, - receive {next, Pid} -> ok end, - timer:sleep(100), - Pid1 = spawn(fun () -> CopyFun(Src2, Dst2) end), - timer:sleep(100), - erlang:monitor(process, Pid), - erlang:monitor(process, Pid1), - exit(Pid, kill), - exit(Pid1, kill), - receive {'DOWN', _MRef, process, Pid, _Reason} -> ok end, - receive {'DOWN', _MRef1, process, Pid1, _Reason1} -> ok end, - [file:delete(File) || File <- Files], - ok = file_handle_cache:set_limit(Limit), - passed. - -test_backing_queue() -> - case application:get_env(rabbit, backing_queue_module) of - {ok, rabbit_priority_queue} -> - {ok, FileSizeLimit} = - application:get_env(rabbit, msg_store_file_size_limit), - application:set_env(rabbit, msg_store_file_size_limit, 512), - {ok, MaxJournal} = - application:get_env(rabbit, queue_index_max_journal_entries), - application:set_env(rabbit, queue_index_max_journal_entries, 128), - passed = test_msg_store(), - application:set_env(rabbit, msg_store_file_size_limit, - FileSizeLimit), - [begin - application:set_env( - rabbit, queue_index_embed_msgs_below, Bytes), - passed = test_queue_index(), - passed = test_queue_index_props(), - passed = test_variable_queue(), - passed = test_variable_queue_delete_msg_store_files_callback(), - passed = test_queue_recover() - end || Bytes <- [0, 1024]], - application:set_env(rabbit, queue_index_max_journal_entries, - MaxJournal), - %% We will have restarted the message store, and thus changed - %% the order of the children of rabbit_sup. This will cause - %% problems if there are subsequent failures - see bug 24262. - ok = restart_app(), - passed; - _ -> - passed - end. - -restart_msg_store_empty() -> - ok = rabbit_variable_queue:stop_msg_store(), - ok = rabbit_variable_queue:start_msg_store( - undefined, {fun (ok) -> finished end, ok}). - -msg_id_bin(X) -> - erlang:md5(term_to_binary(X)). - -msg_store_client_init(MsgStore, Ref) -> - rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined). - -on_disk_capture() -> - receive - {await, MsgIds, Pid} -> on_disk_capture([], MsgIds, Pid); - stop -> done - end. - -on_disk_capture([_|_], _Awaiting, Pid) -> - Pid ! {self(), surplus}; -on_disk_capture(OnDisk, Awaiting, Pid) -> - receive - {on_disk, MsgIdsS} -> - MsgIds = gb_sets:to_list(MsgIdsS), - on_disk_capture(OnDisk ++ (MsgIds -- Awaiting), Awaiting -- MsgIds, - Pid); - stop -> - done - after (case Awaiting of [] -> 200; _ -> ?TIMEOUT end) -> - case Awaiting of - [] -> Pid ! {self(), arrived}, on_disk_capture(); - _ -> Pid ! {self(), timeout} - end - end. - -on_disk_await(Pid, MsgIds) when is_list(MsgIds) -> - Pid ! {await, MsgIds, self()}, - receive - {Pid, arrived} -> ok; - {Pid, Error} -> Error - end. - -on_disk_stop(Pid) -> - MRef = erlang:monitor(process, Pid), - Pid ! stop, - receive {'DOWN', MRef, process, Pid, _Reason} -> - ok - end. - -msg_store_client_init_capture(MsgStore, Ref) -> - Pid = spawn(fun on_disk_capture/0), - {Pid, rabbit_msg_store:client_init( - MsgStore, Ref, fun (MsgIds, _ActionTaken) -> - Pid ! {on_disk, MsgIds} - end, undefined)}. - -msg_store_contains(Atom, MsgIds, MSCState) -> - Atom = lists:foldl( - fun (MsgId, Atom1) when Atom1 =:= Atom -> - rabbit_msg_store:contains(MsgId, MSCState) end, - Atom, MsgIds). - -msg_store_read(MsgIds, MSCState) -> - lists:foldl(fun (MsgId, MSCStateM) -> - {{ok, MsgId}, MSCStateN} = rabbit_msg_store:read( - MsgId, MSCStateM), - MSCStateN - end, MSCState, MsgIds). - -msg_store_write(MsgIds, MSCState) -> - ok = lists:foldl(fun (MsgId, ok) -> - rabbit_msg_store:write(MsgId, MsgId, MSCState) - end, ok, MsgIds). - -msg_store_write_flow(MsgIds, MSCState) -> - ok = lists:foldl(fun (MsgId, ok) -> - rabbit_msg_store:write_flow(MsgId, MsgId, MSCState) - end, ok, MsgIds). - -msg_store_remove(MsgIds, MSCState) -> - rabbit_msg_store:remove(MsgIds, MSCState). - -msg_store_remove(MsgStore, Ref, MsgIds) -> - with_msg_store_client(MsgStore, Ref, - fun (MSCStateM) -> - ok = msg_store_remove(MsgIds, MSCStateM), - MSCStateM - end). - -with_msg_store_client(MsgStore, Ref, Fun) -> - rabbit_msg_store:client_terminate( - Fun(msg_store_client_init(MsgStore, Ref))). - -foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> - rabbit_msg_store:client_terminate( - lists:foldl(fun (MsgId, MSCState) -> Fun(MsgId, MSCState) end, - msg_store_client_init(MsgStore, Ref), L)). - -test_msg_store() -> - restart_msg_store_empty(), - MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)], - {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(length(MsgIds) div 2, MsgIds), - Ref = rabbit_guid:gen(), - {Cap, MSCState} = msg_store_client_init_capture( - ?PERSISTENT_MSG_STORE, Ref), - Ref2 = rabbit_guid:gen(), - {Cap2, MSC2State} = msg_store_client_init_capture( - ?PERSISTENT_MSG_STORE, Ref2), - %% check we don't contain any of the msgs we're about to publish - false = msg_store_contains(false, MsgIds, MSCState), - %% test confirm logic - passed = test_msg_store_confirms([hd(MsgIds)], Cap, MSCState), - %% check we don't contain any of the msgs we're about to publish - false = msg_store_contains(false, MsgIds, MSCState), - %% publish the first half - ok = msg_store_write(MsgIds1stHalf, MSCState), - %% sync on the first half - ok = on_disk_await(Cap, MsgIds1stHalf), - %% publish the second half - ok = msg_store_write(MsgIds2ndHalf, MSCState), - %% check they're all in there - true = msg_store_contains(true, MsgIds, MSCState), - %% publish the latter half twice so we hit the caching and ref - %% count code. We need to do this through a 2nd client since a - %% single client is not supposed to write the same message more - %% than once without first removing it. - ok = msg_store_write(MsgIds2ndHalf, MSC2State), - %% check they're still all in there - true = msg_store_contains(true, MsgIds, MSCState), - %% sync on the 2nd half - ok = on_disk_await(Cap2, MsgIds2ndHalf), - %% cleanup - ok = on_disk_stop(Cap2), - ok = rabbit_msg_store:client_delete_and_terminate(MSC2State), - ok = on_disk_stop(Cap), - %% read them all - MSCState1 = msg_store_read(MsgIds, MSCState), - %% read them all again - this will hit the cache, not disk - MSCState2 = msg_store_read(MsgIds, MSCState1), - %% remove them all - ok = msg_store_remove(MsgIds, MSCState2), - %% check first half doesn't exist - false = msg_store_contains(false, MsgIds1stHalf, MSCState2), - %% check second half does exist - true = msg_store_contains(true, MsgIds2ndHalf, MSCState2), - %% read the second half again - MSCState3 = msg_store_read(MsgIds2ndHalf, MSCState2), - %% read the second half again, just for fun (aka code coverage) - MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3), - ok = rabbit_msg_store:client_terminate(MSCState4), - %% stop and restart, preserving every other msg in 2nd half - ok = rabbit_variable_queue:stop_msg_store(), - ok = rabbit_variable_queue:start_msg_store( - [], {fun ([]) -> finished; - ([MsgId|MsgIdsTail]) - when length(MsgIdsTail) rem 2 == 0 -> - {MsgId, 1, MsgIdsTail}; - ([MsgId|MsgIdsTail]) -> - {MsgId, 0, MsgIdsTail} - end, MsgIds2ndHalf}), - MSCState5 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), - %% check we have the right msgs left - lists:foldl( - fun (MsgId, Bool) -> - not(Bool = rabbit_msg_store:contains(MsgId, MSCState5)) - end, false, MsgIds2ndHalf), - ok = rabbit_msg_store:client_terminate(MSCState5), - %% restart empty - restart_msg_store_empty(), - MSCState6 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), - %% check we don't contain any of the msgs - false = msg_store_contains(false, MsgIds, MSCState6), - %% publish the first half again - ok = msg_store_write(MsgIds1stHalf, MSCState6), - %% this should force some sort of sync internally otherwise misread - ok = rabbit_msg_store:client_terminate( - msg_store_read(MsgIds1stHalf, MSCState6)), - MSCState7 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), - ok = msg_store_remove(MsgIds1stHalf, MSCState7), - ok = rabbit_msg_store:client_terminate(MSCState7), - %% restart empty - restart_msg_store_empty(), %% now safe to reuse msg_ids - %% push a lot of msgs in... at least 100 files worth - {ok, FileSize} = application:get_env(rabbit, msg_store_file_size_limit), - PayloadSizeBits = 65536, - BigCount = trunc(100 * FileSize / (PayloadSizeBits div 8)), - MsgIdsBig = [msg_id_bin(X) || X <- lists:seq(1, BigCount)], - Payload = << 0:PayloadSizeBits >>, - ok = with_msg_store_client( - ?PERSISTENT_MSG_STORE, Ref, - fun (MSCStateM) -> - [ok = rabbit_msg_store:write(MsgId, Payload, MSCStateM) || - MsgId <- MsgIdsBig], - MSCStateM - end), - %% now read them to ensure we hit the fast client-side reading - ok = foreach_with_msg_store_client( - ?PERSISTENT_MSG_STORE, Ref, - fun (MsgId, MSCStateM) -> - {{ok, Payload}, MSCStateN} = rabbit_msg_store:read( - MsgId, MSCStateM), - MSCStateN - end, MsgIdsBig), - %% .., then 3s by 1... - ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, - [msg_id_bin(X) || X <- lists:seq(BigCount, 1, -3)]), - %% .., then remove 3s by 2, from the young end first. This hits - %% GC (under 50% good data left, but no empty files. Must GC). - ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, - [msg_id_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]), - %% .., then remove 3s by 3, from the young end first. This hits - %% GC... - ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, - [msg_id_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]), - %% ensure empty - ok = with_msg_store_client( - ?PERSISTENT_MSG_STORE, Ref, - fun (MSCStateM) -> - false = msg_store_contains(false, MsgIdsBig, MSCStateM), - MSCStateM - end), - %% - passed = test_msg_store_client_delete_and_terminate(), - %% restart empty - restart_msg_store_empty(), - passed. - -test_msg_store_confirms(MsgIds, Cap, MSCState) -> - %% write -> confirmed - ok = msg_store_write(MsgIds, MSCState), - ok = on_disk_await(Cap, MsgIds), - %% remove -> _ - ok = msg_store_remove(MsgIds, MSCState), - ok = on_disk_await(Cap, []), - %% write, remove -> confirmed - ok = msg_store_write(MsgIds, MSCState), - ok = msg_store_remove(MsgIds, MSCState), - ok = on_disk_await(Cap, MsgIds), - %% write, remove, write -> confirmed, confirmed - ok = msg_store_write(MsgIds, MSCState), - ok = msg_store_remove(MsgIds, MSCState), - ok = msg_store_write(MsgIds, MSCState), - ok = on_disk_await(Cap, MsgIds ++ MsgIds), - %% remove, write -> confirmed - ok = msg_store_remove(MsgIds, MSCState), - ok = msg_store_write(MsgIds, MSCState), - ok = on_disk_await(Cap, MsgIds), - %% remove, write, remove -> confirmed - ok = msg_store_remove(MsgIds, MSCState), - ok = msg_store_write(MsgIds, MSCState), - ok = msg_store_remove(MsgIds, MSCState), - ok = on_disk_await(Cap, MsgIds), - %% confirmation on timer-based sync - passed = test_msg_store_confirm_timer(), - passed. - -test_msg_store_confirm_timer() -> - Ref = rabbit_guid:gen(), - MsgId = msg_id_bin(1), - Self = self(), - MSCState = rabbit_msg_store:client_init( - ?PERSISTENT_MSG_STORE, Ref, - fun (MsgIds, _ActionTaken) -> - case gb_sets:is_member(MsgId, MsgIds) of - true -> Self ! on_disk; - false -> ok - end - end, undefined), - ok = msg_store_write([MsgId], MSCState), - ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState, false), - ok = msg_store_remove([MsgId], MSCState), - ok = rabbit_msg_store:client_delete_and_terminate(MSCState), - passed. - -msg_store_keep_busy_until_confirm(MsgIds, MSCState, Blocked) -> - After = case Blocked of - false -> 0; - true -> ?MAX_WAIT - end, - Recurse = fun () -> msg_store_keep_busy_until_confirm( - MsgIds, MSCState, credit_flow:blocked()) end, - receive - on_disk -> ok; - {bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg), - Recurse() - after After -> - ok = msg_store_write_flow(MsgIds, MSCState), - ok = msg_store_remove(MsgIds, MSCState), - Recurse() - end. - -test_msg_store_client_delete_and_terminate() -> - restart_msg_store_empty(), - MsgIds = [msg_id_bin(M) || M <- lists:seq(1, 10)], - Ref = rabbit_guid:gen(), - MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), - ok = msg_store_write(MsgIds, MSCState), - %% test the 'dying client' fast path for writes - ok = rabbit_msg_store:client_delete_and_terminate(MSCState), - passed. - -queue_name(Name) -> - rabbit_misc:r(<<"/">>, queue, Name). - -test_queue() -> - queue_name(<<"test">>). - -init_test_queue() -> - TestQueue = test_queue(), - PRef = rabbit_guid:gen(), - PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef), - Res = rabbit_queue_index:recover( - TestQueue, [], false, - fun (MsgId) -> - rabbit_msg_store:contains(MsgId, PersistentClient) - end, - fun nop/1, fun nop/1), - ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient), - Res. - -restart_test_queue(Qi) -> - _ = rabbit_queue_index:terminate([], Qi), - ok = rabbit_variable_queue:stop(), - {ok, _} = rabbit_variable_queue:start([test_queue()]), - init_test_queue(). - -empty_test_queue() -> - ok = rabbit_variable_queue:stop(), - {ok, _} = rabbit_variable_queue:start([]), - {0, 0, Qi} = init_test_queue(), - _ = rabbit_queue_index:delete_and_terminate(Qi), - ok. - -with_empty_test_queue(Fun) -> - ok = empty_test_queue(), - {0, 0, Qi} = init_test_queue(), - rabbit_queue_index:delete_and_terminate(Fun(Qi)). - -restart_app() -> - rabbit:stop(), - rabbit:start(). - -queue_index_publish(SeqIds, Persistent, Qi) -> - Ref = rabbit_guid:gen(), - MsgStore = case Persistent of - true -> ?PERSISTENT_MSG_STORE; - false -> ?TRANSIENT_MSG_STORE - end, - MSCState = msg_store_client_init(MsgStore, Ref), - {A, B = [{_SeqId, LastMsgIdWritten} | _]} = - lists:foldl( - fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) -> - MsgId = rabbit_guid:gen(), - QiM = rabbit_queue_index:publish( - MsgId, SeqId, #message_properties{size = 10}, - Persistent, infinity, QiN), - ok = rabbit_msg_store:write(MsgId, MsgId, MSCState), - {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]} - end, {Qi, []}, SeqIds), - %% do this just to force all of the publishes through to the msg_store: - true = rabbit_msg_store:contains(LastMsgIdWritten, MSCState), - ok = rabbit_msg_store:client_delete_and_terminate(MSCState), - {A, B}. - -verify_read_with_published(_Delivered, _Persistent, [], _) -> - ok; -verify_read_with_published(Delivered, Persistent, - [{MsgId, SeqId, _Props, Persistent, Delivered}|Read], - [{SeqId, MsgId}|Published]) -> - verify_read_with_published(Delivered, Persistent, Read, Published); -verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> - ko. - -test_queue_index_props() -> - with_empty_test_queue( - fun(Qi0) -> - MsgId = rabbit_guid:gen(), - Props = #message_properties{expiry=12345, size = 10}, - Qi1 = rabbit_queue_index:publish( - MsgId, 1, Props, true, infinity, Qi0), - {[{MsgId, 1, Props, _, _}], Qi2} = - rabbit_queue_index:read(1, 2, Qi1), - Qi2 - end), - - ok = rabbit_variable_queue:stop(), - {ok, _} = rabbit_variable_queue:start([]), - - passed. - -test_queue_index() -> - SegmentSize = rabbit_queue_index:next_segment_boundary(0), - TwoSegs = SegmentSize + SegmentSize, - MostOfASegment = trunc(SegmentSize*0.75), - SeqIdsA = lists:seq(0, MostOfASegment-1), - SeqIdsB = lists:seq(MostOfASegment, 2*MostOfASegment), - SeqIdsC = lists:seq(0, trunc(SegmentSize/2)), - SeqIdsD = lists:seq(0, SegmentSize*4), - - with_empty_test_queue( - fun (Qi0) -> - {0, 0, Qi1} = rabbit_queue_index:bounds(Qi0), - {Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1), - {0, SegmentSize, Qi3} = rabbit_queue_index:bounds(Qi2), - {ReadA, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3), - ok = verify_read_with_published(false, false, ReadA, - lists:reverse(SeqIdsMsgIdsA)), - %% should get length back as 0, as all the msgs were transient - {0, 0, Qi6} = restart_test_queue(Qi4), - {0, 0, Qi7} = rabbit_queue_index:bounds(Qi6), - {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7), - {0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8), - {ReadB, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9), - ok = verify_read_with_published(false, true, ReadB, - lists:reverse(SeqIdsMsgIdsB)), - %% should get length back as MostOfASegment - LenB = length(SeqIdsB), - BytesB = LenB * 10, - {LenB, BytesB, Qi12} = restart_test_queue(Qi10), - {0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12), - Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13), - {ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14), - ok = verify_read_with_published(true, true, ReadC, - lists:reverse(SeqIdsMsgIdsB)), - Qi16 = rabbit_queue_index:ack(SeqIdsB, Qi15), - Qi17 = rabbit_queue_index:flush(Qi16), - %% Everything will have gone now because #pubs == #acks - {0, 0, Qi18} = rabbit_queue_index:bounds(Qi17), - %% should get length back as 0 because all persistent - %% msgs have been acked - {0, 0, Qi19} = restart_test_queue(Qi18), - Qi19 - end), - - %% These next bits are just to hit the auto deletion of segment files. - %% First, partials: - %% a) partial pub+del+ack, then move to new segment - with_empty_test_queue( - fun (Qi0) -> - {Qi1, _SeqIdsMsgIdsC} = queue_index_publish(SeqIdsC, - false, Qi0), - Qi2 = rabbit_queue_index:deliver(SeqIdsC, Qi1), - Qi3 = rabbit_queue_index:ack(SeqIdsC, Qi2), - Qi4 = rabbit_queue_index:flush(Qi3), - {Qi5, _SeqIdsMsgIdsC1} = queue_index_publish([SegmentSize], - false, Qi4), - Qi5 - end), - - %% b) partial pub+del, then move to new segment, then ack all in old segment - with_empty_test_queue( - fun (Qi0) -> - {Qi1, _SeqIdsMsgIdsC2} = queue_index_publish(SeqIdsC, - false, Qi0), - Qi2 = rabbit_queue_index:deliver(SeqIdsC, Qi1), - {Qi3, _SeqIdsMsgIdsC3} = queue_index_publish([SegmentSize], - false, Qi2), - Qi4 = rabbit_queue_index:ack(SeqIdsC, Qi3), - rabbit_queue_index:flush(Qi4) - end), - - %% c) just fill up several segments of all pubs, then +dels, then +acks - with_empty_test_queue( - fun (Qi0) -> - {Qi1, _SeqIdsMsgIdsD} = queue_index_publish(SeqIdsD, - false, Qi0), - Qi2 = rabbit_queue_index:deliver(SeqIdsD, Qi1), - Qi3 = rabbit_queue_index:ack(SeqIdsD, Qi2), - rabbit_queue_index:flush(Qi3) - end), - - %% d) get messages in all states to a segment, then flush, then do - %% the same again, don't flush and read. This will hit all - %% possibilities in combining the segment with the journal. - with_empty_test_queue( - fun (Qi0) -> - {Qi1, [Seven,Five,Four|_]} = queue_index_publish([0,1,2,4,5,7], - false, Qi0), - Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1), - Qi3 = rabbit_queue_index:ack([0], Qi2), - Qi4 = rabbit_queue_index:flush(Qi3), - {Qi5, [Eight,Six|_]} = queue_index_publish([3,6,8], false, Qi4), - Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5), - Qi7 = rabbit_queue_index:ack([1,2,3], Qi6), - {[], Qi8} = rabbit_queue_index:read(0, 4, Qi7), - {ReadD, Qi9} = rabbit_queue_index:read(4, 7, Qi8), - ok = verify_read_with_published(true, false, ReadD, - [Four, Five, Six]), - {ReadE, Qi10} = rabbit_queue_index:read(7, 9, Qi9), - ok = verify_read_with_published(false, false, ReadE, - [Seven, Eight]), - Qi10 - end), - - %% e) as for (d), but use terminate instead of read, which will - %% exercise journal_minus_segment, not segment_plus_journal. - with_empty_test_queue( - fun (Qi0) -> - {Qi1, _SeqIdsMsgIdsE} = queue_index_publish([0,1,2,4,5,7], - true, Qi0), - Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1), - Qi3 = rabbit_queue_index:ack([0], Qi2), - {5, 50, Qi4} = restart_test_queue(Qi3), - {Qi5, _SeqIdsMsgIdsF} = queue_index_publish([3,6,8], true, Qi4), - Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5), - Qi7 = rabbit_queue_index:ack([1,2,3], Qi6), - {5, 50, Qi8} = restart_test_queue(Qi7), - Qi8 - end), - - ok = rabbit_variable_queue:stop(), - {ok, _} = rabbit_variable_queue:start([]), - - passed. - -variable_queue_init(Q, Recover) -> - rabbit_variable_queue:init( - Q, case Recover of - true -> non_clean_shutdown; - false -> new - end, fun nop/2, fun nop/2, fun nop/1, fun nop/1). - -variable_queue_publish(IsPersistent, Count, VQ) -> - variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). - -variable_queue_publish(IsPersistent, Count, PropFun, VQ) -> - variable_queue_publish(IsPersistent, 1, Count, PropFun, - fun (_N) -> <<>> end, VQ). - -variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> - variable_queue_wait_for_shuffling_end( - lists:foldl( - fun (N, VQN) -> - rabbit_variable_queue:publish( - rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{delivery_mode = case IsPersistent of - true -> 2; - false -> 1 - end}, - PayloadFun(N)), - PropFun(N, #message_properties{size = 10}), - false, self(), noflow, VQN) - end, VQ, lists:seq(Start, Start + Count - 1))). - -variable_queue_batch_publish(IsPersistent, Count, VQ) -> - variable_queue_batch_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). - -variable_queue_batch_publish(IsPersistent, Count, PropFun, VQ) -> - variable_queue_batch_publish(IsPersistent, 1, Count, PropFun, - fun (_N) -> <<>> end, VQ). - -variable_queue_batch_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> - variable_queue_batch_publish0(IsPersistent, Start, Count, PropFun, - PayloadFun, fun make_publish/4, - fun rabbit_variable_queue:batch_publish/4, - VQ). - -variable_queue_batch_publish_delivered(IsPersistent, Count, VQ) -> - variable_queue_batch_publish_delivered(IsPersistent, Count, fun (_N, P) -> P end, VQ). - -variable_queue_batch_publish_delivered(IsPersistent, Count, PropFun, VQ) -> - variable_queue_batch_publish_delivered(IsPersistent, 1, Count, PropFun, - fun (_N) -> <<>> end, VQ). - -variable_queue_batch_publish_delivered(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> - variable_queue_batch_publish0(IsPersistent, Start, Count, PropFun, - PayloadFun, fun make_publish_delivered/4, - fun rabbit_variable_queue:batch_publish_delivered/4, - VQ). - -variable_queue_batch_publish0(IsPersistent, Start, Count, PropFun, PayloadFun, - MakePubFun, PubFun, VQ) -> - Publishes = - [MakePubFun(IsPersistent, PayloadFun, PropFun, N) - || N <- lists:seq(Start, Start + Count - 1)], - Res = PubFun(Publishes, self(), noflow, VQ), - VQ1 = pub_res(Res), - variable_queue_wait_for_shuffling_end(VQ1). - -pub_res({_, VQS}) -> - VQS; -pub_res(VQS) -> - VQS. - -make_publish(IsPersistent, PayloadFun, PropFun, N) -> - {rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{delivery_mode = case IsPersistent of - true -> 2; - false -> 1 - end}, - PayloadFun(N)), - PropFun(N, #message_properties{size = 10}), - false}. - -make_publish_delivered(IsPersistent, PayloadFun, PropFun, N) -> - {rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{delivery_mode = case IsPersistent of - true -> 2; - false -> 1 - end}, - PayloadFun(N)), - PropFun(N, #message_properties{size = 10})}. - -variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> - lists:foldl(fun (N, {VQN, AckTagsAcc}) -> - Rem = Len - N, - {{#basic_message { is_persistent = IsPersistent }, - IsDelivered, AckTagN}, VQM} = - rabbit_variable_queue:fetch(true, VQN), - Rem = rabbit_variable_queue:len(VQM), - {VQM, [AckTagN | AckTagsAcc]} - end, {VQ, []}, lists:seq(1, Count)). - -variable_queue_set_ram_duration_target(Duration, VQ) -> - variable_queue_wait_for_shuffling_end( - rabbit_variable_queue:set_ram_duration_target(Duration, VQ)). - -assert_prop(List, Prop, Value) -> - case proplists:get_value(Prop, List)of - Value -> ok; - _ -> {exit, Prop, exp, Value, List} - end. - -assert_props(List, PropVals) -> - [assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals]. - -test_amqqueue(Durable) -> - (rabbit_amqqueue:pseudo_queue(test_queue(), self())) - #amqqueue { durable = Durable }. - -with_fresh_variable_queue(Fun) -> - Ref = make_ref(), - Me = self(), - %% Run in a separate process since rabbit_msg_store will send - %% bump_credit messages and we want to ignore them - spawn_link(fun() -> - ok = empty_test_queue(), - VQ = variable_queue_init(test_amqqueue(true), false), - S0 = variable_queue_status(VQ), - assert_props(S0, [{q1, 0}, {q2, 0}, - {delta, - {delta, undefined, 0, undefined}}, - {q3, 0}, {q4, 0}, - {len, 0}]), - try - _ = rabbit_variable_queue:delete_and_terminate( - shutdown, Fun(VQ)), - Me ! Ref - catch - Type:Error -> - Me ! {Ref, Type, Error, erlang:get_stacktrace()} - end - end), - receive - Ref -> ok; - {Ref, Type, Error, ST} -> exit({Type, Error, ST}) - end, - passed. - -publish_and_confirm(Q, Payload, Count) -> - Seqs = lists:seq(1, Count), - [begin - Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{delivery_mode = 2}, - Payload), - Delivery = #delivery{mandatory = false, sender = self(), - confirm = true, message = Msg, msg_seq_no = Seq, - flow = noflow}, - _QPids = rabbit_amqqueue:deliver([Q], Delivery) - end || Seq <- Seqs], - wait_for_confirms(gb_sets:from_list(Seqs)). - -wait_for_confirms(Unconfirmed) -> - case gb_sets:is_empty(Unconfirmed) of - true -> ok; - false -> receive {'$gen_cast', {confirm, Confirmed, _}} -> - wait_for_confirms( - rabbit_misc:gb_sets_difference( - Unconfirmed, gb_sets:from_list(Confirmed))) - after ?TIMEOUT -> exit(timeout_waiting_for_confirm) - end - end. - -test_variable_queue() -> - [passed = with_fresh_variable_queue(F) || - F <- [fun test_variable_queue_dynamic_duration_change/1, - fun test_variable_queue_partial_segments_delta_thing/1, - fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, - fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, - fun test_drop/1, - fun test_variable_queue_fold_msg_on_disk/1, - fun test_dropfetchwhile/1, - fun test_dropwhile_varying_ram_duration/1, - fun test_fetchwhile_varying_ram_duration/1, - fun test_variable_queue_ack_limiting/1, - fun test_variable_queue_purge/1, - fun test_variable_queue_requeue/1, - fun test_variable_queue_requeue_ram_beta/1, - fun test_variable_queue_fold/1, - fun test_variable_queue_batch_publish/1, - fun test_variable_queue_batch_publish_delivered/1]], - passed. - -test_variable_queue_batch_publish(VQ) -> - Count = 10, - VQ1 = variable_queue_batch_publish(true, Count, VQ), - Count = rabbit_variable_queue:len(VQ1), - VQ1. - -test_variable_queue_batch_publish_delivered(VQ) -> - Count = 10, - VQ1 = variable_queue_batch_publish_delivered(true, Count, VQ), - Count = rabbit_variable_queue:depth(VQ1), - VQ1. - -test_variable_queue_fold(VQ0) -> - {PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = - variable_queue_with_holes(VQ0), - Count = rabbit_variable_queue:depth(VQ1), - Msgs = lists:sort(PendingMsgs ++ RequeuedMsgs ++ FreshMsgs), - lists:foldl(fun (Cut, VQ2) -> - test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ2) - end, VQ1, [0, 1, 2, Count div 2, - Count - 1, Count, Count + 1, Count * 2]). - -test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ0) -> - {Acc, VQ1} = rabbit_variable_queue:fold( - fun (M, _, Pending, A) -> - MInt = msg2int(M), - Pending = lists:member(MInt, PendingMsgs), %% assert - case MInt =< Cut of - true -> {cont, [MInt | A]}; - false -> {stop, A} - end - end, [], VQ0), - Expected = lists:takewhile(fun (I) -> I =< Cut end, Msgs), - Expected = lists:reverse(Acc), %% assertion - VQ1. - -msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) -> - binary_to_term(list_to_binary(lists:reverse(P))). - -ack_subset(AckSeqs, Interval, Rem) -> - lists:filter(fun ({_Ack, N}) -> (N + Rem) rem Interval == 0 end, AckSeqs). - -requeue_one_by_one(Acks, VQ) -> - lists:foldl(fun (AckTag, VQN) -> - {_MsgId, VQM} = rabbit_variable_queue:requeue( - [AckTag], VQN), - VQM - end, VQ, Acks). - -%% Create a vq with messages in q1, delta, and q3, and holes (in the -%% form of pending acks) in the latter two. -variable_queue_with_holes(VQ0) -> - Interval = 2048, %% should match vq:IO_BATCH_SIZE - Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2 * Interval, - Seq = lists:seq(1, Count), - VQ1 = variable_queue_set_ram_duration_target(0, VQ0), - VQ2 = variable_queue_publish( - false, 1, Count, - fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1), - {VQ3, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ2), - Acks = lists:reverse(AcksR), - AckSeqs = lists:zip(Acks, Seq), - [{Subset1, _Seq1}, {Subset2, _Seq2}, {Subset3, Seq3}] = - [lists:unzip(ack_subset(AckSeqs, Interval, I)) || I <- [0, 1, 2]], - %% we requeue in three phases in order to exercise requeuing logic - %% in various vq states - {_MsgIds, VQ4} = rabbit_variable_queue:requeue( - Acks -- (Subset1 ++ Subset2 ++ Subset3), VQ3), - VQ5 = requeue_one_by_one(Subset1, VQ4), - %% by now we have some messages (and holes) in delta - VQ6 = requeue_one_by_one(Subset2, VQ5), - VQ7 = variable_queue_set_ram_duration_target(infinity, VQ6), - %% add the q1 tail - VQ8 = variable_queue_publish( - true, Count + 1, Interval, - fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ7), - %% assertions - [false = case V of - {delta, _, 0, _} -> true; - 0 -> true; - _ -> false - end || {K, V} <- variable_queue_status(VQ8), - lists:member(K, [q1, delta, q3])], - Depth = Count + Interval, - Depth = rabbit_variable_queue:depth(VQ8), - Len = Depth - length(Subset3), - Len = rabbit_variable_queue:len(VQ8), - {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + Interval), VQ8}. - -test_variable_queue_requeue(VQ0) -> - {_PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = - variable_queue_with_holes(VQ0), - Msgs = - lists:zip(RequeuedMsgs, - lists:duplicate(length(RequeuedMsgs), true)) ++ - lists:zip(FreshMsgs, - lists:duplicate(length(FreshMsgs), false)), - VQ2 = lists:foldl(fun ({I, Requeued}, VQa) -> - {{M, MRequeued, _}, VQb} = - rabbit_variable_queue:fetch(true, VQa), - Requeued = MRequeued, %% assertion - I = msg2int(M), %% assertion - VQb - end, VQ1, Msgs), - {empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2), - VQ3. - -%% requeue from ram_pending_ack into q3, move to delta and then empty queue -test_variable_queue_requeue_ram_beta(VQ0) -> - Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2, - VQ1 = variable_queue_publish(false, Count, VQ0), - {VQ2, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ1), - {Back, Front} = lists:split(Count div 2, AcksR), - {_, VQ3} = rabbit_variable_queue:requeue(erlang:tl(Back), VQ2), - VQ4 = variable_queue_set_ram_duration_target(0, VQ3), - {_, VQ5} = rabbit_variable_queue:requeue([erlang:hd(Back)], VQ4), - VQ6 = requeue_one_by_one(Front, VQ5), - {VQ7, AcksAll} = variable_queue_fetch(Count, false, true, Count, VQ6), - {_, VQ8} = rabbit_variable_queue:ack(AcksAll, VQ7), - VQ8. - -test_variable_queue_purge(VQ0) -> - LenDepth = fun (VQ) -> - {rabbit_variable_queue:len(VQ), - rabbit_variable_queue:depth(VQ)} - end, - VQ1 = variable_queue_publish(false, 10, VQ0), - {VQ2, Acks} = variable_queue_fetch(6, false, false, 10, VQ1), - {4, VQ3} = rabbit_variable_queue:purge(VQ2), - {0, 6} = LenDepth(VQ3), - {_, VQ4} = rabbit_variable_queue:requeue(lists:sublist(Acks, 2), VQ3), - {2, 6} = LenDepth(VQ4), - VQ5 = rabbit_variable_queue:purge_acks(VQ4), - {2, 2} = LenDepth(VQ5), - VQ5. - -test_variable_queue_ack_limiting(VQ0) -> - %% start by sending in a bunch of messages - Len = 1024, - VQ1 = variable_queue_publish(false, Len, VQ0), - - %% squeeze and relax queue - Churn = Len div 32, - VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), - - %% update stats for duration - {_Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2), - - %% fetch half the messages - {VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3), - - VQ5 = check_variable_queue_status( - VQ4, [{len, Len div 2}, - {messages_unacknowledged_ram, Len div 2}, - {messages_ready_ram, Len div 2}, - {messages_ram, Len}]), - - %% ensure all acks go to disk on 0 duration target - VQ6 = check_variable_queue_status( - variable_queue_set_ram_duration_target(0, VQ5), - [{len, Len div 2}, - {target_ram_count, 0}, - {messages_unacknowledged_ram, 0}, - {messages_ready_ram, 0}, - {messages_ram, 0}]), - - VQ6. - -test_drop(VQ0) -> - %% start by sending a messages - VQ1 = variable_queue_publish(false, 1, VQ0), - %% drop message with AckRequired = true - {{MsgId, AckTag}, VQ2} = rabbit_variable_queue:drop(true, VQ1), - true = rabbit_variable_queue:is_empty(VQ2), - true = AckTag =/= undefinded, - %% drop again -> empty - {empty, VQ3} = rabbit_variable_queue:drop(false, VQ2), - %% requeue - {[MsgId], VQ4} = rabbit_variable_queue:requeue([AckTag], VQ3), - %% drop message with AckRequired = false - {{MsgId, undefined}, VQ5} = rabbit_variable_queue:drop(false, VQ4), - true = rabbit_variable_queue:is_empty(VQ5), - VQ5. - -test_dropfetchwhile(VQ0) -> - Count = 10, - - %% add messages with sequential expiry - VQ1 = variable_queue_publish( - false, 1, Count, - fun (N, Props) -> Props#message_properties{expiry = N} end, - fun erlang:term_to_binary/1, VQ0), - - %% fetch the first 5 messages - {#message_properties{expiry = 6}, {Msgs, AckTags}, VQ2} = - rabbit_variable_queue:fetchwhile( - fun (#message_properties{expiry = Expiry}) -> Expiry =< 5 end, - fun (Msg, AckTag, {MsgAcc, AckAcc}) -> - {[Msg | MsgAcc], [AckTag | AckAcc]} - end, {[], []}, VQ1), - true = lists:seq(1, 5) == [msg2int(M) || M <- lists:reverse(Msgs)], - - %% requeue them - {_MsgIds, VQ3} = rabbit_variable_queue:requeue(AckTags, VQ2), - - %% drop the first 5 messages - {#message_properties{expiry = 6}, VQ4} = - rabbit_variable_queue:dropwhile( - fun (#message_properties {expiry = Expiry}) -> Expiry =< 5 end, VQ3), - - %% fetch 5 - VQ5 = lists:foldl(fun (N, VQN) -> - {{Msg, _, _}, VQM} = - rabbit_variable_queue:fetch(false, VQN), - true = msg2int(Msg) == N, - VQM - end, VQ4, lists:seq(6, Count)), - - %% should be empty now - true = rabbit_variable_queue:is_empty(VQ5), - - VQ5. - -test_dropwhile_varying_ram_duration(VQ0) -> - test_dropfetchwhile_varying_ram_duration( - fun (VQ1) -> - {_, VQ2} = rabbit_variable_queue:dropwhile( - fun (_) -> false end, VQ1), - VQ2 - end, VQ0). - -test_fetchwhile_varying_ram_duration(VQ0) -> - test_dropfetchwhile_varying_ram_duration( - fun (VQ1) -> - {_, ok, VQ2} = rabbit_variable_queue:fetchwhile( - fun (_) -> false end, - fun (_, _, A) -> A end, - ok, VQ1), - VQ2 - end, VQ0). - -test_dropfetchwhile_varying_ram_duration(Fun, VQ0) -> - VQ1 = variable_queue_publish(false, 1, VQ0), - VQ2 = variable_queue_set_ram_duration_target(0, VQ1), - VQ3 = Fun(VQ2), - VQ4 = variable_queue_set_ram_duration_target(infinity, VQ3), - VQ5 = variable_queue_publish(false, 1, VQ4), - VQ6 = Fun(VQ5), - VQ6. - -test_variable_queue_dynamic_duration_change(VQ0) -> - SegmentSize = rabbit_queue_index:next_segment_boundary(0), - - %% start by sending in a couple of segments worth - Len = 2*SegmentSize, - VQ1 = variable_queue_publish(false, Len, VQ0), - %% squeeze and relax queue - Churn = Len div 32, - VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), - - {Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2), - VQ7 = lists:foldl( - fun (Duration1, VQ4) -> - {_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4), - VQ6 = variable_queue_set_ram_duration_target( - Duration1, VQ5), - publish_fetch_and_ack(Churn, Len, VQ6) - end, VQ3, [Duration / 4, 0, Duration / 4, infinity]), - - %% drain - {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8), - {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), - - VQ10. - -publish_fetch_and_ack(0, _Len, VQ0) -> - VQ0; -publish_fetch_and_ack(N, Len, VQ0) -> - VQ1 = variable_queue_publish(false, 1, VQ0), - {{_Msg, false, AckTag}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - Len = rabbit_variable_queue:len(VQ2), - {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), - publish_fetch_and_ack(N-1, Len, VQ3). - -test_variable_queue_partial_segments_delta_thing(VQ0) -> - SegmentSize = rabbit_queue_index:next_segment_boundary(0), - HalfSegment = SegmentSize div 2, - OneAndAHalfSegment = SegmentSize + HalfSegment, - VQ1 = variable_queue_publish(true, OneAndAHalfSegment, VQ0), - {_Duration, VQ2} = rabbit_variable_queue:ram_duration(VQ1), - VQ3 = check_variable_queue_status( - variable_queue_set_ram_duration_target(0, VQ2), - %% one segment in q3, and half a segment in delta - [{delta, {delta, SegmentSize, HalfSegment, OneAndAHalfSegment}}, - {q3, SegmentSize}, - {len, SegmentSize + HalfSegment}]), - VQ4 = variable_queue_set_ram_duration_target(infinity, VQ3), - VQ5 = check_variable_queue_status( - variable_queue_publish(true, 1, VQ4), - %% one alpha, but it's in the same segment as the deltas - [{q1, 1}, - {delta, {delta, SegmentSize, HalfSegment, OneAndAHalfSegment}}, - {q3, SegmentSize}, - {len, SegmentSize + HalfSegment + 1}]), - {VQ6, AckTags} = variable_queue_fetch(SegmentSize, true, false, - SegmentSize + HalfSegment + 1, VQ5), - VQ7 = check_variable_queue_status( - VQ6, - %% the half segment should now be in q3 - [{q1, 1}, - {delta, {delta, undefined, 0, undefined}}, - {q3, HalfSegment}, - {len, HalfSegment + 1}]), - {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, - HalfSegment + 1, VQ7), - {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), - %% should be empty now - {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), - VQ10. - -check_variable_queue_status(VQ0, Props) -> - VQ1 = variable_queue_wait_for_shuffling_end(VQ0), - S = variable_queue_status(VQ1), - assert_props(S, Props), - VQ1. - -variable_queue_status(VQ) -> - Keys = rabbit_backing_queue:info_keys() -- [backing_queue_status], - [{K, rabbit_variable_queue:info(K, VQ)} || K <- Keys] ++ - rabbit_variable_queue:info(backing_queue_status, VQ). - -variable_queue_wait_for_shuffling_end(VQ) -> - case credit_flow:blocked() of - false -> VQ; - true -> receive - {bump_credit, Msg} -> - credit_flow:handle_bump_msg(Msg), - variable_queue_wait_for_shuffling_end( - rabbit_variable_queue:resume(VQ)) - end - end. - -test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> - Count = 2 * rabbit_queue_index:next_segment_boundary(0), - VQ1 = variable_queue_publish(true, Count, VQ0), - VQ2 = variable_queue_publish(false, Count, VQ1), - VQ3 = variable_queue_set_ram_duration_target(0, VQ2), - {VQ4, _AckTags} = variable_queue_fetch(Count, true, false, - Count + Count, VQ3), - {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, - Count, VQ4), - _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), - VQ7 = variable_queue_init(test_amqqueue(true), true), - {{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), - Count1 = rabbit_variable_queue:len(VQ8), - VQ9 = variable_queue_publish(false, 1, VQ8), - VQ10 = variable_queue_set_ram_duration_target(0, VQ9), - {VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ10), - {VQ12, _AckTags3} = variable_queue_fetch(1, false, false, 1, VQ11), - VQ12. - -test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> - VQ1 = variable_queue_set_ram_duration_target(0, VQ0), - VQ2 = variable_queue_publish(false, 4, VQ1), - {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), - {_Guids, VQ4} = - rabbit_variable_queue:requeue(AckTags, VQ3), - VQ5 = rabbit_variable_queue:timeout(VQ4), - _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), - VQ7 = variable_queue_init(test_amqqueue(true), true), - {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), - VQ8. - -test_variable_queue_fold_msg_on_disk(VQ0) -> - VQ1 = variable_queue_publish(true, 1, VQ0), - {VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1), - {ok, VQ3} = rabbit_variable_queue:ackfold(fun (_M, _A, ok) -> ok end, - ok, VQ2, AckTags), - VQ3. - -test_queue_recover() -> - Count = 2 * rabbit_queue_index:next_segment_boundary(0), - {new, #amqqueue { pid = QPid, name = QName } = Q} = - rabbit_amqqueue:declare(test_queue(), true, false, [], none), - publish_and_confirm(Q, <<>>, Count), - - [{_, SupPid, _, _}] = supervisor:which_children(rabbit_amqqueue_sup_sup), - exit(SupPid, kill), - exit(QPid, kill), - MRef = erlang:monitor(process, QPid), - receive {'DOWN', MRef, process, QPid, _Info} -> ok - after 10000 -> exit(timeout_waiting_for_queue_death) - end, - rabbit_amqqueue:stop(), - rabbit_amqqueue:start(rabbit_amqqueue:recover()), - {ok, Limiter} = rabbit_limiter:start_link(no_id), - rabbit_amqqueue:with_or_die( - QName, - fun (Q1 = #amqqueue { pid = QPid1 }) -> - CountMinusOne = Count - 1, - {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = - rabbit_amqqueue:basic_get(Q1, self(), false, Limiter), - exit(QPid1, shutdown), - VQ1 = variable_queue_init(Q, true), - {{_Msg1, true, _AckTag1}, VQ2} = - rabbit_variable_queue:fetch(true, VQ1), - CountMinusOne = rabbit_variable_queue:len(VQ2), - _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2), - rabbit_amqqueue:internal_delete(QName) - end), - passed. - -test_variable_queue_delete_msg_store_files_callback() -> - ok = restart_msg_store_empty(), - {new, #amqqueue { pid = QPid, name = QName } = Q} = - rabbit_amqqueue:declare(test_queue(), true, false, [], none), - Payload = <<0:8388608>>, %% 1MB - Count = 30, - publish_and_confirm(Q, Payload, Count), - - rabbit_amqqueue:set_ram_duration_target(QPid, 0), - - {ok, Limiter} = rabbit_limiter:start_link(no_id), - - CountMinusOne = Count - 1, - {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} = - rabbit_amqqueue:basic_get(Q, self(), true, Limiter), - {ok, CountMinusOne} = rabbit_amqqueue:purge(Q), - - %% give the queue a second to receive the close_fds callback msg - timer:sleep(1000), - - rabbit_amqqueue:delete(Q, false, false), - passed. - -test_configurable_server_properties() -> - %% List of the names of the built-in properties do we expect to find - BuiltInPropNames = [<<"product">>, <<"version">>, <<"platform">>, - <<"copyright">>, <<"information">>], - - Protocol = rabbit_framing_amqp_0_9_1, - - %% Verify that the built-in properties are initially present - ActualPropNames = [Key || {Key, longstr, _} <- - rabbit_reader:server_properties(Protocol)], - true = lists:all(fun (X) -> lists:member(X, ActualPropNames) end, - BuiltInPropNames), - - %% Get the initial server properties configured in the environment - {ok, ServerProperties} = application:get_env(rabbit, server_properties), - - %% Helper functions - ConsProp = fun (X) -> application:set_env(rabbit, - server_properties, - [X | ServerProperties]) end, - IsPropPresent = - fun (X) -> - lists:member(X, rabbit_reader:server_properties(Protocol)) - end, - - %% Add a wholly new property of the simplified {KeyAtom, StringValue} form - NewSimplifiedProperty = {NewHareKey, NewHareVal} = {hare, "soup"}, - ConsProp(NewSimplifiedProperty), - %% Do we find hare soup, appropriately formatted in the generated properties? - ExpectedHareImage = {list_to_binary(atom_to_list(NewHareKey)), - longstr, - list_to_binary(NewHareVal)}, - true = IsPropPresent(ExpectedHareImage), - - %% Add a wholly new property of the {BinaryKey, Type, Value} form - %% and check for it - NewProperty = {<<"new-bin-key">>, signedint, -1}, - ConsProp(NewProperty), - %% Do we find the new property? - true = IsPropPresent(NewProperty), - - %% Add a property that clobbers a built-in, and verify correct clobbering - {NewVerKey, NewVerVal} = NewVersion = {version, "X.Y.Z."}, - {BinNewVerKey, BinNewVerVal} = {list_to_binary(atom_to_list(NewVerKey)), - list_to_binary(NewVerVal)}, - ConsProp(NewVersion), - ClobberedServerProps = rabbit_reader:server_properties(Protocol), - %% Is the clobbering insert present? - true = IsPropPresent({BinNewVerKey, longstr, BinNewVerVal}), - %% Is the clobbering insert the only thing with the clobbering key? - [{BinNewVerKey, longstr, BinNewVerVal}] = - [E || {K, longstr, _V} = E <- ClobberedServerProps, K =:= BinNewVerKey], - - application:set_env(rabbit, server_properties, ServerProperties), - passed. - -nop(_) -> ok. -nop(_, _) -> ok. diff --git a/test/src/rabbit_tests_event_receiver.erl b/test/src/rabbit_tests_event_receiver.erl deleted file mode 100644 index 610496b60c..0000000000 --- a/test/src/rabbit_tests_event_receiver.erl +++ /dev/null @@ -1,58 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. -%% - --module(rabbit_tests_event_receiver). - --export([start/3, stop/0]). - --export([init/1, handle_call/2, handle_event/2, handle_info/2, - terminate/2, code_change/3]). - --include("rabbit.hrl"). - -start(Pid, Nodes, Types) -> - Oks = [ok || _ <- Nodes], - {Oks, _} = rpc:multicall(Nodes, gen_event, add_handler, - [rabbit_event, ?MODULE, [Pid, Types]]). - -stop() -> - gen_event:delete_handler(rabbit_event, ?MODULE, []). - -%%---------------------------------------------------------------------------- - -init([Pid, Types]) -> - {ok, {Pid, Types}}. - -handle_call(_Request, State) -> - {ok, not_understood, State}. - -handle_event(Event = #event{type = Type}, State = {Pid, Types}) -> - case lists:member(Type, Types) of - true -> Pid ! Event; - false -> ok - end, - {ok, State}. - -handle_info(_Info, State) -> - {ok, State}. - -terminate(_Arg, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%---------------------------------------------------------------------------- diff --git a/test/src/supervisor2_tests.erl b/test/src/supervisor2_tests.erl deleted file mode 100644 index 199c66eca0..0000000000 --- a/test/src/supervisor2_tests.erl +++ /dev/null @@ -1,75 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2011-2015 Pivotal Software, Inc. All rights reserved. -%% - --module(supervisor2_tests). --behaviour(supervisor2). - --export([test_all/0, start_link/0]). --export([init/1]). - -test_all() -> - ok = check_shutdown(stop, 200, 200, 2000), - ok = check_shutdown(ignored, 1, 2, 2000). - -check_shutdown(SigStop, Iterations, ChildCount, SupTimeout) -> - {ok, Sup} = supervisor2:start_link(?MODULE, [SupTimeout]), - Res = lists:foldl( - fun (I, ok) -> - TestSupPid = erlang:whereis(?MODULE), - ChildPids = - [begin - {ok, ChildPid} = - supervisor2:start_child(TestSupPid, []), - ChildPid - end || _ <- lists:seq(1, ChildCount)], - MRef = erlang:monitor(process, TestSupPid), - [P ! SigStop || P <- ChildPids], - ok = supervisor2:terminate_child(Sup, test_sup), - {ok, _} = supervisor2:restart_child(Sup, test_sup), - receive - {'DOWN', MRef, process, TestSupPid, shutdown} -> - ok; - {'DOWN', MRef, process, TestSupPid, Reason} -> - {error, {I, Reason}} - end; - (_, R) -> - R - end, ok, lists:seq(1, Iterations)), - unlink(Sup), - MSupRef = erlang:monitor(process, Sup), - exit(Sup, shutdown), - receive - {'DOWN', MSupRef, process, Sup, _Reason} -> - ok - end, - Res. - -start_link() -> - Pid = spawn_link(fun () -> - process_flag(trap_exit, true), - receive stop -> ok end - end), - {ok, Pid}. - -init([Timeout]) -> - {ok, {{one_for_one, 0, 1}, - [{test_sup, {supervisor2, start_link, - [{local, ?MODULE}, ?MODULE, []]}, - transient, Timeout, supervisor, [?MODULE]}]}}; -init([]) -> - {ok, {{simple_one_for_one, 0, 1}, - [{test_worker, {?MODULE, start_link, []}, - temporary, 1000, worker, [?MODULE]}]}}. diff --git a/test/src/test_sup.erl b/test/src/test_sup.erl deleted file mode 100644 index 84d14f725d..0000000000 --- a/test/src/test_sup.erl +++ /dev/null @@ -1,93 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. -%% - --module(test_sup). - --behaviour(supervisor2). - --export([test_supervisor_delayed_restart/0, - init/1, start_child/0]). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --spec(test_supervisor_delayed_restart/0 :: () -> 'passed'). - --endif. - -%%---------------------------------------------------------------------------- -%% Public API -%%---------------------------------------------------------------------------- - -test_supervisor_delayed_restart() -> - passed = with_sup(simple_one_for_one, - fun (SupPid) -> - {ok, _ChildPid} = - supervisor2:start_child(SupPid, []), - test_supervisor_delayed_restart(SupPid) - end), - passed = with_sup(one_for_one, fun test_supervisor_delayed_restart/1). - -test_supervisor_delayed_restart(SupPid) -> - ok = ping_child(SupPid), - ok = exit_child(SupPid), - timer:sleep(100), - ok = ping_child(SupPid), - ok = exit_child(SupPid), - timer:sleep(100), - timeout = ping_child(SupPid), - timer:sleep(1010), - ok = ping_child(SupPid), - passed. - -with_sup(RestartStrategy, Fun) -> - {ok, SupPid} = supervisor2:start_link(?MODULE, [RestartStrategy]), - Res = Fun(SupPid), - unlink(SupPid), - exit(SupPid, shutdown), - Res. - -init([RestartStrategy]) -> - {ok, {{RestartStrategy, 1, 1}, - [{test, {test_sup, start_child, []}, {permanent, 1}, - 16#ffffffff, worker, [test_sup]}]}}. - -start_child() -> - {ok, proc_lib:spawn_link(fun run_child/0)}. - -ping_child(SupPid) -> - Ref = make_ref(), - with_child_pid(SupPid, fun(ChildPid) -> ChildPid ! {ping, Ref, self()} end), - receive {pong, Ref} -> ok - after 1000 -> timeout - end. - -exit_child(SupPid) -> - with_child_pid(SupPid, fun(ChildPid) -> exit(ChildPid, abnormal) end), - ok. - -with_child_pid(SupPid, Fun) -> - case supervisor2:which_children(SupPid) of - [{_Id, undefined, worker, [test_sup]}] -> ok; - [{_Id, ChildPid, worker, [test_sup]}] -> Fun(ChildPid); - [] -> ok - end. - -run_child() -> - receive {ping, Ref, Pid} -> Pid ! {pong, Ref}, - run_child() - end. diff --git a/test/src/vm_memory_monitor_tests.erl b/test/src/vm_memory_monitor_tests.erl deleted file mode 100644 index 61d62f862d..0000000000 --- a/test/src/vm_memory_monitor_tests.erl +++ /dev/null @@ -1,35 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. -%% - --module(vm_memory_monitor_tests). - --export([all_tests/0]). - -%% --------------------------------------------------------------------------- -%% Tests -%% --------------------------------------------------------------------------- - -all_tests() -> - lists:foreach(fun ({S, {K, V}}) -> - {K, V} = vm_memory_monitor:parse_line_linux(S) - end, - [{"MemTotal: 0 kB", {'MemTotal', 0}}, - {"MemTotal: 502968 kB ", {'MemTotal', 515039232}}, - {"MemFree: 178232 kB", {'MemFree', 182509568}}, - {"MemTotal: 50296888", {'MemTotal', 50296888}}, - {"MemTotal 502968 kB", {'MemTotal', 515039232}}, - {"MemTotal 50296866 ", {'MemTotal', 50296866}}]), - passed. |
