diff options
| author | Karl Nilsson <kjnilsson@gmail.com> | 2018-12-03 09:54:25 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-12-03 09:54:25 +0000 |
| commit | fc3ff9b0d71b50ebf63f7fb1fbc6c09b8d97dc7f (patch) | |
| tree | bb65c12a7b7adbd40fdf2554a5a43ad560ee3c62 | |
| parent | b247f2ab1e478ff71c1d63c49de3474148ba5019 (diff) | |
| parent | bedface2ffbc20d3b20a41f1f8232b43458b7dcb (diff) | |
| download | rabbitmq-server-git-fc3ff9b0d71b50ebf63f7fb1fbc6c09b8d97dc7f.tar.gz | |
Merge pull request #1771 from rabbitmq/return-unack-any-down-reason
Return delivered but unack messages to the queue for noconnection reason
| -rw-r--r-- | src/rabbit_fifo.erl | 59 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 169 |
2 files changed, 213 insertions, 15 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 79d4a3effc..bb2600dd73 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -374,10 +374,16 @@ apply(_, {down, ConsumerPid, noconnection}, Node = node(ConsumerPid), % mark all consumers and enqueuers as suspect % and monitor the node - Cons = maps:map(fun({_, P}, C) when node(P) =:= Node -> - C#consumer{suspected_down = true}; - (_, C) -> C - end, Cons0), + {Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C, + {Co, St0}) when node(P) =:= Node -> + St = return_all(St0, Checked0), + {maps:put(K, C#consumer{suspected_down = true, + checked_out = #{}}, + Co), + St}; + (K, C, {Co, St}) -> + {maps:put(K, C, Co), St} + end, {#{}, State0}, Cons0), Enqs = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{suspected_down = true}; (_, E) -> E @@ -388,7 +394,7 @@ apply(_, {down, ConsumerPid, noconnection}, _ -> [{monitor, node, Node} | Effects0] end, - {State0#state{consumers = Cons, enqueuers = Enqs}, Effects, ok}; + {State#state{consumers = Cons, enqueuers = Enqs}, Effects, ok}; apply(_, {down, Pid, _Info}, Effects0, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> @@ -411,7 +417,8 @@ apply(_, {down, Pid, _Info}, Effects0, checkout(State2, Effects1); apply(_, {nodeup, Node}, Effects0, #state{consumers = Cons0, - enqueuers = Enqs0} = State0) -> + enqueuers = Enqs0, + service_queue = SQ0} = State0) -> %% A node we are monitoring has come back. %% If we have suspected any processes of being %% down we should now re-issue the monitors for them to detect if they're @@ -427,8 +434,22 @@ apply(_, {nodeup, Node}, Effects0, (_, _, Acc) -> Acc end, [], Enqs0), Monitors = [{monitor, process, P} || P <- Cons ++ Enqs], + Enqs1 = maps:map(fun(P, E) when node(P) =:= Node -> + E#enqueuer{suspected_down = false}; + (_, E) -> E + end, Enqs0), + {Cons1, SQ, Effects} = + maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) + when node(P) =:= Node -> + update_or_remove_sub( + ConsumerId, C#consumer{suspected_down = false}, + CAcc, SQAcc, EAcc); + (_, _, Acc) -> + Acc + end, {Cons0, SQ0, Effects0}, Cons0), % TODO: avoid list concat - {State0, Monitors ++ Effects0, ok}; + checkout(State0#state{consumers = Cons1, enqueuers = Enqs1, + service_queue = SQ}, Monitors ++ Effects); apply(_, {nodedown, _Node}, Effects, State) -> {State, Effects, ok}. @@ -583,9 +604,7 @@ cancel_consumer(ConsumerId, {Effects0, #state{consumers = C0, name = Name} = S0}) -> case maps:take(ConsumerId, C0) of {#consumer{checked_out = Checked0}, Cons} -> - S = maps:fold(fun (_, {MsgNum, Msg}, S) -> - return_one(MsgNum, Msg, S) - end, S0, Checked0), + S = return_all(S0, Checked0), Effects = cancel_consumer_effects(ConsumerId, Name, S, Effects0), case maps:size(Cons) of 0 -> @@ -788,6 +807,10 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, State0#state{messages = maps:put(MsgNum, Msg, Messages), returns = queue:in(MsgNum, Returns)}. +return_all(State, Checked) -> + maps:fold(fun (_, {MsgNum, Msg}, S) -> + return_one(MsgNum, Msg, S) + end, State, Checked). checkout(State, Effects) -> checkout0(checkout_one(State), Effects, #{}). @@ -871,6 +894,8 @@ checkout_one(#state{service_queue = SQ0, %% can happen when draining %% recurse without consumer on queue checkout_one(InitState#state{service_queue = SQ1}); + {ok, #consumer{suspected_down = true}} -> + checkout_one(InitState#state{service_queue = SQ1}); {ok, #consumer{checked_out = Checked0, next_msg_id = Next, credit = Credit, @@ -1289,6 +1314,20 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() -> ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3), ok. +down_with_noconnection_returns_unack_test() -> + Pid = spawn(fun() -> ok end), + Cid = {<<"down_with_noconnect">>, Pid}, + {State0, _} = enq(1, 1, second, test_init(test)), + ?assertEqual(1, maps:size(State0#state.messages)), + ?assertEqual(0, queue:len(State0#state.returns)), + {State1, {_, _}} = deq(2, Cid, unsettled, State0), + ?assertEqual(0, maps:size(State1#state.messages)), + ?assertEqual(0, queue:len(State1#state.returns)), + {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, [], State1), + ?assertEqual(1, maps:size(State2a#state.messages)), + ?assertEqual(1, queue:len(State2a#state.returns)), + ok. + down_with_noproc_enqueuer_is_cleaned_up_test() -> State00 = test_init(test), Pid = spawn(fun() -> ok end), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 19f3a66a1d..5d70a9b61c 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -54,12 +54,16 @@ groups() -> delete_declare, metrics_cleanup_on_leadership_takeover, metrics_cleanup_on_leader_crash, - consume_in_minority - ]}, + consume_in_minority]}, {cluster_size_5, [], [start_queue, start_queue_concurrent, quorum_cluster_size_3, quorum_cluster_size_7 + ]}, + {clustered_with_partitions, [], [ + reconnect_consumer_and_publish, + reconnect_consumer_and_wait, + reconnect_consumer_and_wait_channel_down ]} ]} ]. @@ -117,7 +121,9 @@ all_tests() -> init_per_suite(Config) -> rabbit_ct_helpers:log_environment(), - rabbit_ct_helpers:run_setup_steps(Config). + rabbit_ct_helpers:run_setup_steps( + Config, + [fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1]). end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). @@ -126,6 +132,8 @@ init_per_group(clustered, Config) -> rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]); init_per_group(unclustered, Config) -> rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]); +init_per_group(clustered_with_partitions, Config) -> + Config; init_per_group(Group, Config) -> ClusterSize = case Group of single_node -> 1; @@ -157,10 +165,28 @@ end_per_group(clustered, Config) -> Config; end_per_group(unclustered, Config) -> Config; +end_per_group(clustered_with_partitions, Config) -> + Config; end_per_group(_, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_broker_helpers:teardown_steps()). +init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish; + Testcase == reconnect_consumer_and_wait; + Testcase == reconnect_consumer_and_wait_channel_down -> + Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), + Q = rabbit_data_coercion:to_binary(Testcase), + Config2 = rabbit_ct_helpers:set_config(Config1, + [{rmq_nodes_count, 3}, + {rmq_nodename_suffix, Testcase}, + {tcp_ports_base}, + {queue_name, Q} + ]), + rabbit_ct_helpers:run_steps(Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ + [fun rabbit_ct_broker_helpers:enable_dist_proxy/1, + fun rabbit_ct_broker_helpers:cluster_nodes/1]); init_per_testcase(Testcase, Config) -> Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), @@ -168,12 +194,18 @@ init_per_testcase(Testcase, Config) -> Config2 = rabbit_ct_helpers:set_config(Config1, [{queue_name, Q} ]), - rabbit_ct_helpers:run_steps(Config2, - rabbit_ct_client_helpers:setup_steps()). + rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()). merge_app_env(Config) -> rabbit_ct_helpers:merge_app_env(Config, {rabbit, [{core_metrics_gc_interval, 100}]}). +end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish; + Testcase == reconnect_consumer_and_wait; + Testcase == reconnect_consumer_and_wait_channel_down -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase); end_per_testcase(Testcase, Config) -> catch delete_queues(), Config1 = rabbit_ct_helpers:run_steps( @@ -1587,6 +1619,7 @@ delete_member(Config) -> rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, Server])). + cleanup_data_dir(Config) -> %% This test is slow, but also checks that we handle properly errors when %% trying to delete a queue in minority. A case clause there had gone @@ -1615,6 +1648,132 @@ cleanup_data_dir(Config) -> [])), ?assert(not filelib:is_dir(DataDir)). +reconnect_consumer_and_publish(Config) -> + [Server | _] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), + [F1, F2] = lists:delete(Leader, Servers), + ChF = rabbit_ct_client_helpers:open_channel(Config, F1), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(ChF, QQ, false), + receive + {#'basic.deliver'{redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + Up = [Leader, F2], + rabbit_ct_broker_helpers:block_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:block_traffic_between(F1, F2), + wait_for_messages_ready(Up, RaName, 1), + wait_for_messages_pending_ack(Up, RaName, 0), + wait_for_messages_ready([F1], RaName, 0), + wait_for_messages_pending_ack([F1], RaName, 1), + rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:allow_traffic_between(F1, F2), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 2), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag2, + redelivered = true}, _} -> + amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag2, + multiple = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end. + +reconnect_consumer_and_wait(Config) -> + [Server | _] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), + [F1, F2] = lists:delete(Leader, Servers), + ChF = rabbit_ct_client_helpers:open_channel(Config, F1), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(ChF, QQ, false), + receive + {#'basic.deliver'{redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + Up = [Leader, F2], + rabbit_ct_broker_helpers:block_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:block_traffic_between(F1, F2), + wait_for_messages_ready(Up, RaName, 1), + wait_for_messages_pending_ack(Up, RaName, 0), + wait_for_messages_ready([F1], RaName, 0), + wait_for_messages_pending_ack([F1], RaName, 1), + rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:allow_traffic_between(F1, F2), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = true}, _} -> + amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end. + +reconnect_consumer_and_wait_channel_down(Config) -> + [Server | _] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), + [F1, F2] = lists:delete(Leader, Servers), + ChF = rabbit_ct_client_helpers:open_channel(Config, F1), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(ChF, QQ, false), + receive + {#'basic.deliver'{redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + Up = [Leader, F2], + rabbit_ct_broker_helpers:block_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:block_traffic_between(F1, F2), + wait_for_messages_ready(Up, RaName, 1), + wait_for_messages_pending_ack(Up, RaName, 0), + wait_for_messages_ready([F1], RaName, 0), + wait_for_messages_pending_ack([F1], RaName, 1), + rabbit_ct_client_helpers:close_channel(ChF), + rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:allow_traffic_between(F1, F2), + %% Let's give it a few seconds to ensure it doesn't attempt to + %% deliver to the down channel - it shouldn't be monitored + %% at this time! + timer:sleep(5000), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0). + basic_recover(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
