summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2018-11-28 13:25:20 +0000
committerkjnilsson <knilsson@pivotal.io>2018-12-03 09:28:44 +0000
commit697d23a473fff3097d50a6398bb865ec17ea2a89 (patch)
tree0e1d539b751e320bb23aa444d579800e17f9edd7
parentc318bb9d8685e66b9dbbaa2171b4a8ae3c95290b (diff)
downloadrabbitmq-server-git-697d23a473fff3097d50a6398bb865ec17ea2a89.tar.gz
Ensure delivery doesn't happen when consumers are suspected down
Testcases for unack messages returned to queue on noconnection [#161679638]
-rw-r--r--src/rabbit_fifo.erl23
-rw-r--r--test/quorum_queue_SUITE.erl169
2 files changed, 184 insertions, 8 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 9f24a677b0..38c86cd3ba 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -395,7 +395,7 @@ apply(_, {down, ConsumerPid, noconnection},
[{monitor, node, Node} | Effects0]
end,
{State#state{consumers = Cons, enqueuers = Enqs}, Effects, ok};
-apply(_, {down, Pid, _Info}, Effects0,
+apply(_, {down, Pid, _Info} = D, Effects0,
#state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
@@ -417,7 +417,8 @@ apply(_, {down, Pid, _Info}, Effects0,
checkout(State2, Effects1);
apply(_, {nodeup, Node}, Effects0,
#state{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+ enqueuers = Enqs0,
+ service_queue = SQ0} = State0) ->
%% A node we are monitoring has come back.
%% If we have suspected any processes of being
%% down we should now re-issue the monitors for them to detect if they're
@@ -433,8 +434,22 @@ apply(_, {nodeup, Node}, Effects0,
(_, _, Acc) -> Acc
end, [], Enqs0),
Monitors = [{monitor, process, P} || P <- Cons ++ Enqs],
+ Enqs1 = maps:map(fun(P, E) when node(P) =:= Node ->
+ E#enqueuer{suspected_down = false};
+ (_, E) -> E
+ end, Enqs0),
+ {Cons1, SQ, Effects} =
+ maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
+ when node(P) =:= Node ->
+ update_or_remove_sub(
+ ConsumerId, C#consumer{suspected_down = false},
+ CAcc, SQAcc, EAcc);
+ (_, _, Acc) ->
+ Acc
+ end, {Cons0, SQ0, Effects0}, Cons0),
% TODO: avoid list concat
- {State0, Monitors ++ Effects0, ok};
+ checkout(State0#state{consumers = Cons1, enqueuers = Enqs1,
+ service_queue = SQ}, Monitors ++ Effects);
apply(_, {nodedown, _Node}, Effects, State) ->
{State, Effects, ok}.
@@ -879,6 +894,8 @@ checkout_one(#state{service_queue = SQ0,
%% can happen when draining
%% recurse without consumer on queue
checkout_one(InitState#state{service_queue = SQ1});
+ {ok, #consumer{suspected_down = true}} ->
+ checkout_one(InitState#state{service_queue = SQ1});
{ok, #consumer{checked_out = Checked0,
next_msg_id = Next,
credit = Credit,
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 19f3a66a1d..5d70a9b61c 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -54,12 +54,16 @@ groups() ->
delete_declare,
metrics_cleanup_on_leadership_takeover,
metrics_cleanup_on_leader_crash,
- consume_in_minority
- ]},
+ consume_in_minority]},
{cluster_size_5, [], [start_queue,
start_queue_concurrent,
quorum_cluster_size_3,
quorum_cluster_size_7
+ ]},
+ {clustered_with_partitions, [], [
+ reconnect_consumer_and_publish,
+ reconnect_consumer_and_wait,
+ reconnect_consumer_and_wait_channel_down
]}
]}
].
@@ -117,7 +121,9 @@ all_tests() ->
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
- rabbit_ct_helpers:run_setup_steps(Config).
+ rabbit_ct_helpers:run_setup_steps(
+ Config,
+ [fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1]).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
@@ -126,6 +132,8 @@ init_per_group(clustered, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]);
init_per_group(unclustered, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]);
+init_per_group(clustered_with_partitions, Config) ->
+ Config;
init_per_group(Group, Config) ->
ClusterSize = case Group of
single_node -> 1;
@@ -157,10 +165,28 @@ end_per_group(clustered, Config) ->
Config;
end_per_group(unclustered, Config) ->
Config;
+end_per_group(clustered_with_partitions, Config) ->
+ Config;
end_per_group(_, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).
+init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish;
+ Testcase == reconnect_consumer_and_wait;
+ Testcase == reconnect_consumer_and_wait_channel_down ->
+ Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
+ Q = rabbit_data_coercion:to_binary(Testcase),
+ Config2 = rabbit_ct_helpers:set_config(Config1,
+ [{rmq_nodes_count, 3},
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base},
+ {queue_name, Q}
+ ]),
+ rabbit_ct_helpers:run_steps(Config2,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++
+ [fun rabbit_ct_broker_helpers:enable_dist_proxy/1,
+ fun rabbit_ct_broker_helpers:cluster_nodes/1]);
init_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
@@ -168,12 +194,18 @@ init_per_testcase(Testcase, Config) ->
Config2 = rabbit_ct_helpers:set_config(Config1,
[{queue_name, Q}
]),
- rabbit_ct_helpers:run_steps(Config2,
- rabbit_ct_client_helpers:setup_steps()).
+ rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()).
merge_app_env(Config) ->
rabbit_ct_helpers:merge_app_env(Config, {rabbit, [{core_metrics_gc_interval, 100}]}).
+end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish;
+ Testcase == reconnect_consumer_and_wait;
+ Testcase == reconnect_consumer_and_wait_channel_down ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase);
end_per_testcase(Testcase, Config) ->
catch delete_queues(),
Config1 = rabbit_ct_helpers:run_steps(
@@ -1587,6 +1619,7 @@ delete_member(Config) ->
rpc:call(Server, rabbit_quorum_queue, delete_member,
[<<"/">>, QQ, Server])).
+
cleanup_data_dir(Config) ->
%% This test is slow, but also checks that we handle properly errors when
%% trying to delete a queue in minority. A case clause there had gone
@@ -1615,6 +1648,132 @@ cleanup_data_dir(Config) ->
[])),
?assert(not filelib:is_dir(DataDir)).
+reconnect_consumer_and_publish(Config) ->
+ [Server | _] = Servers =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
+ [F1, F2] = lists:delete(Leader, Servers),
+ ChF = rabbit_ct_client_helpers:open_channel(Config, F1),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(ChF, QQ, false),
+ receive
+ {#'basic.deliver'{redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1)
+ end,
+ Up = [Leader, F2],
+ rabbit_ct_broker_helpers:block_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:block_traffic_between(F1, F2),
+ wait_for_messages_ready(Up, RaName, 1),
+ wait_for_messages_pending_ack(Up, RaName, 0),
+ wait_for_messages_ready([F1], RaName, 0),
+ wait_for_messages_pending_ack([F1], RaName, 1),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, F2),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 2),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1)
+ end,
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag2,
+ redelivered = true}, _} ->
+ amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag2,
+ multiple = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+reconnect_consumer_and_wait(Config) ->
+ [Server | _] = Servers =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
+ [F1, F2] = lists:delete(Leader, Servers),
+ ChF = rabbit_ct_client_helpers:open_channel(Config, F1),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(ChF, QQ, false),
+ receive
+ {#'basic.deliver'{redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1)
+ end,
+ Up = [Leader, F2],
+ rabbit_ct_broker_helpers:block_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:block_traffic_between(F1, F2),
+ wait_for_messages_ready(Up, RaName, 1),
+ wait_for_messages_pending_ack(Up, RaName, 0),
+ wait_for_messages_ready([F1], RaName, 0),
+ wait_for_messages_pending_ack([F1], RaName, 1),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, F2),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = true}, _} ->
+ amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+reconnect_consumer_and_wait_channel_down(Config) ->
+ [Server | _] = Servers =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
+ [F1, F2] = lists:delete(Leader, Servers),
+ ChF = rabbit_ct_client_helpers:open_channel(Config, F1),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(ChF, QQ, false),
+ receive
+ {#'basic.deliver'{redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1)
+ end,
+ Up = [Leader, F2],
+ rabbit_ct_broker_helpers:block_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:block_traffic_between(F1, F2),
+ wait_for_messages_ready(Up, RaName, 1),
+ wait_for_messages_pending_ack(Up, RaName, 0),
+ wait_for_messages_ready([F1], RaName, 0),
+ wait_for_messages_pending_ack([F1], RaName, 1),
+ rabbit_ct_client_helpers:close_channel(ChF),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, F2),
+ %% Let's give it a few seconds to ensure it doesn't attempt to
+ %% deliver to the down channel - it shouldn't be monitored
+ %% at this time!
+ timer:sleep(5000),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
basic_recover(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),