diff options
| author | D Corbacho <diana@rabbitmq.com> | 2018-12-05 10:07:56 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-12-05 10:07:56 +0000 |
| commit | 105204861abfd270b4ba8c1b6e5878e00c6049f5 (patch) | |
| tree | 8cad648011789345fdf87db888efdbec804f50b7 /test | |
| parent | db888df2a9156fefda626f57cbb9d9591e27d41f (diff) | |
| parent | 26c7dfba5b31118010256ebe79b1043e470ce452 (diff) | |
| download | rabbitmq-server-git-105204861abfd270b4ba8c1b6e5878e00c6049f5.tar.gz | |
Merge pull request #1782 from rabbitmq/qq-confirm-availability
Quorum queue confirm availability
Diffstat (limited to 'test')
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 93 |
1 files changed, 92 insertions, 1 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 88c79acf79..61fd7a1b2c 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -48,6 +48,8 @@ groups() -> ++ all_tests()}, {cluster_size_3, [], [ declare_during_node_down, + simple_confirm_availability_on_leader_change, + confirm_availability_on_leader_change, recover_from_single_failure, recover_from_multiple_failures, leadership_takeover, @@ -575,6 +577,19 @@ publish(Config) -> wait_for_messages_ready(Servers, Name, 1), wait_for_messages_pending_ack(Servers, Name, 0). +publish_confirm(Ch, QName) -> + publish(Ch, QName), + amqp_channel:register_confirm_handler(Ch, self()), + ct:pal("waiting for confirms from ~s", [QName]), + ok = receive + #'basic.ack'{} -> ok; + #'basic.nack'{} -> fail + after 2500 -> + exit(confirm_timeout) + end, + ct:pal("CONFIRMED! ~s", [QName]), + ok. + ra_name(Q) -> binary_to_atom(<<"%2F_", Q/binary>>, utf8). @@ -1549,6 +1564,82 @@ declare_during_node_down(Config) -> wait_for_messages_ready(Servers, RaName, 1), ok. +simple_confirm_availability_on_leader_change(Config) -> + [Node1, Node2, _Node3] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + %% declare a queue on node2 - this _should_ host the leader on node 2 + DCh = rabbit_ct_client_helpers:open_channel(Config, Node2), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(DCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + erlang:process_flag(trap_exit, true), + %% open a channel to another node + Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + publish_confirm(Ch, QQ), + + %% stop the node hosting the leader + stop_node(Config, Node2), + %% this should not fail as the channel should detect the new leader and + %% resend to that + publish_confirm(Ch, QQ), + ok = rabbit_ct_broker_helpers:start_node(Config, Node2), + ok. + +confirm_availability_on_leader_change(Config) -> + [Node1, Node2, _Node3] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + %% declare a queue on node2 - this _should_ host the leader on node 2 + DCh = rabbit_ct_client_helpers:open_channel(Config, Node2), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(DCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + erlang:process_flag(trap_exit, true), + Pid = spawn_link(fun () -> + %% open a channel to another node + Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + ConfirmLoop = fun Loop() -> + publish_confirm(Ch, QQ), + receive {done, P} -> + P ! done, + ok + after 0 -> Loop() end + end, + ConfirmLoop() + end), + + timer:sleep(500), + %% stop the node hosting the leader + stop_node(Config, Node2), + %% this should not fail as the channel should detect the new leader and + %% resend to that + timer:sleep(500), + Pid ! {done, self()}, + receive + done -> ok; + {'EXIT', Pid, Err} -> + exit(Err) + after 5500 -> + flush(100), + exit(bah) + end, + ok = rabbit_ct_broker_helpers:start_node(Config, Node2), + ok. + +flush(T) -> + receive X -> + ct:pal("flushed ~w", [X]), + flush(T) + after T -> + ok + end. + + add_member_not_running(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1966,7 +2057,7 @@ filter_queues(Expected, Got) -> end, Got). publish(Ch, Queue) -> - ok = amqp_channel:call(Ch, + ok = amqp_channel:cast(Ch, #'basic.publish'{routing_key = Queue}, #amqp_msg{props = #'P_basic'{delivery_mode = 2}, payload = <<"msg">>}). |
