diff options
| author | kjnilsson <knilsson@pivotal.io> | 2018-11-30 14:07:22 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2018-12-04 17:16:08 +0000 |
| commit | ea0e192dc193add8bf4c1250fbe1c9b15b999d29 (patch) | |
| tree | f7fbe97129f96ab514a51db1d0abab94e716df04 | |
| parent | a4c00475eb6cf261ac7775244e518fc63ea8fa94 (diff) | |
| download | rabbitmq-server-git-ea0e192dc193add8bf4c1250fbe1c9b15b999d29.tar.gz | |
Better handle changing quorum queue leaders
All maps that track queues inside the channel should use the queue name
atom rather than the server id as they key so that leader changes don't
impact this tracking.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 80 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 53 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 42 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 25 |
5 files changed, 139 insertions, 64 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index e19914d5ab..d5bfbbb5e3 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -1094,7 +1094,8 @@ notify_down_all(QPids, ChPid, Timeout) -> Error -> {error, Error} end. -activate_limit_all(QPids, ChPid) -> +activate_limit_all(QRefs, ChPid) -> + QPids = [P || P <- QRefs, ?IS_CLASSIC(P)], delegate:invoke_no_result(QPids, {gen_server2, cast, [{activate_limit, ChPid}]}). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 56b43c1415..1fafd3a6f8 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -110,7 +110,7 @@ %% when queue.bind's queue field is empty, %% this name will be used instead most_recently_declared_queue, - %% a map of queue pid to queue name + %% a map of queue ref to queue name queue_names, %% queue processes are monitored to update %% queue names @@ -210,7 +210,7 @@ end). -define(IS_CLASSIC(QPid), is_pid(QPid)). --define(IS_QUORUM(QPid), is_tuple(QPid)). +-define(IS_QUORUM(QPid), is_tuple(QPid) orelse is_atom(QPid)). %%---------------------------------------------------------------------------- @@ -672,18 +672,18 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, end, Actions), noreply_coalesce(confirm(MsgSeqNos, Name, State)); eol -> - State1 = handle_consuming_queue_down_or_eol(From, State0), - State2 = handle_delivering_queue_down(From, State1), + State1 = handle_consuming_queue_down_or_eol(Name, State0), + State2 = handle_delivering_queue_down(Name, State1), %% TODO: this should use dtree:take/3 {MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed), State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}), - case maps:find(From, QNames) of + case maps:find(Name, QNames) of {ok, QName} -> erase_queue_stats(QName); error -> ok end, noreply_coalesce( State3#ch{queue_states = maps:remove(Name, QueueStates), - queue_names = maps:remove(From, QNames)}) + queue_names = maps:remove(Name, QNames)}) end; _ -> %% the assumption here is that the queue state has been cleaned up and @@ -1336,13 +1336,14 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, return_ok(State, NoWait, OkMsg); {ok, {Q = #amqqueue{pid = QPid}, _CParams}} -> ConsumerMapping1 = maps:remove(ConsumerTag, ConsumerMapping), + QRef = qpid_to_ref(QPid), QCons1 = - case maps:find(QPid, QCons) of + case maps:find(QRef, QCons) of error -> QCons; {ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags), case gb_sets:is_empty(CTags1) of - true -> maps:remove(QPid, QCons); - false -> maps:put(QPid, CTags1, QCons) + true -> maps:remove(QRef, QCons); + false -> maps:put(QRef, CTags1, QCons) end end, NewState = State#ch{consumer_mapping = ConsumerMapping1, @@ -1395,7 +1396,7 @@ handle_method(#'basic.qos'{global = true, case ((not rabbit_limiter:is_active(Limiter)) andalso rabbit_limiter:is_active(Limiter1)) of true -> rabbit_amqqueue:activate_limit_all( - consumer_queues(State#ch.consumer_mapping), self()); + consumer_queue_refs(State#ch.consumer_mapping), self()); false -> ok end, {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; @@ -1641,25 +1642,26 @@ consumer_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, queue_monitors = QMons, queue_consumers = QCons}) -> - {#amqqueue{pid = QPid}, _} = - maps:get(ConsumerTag, ConsumerMapping), - CTags1 = case maps:find(QPid, QCons) of + {#amqqueue{pid = QPid}, _} = maps:get(ConsumerTag, ConsumerMapping), + QRef = qpid_to_ref(QPid), + CTags1 = case maps:find(QRef, QCons) of {ok, CTags} -> gb_sets:insert(ConsumerTag, CTags); error -> gb_sets:singleton(ConsumerTag) end, - QCons1 = maps:put(QPid, CTags1, QCons), - State#ch{queue_monitors = maybe_monitor(QPid, QMons), + QCons1 = maps:put(QRef, CTags1, QCons), + State#ch{queue_monitors = maybe_monitor(QRef, QMons), queue_consumers = QCons1}. track_delivering_queue(NoAck, QPid, QName, State = #ch{queue_names = QNames, queue_monitors = QMons, delivering_queues = DQ}) -> - State#ch{queue_names = maps:put(QPid, QName, QNames), - queue_monitors = maybe_monitor(QPid, QMons), + QRef = qpid_to_ref(QPid), + State#ch{queue_names = maps:put(QRef, QName, QNames), + queue_monitors = maybe_monitor(QRef, QMons), delivering_queues = case NoAck of true -> DQ; - false -> sets:add_element(QPid, DQ) + false -> sets:add_element(QRef, DQ) end}. handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC, @@ -1678,16 +1680,16 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC, handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) -> error(quorum_queues_should_never_be_monitored). -handle_consuming_queue_down_or_eol(QPid, - State = #ch{queue_consumers = QCons, - queue_names = QNames}) -> - ConsumerTags = case maps:find(QPid, QCons) of +handle_consuming_queue_down_or_eol(QRef, + State = #ch{queue_consumers = QCons, + queue_names = QNames}) -> + ConsumerTags = case maps:find(QRef, QCons) of error -> gb_sets:new(); {ok, CTags} -> CTags end, gb_sets:fold( fun (CTag, StateN = #ch{consumer_mapping = CMap}) -> - QName = maps:get(QPid, QNames), + QName = maps:get(QRef, QNames), case queue_down_consumer_action(CTag, CMap) of remove -> cancel_consumer(CTag, QName, StateN); @@ -1699,7 +1701,7 @@ handle_consuming_queue_down_or_eol(QPid, _ -> cancel_consumer(CTag, QName, StateN) end end - end, State#ch{queue_consumers = maps:remove(QPid, QCons)}, ConsumerTags). + end, State#ch{queue_consumers = maps:remove(QRef, QCons)}, ConsumerTags). %% [0] There is a slight danger here that if a queue is deleted and %% then recreated again the reconsume will succeed even though it was @@ -1726,8 +1728,8 @@ queue_down_consumer_action(CTag, CMap) -> _ -> {recover, ConsumeSpec} end. -handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> - State#ch{delivering_queues = sets:del_element(QPid, DQ)}. +handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) -> + State#ch{delivering_queues = sets:del_element(QRef, DQ)}. binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, RoutingKey, Arguments, VHostPath, ConnPid, @@ -1882,7 +1884,7 @@ ack(Acked, State = #ch{queue_names = QNames, State#ch{queue_states = QueueStates}. incr_queue_stats(QPid, QNames, MsgIds, State) -> - case maps:find(QPid, QNames) of + case maps:find(qpid_to_ref(QPid), QNames) of {ok, QName} -> Count = length(MsgIds), ?INCR_STATS(queue_stats, QName, Count, ack, State); error -> ok @@ -1906,10 +1908,10 @@ notify_queues(State = #ch{state = closing}) -> {ok, State}; notify_queues(State = #ch{consumer_mapping = Consumers, delivering_queues = DQ }) -> - QPids0 = sets:to_list( - sets:union(sets:from_list(consumer_queues(Consumers)), DQ)), + QRefs0 = sets:to_list( + sets:union(sets:from_list(consumer_queue_refs(Consumers)), DQ)), %% filter to only include pids to avoid trying to notify quorum queues - QPids = [P || P <- QPids0, ?IS_CLASSIC(P)], + QPids = [P || P <- QRefs0, ?IS_CLASSIC(P)], Timeout = get_operation_timeout(), {rabbit_amqqueue:notify_down_all(QPids, self(), Timeout), State#ch{state = closing}}. @@ -1925,8 +1927,8 @@ foreach_per_queue(F, UAL, Acc) -> end, gb_trees:empty(), UAL), rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, Acc, T). -consumer_queues(Consumers) -> - lists:usort([QPid || {_Key, {#amqqueue{pid = QPid}, _CParams}} +consumer_queue_refs(Consumers) -> + lists:usort([qpid_to_ref(QPid) || {_Key, {#amqqueue{pid = QPid}, _CParams}} <- maps:to_list(Consumers)]). %% tell the limiter about the number of acks that have been received @@ -1983,9 +1985,10 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ {QNames1, QMons1} = lists:foldl(fun (#amqqueue{pid = QPid, name = QName}, {QNames0, QMons0}) -> - {case maps:is_key(QPid, QNames0) of + QRef = qpid_to_ref(QPid), + {case maps:is_key(QRef, QNames0) of true -> QNames0; - false -> maps:put(QPid, QName, QNames0) + false -> maps:put(QRef, QName, QNames0) end, maybe_monitor(QPid, QMons0)} end, {QNames, maybe_monitor_all(DeliveredQPids, QMons)}, Qs), State1 = State#ch{queue_names = QNames1, @@ -2000,8 +2003,8 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ fine -> ?INCR_STATS(exchange_stats, XName, 1, publish), [?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) || - QPid <- AllDeliveredQRefs, - {ok, QName} <- [maps:find(QPid, QNames1)]]; + QRef <- AllDeliveredQRefs, + {ok, QName} <- [maps:find(QRef, QNames1)]]; _ -> ok end, @@ -2494,3 +2497,8 @@ maybe_monitor_all(Items, S) -> lists:foldl(fun maybe_monitor/2, S, Items). add_delivery_count_header(MsgHeader, Msg) -> Count = maps:get(delivery_count, MsgHeader, 0), rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg). + +qpid_to_ref(Pid) when is_pid(Pid) -> Pid; +qpid_to_ref({Name, _}) -> Name; +%% assume it already is a ref +qpid_to_ref(Ref) -> Ref. diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index a455264560..740c6e202c 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -1,3 +1,19 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% + -module(rabbit_fifo). -behaviour(ra_machine). @@ -228,9 +244,9 @@ update_state(Conf, State) -> {state(), ra_machine:effects(), Reply :: term()}. apply(#{index := RaftIdx}, {enqueue, From, Seq, RawMsg}, Effects0, State00) -> case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of - {ok, State0, Effects} -> - State = append_to_master_index(RaftIdx, State0), - checkout(State, Effects); + {ok, State0, Effects1} -> + {State, Effects, ok} = checkout(State0, Effects1), + {append_to_master_index(RaftIdx, State), Effects, ok}; {duplicate, State, Effects} -> {State, Effects, ok} end; @@ -314,6 +330,7 @@ apply(_, {credit, NewCredit, RemoteDelCnt, Drain, ConsumerId}, Effects0, apply(_, {checkout, {dequeue, _}, {_Tag, _Pid}}, Effects0, #state{messages = M, prefix_msg_count = 0} = State0) when map_size(M) == 0 -> + %% FIX: also check if there are returned messages %% TODO do we need metric visibility of empty get requests? {State0, Effects0, {dequeue, empty}}; apply(Meta, {checkout, {dequeue, settled}, ConsumerId}, @@ -780,6 +797,7 @@ cancel_consumer_effects(Pid, Name, update_smallest_raft_index(IncomingRaftIdx, OldIndexes, #state{ra_indexes = Indexes, + % prefix_msg_count = 0, messages = Messages} = State, Effects) -> case rabbit_fifo_index:size(Indexes) of 0 when map_size(Messages) =:= 0 -> @@ -1023,12 +1041,17 @@ dehydrate_state(#state{messages = Messages0, ra_indexes = rabbit_fifo_index:empty(), low_msg_num = undefined, consumers = maps:map(fun (_, C) -> - C#consumer{checked_out = #{}} + dehydrate_consumer(C) + % C#consumer{checked_out = #{}} end, Consumers), returns = queue:new(), %% messages include returns prefix_msg_count = maps:size(Messages0) + MsgCount}. +dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> + Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0), + Con#consumer{checked_out = Checked}. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -1406,7 +1429,7 @@ tick_test() -> ok. enq_deq_snapshot_recover_test() -> - Tag = <<"release_cursor_snapshot_state_test">>, + Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, % OthPid = spawn(fun () -> ok end), % Oth = {<<"oth">>, OthPid}, @@ -1539,6 +1562,23 @@ enq_check_settle_snapshot_purge_test() -> % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), run_snapshot_test(?FUNCTION_NAME, Commands). +enq_check_settle_duplicate_test() -> + %% duplicate settle commands are likely + Tag = atom_to_binary(?FUNCTION_NAME, utf8), + Cid = {Tag, self()}, + Commands = [ + {checkout, {auto, 2, simple_prefetch}, Cid}, + {enqueue, self(), 1, one}, %% 0 + {enqueue, self(), 2, two}, %% 0 + {settle, [0], Cid}, + {settle, [1], Cid}, + {settle, [1], Cid}, + {enqueue, self(), 3, three}, + {settle, [2], Cid} + ], + % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), + run_snapshot_test(?FUNCTION_NAME, Commands). + run_snapshot_test(Name, Commands) -> %% create every incremental permuation of the commands lists %% and run the snapshot tests against that @@ -1556,6 +1596,7 @@ run_snapshot_test0(Name, Commands) -> Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true; (_) -> false end, Entries), + ?debugFmt("running from snapshot: ~b", [SnapIdx]), {S, _} = run_log(SnapState, Filtered), % assert log can be restored from any release cursor index % ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n", @@ -1571,7 +1612,7 @@ prefixes(Source, N, Acc) -> prefixes(Source, N+1, [X | Acc]). delivery_query_returns_deliveries_test() -> - Tag = <<"release_cursor_snapshot_state_test">>, + Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ {checkout, {auto, 5, simple_prefetch}, Cid}, diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index fc35d26e0f..635d85be4a 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -1,3 +1,19 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% + %% @doc Provides an easy to consume API for interacting with the {@link rabbit_fifo.} %% state machine implementation running inside a `ra' raft system. %% @@ -432,19 +448,10 @@ update_machine_state(Node, Conf) -> {rabbit_fifo:client_msg(), state()} | eol. handle_ra_event(From, {applied, Seqs}, #state{soft_limit = SftLmt, - leader = CurLeader, - last_applied = _Last, unblock_handler = UnblockFun} = State0) -> {Corrs, Actions, State1} = lists:foldl(fun seq_applied/2, {[], [], State0#state{leader = From}}, Seqs), - case From of - CurLeader -> ok; - _ -> - ?INFO("rabbit_fifo_client: leader change from ~w to ~w~n" - "applying ~w last ~w~n", - [CurLeader, From, Seqs, _Last]) - end, case maps:size(State1#state.pending) < SftLmt of true when State1#state.slow == true -> % we have exited soft limit state @@ -479,10 +486,7 @@ handle_ra_event(Leader, {machine, leader_change}, #state{leader = Leader} = State) -> %% leader already known {internal, [], [], State}; -handle_ra_event(Leader, {machine, leader_change}, - #state{leader = OldLeader} = State0) -> - ?INFO("rabbit_fifo_client: leader changed from ~w to ~w~n", - [OldLeader, Leader]), +handle_ra_event(Leader, {machine, leader_change}, State0) -> %% we need to update leader %% and resend any pending commands State = resend_all_pending(State0#state{leader = Leader}), @@ -490,9 +494,9 @@ handle_ra_event(Leader, {machine, leader_change}, handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) -> % TODO: how should these be handled? re-sent on timer or try random {internal, [], [], State0}; -handle_ra_event(From, {rejected, {not_leader, Leader, Seq}}, State0) -> - ?INFO("rabbit_fifo_client: rejected ~b not leader ~w leader: ~w~n", - [Seq, From, Leader]), +handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) -> + % ?INFO("rabbit_fifo_client: rejected ~b not leader ~w leader: ~w~n", + % [Seq, From, Leader]), State1 = State0#state{leader = Leader}, State = resend(Seq, State1), {internal, [], [], State}; @@ -535,7 +539,6 @@ try_process_command([Server | Rem], Cmd, State) -> seq_applied({Seq, MaybeAction}, {Corrs, Actions0, #state{last_applied = Last} = State0}) when Seq > Last orelse Last =:= undefined -> - % ?INFO("rabbit_fifo_client: applying seq ~b last ~w", [Seq, Last]), State1 = case Last of undefined -> State0; _ -> @@ -550,14 +553,12 @@ seq_applied({Seq, MaybeAction}, {[Corr | Corrs], Actions, State#state{pending = Pending, last_applied = Seq}}; error -> - ?INFO("rabbit_fifo_client: pending not found ~w", [Seq]), % must have already been resent or removed for some other reason % still need to update last_applied or we may inadvertently resend % stuff later {Corrs, Actions, State#state{last_applied = Seq}} end; seq_applied(_Seq, Acc) -> - ?INFO("rabbit_fifo_client: dropping seq ~b", [_Seq]), Acc. maybe_add_action(ok, Acc, State) -> @@ -579,7 +580,7 @@ maybe_add_action(Action, Acc, State) -> {[Action | Acc], State}. do_resends(From, To, State) when From =< To -> - ?INFO("doing resends From ~w To ~w~n", [From, To]), + % ?INFO("rabbit_fifo_client: doing resends From ~w To ~w~n", [From, To]), lists:foldl(fun resend/2, State, lists:seq(From, To)); do_resends(_, _, State) -> State. @@ -596,7 +597,6 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) -> resend_all_pending(#state{pending = Pend} = State) -> Seqs = lists:sort(maps:keys(Pend)), - ?INFO ("rabbit_fifo_client: resending all ~w~n", [Seqs]), lists:foldl(fun resend/2, State, Seqs). handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0, diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 380b8ab59e..61fd7a1b2c 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -48,6 +48,7 @@ groups() -> ++ all_tests()}, {cluster_size_3, [], [ declare_during_node_down, + simple_confirm_availability_on_leader_change, confirm_availability_on_leader_change, recover_from_single_failure, recover_from_multiple_failures, @@ -1563,6 +1564,30 @@ declare_during_node_down(Config) -> wait_for_messages_ready(Servers, RaName, 1), ok. +simple_confirm_availability_on_leader_change(Config) -> + [Node1, Node2, _Node3] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + %% declare a queue on node2 - this _should_ host the leader on node 2 + DCh = rabbit_ct_client_helpers:open_channel(Config, Node2), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(DCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + erlang:process_flag(trap_exit, true), + %% open a channel to another node + Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + publish_confirm(Ch, QQ), + + %% stop the node hosting the leader + stop_node(Config, Node2), + %% this should not fail as the channel should detect the new leader and + %% resend to that + publish_confirm(Ch, QQ), + ok = rabbit_ct_broker_helpers:start_node(Config, Node2), + ok. + confirm_availability_on_leader_change(Config) -> [Node1, Node2, _Node3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
