diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2018-11-28 13:25:20 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2018-12-03 09:28:44 +0000 |
| commit | 697d23a473fff3097d50a6398bb865ec17ea2a89 (patch) | |
| tree | 0e1d539b751e320bb23aa444d579800e17f9edd7 | |
| parent | c318bb9d8685e66b9dbbaa2171b4a8ae3c95290b (diff) | |
| download | rabbitmq-server-git-697d23a473fff3097d50a6398bb865ec17ea2a89.tar.gz | |
Ensure delivery doesn't happen when consumers are suspected down
Testcases for unack messages returned to queue on noconnection
[#161679638]
| -rw-r--r-- | src/rabbit_fifo.erl | 23 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 169 |
2 files changed, 184 insertions, 8 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 9f24a677b0..38c86cd3ba 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -395,7 +395,7 @@ apply(_, {down, ConsumerPid, noconnection}, [{monitor, node, Node} | Effects0] end, {State#state{consumers = Cons, enqueuers = Enqs}, Effects, ok}; -apply(_, {down, Pid, _Info}, Effects0, +apply(_, {down, Pid, _Info} = D, Effects0, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages @@ -417,7 +417,8 @@ apply(_, {down, Pid, _Info}, Effects0, checkout(State2, Effects1); apply(_, {nodeup, Node}, Effects0, #state{consumers = Cons0, - enqueuers = Enqs0} = State0) -> + enqueuers = Enqs0, + service_queue = SQ0} = State0) -> %% A node we are monitoring has come back. %% If we have suspected any processes of being %% down we should now re-issue the monitors for them to detect if they're @@ -433,8 +434,22 @@ apply(_, {nodeup, Node}, Effects0, (_, _, Acc) -> Acc end, [], Enqs0), Monitors = [{monitor, process, P} || P <- Cons ++ Enqs], + Enqs1 = maps:map(fun(P, E) when node(P) =:= Node -> + E#enqueuer{suspected_down = false}; + (_, E) -> E + end, Enqs0), + {Cons1, SQ, Effects} = + maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) + when node(P) =:= Node -> + update_or_remove_sub( + ConsumerId, C#consumer{suspected_down = false}, + CAcc, SQAcc, EAcc); + (_, _, Acc) -> + Acc + end, {Cons0, SQ0, Effects0}, Cons0), % TODO: avoid list concat - {State0, Monitors ++ Effects0, ok}; + checkout(State0#state{consumers = Cons1, enqueuers = Enqs1, + service_queue = SQ}, Monitors ++ Effects); apply(_, {nodedown, _Node}, Effects, State) -> {State, Effects, ok}. @@ -879,6 +894,8 @@ checkout_one(#state{service_queue = SQ0, %% can happen when draining %% recurse without consumer on queue checkout_one(InitState#state{service_queue = SQ1}); + {ok, #consumer{suspected_down = true}} -> + checkout_one(InitState#state{service_queue = SQ1}); {ok, #consumer{checked_out = Checked0, next_msg_id = Next, credit = Credit, diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 19f3a66a1d..5d70a9b61c 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -54,12 +54,16 @@ groups() -> delete_declare, metrics_cleanup_on_leadership_takeover, metrics_cleanup_on_leader_crash, - consume_in_minority - ]}, + consume_in_minority]}, {cluster_size_5, [], [start_queue, start_queue_concurrent, quorum_cluster_size_3, quorum_cluster_size_7 + ]}, + {clustered_with_partitions, [], [ + reconnect_consumer_and_publish, + reconnect_consumer_and_wait, + reconnect_consumer_and_wait_channel_down ]} ]} ]. @@ -117,7 +121,9 @@ all_tests() -> init_per_suite(Config) -> rabbit_ct_helpers:log_environment(), - rabbit_ct_helpers:run_setup_steps(Config). + rabbit_ct_helpers:run_setup_steps( + Config, + [fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1]). end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). @@ -126,6 +132,8 @@ init_per_group(clustered, Config) -> rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]); init_per_group(unclustered, Config) -> rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]); +init_per_group(clustered_with_partitions, Config) -> + Config; init_per_group(Group, Config) -> ClusterSize = case Group of single_node -> 1; @@ -157,10 +165,28 @@ end_per_group(clustered, Config) -> Config; end_per_group(unclustered, Config) -> Config; +end_per_group(clustered_with_partitions, Config) -> + Config; end_per_group(_, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_broker_helpers:teardown_steps()). +init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish; + Testcase == reconnect_consumer_and_wait; + Testcase == reconnect_consumer_and_wait_channel_down -> + Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), + Q = rabbit_data_coercion:to_binary(Testcase), + Config2 = rabbit_ct_helpers:set_config(Config1, + [{rmq_nodes_count, 3}, + {rmq_nodename_suffix, Testcase}, + {tcp_ports_base}, + {queue_name, Q} + ]), + rabbit_ct_helpers:run_steps(Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ + [fun rabbit_ct_broker_helpers:enable_dist_proxy/1, + fun rabbit_ct_broker_helpers:cluster_nodes/1]); init_per_testcase(Testcase, Config) -> Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), @@ -168,12 +194,18 @@ init_per_testcase(Testcase, Config) -> Config2 = rabbit_ct_helpers:set_config(Config1, [{queue_name, Q} ]), - rabbit_ct_helpers:run_steps(Config2, - rabbit_ct_client_helpers:setup_steps()). + rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()). merge_app_env(Config) -> rabbit_ct_helpers:merge_app_env(Config, {rabbit, [{core_metrics_gc_interval, 100}]}). +end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish; + Testcase == reconnect_consumer_and_wait; + Testcase == reconnect_consumer_and_wait_channel_down -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase); end_per_testcase(Testcase, Config) -> catch delete_queues(), Config1 = rabbit_ct_helpers:run_steps( @@ -1587,6 +1619,7 @@ delete_member(Config) -> rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, Server])). + cleanup_data_dir(Config) -> %% This test is slow, but also checks that we handle properly errors when %% trying to delete a queue in minority. A case clause there had gone @@ -1615,6 +1648,132 @@ cleanup_data_dir(Config) -> [])), ?assert(not filelib:is_dir(DataDir)). +reconnect_consumer_and_publish(Config) -> + [Server | _] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), + [F1, F2] = lists:delete(Leader, Servers), + ChF = rabbit_ct_client_helpers:open_channel(Config, F1), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(ChF, QQ, false), + receive + {#'basic.deliver'{redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + Up = [Leader, F2], + rabbit_ct_broker_helpers:block_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:block_traffic_between(F1, F2), + wait_for_messages_ready(Up, RaName, 1), + wait_for_messages_pending_ack(Up, RaName, 0), + wait_for_messages_ready([F1], RaName, 0), + wait_for_messages_pending_ack([F1], RaName, 1), + rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:allow_traffic_between(F1, F2), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 2), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag2, + redelivered = true}, _} -> + amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag2, + multiple = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end. + +reconnect_consumer_and_wait(Config) -> + [Server | _] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), + [F1, F2] = lists:delete(Leader, Servers), + ChF = rabbit_ct_client_helpers:open_channel(Config, F1), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(ChF, QQ, false), + receive + {#'basic.deliver'{redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + Up = [Leader, F2], + rabbit_ct_broker_helpers:block_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:block_traffic_between(F1, F2), + wait_for_messages_ready(Up, RaName, 1), + wait_for_messages_pending_ack(Up, RaName, 0), + wait_for_messages_ready([F1], RaName, 0), + wait_for_messages_pending_ack([F1], RaName, 1), + rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:allow_traffic_between(F1, F2), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = true}, _} -> + amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end. + +reconnect_consumer_and_wait_channel_down(Config) -> + [Server | _] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), + [F1, F2] = lists:delete(Leader, Servers), + ChF = rabbit_ct_client_helpers:open_channel(Config, F1), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(ChF, QQ, false), + receive + {#'basic.deliver'{redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + Up = [Leader, F2], + rabbit_ct_broker_helpers:block_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:block_traffic_between(F1, F2), + wait_for_messages_ready(Up, RaName, 1), + wait_for_messages_pending_ack(Up, RaName, 0), + wait_for_messages_ready([F1], RaName, 0), + wait_for_messages_pending_ack([F1], RaName, 1), + rabbit_ct_client_helpers:close_channel(ChF), + rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:allow_traffic_between(F1, F2), + %% Let's give it a few seconds to ensure it doesn't attempt to + %% deliver to the down channel - it shouldn't be monitored + %% at this time! + timer:sleep(5000), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0). + basic_recover(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
