diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2018-11-07 19:14:50 +0000 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2018-11-07 19:14:50 +0000 |
| commit | b56db0e487c6e45aa3fed7a2cb11a9ecc042db9f (patch) | |
| tree | 7b750ae4dc729eb3c400e2fe6c824780bbadd7ee /src | |
| parent | 84fb288e45e440a170686dd8acfb0730c6911af8 (diff) | |
| download | rabbitmq-server-git-b56db0e487c6e45aa3fed7a2cb11a9ecc042db9f.tar.gz | |
Support dlx policy changes in quorum queues
[#161247406]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 19 |
4 files changed, 39 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7c4386ba57..1be5637437 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -894,8 +894,11 @@ list_local(VHostPath) -> [ Q || #amqqueue{state = State, pid = QPid} = Q <- list(VHostPath), State =/= crashed, is_local_to_node(QPid, node()) ]. -notify_policy_changed(#amqqueue{pid = QPid}) -> - gen_server2:cast(QPid, policy_changed). +notify_policy_changed(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) -> + gen_server2:cast(QPid, policy_changed); +notify_policy_changed(#amqqueue{pid = QPid, + name = QName}) when ?IS_QUORUM(QPid) -> + rabbit_quorum_queue:policy_changed(QName, QPid). consumers(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}). diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 3fceb93654..21f77fc872 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -207,19 +207,19 @@ -spec init(config()) -> {state(), ra_machine:effects()}. init(#{name := Name} = Conf) -> + update_state(Conf, #state{name = Name}). + +update_state(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), CCH = maps:get(cancel_consumer_handler, Conf, undefined), BLH = maps:get(become_leader_handler, Conf, undefined), MH = maps:get(metrics_handler, Conf, undefined), SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL), - #state{name = Name, - dead_letter_handler = DLH, - cancel_consumer_handler = CCH, - become_leader_handler = BLH, - metrics_handler = MH, - shadow_copy_interval = SHI}. - - + State#state{dead_letter_handler = DLH, + cancel_consumer_handler = CCH, + become_leader_handler = BLH, + metrics_handler = MH, + shadow_copy_interval = SHI}. % msg_ids are scoped per consumer % ra_indexes holds all raft indexes for enqueues currently on queue @@ -431,7 +431,9 @@ apply(_, {nodeup, Node}, Effects0, % TODO: avoid list concat {State0, Monitors ++ Effects0, ok}; apply(_, {nodedown, _Node}, Effects, State) -> - {State, Effects, ok}. + {State, Effects, ok}; +apply(_, {update_state, Conf}, Effects, State) -> + {update_state(Conf, State), Effects, ok}. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #state{consumers = Custs, diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index c087e35fb2..c063ef9a17 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -21,7 +21,8 @@ handle_ra_event/3, untracked_enqueue/2, purge/1, - cluster_name/1 + cluster_name/1, + update_machine_state/2 ]). -include_lib("ra/include/ra.hrl"). @@ -375,6 +376,14 @@ purge(Node) -> cluster_name(#state{cluster_name = ClusterName}) -> ClusterName. +update_machine_state(Node, Conf) -> + case ra:process_command(Node, {update_state, Conf}) of + {ok, ok, _} -> + ok; + Err -> + Err + end. + %% @doc Handles incoming `ra_events'. Events carry both internal "bookeeping" %% events emitted by the `ra' leader as well as `rabbit_fifo' emitted events such %% as message deliveries. All ra events need to be handled by {@module} diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 795465855b..acf002748f 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -34,6 +34,7 @@ -export([add_member/3]). -export([delete_member/3]). -export([requeue/3]). +-export([policy_changed/2]). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -147,12 +148,14 @@ declare(#amqqueue{name = QName, -ra_machine(Q = #amqqueue{name = QName}) -> - {module, rabbit_fifo, - #{dead_letter_handler => dlx_mfa(Q), - cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]}, - become_leader_handler => {?MODULE, become_leader, [QName]}, - metrics_handler => {?MODULE, update_metrics, [QName]}}}. +ra_machine(Q) -> + {module, rabbit_fifo, ra_machine_state(Q)}. + +ra_machine_state(Q = #amqqueue{name = QName}) -> + #{dead_letter_handler => dlx_mfa(Q), + cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]}, + become_leader_handler => {?MODULE, become_leader, [QName]}, + metrics_handler => {?MODULE, update_metrics, [QName]}}. cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) -> Node = node(ChPid), @@ -386,6 +389,10 @@ purge(Node) -> requeue(ConsumerTag, MsgIds, FState) -> rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, FState). +policy_changed(QName, Node) -> + {ok, Q} = rabbit_amqqueue:lookup(QName), + rabbit_fifo_client:update_machine_state(Node, ra_machine_state(Q)). + cluster_state(Name) -> case whereis(Name) of undefined -> down; |
