diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 79 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_vm.erl | 16 |
5 files changed, 105 insertions, 35 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5127280871..f4876356ee 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -913,8 +913,11 @@ list_local(VHostPath) -> [ Q || #amqqueue{state = State, pid = QPid} = Q <- list(VHostPath), State =/= crashed, is_local_to_node(QPid, node()) ]. -notify_policy_changed(#amqqueue{pid = QPid}) -> - gen_server2:cast(QPid, policy_changed). +notify_policy_changed(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) -> + gen_server2:cast(QPid, policy_changed); +notify_policy_changed(#amqqueue{pid = QPid, + name = QName}) when ?IS_QUORUM(QPid) -> + rabbit_quorum_queue:policy_changed(QName, QPid). consumers(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}). diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 79d4a3effc..c12a6ec464 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -207,19 +207,19 @@ -spec init(config()) -> {state(), ra_machine:effects()}. init(#{name := Name} = Conf) -> + update_state(Conf, #state{name = Name}). + +update_state(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), CCH = maps:get(cancel_consumer_handler, Conf, undefined), BLH = maps:get(become_leader_handler, Conf, undefined), MH = maps:get(metrics_handler, Conf, undefined), SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL), - #state{name = Name, - dead_letter_handler = DLH, - cancel_consumer_handler = CCH, - become_leader_handler = BLH, - metrics_handler = MH, - shadow_copy_interval = SHI}. - - + State#state{dead_letter_handler = DLH, + cancel_consumer_handler = CCH, + become_leader_handler = BLH, + metrics_handler = MH, + shadow_copy_interval = SHI}. % msg_ids are scoped per consumer % ra_indexes holds all raft indexes for enqueues currently on queue @@ -374,10 +374,16 @@ apply(_, {down, ConsumerPid, noconnection}, Node = node(ConsumerPid), % mark all consumers and enqueuers as suspect % and monitor the node - Cons = maps:map(fun({_, P}, C) when node(P) =:= Node -> - C#consumer{suspected_down = true}; - (_, C) -> C - end, Cons0), + {Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C, + {Co, St0}) when node(P) =:= Node -> + St = return_all(St0, Checked0), + {maps:put(K, C#consumer{suspected_down = true, + checked_out = #{}}, + Co), + St}; + (K, C, {Co, St}) -> + {maps:put(K, C, Co), St} + end, {#{}, State0}, Cons0), Enqs = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{suspected_down = true}; (_, E) -> E @@ -388,7 +394,7 @@ apply(_, {down, ConsumerPid, noconnection}, _ -> [{monitor, node, Node} | Effects0] end, - {State0#state{consumers = Cons, enqueuers = Enqs}, Effects, ok}; + {State#state{consumers = Cons, enqueuers = Enqs}, Effects, ok}; apply(_, {down, Pid, _Info}, Effects0, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> @@ -411,7 +417,8 @@ apply(_, {down, Pid, _Info}, Effects0, checkout(State2, Effects1); apply(_, {nodeup, Node}, Effects0, #state{consumers = Cons0, - enqueuers = Enqs0} = State0) -> + enqueuers = Enqs0, + service_queue = SQ0} = State0) -> %% A node we are monitoring has come back. %% If we have suspected any processes of being %% down we should now re-issue the monitors for them to detect if they're @@ -427,10 +434,26 @@ apply(_, {nodeup, Node}, Effects0, (_, _, Acc) -> Acc end, [], Enqs0), Monitors = [{monitor, process, P} || P <- Cons ++ Enqs], + Enqs1 = maps:map(fun(P, E) when node(P) =:= Node -> + E#enqueuer{suspected_down = false}; + (_, E) -> E + end, Enqs0), + {Cons1, SQ, Effects} = + maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) + when node(P) =:= Node -> + update_or_remove_sub( + ConsumerId, C#consumer{suspected_down = false}, + CAcc, SQAcc, EAcc); + (_, _, Acc) -> + Acc + end, {Cons0, SQ0, Effects0}, Cons0), % TODO: avoid list concat - {State0, Monitors ++ Effects0, ok}; + checkout(State0#state{consumers = Cons1, enqueuers = Enqs1, + service_queue = SQ}, Monitors ++ Effects); apply(_, {nodedown, _Node}, Effects, State) -> - {State, Effects, ok}. + {State, Effects, ok}; +apply(_, {update_state, Conf}, Effects, State) -> + {update_state(Conf, State), Effects, ok}. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #state{consumers = Custs, @@ -583,9 +606,7 @@ cancel_consumer(ConsumerId, {Effects0, #state{consumers = C0, name = Name} = S0}) -> case maps:take(ConsumerId, C0) of {#consumer{checked_out = Checked0}, Cons} -> - S = maps:fold(fun (_, {MsgNum, Msg}, S) -> - return_one(MsgNum, Msg, S) - end, S0, Checked0), + S = return_all(S0, Checked0), Effects = cancel_consumer_effects(ConsumerId, Name, S, Effects0), case maps:size(Cons) of 0 -> @@ -788,6 +809,10 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, State0#state{messages = maps:put(MsgNum, Msg, Messages), returns = queue:in(MsgNum, Returns)}. +return_all(State, Checked) -> + maps:fold(fun (_, {MsgNum, Msg}, S) -> + return_one(MsgNum, Msg, S) + end, State, Checked). checkout(State, Effects) -> checkout0(checkout_one(State), Effects, #{}). @@ -871,6 +896,8 @@ checkout_one(#state{service_queue = SQ0, %% can happen when draining %% recurse without consumer on queue checkout_one(InitState#state{service_queue = SQ1}); + {ok, #consumer{suspected_down = true}} -> + checkout_one(InitState#state{service_queue = SQ1}); {ok, #consumer{checked_out = Checked0, next_msg_id = Next, credit = Credit, @@ -1289,6 +1316,20 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() -> ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3), ok. +down_with_noconnection_returns_unack_test() -> + Pid = spawn(fun() -> ok end), + Cid = {<<"down_with_noconnect">>, Pid}, + {State0, _} = enq(1, 1, second, test_init(test)), + ?assertEqual(1, maps:size(State0#state.messages)), + ?assertEqual(0, queue:len(State0#state.returns)), + {State1, {_, _}} = deq(2, Cid, unsettled, State0), + ?assertEqual(0, maps:size(State1#state.messages)), + ?assertEqual(0, queue:len(State1#state.returns)), + {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, [], State1), + ?assertEqual(1, maps:size(State2a#state.messages)), + ?assertEqual(1, queue:len(State2a#state.returns)), + ok. + down_with_noproc_enqueuer_is_cleaned_up_test() -> State00 = test_init(test), Pid = spawn(fun() -> ok end), diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index c087e35fb2..c063ef9a17 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -21,7 +21,8 @@ handle_ra_event/3, untracked_enqueue/2, purge/1, - cluster_name/1 + cluster_name/1, + update_machine_state/2 ]). -include_lib("ra/include/ra.hrl"). @@ -375,6 +376,14 @@ purge(Node) -> cluster_name(#state{cluster_name = ClusterName}) -> ClusterName. +update_machine_state(Node, Conf) -> + case ra:process_command(Node, {update_state, Conf}) of + {ok, ok, _} -> + ok; + Err -> + Err + end. + %% @doc Handles incoming `ra_events'. Events carry both internal "bookeeping" %% events emitted by the `ra' leader as well as `rabbit_fifo' emitted events such %% as message deliveries. All ra events need to be handled by {@module} diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 70c3116c33..61c4858f40 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -34,6 +34,7 @@ -export([add_member/3]). -export([delete_member/3]). -export([requeue/3]). +-export([policy_changed/2]). -export([cleanup_data_dir/0]). -include_lib("rabbit_common/include/rabbit.hrl"). @@ -101,8 +102,9 @@ init_state({Name, _}, QName) -> {ok, #amqqueue{pid = Leader, quorum_nodes = Nodes0}} = rabbit_amqqueue:lookup(QName), %% Ensure the leader is listed first - Nodes = [Leader | lists:delete(Leader, [{Name, N} || N <- Nodes0])], - rabbit_fifo_client:init(qname_to_rname(QName), Nodes, SoftLimit, + Servers0 = [{Name, N} || N <- Nodes], + Servers = [Leader | lists:delete(Leader, Servers0)], + rabbit_fifo_client:init(qname_to_rname(QName), Servers, SoftLimit, fun() -> credit_flow:block(Name), ok end, fun() -> credit_flow:unblock(Name), ok end). @@ -150,12 +152,14 @@ declare(#amqqueue{name = QName, -ra_machine(Q = #amqqueue{name = QName}) -> - {module, rabbit_fifo, - #{dead_letter_handler => dlx_mfa(Q), - cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]}, - become_leader_handler => {?MODULE, become_leader, [QName]}, - metrics_handler => {?MODULE, update_metrics, [QName]}}}. +ra_machine(Q) -> + {module, rabbit_fifo, ra_machine_config(Q)}. + +ra_machine_config(Q = #amqqueue{name = QName}) -> + #{dead_letter_handler => dlx_mfa(Q), + cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]}, + become_leader_handler => {?MODULE, become_leader, [QName]}, + metrics_handler => {?MODULE, update_metrics, [QName]}}. cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) -> Node = node(ChPid), @@ -274,9 +278,10 @@ stop(VHost) -> delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = QNodes}, _IfUnused, _IfEmpty, ActingUser) -> %% TODO Quorum queue needs to support consumer tracking for IfUnused + Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, Msgs = quorum_messages(Name), _ = rabbit_amqqueue:internal_delete(QName, ActingUser), - case ra:delete_cluster([{Name, Node} || Node <- QNodes], 120000) of + case ra:delete_cluster([{Name, Node} || Node <- QNodes], Timeout) of {ok, {_, LeaderNode} = Leader} -> MRef = erlang:monitor(process, Leader), receive @@ -412,6 +417,10 @@ maybe_delete_data_dir(UId) -> ok end. +policy_changed(QName, Node) -> + {ok, Q} = rabbit_amqqueue:lookup(QName), + rabbit_fifo_client:update_machine_state(Node, ra_machine_config(Q)). + cluster_state(Name) -> case whereis(Name) of undefined -> down; diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 265bcb45e3..e495ab8677 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -35,7 +35,7 @@ memory() -> {Sums, _Other} = sum_processes( lists:append(All), distinguishers(), [memory]), - [Qs, QsSlave, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther, + [Qs, QsSlave, Qqs, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther, MsgIndexProc, MgmtDbProc, Plugins] = [aggregate(Names, Sums, memory, fun (X) -> X end) || Names <- distinguished_interesting_sups()], @@ -69,7 +69,7 @@ memory() -> OtherProc = Processes - ConnsReader - ConnsWriter - ConnsChannel - ConnsOther - - Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc, + - Qs - QsSlave - Qqs - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc, [ %% Connections @@ -81,6 +81,7 @@ memory() -> %% Queues {queue_procs, Qs}, {queue_slave_procs, QsSlave}, + {quorum_queue_procs, Qqs}, %% Processes {plugins, Plugins}, @@ -124,7 +125,7 @@ binary() -> sets:add_element({Ptr, Sz}, Acc0) end, Acc, Info) end, distinguishers(), [{binary, sets:new()}]), - [Other, Qs, QsSlave, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther, + [Other, Qs, QsSlave, Qqs, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther, MsgIndexProc, MgmtDbProc, Plugins] = [aggregate(Names, [{other, Rest} | Sums], binary, fun sum_binary/1) || Names <- [[other] | distinguished_interesting_sups()]], @@ -134,6 +135,7 @@ binary() -> {connection_other, ConnsOther}, {queue_procs, Qs}, {queue_slave_procs, QsSlave}, + {quorum_queue_procs, Qqs}, {plugins, Plugins}, {mgmt_db, MgmtDbProc}, {msg_index, MsgIndexProc}, @@ -173,11 +175,16 @@ bytes(Words) -> try end. interesting_sups() -> - [queue_sups(), conn_sups() | interesting_sups0()]. + [queue_sups(), quorum_sups(), conn_sups() | interesting_sups0()]. queue_sups() -> all_vhosts_children(rabbit_amqqueue_sup_sup). +quorum_sups() -> + %% TODO: in the future not all ra servers may be queues and we needs + %% some way to filter this + [ra_server_sup]. + msg_stores() -> all_vhosts_children(msg_store_transient) ++ @@ -229,6 +236,7 @@ distinguished_interesting_sups() -> [ with(queue_sups(), master), with(queue_sups(), slave), + quorum_sups(), with(conn_sups(), reader), with(conn_sups(), writer), with(conn_sups(), channel), |
