diff options
Diffstat (limited to 'test')
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 93 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 14 |
2 files changed, 102 insertions, 5 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index e949f7cc1a..bef943da0c 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -48,6 +48,8 @@ groups() -> ++ all_tests()}, {cluster_size_3, [], [ declare_during_node_down, + simple_confirm_availability_on_leader_change, + confirm_availability_on_leader_change, recover_from_single_failure, recover_from_multiple_failures, leadership_takeover, @@ -577,6 +579,19 @@ publish(Config) -> wait_for_messages_ready(Servers, Name, 1), wait_for_messages_pending_ack(Servers, Name, 0). +publish_confirm(Ch, QName) -> + publish(Ch, QName), + amqp_channel:register_confirm_handler(Ch, self()), + ct:pal("waiting for confirms from ~s", [QName]), + ok = receive + #'basic.ack'{} -> ok; + #'basic.nack'{} -> fail + after 2500 -> + exit(confirm_timeout) + end, + ct:pal("CONFIRMED! ~s", [QName]), + ok. + ra_name(Q) -> binary_to_atom(<<"%2F_", Q/binary>>, utf8). @@ -1551,6 +1566,82 @@ declare_during_node_down(Config) -> wait_for_messages_ready(Servers, RaName, 1), ok. +simple_confirm_availability_on_leader_change(Config) -> + [Node1, Node2, _Node3] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + %% declare a queue on node2 - this _should_ host the leader on node 2 + DCh = rabbit_ct_client_helpers:open_channel(Config, Node2), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(DCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + erlang:process_flag(trap_exit, true), + %% open a channel to another node + Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + publish_confirm(Ch, QQ), + + %% stop the node hosting the leader + stop_node(Config, Node2), + %% this should not fail as the channel should detect the new leader and + %% resend to that + publish_confirm(Ch, QQ), + ok = rabbit_ct_broker_helpers:start_node(Config, Node2), + ok. + +confirm_availability_on_leader_change(Config) -> + [Node1, Node2, _Node3] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + %% declare a queue on node2 - this _should_ host the leader on node 2 + DCh = rabbit_ct_client_helpers:open_channel(Config, Node2), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(DCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + erlang:process_flag(trap_exit, true), + Pid = spawn_link(fun () -> + %% open a channel to another node + Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + ConfirmLoop = fun Loop() -> + publish_confirm(Ch, QQ), + receive {done, P} -> + P ! done, + ok + after 0 -> Loop() end + end, + ConfirmLoop() + end), + + timer:sleep(500), + %% stop the node hosting the leader + stop_node(Config, Node2), + %% this should not fail as the channel should detect the new leader and + %% resend to that + timer:sleep(500), + Pid ! {done, self()}, + receive + done -> ok; + {'EXIT', Pid, Err} -> + exit(Err) + after 5500 -> + flush(100), + exit(bah) + end, + ok = rabbit_ct_broker_helpers:start_node(Config, Node2), + ok. + +flush(T) -> + receive X -> + ct:pal("flushed ~w", [X]), + flush(T) + after T -> + ok + end. + + add_member_not_running(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -2002,7 +2093,7 @@ filter_queues(Expected, Got) -> end, Got). publish(Ch, Queue) -> - ok = amqp_channel:call(Ch, + ok = amqp_channel:cast(Ch, #'basic.publish'{routing_key = Queue}, #amqp_msg{props = #'P_basic'{delivery_mode = 2}, payload = <<"msg">>}). diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index a2e22afc2e..56608e9af3 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -99,8 +99,13 @@ basics(Config) -> _ = ra:stop_server(ServerId), _ = ra:restart_server(ServerId), - % give time to become leader - timer:sleep(500), + %% wait for leader change to notice server is up again + receive + {ra_event, _, {machine, leader_change}} -> ok + after 5000 -> + exit(leader_change_timeout) + end, + {ok, FState6} = rabbit_fifo_client:enqueue(two, FState5b), % process applied event FState6b = process_ra_event(FState6, 250), @@ -523,8 +528,9 @@ conf(ClusterName, UId, ServerId, _, Peers) -> process_ra_event(State, Wait) -> receive {ra_event, From, Evt} -> - % ct:pal("processed ra event ~p~n", [Evt]), - {internal, _, _, S} = rabbit_fifo_client:handle_ra_event(From, Evt, State), + ct:pal("processed ra event ~p~n", [Evt]), + {internal, _, _, S} = + rabbit_fifo_client:handle_ra_event(From, Evt, State), S after Wait -> exit(ra_event_timeout) |
