summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2018-12-05 15:04:39 +0300
committerMichael Klishin <michael@clojurewerkz.org>2018-12-05 15:04:39 +0300
commit1111ceeb7a0c85b25dd2e433435feb1d107fc811 (patch)
treeb4b3a0aff80ee3d1049141a3ac0c15335d8cd0a0 /test
parent0962eee105023d4e737ed76907df9ced8707da8b (diff)
parente0ad5b09f8aae9c84822018716d0ef32bd9cccce (diff)
downloadrabbitmq-server-git-1111ceeb7a0c85b25dd2e433435feb1d107fc811.tar.gz
Merge branch 'master' into ranch_proxy_header
Diffstat (limited to 'test')
-rw-r--r--test/quorum_queue_SUITE.erl190
-rw-r--r--test/rabbit_fifo_SUITE.erl14
2 files changed, 198 insertions, 6 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 26357c3fee..61fd7a1b2c 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -48,6 +48,8 @@ groups() ->
++ all_tests()},
{cluster_size_3, [], [
declare_during_node_down,
+ simple_confirm_availability_on_leader_change,
+ confirm_availability_on_leader_change,
recover_from_single_failure,
recover_from_multiple_failures,
leadership_takeover,
@@ -113,7 +115,9 @@ all_tests() ->
cancel_sync_queue,
basic_recover,
idempotent_recover,
- vhost_with_quorum_queue_is_deleted
+ vhost_with_quorum_queue_is_deleted,
+ consume_redelivery_count,
+ subscribe_redelivery_count
].
%% -------------------------------------------------------------------
@@ -573,6 +577,19 @@ publish(Config) ->
wait_for_messages_ready(Servers, Name, 1),
wait_for_messages_pending_ack(Servers, Name, 0).
+publish_confirm(Ch, QName) ->
+ publish(Ch, QName),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ ct:pal("waiting for confirms from ~s", [QName]),
+ ok = receive
+ #'basic.ack'{} -> ok;
+ #'basic.nack'{} -> fail
+ after 2500 ->
+ exit(confirm_timeout)
+ end,
+ ct:pal("CONFIRMED! ~s", [QName]),
+ ok.
+
ra_name(Q) ->
binary_to_atom(<<"%2F_", Q/binary>>, utf8).
@@ -1547,6 +1564,82 @@ declare_during_node_down(Config) ->
wait_for_messages_ready(Servers, RaName, 1),
ok.
+simple_confirm_availability_on_leader_change(Config) ->
+ [Node1, Node2, _Node3] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ %% declare a queue on node2 - this _should_ host the leader on node 2
+ DCh = rabbit_ct_client_helpers:open_channel(Config, Node2),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(DCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ erlang:process_flag(trap_exit, true),
+ %% open a channel to another node
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ publish_confirm(Ch, QQ),
+
+ %% stop the node hosting the leader
+ stop_node(Config, Node2),
+ %% this should not fail as the channel should detect the new leader and
+ %% resend to that
+ publish_confirm(Ch, QQ),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
+ ok.
+
+confirm_availability_on_leader_change(Config) ->
+ [Node1, Node2, _Node3] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ %% declare a queue on node2 - this _should_ host the leader on node 2
+ DCh = rabbit_ct_client_helpers:open_channel(Config, Node2),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(DCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ erlang:process_flag(trap_exit, true),
+ Pid = spawn_link(fun () ->
+ %% open a channel to another node
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ ConfirmLoop = fun Loop() ->
+ publish_confirm(Ch, QQ),
+ receive {done, P} ->
+ P ! done,
+ ok
+ after 0 -> Loop() end
+ end,
+ ConfirmLoop()
+ end),
+
+ timer:sleep(500),
+ %% stop the node hosting the leader
+ stop_node(Config, Node2),
+ %% this should not fail as the channel should detect the new leader and
+ %% resend to that
+ timer:sleep(500),
+ Pid ! {done, self()},
+ receive
+ done -> ok;
+ {'EXIT', Pid, Err} ->
+ exit(Err)
+ after 5500 ->
+ flush(100),
+ exit(bah)
+ end,
+ ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
+ ok.
+
+flush(T) ->
+ receive X ->
+ ct:pal("flushed ~w", [X]),
+ flush(T)
+ after T ->
+ ok
+ end.
+
+
add_member_not_running(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1820,6 +1913,99 @@ basic_recover(Config) ->
amqp_channel:cast(Ch, #'basic.recover'{requeue = true}),
wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 0).
+
+subscribe_redelivery_count(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+
+ DTag = <<"x-delivery-count">>,
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false},
+ #amqp_msg{props = #'P_basic'{headers = H0}}} ->
+ ?assertMatch({DTag, _, 0}, rabbit_basic:header(DTag, H0)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true})
+ end,
+
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H1}}} ->
+ ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
+ multiple = false,
+ requeue = true})
+ end,
+
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag2,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H2}}} ->
+ ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2,
+ multiple = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+consume_redelivery_count(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+
+ DTag = <<"x-delivery-count">>,
+
+ {#'basic.get_ok'{delivery_tag = DeliveryTag,
+ redelivered = false},
+ #amqp_msg{props = #'P_basic'{headers = H0}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = false}),
+ ?assertMatch({DTag, _, 0}, rabbit_basic:header(DTag, H0)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+
+ {#'basic.get_ok'{delivery_tag = DeliveryTag1,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H1}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = false}),
+ ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
+ multiple = false,
+ requeue = true}),
+
+ {#'basic.get_ok'{delivery_tag = DeliveryTag2,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H2}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = false}),
+ ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2,
+ multiple = false,
+ requeue = true}).
+
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->
@@ -1871,7 +2057,7 @@ filter_queues(Expected, Got) ->
end, Got).
publish(Ch, Queue) ->
- ok = amqp_channel:call(Ch,
+ ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = Queue},
#amqp_msg{props = #'P_basic'{delivery_mode = 2},
payload = <<"msg">>}).
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index a2e22afc2e..56608e9af3 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -99,8 +99,13 @@ basics(Config) ->
_ = ra:stop_server(ServerId),
_ = ra:restart_server(ServerId),
- % give time to become leader
- timer:sleep(500),
+ %% wait for leader change to notice server is up again
+ receive
+ {ra_event, _, {machine, leader_change}} -> ok
+ after 5000 ->
+ exit(leader_change_timeout)
+ end,
+
{ok, FState6} = rabbit_fifo_client:enqueue(two, FState5b),
% process applied event
FState6b = process_ra_event(FState6, 250),
@@ -523,8 +528,9 @@ conf(ClusterName, UId, ServerId, _, Peers) ->
process_ra_event(State, Wait) ->
receive
{ra_event, From, Evt} ->
- % ct:pal("processed ra event ~p~n", [Evt]),
- {internal, _, _, S} = rabbit_fifo_client:handle_ra_event(From, Evt, State),
+ ct:pal("processed ra event ~p~n", [Evt]),
+ {internal, _, _, S} =
+ rabbit_fifo_client:handle_ra_event(From, Evt, State),
S
after Wait ->
exit(ra_event_timeout)