summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2018-12-03 10:43:35 +0000
committerGitHub <noreply@github.com>2018-12-03 10:43:35 +0000
commitd675f6854356a330e6d6ab7f6c58376cd8971a91 (patch)
treee7326f7a8f2bdad15ab83fbb2945dbd7d912d3df /src
parent9b2ee1e47d005d31e60e501fb8b1b1c707c8eabe (diff)
parent612ed35994feb626e1fbc28c15bc09158dd5d19f (diff)
downloadrabbitmq-server-git-d675f6854356a330e6d6ab7f6c58376cd8971a91.tar.gz
Merge pull request #1764 from rabbitmq/qq-dlx-policy
Support dlx policy changes in quorum queues
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_fifo.erl20
-rw-r--r--src/rabbit_fifo_client.erl11
-rw-r--r--src/rabbit_quorum_queue.erl19
4 files changed, 39 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 89913ba408..e19914d5ab 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -905,8 +905,11 @@ list_local(VHostPath) ->
[ Q || #amqqueue{state = State, pid = QPid} = Q <- list(VHostPath),
State =/= crashed, is_local_to_node(QPid, node()) ].
-notify_policy_changed(#amqqueue{pid = QPid}) ->
- gen_server2:cast(QPid, policy_changed).
+notify_policy_changed(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) ->
+ gen_server2:cast(QPid, policy_changed);
+notify_policy_changed(#amqqueue{pid = QPid,
+ name = QName}) when ?IS_QUORUM(QPid) ->
+ rabbit_quorum_queue:policy_changed(QName, QPid).
consumers(#amqqueue{ pid = QPid }) ->
delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}).
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index bb2600dd73..c12a6ec464 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -207,19 +207,19 @@
-spec init(config()) -> {state(), ra_machine:effects()}.
init(#{name := Name} = Conf) ->
+ update_state(Conf, #state{name = Name}).
+
+update_state(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
CCH = maps:get(cancel_consumer_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
MH = maps:get(metrics_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
- #state{name = Name,
- dead_letter_handler = DLH,
- cancel_consumer_handler = CCH,
- become_leader_handler = BLH,
- metrics_handler = MH,
- shadow_copy_interval = SHI}.
-
-
+ State#state{dead_letter_handler = DLH,
+ cancel_consumer_handler = CCH,
+ become_leader_handler = BLH,
+ metrics_handler = MH,
+ shadow_copy_interval = SHI}.
% msg_ids are scoped per consumer
% ra_indexes holds all raft indexes for enqueues currently on queue
@@ -451,7 +451,9 @@ apply(_, {nodeup, Node}, Effects0,
checkout(State0#state{consumers = Cons1, enqueuers = Enqs1,
service_queue = SQ}, Monitors ++ Effects);
apply(_, {nodedown, _Node}, Effects, State) ->
- {State, Effects, ok}.
+ {State, Effects, ok};
+apply(_, {update_state, Conf}, Effects, State) ->
+ {update_state(Conf, State), Effects, ok}.
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #state{consumers = Custs,
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index c087e35fb2..c063ef9a17 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -21,7 +21,8 @@
handle_ra_event/3,
untracked_enqueue/2,
purge/1,
- cluster_name/1
+ cluster_name/1,
+ update_machine_state/2
]).
-include_lib("ra/include/ra.hrl").
@@ -375,6 +376,14 @@ purge(Node) ->
cluster_name(#state{cluster_name = ClusterName}) ->
ClusterName.
+update_machine_state(Node, Conf) ->
+ case ra:process_command(Node, {update_state, Conf}) of
+ {ok, ok, _} ->
+ ok;
+ Err ->
+ Err
+ end.
+
%% @doc Handles incoming `ra_events'. Events carry both internal "bookeeping"
%% events emitted by the `ra' leader as well as `rabbit_fifo' emitted events such
%% as message deliveries. All ra events need to be handled by {@module}
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index e24b907600..385df48c86 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -34,6 +34,7 @@
-export([add_member/3]).
-export([delete_member/3]).
-export([requeue/3]).
+-export([policy_changed/2]).
-export([cleanup_data_dir/0]).
-include_lib("rabbit_common/include/rabbit.hrl").
@@ -148,12 +149,14 @@ declare(#amqqueue{name = QName,
-ra_machine(Q = #amqqueue{name = QName}) ->
- {module, rabbit_fifo,
- #{dead_letter_handler => dlx_mfa(Q),
- cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]},
- become_leader_handler => {?MODULE, become_leader, [QName]},
- metrics_handler => {?MODULE, update_metrics, [QName]}}}.
+ra_machine(Q) ->
+ {module, rabbit_fifo, ra_machine_config(Q)}.
+
+ra_machine_config(Q = #amqqueue{name = QName}) ->
+ #{dead_letter_handler => dlx_mfa(Q),
+ cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]},
+ become_leader_handler => {?MODULE, become_leader, [QName]},
+ metrics_handler => {?MODULE, update_metrics, [QName]}}.
cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) ->
Node = node(ChPid),
@@ -411,6 +414,10 @@ maybe_delete_data_dir(UId) ->
ok
end.
+policy_changed(QName, Node) ->
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ rabbit_fifo_client:update_machine_state(Node, ra_machine_config(Q)).
+
cluster_state(Name) ->
case whereis(Name) of
undefined -> down;