diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 19 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 40 |
5 files changed, 72 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7c4386ba57..1be5637437 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -894,8 +894,11 @@ list_local(VHostPath) -> [ Q || #amqqueue{state = State, pid = QPid} = Q <- list(VHostPath), State =/= crashed, is_local_to_node(QPid, node()) ]. -notify_policy_changed(#amqqueue{pid = QPid}) -> - gen_server2:cast(QPid, policy_changed). +notify_policy_changed(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) -> + gen_server2:cast(QPid, policy_changed); +notify_policy_changed(#amqqueue{pid = QPid, + name = QName}) when ?IS_QUORUM(QPid) -> + rabbit_quorum_queue:policy_changed(QName, QPid). consumers(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}). diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 3fceb93654..21f77fc872 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -207,19 +207,19 @@ -spec init(config()) -> {state(), ra_machine:effects()}. init(#{name := Name} = Conf) -> + update_state(Conf, #state{name = Name}). + +update_state(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), CCH = maps:get(cancel_consumer_handler, Conf, undefined), BLH = maps:get(become_leader_handler, Conf, undefined), MH = maps:get(metrics_handler, Conf, undefined), SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL), - #state{name = Name, - dead_letter_handler = DLH, - cancel_consumer_handler = CCH, - become_leader_handler = BLH, - metrics_handler = MH, - shadow_copy_interval = SHI}. - - + State#state{dead_letter_handler = DLH, + cancel_consumer_handler = CCH, + become_leader_handler = BLH, + metrics_handler = MH, + shadow_copy_interval = SHI}. % msg_ids are scoped per consumer % ra_indexes holds all raft indexes for enqueues currently on queue @@ -431,7 +431,9 @@ apply(_, {nodeup, Node}, Effects0, % TODO: avoid list concat {State0, Monitors ++ Effects0, ok}; apply(_, {nodedown, _Node}, Effects, State) -> - {State, Effects, ok}. + {State, Effects, ok}; +apply(_, {update_state, Conf}, Effects, State) -> + {update_state(Conf, State), Effects, ok}. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #state{consumers = Custs, diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index c087e35fb2..c063ef9a17 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -21,7 +21,8 @@ handle_ra_event/3, untracked_enqueue/2, purge/1, - cluster_name/1 + cluster_name/1, + update_machine_state/2 ]). -include_lib("ra/include/ra.hrl"). @@ -375,6 +376,14 @@ purge(Node) -> cluster_name(#state{cluster_name = ClusterName}) -> ClusterName. +update_machine_state(Node, Conf) -> + case ra:process_command(Node, {update_state, Conf}) of + {ok, ok, _} -> + ok; + Err -> + Err + end. + %% @doc Handles incoming `ra_events'. Events carry both internal "bookeeping" %% events emitted by the `ra' leader as well as `rabbit_fifo' emitted events such %% as message deliveries. All ra events need to be handled by {@module} diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 795465855b..acf002748f 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -34,6 +34,7 @@ -export([add_member/3]). -export([delete_member/3]). -export([requeue/3]). +-export([policy_changed/2]). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -147,12 +148,14 @@ declare(#amqqueue{name = QName, -ra_machine(Q = #amqqueue{name = QName}) -> - {module, rabbit_fifo, - #{dead_letter_handler => dlx_mfa(Q), - cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]}, - become_leader_handler => {?MODULE, become_leader, [QName]}, - metrics_handler => {?MODULE, update_metrics, [QName]}}}. +ra_machine(Q) -> + {module, rabbit_fifo, ra_machine_state(Q)}. + +ra_machine_state(Q = #amqqueue{name = QName}) -> + #{dead_letter_handler => dlx_mfa(Q), + cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]}, + become_leader_handler => {?MODULE, become_leader, [QName]}, + metrics_handler => {?MODULE, update_metrics, [QName]}}. cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) -> Node = node(ChPid), @@ -386,6 +389,10 @@ purge(Node) -> requeue(ConsumerTag, MsgIds, FState) -> rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, FState). +policy_changed(QName, Node) -> + {ok, Q} = rabbit_amqqueue:lookup(QName), + rabbit_fifo_client:update_machine_state(Node, ra_machine_state(Q)). + cluster_state(Name) -> case whereis(Name) of undefined -> down; diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index d983a9d396..4a175af6e2 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -99,6 +99,7 @@ all_tests() -> dead_letter_to_classic_queue, dead_letter_to_quorum_queue, dead_letter_from_classic_to_quorum_queue, + dead_letter_policy, cleanup_queue_state_on_channel_after_publish, cleanup_queue_state_on_channel_after_subscribe, basic_cancel, @@ -1063,22 +1064,47 @@ dead_letter_to_classic_queue(Config) -> {<<"x-dead-letter-routing-key">>, longstr, CQ} ])), ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])), - RaName = ra_name(QQ), - publish(Ch, QQ), + test_dead_lettering(true, Config, Ch, Servers, ra_name(QQ), QQ, CQ). + +test_dead_lettering(PolicySet, Config, Ch, Servers, RaName, Source, Destination) -> + publish(Ch, Source), wait_for_messages_ready(Servers, RaName, 1), wait_for_messages_pending_ack(Servers, RaName, 0), - wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]), - DeliveryTag = consume(Ch, QQ, false), + wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]), + DeliveryTag = consume(Ch, Source, false), wait_for_messages_ready(Servers, RaName, 0), wait_for_messages_pending_ack(Servers, RaName, 1), - wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = false}), wait_for_messages_ready(Servers, RaName, 0), wait_for_messages_pending_ack(Servers, RaName, 0), - wait_for_messages(Config, [[CQ, <<"1">>, <<"1">>, <<"0">>]]), - _ = consume(Ch, CQ, false). + case PolicySet of + true -> + wait_for_messages(Config, [[Destination, <<"1">>, <<"1">>, <<"0">>]]), + _ = consume(Ch, Destination, true); + false -> + wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]) + end. + +dead_letter_policy(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + CQ = <<"classic-dead_letter_policy">>, + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])), + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"dlx">>, <<"dead_letter.*">>, <<"queues">>, + [{<<"dead-letter-exchange">>, <<"">>}, + {<<"dead-letter-routing-key">>, CQ}]), + RaName = ra_name(QQ), + test_dead_lettering(true, Config, Ch, Servers, RaName, QQ, CQ), + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"dlx">>), + test_dead_lettering(false, Config, Ch, Servers, RaName, QQ, CQ). dead_letter_to_quorum_queue(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
