summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_fifo.erl79
-rw-r--r--src/rabbit_fifo_client.erl11
-rw-r--r--src/rabbit_quorum_queue.erl27
-rw-r--r--src/rabbit_vm.erl16
5 files changed, 105 insertions, 35 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5127280871..f4876356ee 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -913,8 +913,11 @@ list_local(VHostPath) ->
[ Q || #amqqueue{state = State, pid = QPid} = Q <- list(VHostPath),
State =/= crashed, is_local_to_node(QPid, node()) ].
-notify_policy_changed(#amqqueue{pid = QPid}) ->
- gen_server2:cast(QPid, policy_changed).
+notify_policy_changed(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) ->
+ gen_server2:cast(QPid, policy_changed);
+notify_policy_changed(#amqqueue{pid = QPid,
+ name = QName}) when ?IS_QUORUM(QPid) ->
+ rabbit_quorum_queue:policy_changed(QName, QPid).
consumers(#amqqueue{ pid = QPid }) ->
delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}).
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 79d4a3effc..c12a6ec464 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -207,19 +207,19 @@
-spec init(config()) -> {state(), ra_machine:effects()}.
init(#{name := Name} = Conf) ->
+ update_state(Conf, #state{name = Name}).
+
+update_state(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
CCH = maps:get(cancel_consumer_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
MH = maps:get(metrics_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
- #state{name = Name,
- dead_letter_handler = DLH,
- cancel_consumer_handler = CCH,
- become_leader_handler = BLH,
- metrics_handler = MH,
- shadow_copy_interval = SHI}.
-
-
+ State#state{dead_letter_handler = DLH,
+ cancel_consumer_handler = CCH,
+ become_leader_handler = BLH,
+ metrics_handler = MH,
+ shadow_copy_interval = SHI}.
% msg_ids are scoped per consumer
% ra_indexes holds all raft indexes for enqueues currently on queue
@@ -374,10 +374,16 @@ apply(_, {down, ConsumerPid, noconnection},
Node = node(ConsumerPid),
% mark all consumers and enqueuers as suspect
% and monitor the node
- Cons = maps:map(fun({_, P}, C) when node(P) =:= Node ->
- C#consumer{suspected_down = true};
- (_, C) -> C
- end, Cons0),
+ {Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C,
+ {Co, St0}) when node(P) =:= Node ->
+ St = return_all(St0, Checked0),
+ {maps:put(K, C#consumer{suspected_down = true,
+ checked_out = #{}},
+ Co),
+ St};
+ (K, C, {Co, St}) ->
+ {maps:put(K, C, Co), St}
+ end, {#{}, State0}, Cons0),
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{suspected_down = true};
(_, E) -> E
@@ -388,7 +394,7 @@ apply(_, {down, ConsumerPid, noconnection},
_ ->
[{monitor, node, Node} | Effects0]
end,
- {State0#state{consumers = Cons, enqueuers = Enqs}, Effects, ok};
+ {State#state{consumers = Cons, enqueuers = Enqs}, Effects, ok};
apply(_, {down, Pid, _Info}, Effects0,
#state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
@@ -411,7 +417,8 @@ apply(_, {down, Pid, _Info}, Effects0,
checkout(State2, Effects1);
apply(_, {nodeup, Node}, Effects0,
#state{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+ enqueuers = Enqs0,
+ service_queue = SQ0} = State0) ->
%% A node we are monitoring has come back.
%% If we have suspected any processes of being
%% down we should now re-issue the monitors for them to detect if they're
@@ -427,10 +434,26 @@ apply(_, {nodeup, Node}, Effects0,
(_, _, Acc) -> Acc
end, [], Enqs0),
Monitors = [{monitor, process, P} || P <- Cons ++ Enqs],
+ Enqs1 = maps:map(fun(P, E) when node(P) =:= Node ->
+ E#enqueuer{suspected_down = false};
+ (_, E) -> E
+ end, Enqs0),
+ {Cons1, SQ, Effects} =
+ maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
+ when node(P) =:= Node ->
+ update_or_remove_sub(
+ ConsumerId, C#consumer{suspected_down = false},
+ CAcc, SQAcc, EAcc);
+ (_, _, Acc) ->
+ Acc
+ end, {Cons0, SQ0, Effects0}, Cons0),
% TODO: avoid list concat
- {State0, Monitors ++ Effects0, ok};
+ checkout(State0#state{consumers = Cons1, enqueuers = Enqs1,
+ service_queue = SQ}, Monitors ++ Effects);
apply(_, {nodedown, _Node}, Effects, State) ->
- {State, Effects, ok}.
+ {State, Effects, ok};
+apply(_, {update_state, Conf}, Effects, State) ->
+ {update_state(Conf, State), Effects, ok}.
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #state{consumers = Custs,
@@ -583,9 +606,7 @@ cancel_consumer(ConsumerId,
{Effects0, #state{consumers = C0, name = Name} = S0}) ->
case maps:take(ConsumerId, C0) of
{#consumer{checked_out = Checked0}, Cons} ->
- S = maps:fold(fun (_, {MsgNum, Msg}, S) ->
- return_one(MsgNum, Msg, S)
- end, S0, Checked0),
+ S = return_all(S0, Checked0),
Effects = cancel_consumer_effects(ConsumerId, Name, S, Effects0),
case maps:size(Cons) of
0 ->
@@ -788,6 +809,10 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
State0#state{messages = maps:put(MsgNum, Msg, Messages),
returns = queue:in(MsgNum, Returns)}.
+return_all(State, Checked) ->
+ maps:fold(fun (_, {MsgNum, Msg}, S) ->
+ return_one(MsgNum, Msg, S)
+ end, State, Checked).
checkout(State, Effects) ->
checkout0(checkout_one(State), Effects, #{}).
@@ -871,6 +896,8 @@ checkout_one(#state{service_queue = SQ0,
%% can happen when draining
%% recurse without consumer on queue
checkout_one(InitState#state{service_queue = SQ1});
+ {ok, #consumer{suspected_down = true}} ->
+ checkout_one(InitState#state{service_queue = SQ1});
{ok, #consumer{checked_out = Checked0,
next_msg_id = Next,
credit = Credit,
@@ -1289,6 +1316,20 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() ->
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3),
ok.
+down_with_noconnection_returns_unack_test() ->
+ Pid = spawn(fun() -> ok end),
+ Cid = {<<"down_with_noconnect">>, Pid},
+ {State0, _} = enq(1, 1, second, test_init(test)),
+ ?assertEqual(1, maps:size(State0#state.messages)),
+ ?assertEqual(0, queue:len(State0#state.returns)),
+ {State1, {_, _}} = deq(2, Cid, unsettled, State0),
+ ?assertEqual(0, maps:size(State1#state.messages)),
+ ?assertEqual(0, queue:len(State1#state.returns)),
+ {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, [], State1),
+ ?assertEqual(1, maps:size(State2a#state.messages)),
+ ?assertEqual(1, queue:len(State2a#state.returns)),
+ ok.
+
down_with_noproc_enqueuer_is_cleaned_up_test() ->
State00 = test_init(test),
Pid = spawn(fun() -> ok end),
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index c087e35fb2..c063ef9a17 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -21,7 +21,8 @@
handle_ra_event/3,
untracked_enqueue/2,
purge/1,
- cluster_name/1
+ cluster_name/1,
+ update_machine_state/2
]).
-include_lib("ra/include/ra.hrl").
@@ -375,6 +376,14 @@ purge(Node) ->
cluster_name(#state{cluster_name = ClusterName}) ->
ClusterName.
+update_machine_state(Node, Conf) ->
+ case ra:process_command(Node, {update_state, Conf}) of
+ {ok, ok, _} ->
+ ok;
+ Err ->
+ Err
+ end.
+
%% @doc Handles incoming `ra_events'. Events carry both internal "bookeeping"
%% events emitted by the `ra' leader as well as `rabbit_fifo' emitted events such
%% as message deliveries. All ra events need to be handled by {@module}
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 70c3116c33..61c4858f40 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -34,6 +34,7 @@
-export([add_member/3]).
-export([delete_member/3]).
-export([requeue/3]).
+-export([policy_changed/2]).
-export([cleanup_data_dir/0]).
-include_lib("rabbit_common/include/rabbit.hrl").
@@ -101,8 +102,9 @@ init_state({Name, _}, QName) ->
{ok, #amqqueue{pid = Leader, quorum_nodes = Nodes0}} =
rabbit_amqqueue:lookup(QName),
%% Ensure the leader is listed first
- Nodes = [Leader | lists:delete(Leader, [{Name, N} || N <- Nodes0])],
- rabbit_fifo_client:init(qname_to_rname(QName), Nodes, SoftLimit,
+ Servers0 = [{Name, N} || N <- Nodes],
+ Servers = [Leader | lists:delete(Leader, Servers0)],
+ rabbit_fifo_client:init(qname_to_rname(QName), Servers, SoftLimit,
fun() -> credit_flow:block(Name), ok end,
fun() -> credit_flow:unblock(Name), ok end).
@@ -150,12 +152,14 @@ declare(#amqqueue{name = QName,
-ra_machine(Q = #amqqueue{name = QName}) ->
- {module, rabbit_fifo,
- #{dead_letter_handler => dlx_mfa(Q),
- cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]},
- become_leader_handler => {?MODULE, become_leader, [QName]},
- metrics_handler => {?MODULE, update_metrics, [QName]}}}.
+ra_machine(Q) ->
+ {module, rabbit_fifo, ra_machine_config(Q)}.
+
+ra_machine_config(Q = #amqqueue{name = QName}) ->
+ #{dead_letter_handler => dlx_mfa(Q),
+ cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]},
+ become_leader_handler => {?MODULE, become_leader, [QName]},
+ metrics_handler => {?MODULE, update_metrics, [QName]}}.
cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) ->
Node = node(ChPid),
@@ -274,9 +278,10 @@ stop(VHost) ->
delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = QNodes},
_IfUnused, _IfEmpty, ActingUser) ->
%% TODO Quorum queue needs to support consumer tracking for IfUnused
+ Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
Msgs = quorum_messages(Name),
_ = rabbit_amqqueue:internal_delete(QName, ActingUser),
- case ra:delete_cluster([{Name, Node} || Node <- QNodes], 120000) of
+ case ra:delete_cluster([{Name, Node} || Node <- QNodes], Timeout) of
{ok, {_, LeaderNode} = Leader} ->
MRef = erlang:monitor(process, Leader),
receive
@@ -412,6 +417,10 @@ maybe_delete_data_dir(UId) ->
ok
end.
+policy_changed(QName, Node) ->
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ rabbit_fifo_client:update_machine_state(Node, ra_machine_config(Q)).
+
cluster_state(Name) ->
case whereis(Name) of
undefined -> down;
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 265bcb45e3..e495ab8677 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -35,7 +35,7 @@ memory() ->
{Sums, _Other} = sum_processes(
lists:append(All), distinguishers(), [memory]),
- [Qs, QsSlave, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther,
+ [Qs, QsSlave, Qqs, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther,
MsgIndexProc, MgmtDbProc, Plugins] =
[aggregate(Names, Sums, memory, fun (X) -> X end)
|| Names <- distinguished_interesting_sups()],
@@ -69,7 +69,7 @@ memory() ->
OtherProc = Processes
- ConnsReader - ConnsWriter - ConnsChannel - ConnsOther
- - Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc,
+ - Qs - QsSlave - Qqs - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc,
[
%% Connections
@@ -81,6 +81,7 @@ memory() ->
%% Queues
{queue_procs, Qs},
{queue_slave_procs, QsSlave},
+ {quorum_queue_procs, Qqs},
%% Processes
{plugins, Plugins},
@@ -124,7 +125,7 @@ binary() ->
sets:add_element({Ptr, Sz}, Acc0)
end, Acc, Info)
end, distinguishers(), [{binary, sets:new()}]),
- [Other, Qs, QsSlave, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther,
+ [Other, Qs, QsSlave, Qqs, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther,
MsgIndexProc, MgmtDbProc, Plugins] =
[aggregate(Names, [{other, Rest} | Sums], binary, fun sum_binary/1)
|| Names <- [[other] | distinguished_interesting_sups()]],
@@ -134,6 +135,7 @@ binary() ->
{connection_other, ConnsOther},
{queue_procs, Qs},
{queue_slave_procs, QsSlave},
+ {quorum_queue_procs, Qqs},
{plugins, Plugins},
{mgmt_db, MgmtDbProc},
{msg_index, MsgIndexProc},
@@ -173,11 +175,16 @@ bytes(Words) -> try
end.
interesting_sups() ->
- [queue_sups(), conn_sups() | interesting_sups0()].
+ [queue_sups(), quorum_sups(), conn_sups() | interesting_sups0()].
queue_sups() ->
all_vhosts_children(rabbit_amqqueue_sup_sup).
+quorum_sups() ->
+ %% TODO: in the future not all ra servers may be queues and we needs
+ %% some way to filter this
+ [ra_server_sup].
+
msg_stores() ->
all_vhosts_children(msg_store_transient)
++
@@ -229,6 +236,7 @@ distinguished_interesting_sups() ->
[
with(queue_sups(), master),
with(queue_sups(), slave),
+ quorum_sups(),
with(conn_sups(), reader),
with(conn_sups(), writer),
with(conn_sups(), channel),