diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2018-12-04 12:31:50 +0000 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2018-12-04 12:31:50 +0000 |
| commit | 36e5a0fa2164b810ee11edd5a2d078083c4d0575 (patch) | |
| tree | d2118f8f9925fdc1ac88f0577d73ab964e0f6c18 | |
| parent | adee8467f4e925ae472b17cf2fa0a0df0f2978ca (diff) | |
| parent | 812706707f300b735f22a56b7ff713cd20f4c7b9 (diff) | |
| download | rabbitmq-server-git-36e5a0fa2164b810ee11edd5a2d078083c4d0575.tar.gz | |
Merge branch 'master' into dialyze-qq
| -rw-r--r-- | rabbitmq-components.mk | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 79 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_vm.erl | 16 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 213 |
7 files changed, 307 insertions, 50 deletions
diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 709d155040..5307809c8b 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -110,12 +110,12 @@ dep_rabbitmq_public_umbrella = git_rmq rabbitmq-public-umbrella $(curre # all projects use the same versions. It avoids conflicts and makes it # possible to work with rabbitmq-public-umbrella. -dep_cowboy = hex 2.6.0 +dep_cowboy = hex 2.6.1 dep_cowlib = hex 2.7.0 dep_jsx = hex 2.9.0 dep_lager = hex 3.6.5 dep_ra = git https://github.com/rabbitmq/ra.git master -dep_ranch = hex 1.7.0 +dep_ranch = hex 1.7.1 dep_ranch_proxy_protocol = hex 2.1.1 dep_recon = hex 2.3.6 diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5127280871..f4876356ee 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -913,8 +913,11 @@ list_local(VHostPath) -> [ Q || #amqqueue{state = State, pid = QPid} = Q <- list(VHostPath), State =/= crashed, is_local_to_node(QPid, node()) ]. -notify_policy_changed(#amqqueue{pid = QPid}) -> - gen_server2:cast(QPid, policy_changed). +notify_policy_changed(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) -> + gen_server2:cast(QPid, policy_changed); +notify_policy_changed(#amqqueue{pid = QPid, + name = QName}) when ?IS_QUORUM(QPid) -> + rabbit_quorum_queue:policy_changed(QName, QPid). consumers(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}). diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 79d4a3effc..c12a6ec464 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -207,19 +207,19 @@ -spec init(config()) -> {state(), ra_machine:effects()}. init(#{name := Name} = Conf) -> + update_state(Conf, #state{name = Name}). + +update_state(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), CCH = maps:get(cancel_consumer_handler, Conf, undefined), BLH = maps:get(become_leader_handler, Conf, undefined), MH = maps:get(metrics_handler, Conf, undefined), SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL), - #state{name = Name, - dead_letter_handler = DLH, - cancel_consumer_handler = CCH, - become_leader_handler = BLH, - metrics_handler = MH, - shadow_copy_interval = SHI}. - - + State#state{dead_letter_handler = DLH, + cancel_consumer_handler = CCH, + become_leader_handler = BLH, + metrics_handler = MH, + shadow_copy_interval = SHI}. % msg_ids are scoped per consumer % ra_indexes holds all raft indexes for enqueues currently on queue @@ -374,10 +374,16 @@ apply(_, {down, ConsumerPid, noconnection}, Node = node(ConsumerPid), % mark all consumers and enqueuers as suspect % and monitor the node - Cons = maps:map(fun({_, P}, C) when node(P) =:= Node -> - C#consumer{suspected_down = true}; - (_, C) -> C - end, Cons0), + {Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C, + {Co, St0}) when node(P) =:= Node -> + St = return_all(St0, Checked0), + {maps:put(K, C#consumer{suspected_down = true, + checked_out = #{}}, + Co), + St}; + (K, C, {Co, St}) -> + {maps:put(K, C, Co), St} + end, {#{}, State0}, Cons0), Enqs = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{suspected_down = true}; (_, E) -> E @@ -388,7 +394,7 @@ apply(_, {down, ConsumerPid, noconnection}, _ -> [{monitor, node, Node} | Effects0] end, - {State0#state{consumers = Cons, enqueuers = Enqs}, Effects, ok}; + {State#state{consumers = Cons, enqueuers = Enqs}, Effects, ok}; apply(_, {down, Pid, _Info}, Effects0, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> @@ -411,7 +417,8 @@ apply(_, {down, Pid, _Info}, Effects0, checkout(State2, Effects1); apply(_, {nodeup, Node}, Effects0, #state{consumers = Cons0, - enqueuers = Enqs0} = State0) -> + enqueuers = Enqs0, + service_queue = SQ0} = State0) -> %% A node we are monitoring has come back. %% If we have suspected any processes of being %% down we should now re-issue the monitors for them to detect if they're @@ -427,10 +434,26 @@ apply(_, {nodeup, Node}, Effects0, (_, _, Acc) -> Acc end, [], Enqs0), Monitors = [{monitor, process, P} || P <- Cons ++ Enqs], + Enqs1 = maps:map(fun(P, E) when node(P) =:= Node -> + E#enqueuer{suspected_down = false}; + (_, E) -> E + end, Enqs0), + {Cons1, SQ, Effects} = + maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) + when node(P) =:= Node -> + update_or_remove_sub( + ConsumerId, C#consumer{suspected_down = false}, + CAcc, SQAcc, EAcc); + (_, _, Acc) -> + Acc + end, {Cons0, SQ0, Effects0}, Cons0), % TODO: avoid list concat - {State0, Monitors ++ Effects0, ok}; + checkout(State0#state{consumers = Cons1, enqueuers = Enqs1, + service_queue = SQ}, Monitors ++ Effects); apply(_, {nodedown, _Node}, Effects, State) -> - {State, Effects, ok}. + {State, Effects, ok}; +apply(_, {update_state, Conf}, Effects, State) -> + {update_state(Conf, State), Effects, ok}. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #state{consumers = Custs, @@ -583,9 +606,7 @@ cancel_consumer(ConsumerId, {Effects0, #state{consumers = C0, name = Name} = S0}) -> case maps:take(ConsumerId, C0) of {#consumer{checked_out = Checked0}, Cons} -> - S = maps:fold(fun (_, {MsgNum, Msg}, S) -> - return_one(MsgNum, Msg, S) - end, S0, Checked0), + S = return_all(S0, Checked0), Effects = cancel_consumer_effects(ConsumerId, Name, S, Effects0), case maps:size(Cons) of 0 -> @@ -788,6 +809,10 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, State0#state{messages = maps:put(MsgNum, Msg, Messages), returns = queue:in(MsgNum, Returns)}. +return_all(State, Checked) -> + maps:fold(fun (_, {MsgNum, Msg}, S) -> + return_one(MsgNum, Msg, S) + end, State, Checked). checkout(State, Effects) -> checkout0(checkout_one(State), Effects, #{}). @@ -871,6 +896,8 @@ checkout_one(#state{service_queue = SQ0, %% can happen when draining %% recurse without consumer on queue checkout_one(InitState#state{service_queue = SQ1}); + {ok, #consumer{suspected_down = true}} -> + checkout_one(InitState#state{service_queue = SQ1}); {ok, #consumer{checked_out = Checked0, next_msg_id = Next, credit = Credit, @@ -1289,6 +1316,20 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() -> ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3), ok. +down_with_noconnection_returns_unack_test() -> + Pid = spawn(fun() -> ok end), + Cid = {<<"down_with_noconnect">>, Pid}, + {State0, _} = enq(1, 1, second, test_init(test)), + ?assertEqual(1, maps:size(State0#state.messages)), + ?assertEqual(0, queue:len(State0#state.returns)), + {State1, {_, _}} = deq(2, Cid, unsettled, State0), + ?assertEqual(0, maps:size(State1#state.messages)), + ?assertEqual(0, queue:len(State1#state.returns)), + {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, [], State1), + ?assertEqual(1, maps:size(State2a#state.messages)), + ?assertEqual(1, queue:len(State2a#state.returns)), + ok. + down_with_noproc_enqueuer_is_cleaned_up_test() -> State00 = test_init(test), Pid = spawn(fun() -> ok end), diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index c087e35fb2..c063ef9a17 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -21,7 +21,8 @@ handle_ra_event/3, untracked_enqueue/2, purge/1, - cluster_name/1 + cluster_name/1, + update_machine_state/2 ]). -include_lib("ra/include/ra.hrl"). @@ -375,6 +376,14 @@ purge(Node) -> cluster_name(#state{cluster_name = ClusterName}) -> ClusterName. +update_machine_state(Node, Conf) -> + case ra:process_command(Node, {update_state, Conf}) of + {ok, ok, _} -> + ok; + Err -> + Err + end. + %% @doc Handles incoming `ra_events'. Events carry both internal "bookeeping" %% events emitted by the `ra' leader as well as `rabbit_fifo' emitted events such %% as message deliveries. All ra events need to be handled by {@module} diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 70c3116c33..61c4858f40 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -34,6 +34,7 @@ -export([add_member/3]). -export([delete_member/3]). -export([requeue/3]). +-export([policy_changed/2]). -export([cleanup_data_dir/0]). -include_lib("rabbit_common/include/rabbit.hrl"). @@ -101,8 +102,9 @@ init_state({Name, _}, QName) -> {ok, #amqqueue{pid = Leader, quorum_nodes = Nodes0}} = rabbit_amqqueue:lookup(QName), %% Ensure the leader is listed first - Nodes = [Leader | lists:delete(Leader, [{Name, N} || N <- Nodes0])], - rabbit_fifo_client:init(qname_to_rname(QName), Nodes, SoftLimit, + Servers0 = [{Name, N} || N <- Nodes], + Servers = [Leader | lists:delete(Leader, Servers0)], + rabbit_fifo_client:init(qname_to_rname(QName), Servers, SoftLimit, fun() -> credit_flow:block(Name), ok end, fun() -> credit_flow:unblock(Name), ok end). @@ -150,12 +152,14 @@ declare(#amqqueue{name = QName, -ra_machine(Q = #amqqueue{name = QName}) -> - {module, rabbit_fifo, - #{dead_letter_handler => dlx_mfa(Q), - cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]}, - become_leader_handler => {?MODULE, become_leader, [QName]}, - metrics_handler => {?MODULE, update_metrics, [QName]}}}. +ra_machine(Q) -> + {module, rabbit_fifo, ra_machine_config(Q)}. + +ra_machine_config(Q = #amqqueue{name = QName}) -> + #{dead_letter_handler => dlx_mfa(Q), + cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]}, + become_leader_handler => {?MODULE, become_leader, [QName]}, + metrics_handler => {?MODULE, update_metrics, [QName]}}. cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) -> Node = node(ChPid), @@ -274,9 +278,10 @@ stop(VHost) -> delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = QNodes}, _IfUnused, _IfEmpty, ActingUser) -> %% TODO Quorum queue needs to support consumer tracking for IfUnused + Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, Msgs = quorum_messages(Name), _ = rabbit_amqqueue:internal_delete(QName, ActingUser), - case ra:delete_cluster([{Name, Node} || Node <- QNodes], 120000) of + case ra:delete_cluster([{Name, Node} || Node <- QNodes], Timeout) of {ok, {_, LeaderNode} = Leader} -> MRef = erlang:monitor(process, Leader), receive @@ -412,6 +417,10 @@ maybe_delete_data_dir(UId) -> ok end. +policy_changed(QName, Node) -> + {ok, Q} = rabbit_amqqueue:lookup(QName), + rabbit_fifo_client:update_machine_state(Node, ra_machine_config(Q)). + cluster_state(Name) -> case whereis(Name) of undefined -> down; diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 265bcb45e3..e495ab8677 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -35,7 +35,7 @@ memory() -> {Sums, _Other} = sum_processes( lists:append(All), distinguishers(), [memory]), - [Qs, QsSlave, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther, + [Qs, QsSlave, Qqs, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther, MsgIndexProc, MgmtDbProc, Plugins] = [aggregate(Names, Sums, memory, fun (X) -> X end) || Names <- distinguished_interesting_sups()], @@ -69,7 +69,7 @@ memory() -> OtherProc = Processes - ConnsReader - ConnsWriter - ConnsChannel - ConnsOther - - Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc, + - Qs - QsSlave - Qqs - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc, [ %% Connections @@ -81,6 +81,7 @@ memory() -> %% Queues {queue_procs, Qs}, {queue_slave_procs, QsSlave}, + {quorum_queue_procs, Qqs}, %% Processes {plugins, Plugins}, @@ -124,7 +125,7 @@ binary() -> sets:add_element({Ptr, Sz}, Acc0) end, Acc, Info) end, distinguishers(), [{binary, sets:new()}]), - [Other, Qs, QsSlave, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther, + [Other, Qs, QsSlave, Qqs, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther, MsgIndexProc, MgmtDbProc, Plugins] = [aggregate(Names, [{other, Rest} | Sums], binary, fun sum_binary/1) || Names <- [[other] | distinguished_interesting_sups()]], @@ -134,6 +135,7 @@ binary() -> {connection_other, ConnsOther}, {queue_procs, Qs}, {queue_slave_procs, QsSlave}, + {quorum_queue_procs, Qqs}, {plugins, Plugins}, {mgmt_db, MgmtDbProc}, {msg_index, MsgIndexProc}, @@ -173,11 +175,16 @@ bytes(Words) -> try end. interesting_sups() -> - [queue_sups(), conn_sups() | interesting_sups0()]. + [queue_sups(), quorum_sups(), conn_sups() | interesting_sups0()]. queue_sups() -> all_vhosts_children(rabbit_amqqueue_sup_sup). +quorum_sups() -> + %% TODO: in the future not all ra servers may be queues and we needs + %% some way to filter this + [ra_server_sup]. + msg_stores() -> all_vhosts_children(msg_store_transient) ++ @@ -229,6 +236,7 @@ distinguished_interesting_sups() -> [ with(queue_sups(), master), with(queue_sups(), slave), + quorum_sups(), with(conn_sups(), reader), with(conn_sups(), writer), with(conn_sups(), channel), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 531b645774..14a3650a87 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -54,12 +54,16 @@ groups() -> delete_declare, metrics_cleanup_on_leadership_takeover, metrics_cleanup_on_leader_crash, - consume_in_minority - ]}, + consume_in_minority]}, {cluster_size_5, [], [start_queue, start_queue_concurrent, quorum_cluster_size_3, quorum_cluster_size_7 + ]}, + {clustered_with_partitions, [], [ + reconnect_consumer_and_publish, + reconnect_consumer_and_wait, + reconnect_consumer_and_wait_channel_down ]} ]} ]. @@ -100,6 +104,7 @@ all_tests() -> dead_letter_to_classic_queue, dead_letter_to_quorum_queue, dead_letter_from_classic_to_quorum_queue, + dead_letter_policy, cleanup_queue_state_on_channel_after_publish, cleanup_queue_state_on_channel_after_subscribe, basic_cancel, @@ -119,7 +124,9 @@ all_tests() -> init_per_suite(Config) -> rabbit_ct_helpers:log_environment(), - rabbit_ct_helpers:run_setup_steps(Config). + rabbit_ct_helpers:run_setup_steps( + Config, + [fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1]). end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). @@ -128,6 +135,8 @@ init_per_group(clustered, Config) -> rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]); init_per_group(unclustered, Config) -> rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]); +init_per_group(clustered_with_partitions, Config) -> + rabbit_ct_helpers:set_config(Config, [{net_ticktime, 10}]); init_per_group(Group, Config) -> ClusterSize = case Group of single_node -> 1; @@ -139,7 +148,8 @@ init_per_group(Group, Config) -> [{rmq_nodes_count, ClusterSize}, {rmq_nodename_suffix, Group}, {tcp_ports_base}]), - Config2 = rabbit_ct_helpers:run_steps(Config1, + Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]), + Config2 = rabbit_ct_helpers:run_steps(Config1b, [fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps()), ok = rabbit_ct_broker_helpers:rpc( @@ -159,10 +169,28 @@ end_per_group(clustered, Config) -> Config; end_per_group(unclustered, Config) -> Config; +end_per_group(clustered_with_partitions, Config) -> + Config; end_per_group(_, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_broker_helpers:teardown_steps()). +init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish; + Testcase == reconnect_consumer_and_wait; + Testcase == reconnect_consumer_and_wait_channel_down -> + Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), + Q = rabbit_data_coercion:to_binary(Testcase), + Config2 = rabbit_ct_helpers:set_config(Config1, + [{rmq_nodes_count, 3}, + {rmq_nodename_suffix, Testcase}, + {tcp_ports_base}, + {queue_name, Q} + ]), + rabbit_ct_helpers:run_steps(Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ + [fun rabbit_ct_broker_helpers:enable_dist_proxy/1, + fun rabbit_ct_broker_helpers:cluster_nodes/1]); init_per_testcase(Testcase, Config) -> Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), @@ -170,12 +198,18 @@ init_per_testcase(Testcase, Config) -> Config2 = rabbit_ct_helpers:set_config(Config1, [{queue_name, Q} ]), - rabbit_ct_helpers:run_steps(Config2, - rabbit_ct_client_helpers:setup_steps()). + rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()). merge_app_env(Config) -> rabbit_ct_helpers:merge_app_env(Config, {rabbit, [{core_metrics_gc_interval, 100}]}). +end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish; + Testcase == reconnect_consumer_and_wait; + Testcase == reconnect_consumer_and_wait_channel_down -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase); end_per_testcase(Testcase, Config) -> catch delete_queues(), Config1 = rabbit_ct_helpers:run_steps( @@ -1066,22 +1100,47 @@ dead_letter_to_classic_queue(Config) -> {<<"x-dead-letter-routing-key">>, longstr, CQ} ])), ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])), - RaName = ra_name(QQ), - publish(Ch, QQ), + test_dead_lettering(true, Config, Ch, Servers, ra_name(QQ), QQ, CQ). + +test_dead_lettering(PolicySet, Config, Ch, Servers, RaName, Source, Destination) -> + publish(Ch, Source), wait_for_messages_ready(Servers, RaName, 1), wait_for_messages_pending_ack(Servers, RaName, 0), - wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]), - DeliveryTag = consume(Ch, QQ, false), + wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]), + DeliveryTag = consume(Ch, Source, false), wait_for_messages_ready(Servers, RaName, 0), wait_for_messages_pending_ack(Servers, RaName, 1), - wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = false}), wait_for_messages_ready(Servers, RaName, 0), wait_for_messages_pending_ack(Servers, RaName, 0), - wait_for_messages(Config, [[CQ, <<"1">>, <<"1">>, <<"0">>]]), - _ = consume(Ch, CQ, false). + case PolicySet of + true -> + wait_for_messages(Config, [[Destination, <<"1">>, <<"1">>, <<"0">>]]), + _ = consume(Ch, Destination, true); + false -> + wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]) + end. + +dead_letter_policy(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + CQ = <<"classic-dead_letter_policy">>, + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])), + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"dlx">>, <<"dead_letter.*">>, <<"queues">>, + [{<<"dead-letter-exchange">>, <<"">>}, + {<<"dead-letter-routing-key">>, CQ}]), + RaName = ra_name(QQ), + test_dead_lettering(true, Config, Ch, Servers, RaName, QQ, CQ), + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"dlx">>), + test_dead_lettering(false, Config, Ch, Servers, RaName, QQ, CQ). dead_letter_to_quorum_queue(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1589,6 +1648,7 @@ delete_member(Config) -> rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, Server])). + cleanup_data_dir(Config) -> %% This test is slow, but also checks that we handle properly errors when %% trying to delete a queue in minority. A case clause there had gone @@ -1610,6 +1670,7 @@ cleanup_data_dir(Config) -> ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _}, amqp_channel:call(Ch, #'queue.delete'{queue = QQ})), + catch amqp_channel:call(Ch, #'queue.delete'{queue = QQ}), ?assert(filelib:is_dir(DataDir)), ?assertEqual(ok, @@ -1617,6 +1678,132 @@ cleanup_data_dir(Config) -> [])), ?assert(not filelib:is_dir(DataDir)). +reconnect_consumer_and_publish(Config) -> + [Server | _] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), + [F1, F2] = lists:delete(Leader, Servers), + ChF = rabbit_ct_client_helpers:open_channel(Config, F1), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(ChF, QQ, false), + receive + {#'basic.deliver'{redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + Up = [Leader, F2], + rabbit_ct_broker_helpers:block_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:block_traffic_between(F1, F2), + wait_for_messages_ready(Up, RaName, 1), + wait_for_messages_pending_ack(Up, RaName, 0), + wait_for_messages_ready([F1], RaName, 0), + wait_for_messages_pending_ack([F1], RaName, 1), + rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:allow_traffic_between(F1, F2), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 2), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag2, + redelivered = true}, _} -> + amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag2, + multiple = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end. + +reconnect_consumer_and_wait(Config) -> + [Server | _] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), + [F1, F2] = lists:delete(Leader, Servers), + ChF = rabbit_ct_client_helpers:open_channel(Config, F1), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(ChF, QQ, false), + receive + {#'basic.deliver'{redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + Up = [Leader, F2], + rabbit_ct_broker_helpers:block_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:block_traffic_between(F1, F2), + wait_for_messages_ready(Up, RaName, 1), + wait_for_messages_pending_ack(Up, RaName, 0), + wait_for_messages_ready([F1], RaName, 0), + wait_for_messages_pending_ack([F1], RaName, 1), + rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:allow_traffic_between(F1, F2), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = true}, _} -> + amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end. + +reconnect_consumer_and_wait_channel_down(Config) -> + [Server | _] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), + [F1, F2] = lists:delete(Leader, Servers), + ChF = rabbit_ct_client_helpers:open_channel(Config, F1), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(ChF, QQ, false), + receive + {#'basic.deliver'{redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + Up = [Leader, F2], + rabbit_ct_broker_helpers:block_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:block_traffic_between(F1, F2), + wait_for_messages_ready(Up, RaName, 1), + wait_for_messages_pending_ack(Up, RaName, 0), + wait_for_messages_ready([F1], RaName, 0), + wait_for_messages_pending_ack([F1], RaName, 1), + rabbit_ct_client_helpers:close_channel(ChF), + rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:allow_traffic_between(F1, F2), + %% Let's give it a few seconds to ensure it doesn't attempt to + %% deliver to the down channel - it shouldn't be monitored + %% at this time! + timer:sleep(5000), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0). + basic_recover(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
