diff options
| author | D Corbacho <diana@rabbitmq.com> | 2018-12-05 10:07:56 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-12-05 10:07:56 +0000 |
| commit | 105204861abfd270b4ba8c1b6e5878e00c6049f5 (patch) | |
| tree | 8cad648011789345fdf87db888efdbec804f50b7 | |
| parent | db888df2a9156fefda626f57cbb9d9591e27d41f (diff) | |
| parent | 26c7dfba5b31118010256ebe79b1043e470ce452 (diff) | |
| download | rabbitmq-server-git-105204861abfd270b4ba8c1b6e5878e00c6049f5.tar.gz | |
Merge pull request #1782 from rabbitmq/qq-confirm-availability
Quorum queue confirm availability
| -rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 111 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 182 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 41 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 93 |
5 files changed, 337 insertions, 93 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index e19914d5ab..d5bfbbb5e3 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -1094,7 +1094,8 @@ notify_down_all(QPids, ChPid, Timeout) -> Error -> {error, Error} end. -activate_limit_all(QPids, ChPid) -> +activate_limit_all(QRefs, ChPid) -> + QPids = [P || P <- QRefs, ?IS_CLASSIC(P)], delegate:invoke_no_result(QPids, {gen_server2, cast, [{activate_limit, ChPid}]}). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index bc273bf100..8fb73d6156 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -110,7 +110,7 @@ %% when queue.bind's queue field is empty, %% this name will be used instead most_recently_declared_queue, - %% a map of queue pid to queue name + %% a map of queue ref to queue name queue_names, %% queue processes are monitored to update %% queue names @@ -670,19 +670,20 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, #'basic.credit_drained'{consumer_tag = CTag, credit_drained = Credit}) end, Actions), - noreply_coalesce(confirm(MsgSeqNos, From, State)); + noreply_coalesce(confirm(MsgSeqNos, Name, State)); eol -> - State1 = handle_consuming_queue_down_or_eol(From, State0), - State2 = handle_delivering_queue_down(From, State1), - {MXs, UC1} = dtree:take(From, State2#ch.unconfirmed), + State1 = handle_consuming_queue_down_or_eol(Name, State0), + State2 = handle_delivering_queue_down(Name, State1), + %% TODO: this should use dtree:take/3 + {MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed), State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}), - case maps:find(From, QNames) of + case maps:find(Name, QNames) of {ok, QName} -> erase_queue_stats(QName); error -> ok end, noreply_coalesce( State3#ch{queue_states = maps:remove(Name, QueueStates), - queue_names = maps:remove(From, QNames)}) + queue_names = maps:remove(Name, QNames)}) end; _ -> %% the assumption here is that the queue state has been cleaned up and @@ -1335,13 +1336,14 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, return_ok(State, NoWait, OkMsg); {ok, {Q = #amqqueue{pid = QPid}, _CParams}} -> ConsumerMapping1 = maps:remove(ConsumerTag, ConsumerMapping), + QRef = qpid_to_ref(QPid), QCons1 = - case maps:find(QPid, QCons) of + case maps:find(QRef, QCons) of error -> QCons; {ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags), case gb_sets:is_empty(CTags1) of - true -> maps:remove(QPid, QCons); - false -> maps:put(QPid, CTags1, QCons) + true -> maps:remove(QRef, QCons); + false -> maps:put(QRef, CTags1, QCons) end end, NewState = State#ch{consumer_mapping = ConsumerMapping1, @@ -1394,7 +1396,7 @@ handle_method(#'basic.qos'{global = true, case ((not rabbit_limiter:is_active(Limiter)) andalso rabbit_limiter:is_active(Limiter1)) of true -> rabbit_amqqueue:activate_limit_all( - consumer_queues(State#ch.consumer_mapping), self()); + consumer_queue_refs(State#ch.consumer_mapping), self()); false -> ok end, {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; @@ -1640,25 +1642,26 @@ consumer_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, queue_monitors = QMons, queue_consumers = QCons}) -> - {#amqqueue{pid = QPid}, _} = - maps:get(ConsumerTag, ConsumerMapping), - CTags1 = case maps:find(QPid, QCons) of + {#amqqueue{pid = QPid}, _} = maps:get(ConsumerTag, ConsumerMapping), + QRef = qpid_to_ref(QPid), + CTags1 = case maps:find(QRef, QCons) of {ok, CTags} -> gb_sets:insert(ConsumerTag, CTags); error -> gb_sets:singleton(ConsumerTag) end, - QCons1 = maps:put(QPid, CTags1, QCons), - State#ch{queue_monitors = maybe_monitor(QPid, QMons), + QCons1 = maps:put(QRef, CTags1, QCons), + State#ch{queue_monitors = maybe_monitor(QRef, QMons), queue_consumers = QCons1}. track_delivering_queue(NoAck, QPid, QName, State = #ch{queue_names = QNames, queue_monitors = QMons, delivering_queues = DQ}) -> - State#ch{queue_names = maps:put(QPid, QName, QNames), - queue_monitors = maybe_monitor(QPid, QMons), + QRef = qpid_to_ref(QPid), + State#ch{queue_names = maps:put(QRef, QName, QNames), + queue_monitors = maybe_monitor(QRef, QMons), delivering_queues = case NoAck of true -> DQ; - false -> sets:add_element(QPid, DQ) + false -> sets:add_element(QRef, DQ) end}. handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC, @@ -1677,16 +1680,16 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC, handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) -> error(quorum_queues_should_never_be_monitored). -handle_consuming_queue_down_or_eol(QPid, - State = #ch{queue_consumers = QCons, - queue_names = QNames}) -> - ConsumerTags = case maps:find(QPid, QCons) of +handle_consuming_queue_down_or_eol(QRef, + State = #ch{queue_consumers = QCons, + queue_names = QNames}) -> + ConsumerTags = case maps:find(QRef, QCons) of error -> gb_sets:new(); {ok, CTags} -> CTags end, gb_sets:fold( fun (CTag, StateN = #ch{consumer_mapping = CMap}) -> - QName = maps:get(QPid, QNames), + QName = maps:get(QRef, QNames), case queue_down_consumer_action(CTag, CMap) of remove -> cancel_consumer(CTag, QName, StateN); @@ -1698,7 +1701,7 @@ handle_consuming_queue_down_or_eol(QPid, _ -> cancel_consumer(CTag, QName, StateN) end end - end, State#ch{queue_consumers = maps:remove(QPid, QCons)}, ConsumerTags). + end, State#ch{queue_consumers = maps:remove(QRef, QCons)}, ConsumerTags). %% [0] There is a slight danger here that if a queue is deleted and %% then recreated again the reconsume will succeed even though it was @@ -1725,8 +1728,8 @@ queue_down_consumer_action(CTag, CMap) -> _ -> {recover, ConsumeSpec} end. -handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> - State#ch{delivering_queues = sets:del_element(QPid, DQ)}. +handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) -> + State#ch{delivering_queues = sets:del_element(QRef, DQ)}. binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, RoutingKey, Arguments, VHostPath, ConnPid, @@ -1881,7 +1884,7 @@ ack(Acked, State = #ch{queue_names = QNames, State#ch{queue_states = QueueStates}. incr_queue_stats(QPid, QNames, MsgIds, State) -> - case maps:find(QPid, QNames) of + case maps:find(qpid_to_ref(QPid), QNames) of {ok, QName} -> Count = length(MsgIds), ?INCR_STATS(queue_stats, QName, Count, ack, State); error -> ok @@ -1905,10 +1908,10 @@ notify_queues(State = #ch{state = closing}) -> {ok, State}; notify_queues(State = #ch{consumer_mapping = Consumers, delivering_queues = DQ }) -> - QPids0 = sets:to_list( - sets:union(sets:from_list(consumer_queues(Consumers)), DQ)), + QRefs0 = sets:to_list( + sets:union(sets:from_list(consumer_queue_refs(Consumers)), DQ)), %% filter to only include pids to avoid trying to notify quorum queues - QPids = [P || P <- QPids0, ?IS_CLASSIC(P)], + QPids = [P || P <- QRefs0, ?IS_CLASSIC(P)], Timeout = get_operation_timeout(), {rabbit_amqqueue:notify_down_all(QPids, self(), Timeout), State#ch{state = closing}}. @@ -1924,8 +1927,8 @@ foreach_per_queue(F, UAL, Acc) -> end, gb_trees:empty(), UAL), rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, Acc, T). -consumer_queues(Consumers) -> - lists:usort([QPid || {_Key, {#amqqueue{pid = QPid}, _CParams}} +consumer_queue_refs(Consumers) -> + lists:usort([qpid_to_ref(QPid) || {_Key, {#amqqueue{pid = QPid}, _CParams}} <- maps:to_list(Consumers)]). %% tell the limiter about the number of acks that have been received @@ -1968,7 +1971,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ Qs = rabbit_amqqueue:lookup(DelQNames), {DeliveredQPids, DeliveredQQPids, QueueStates} = rabbit_amqqueue:deliver(Qs, Delivery, QueueStates0), - AllDeliveredQPids = DeliveredQPids ++ DeliveredQQPids, + AllDeliveredQRefs = DeliveredQPids ++ [N || {N, _} <- DeliveredQQPids], %% The maybe_monitor_all/2 monitors all queues to which we %% delivered. But we want to monitor even queues we didn't deliver %% to, since we need their 'DOWN' messages to clean @@ -1982,49 +1985,50 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ {QNames1, QMons1} = lists:foldl(fun (#amqqueue{pid = QPid, name = QName}, {QNames0, QMons0}) -> - {case maps:is_key(QPid, QNames0) of + QRef = qpid_to_ref(QPid), + {case maps:is_key(QRef, QNames0) of true -> QNames0; - false -> maps:put(QPid, QName, QNames0) + false -> maps:put(QRef, QName, QNames0) end, maybe_monitor(QPid, QMons0)} end, {QNames, maybe_monitor_all(DeliveredQPids, QMons)}, Qs), State1 = State#ch{queue_names = QNames1, queue_monitors = QMons1}, %% NB: the order here is important since basic.returns must be %% sent before confirms. - State2 = process_routing_mandatory(Mandatory, AllDeliveredQPids, MsgSeqNo, + State2 = process_routing_mandatory(Mandatory, AllDeliveredQRefs , MsgSeqNo, Message, State1), - State3 = process_routing_confirm( Confirm, AllDeliveredQPids, MsgSeqNo, - XName, State2), + State3 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo, + XName, State2), case rabbit_event:stats_level(State3, #ch.stats_timer) of fine -> ?INCR_STATS(exchange_stats, XName, 1, publish), [?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) || - QPid <- AllDeliveredQPids, - {ok, QName} <- [maps:find(QPid, QNames1)]]; + QRef <- AllDeliveredQRefs, + {ok, QName} <- [maps:find(QRef, QNames1)]]; _ -> ok end, State3#ch{queue_states = QueueStates}. -process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) -> +process_routing_mandatory(false, _, _, _, State) -> State; -process_routing_mandatory(true, [], _MsgSeqNo, Msg, State) -> +process_routing_mandatory(true, [], _, Msg, State) -> ok = basic_return(Msg, State, no_route), State; -process_routing_mandatory(true, QPids, MsgSeqNo, Msg, State) -> - State#ch{mandatory = dtree:insert(MsgSeqNo, QPids, Msg, +process_routing_mandatory(true, QRefs, MsgSeqNo, Msg, State) -> + State#ch{mandatory = dtree:insert(MsgSeqNo, QRefs, Msg, State#ch.mandatory)}. -process_routing_confirm(false, _, _MsgSeqNo, _XName, State) -> +process_routing_confirm(false, _, _, _, State) -> State; -process_routing_confirm(true, [], MsgSeqNo, XName, State) -> +process_routing_confirm(true, [], MsgSeqNo, XName, State) -> record_confirms([{MsgSeqNo, XName}], State); -process_routing_confirm(true, QPids, MsgSeqNo, XName, State) -> - State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, +process_routing_confirm(true, QRefs, MsgSeqNo, XName, State) -> + State#ch{unconfirmed = dtree:insert(MsgSeqNo, QRefs, XName, State#ch.unconfirmed)}. -confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> - {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC), +confirm(MsgSeqNos, QRef, State = #ch{unconfirmed = UC}) -> + {MXs, UC1} = dtree:take(MsgSeqNos, QRef, UC), %% NB: don't call noreply/1 since we don't want to send confirms. record_confirms(MXs, State#ch{unconfirmed = UC1}). @@ -2493,3 +2497,8 @@ maybe_monitor_all(Items, S) -> lists:foldl(fun maybe_monitor/2, S, Items). add_delivery_count_header(MsgHeader, Msg) -> Count = maps:get(delivery_count, MsgHeader, 0), rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg). + +qpid_to_ref(Pid) when is_pid(Pid) -> Pid; +qpid_to_ref({Name, _}) -> Name; +%% assume it already is a ref +qpid_to_ref(Ref) -> Ref. diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index c12a6ec464..740c6e202c 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -1,3 +1,19 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% + -module(rabbit_fifo). -behaviour(ra_machine). @@ -106,7 +122,7 @@ -type applied_mfa() :: {module(), atom(), list()}. % represents a partially applied module call --define(SHADOW_COPY_INTERVAL, 4096). +-define(SHADOW_COPY_INTERVAL, 4096 * 4). -define(USE_AVG_HALF_LIFE, 10000.0). -record(consumer, @@ -228,9 +244,9 @@ update_state(Conf, State) -> {state(), ra_machine:effects(), Reply :: term()}. apply(#{index := RaftIdx}, {enqueue, From, Seq, RawMsg}, Effects0, State00) -> case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of - {ok, State0, Effects} -> - State = append_to_master_index(RaftIdx, State0), - checkout(State, Effects); + {ok, State0, Effects1} -> + {State, Effects, ok} = checkout(State0, Effects1), + {append_to_master_index(RaftIdx, State), Effects, ok}; {duplicate, State, Effects} -> {State, Effects, ok} end; @@ -314,6 +330,7 @@ apply(_, {credit, NewCredit, RemoteDelCnt, Drain, ConsumerId}, Effects0, apply(_, {checkout, {dequeue, _}, {_Tag, _Pid}}, Effects0, #state{messages = M, prefix_msg_count = 0} = State0) when map_size(M) == 0 -> + %% FIX: also check if there are returned messages %% TODO do we need metric visibility of empty get requests? {State0, Effects0, {dequeue, empty}}; apply(Meta, {checkout, {dequeue, settled}, ConsumerId}, @@ -357,7 +374,7 @@ apply(#{index := RaftIdx}, purge, Effects0, {StateAcc0, EffectsAcc0, ok}) -> MsgRaftIdxs = [RIdx || {_MsgInId, {RIdx, _}} <- maps:values(Checked0)], - complete(ConsumerId, MsgRaftIdxs, C, + complete(ConsumerId, MsgRaftIdxs, maps:size(Checked0), C, #{}, EffectsAcc0, StateAcc0) end, {State0, Effects0, ok}, Cons0), {State, Effects, _} = @@ -456,20 +473,26 @@ apply(_, {update_state, Conf}, Effects, State) -> {update_state(Conf, State), Effects, ok}. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). -state_enter(leader, #state{consumers = Custs, +state_enter(leader, #state{consumers = Cons, enqueuers = Enqs, name = Name, + prefix_msg_count = 0, become_leader_handler = BLH}) -> % return effects to monitor all current consumers and enqueuers - ConMons = [{monitor, process, P} || {_, P} <- maps:keys(Custs)], - EnqMons = [{monitor, process, P} || P <- maps:keys(Enqs)], - Effects = ConMons ++ EnqMons, + Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]), + Mons = [{monitor, process, P} || P <- Pids], + Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids], + Effects = Mons ++ Nots, case BLH of undefined -> Effects; {Mod, Fun, Args} -> [{mod_call, Mod, Fun, Args ++ [Name]} | Effects] end; +state_enter(recovered, #state{prefix_msg_count = PrefixMsgCount}) + when PrefixMsgCount =/= 0 -> + %% TODO: remove assertion? + exit({rabbit_fifo, unexpected_prefix_msg_count, PrefixMsgCount}); state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0}) -> Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0), [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, Custs))]; @@ -699,7 +722,8 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked, end, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), - State1 = lists:foldl(fun('$prefix_msg', #state{prefix_msg_count = MsgCount} = S0) -> + State1 = lists:foldl(fun('$prefix_msg', + #state{prefix_msg_count = MsgCount} = S0) -> S0#state{prefix_msg_count = MsgCount + 1}; ({MsgNum, Msg}, S0) -> return_one(MsgNum, Msg, S0) @@ -709,14 +733,14 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked, Effects). % used to processes messages that are finished -complete(ConsumerId, MsgRaftIdxs, +complete(ConsumerId, MsgRaftIdxs, NumDiscarded, Con0, Checked, Effects0, #state{consumers = Cons0, service_queue = SQ0, ra_indexes = Indexes0} = State0) -> - %% credit_mode = simple_prefetch should automatically top-up credit as messages - %% are simple_prefetch or otherwise returned + %% credit_mode = simple_prefetch should automatically top-up credit + %% as messages are simple_prefetch or otherwise returned Con = Con0#consumer{checked_out = Checked, - credit = increase_credit(Con0, length(MsgRaftIdxs))}, + credit = increase_credit(Con0, NumDiscarded)}, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs), @@ -742,7 +766,10 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId, Checked = maps:without(MsgIds, Checked0), Discarded = maps:with(MsgIds, Checked0), MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], + %% need to pass the length of discarded as $prefix_msgs would be filtered + %% by the above list comprehension {State1, Effects1, _} = complete(ConsumerId, MsgRaftIdxs, + maps:size(Discarded), Con0, Checked, Effects0, State0), {State, Effects, _} = checkout(State1, Effects1), % settle metrics are incremented separately @@ -770,6 +797,7 @@ cancel_consumer_effects(Pid, Name, update_smallest_raft_index(IncomingRaftIdx, OldIndexes, #state{ra_indexes = Indexes, + % prefix_msg_count = 0, messages = Messages} = State, Effects) -> case rabbit_fifo_index:size(Indexes) of 0 when map_size(Messages) =:= 0 -> @@ -810,7 +838,10 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, returns = queue:in(MsgNum, Returns)}. return_all(State, Checked) -> - maps:fold(fun (_, {MsgNum, Msg}, S) -> + maps:fold(fun (_, '$prefix_msg', + #state{prefix_msg_count = MsgCount} = S) -> + S#state{prefix_msg_count = MsgCount + 1}; + (_, {MsgNum, Msg}, S) -> return_one(MsgNum, Msg, S) end, State, Checked). @@ -1010,11 +1041,17 @@ dehydrate_state(#state{messages = Messages0, ra_indexes = rabbit_fifo_index:empty(), low_msg_num = undefined, consumers = maps:map(fun (_, C) -> - C#consumer{checked_out = #{}} + dehydrate_consumer(C) + % C#consumer{checked_out = #{}} end, Consumers), returns = queue:new(), + %% messages include returns prefix_msg_count = maps:size(Messages0) + MsgCount}. +dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> + Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0), + Con#consumer{checked_out = Checked}. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -1392,7 +1429,7 @@ tick_test() -> ok. enq_deq_snapshot_recover_test() -> - Tag = <<"release_cursor_snapshot_state_test">>, + Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, % OthPid = spawn(fun () -> ok end), % Oth = {<<"oth">>, OthPid}, @@ -1449,20 +1486,49 @@ snapshot_recover_test() -> ], run_snapshot_test(?FUNCTION_NAME, Commands). -enq_deq_return_snapshot_recover_test() -> +enq_deq_return_settle_snapshot_test() -> + Tag = atom_to_binary(?FUNCTION_NAME, utf8), + Cid = {Tag, self()}, + Commands = [ + {enqueue, self(), 1, one}, %% to Cid + {checkout, {auto, 1, simple_prefetch}, Cid}, + {return, [0], Cid}, %% should be re-delivered to Cid + {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2 + {settle, [1], Cid}, + {settle, [2], Cid} + ], + run_snapshot_test(?FUNCTION_NAME, Commands). + +return_prefix_msg_count_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, - OthPid = spawn(fun () -> ok end), - Oth = {<<"oth">>, OthPid}, Commands = [ {enqueue, self(), 1, one}, - {enqueue, self(), 2, two}, - {checkout, {dequeue, unsettled}, Oth}, - {checkout, {dequeue, unsettled}, Cid}, - {settle, [0], Oth}, - {return, [0], Cid}, + {checkout, {auto, 1, simple_prefetch}, Cid}, + {checkout, cancel, Cid}, + {enqueue, self(), 2, two} %% Cid prefix_msg_count: 2 + ], + Indexes = lists:seq(1, length(Commands)), + Entries = lists:zip(Indexes, Commands), + {State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries), + ?debugFmt("return_prefix_msg_count_test state ~n~p~n", [State]), + ok. + + +return_settle_snapshot_test() -> + Tag = atom_to_binary(?FUNCTION_NAME, utf8), + Cid = {Tag, self()}, + Commands = [ + {enqueue, self(), 1, one}, %% to Cid + {checkout, {auto, 1, simple_prefetch}, Cid}, + {return, [0], Cid}, %% should be re-delivered to Oth + {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2 + {settle, [1], Cid}, + {return, [2], Cid}, + {settle, [3], Cid}, {enqueue, self(), 3, three}, - purge + purge, + {enqueue, self(), 4, four} ], run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1477,17 +1543,47 @@ enq_check_settle_snapshot_recover_test() -> {settle, [0], Cid}, {enqueue, self(), 3, three}, {settle, [2], Cid} + ], + % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), + run_snapshot_test(?FUNCTION_NAME, Commands). +enq_check_settle_snapshot_purge_test() -> + Tag = atom_to_binary(?FUNCTION_NAME, utf8), + Cid = {Tag, self()}, + Commands = [ + {checkout, {auto, 2, simple_prefetch}, Cid}, + {enqueue, self(), 1, one}, + {enqueue, self(), 2, two}, + {settle, [1], Cid}, + {settle, [0], Cid}, + {enqueue, self(), 3, three}, + purge ], % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), run_snapshot_test(?FUNCTION_NAME, Commands). +enq_check_settle_duplicate_test() -> + %% duplicate settle commands are likely + Tag = atom_to_binary(?FUNCTION_NAME, utf8), + Cid = {Tag, self()}, + Commands = [ + {checkout, {auto, 2, simple_prefetch}, Cid}, + {enqueue, self(), 1, one}, %% 0 + {enqueue, self(), 2, two}, %% 0 + {settle, [0], Cid}, + {settle, [1], Cid}, + {settle, [1], Cid}, + {enqueue, self(), 3, three}, + {settle, [2], Cid} + ], + % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), + run_snapshot_test(?FUNCTION_NAME, Commands). run_snapshot_test(Name, Commands) -> %% create every incremental permuation of the commands lists %% and run the snapshot tests against that [begin - % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), + ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]), run_snapshot_test0(Name, C) end || C <- prefixes(Commands, 1, [])]. @@ -1500,6 +1596,7 @@ run_snapshot_test0(Name, Commands) -> Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true; (_) -> false end, Entries), + ?debugFmt("running from snapshot: ~b", [SnapIdx]), {S, _} = run_log(SnapState, Filtered), % assert log can be restored from any release cursor index % ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n", @@ -1515,7 +1612,7 @@ prefixes(Source, N, Acc) -> prefixes(Source, N+1, [X | Acc]). delivery_query_returns_deliveries_test() -> - Tag = <<"release_cursor_snapshot_state_test">>, + Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ {checkout, {auto, 5, simple_prefetch}, Cid}, @@ -1555,19 +1652,30 @@ state_enter_test() -> [{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0), ok. -leader_monitors_on_state_enter_test() -> - Cid = {<<"cid">>, self()}, - {State0, [_, _]} = enq(1, 1, first, test_init(test)), - {State1, _} = check_auto(Cid, 2, State0), +state_enter_montors_and_notifications_test() -> + Oth = spawn(fun () -> ok end), + {State0, _} = enq(1, 1, first, test_init(test)), + Cid = {<<"adf">>, self()}, + OthCid = {<<"oth">>, Oth}, + {State1, _} = check(Cid, 2, State0), + {State, _} = check(OthCid, 3, State1), Self = self(), - %% as we have an enqueuer _and_ a consumer we chould - %% get two monitor effects in total, even if they are for the same - %% processs + Effects = state_enter(leader, State), + + %% monitor all enqueuers and consumers [{monitor, process, Self}, - {monitor, process, Self}] = state_enter(leader, State1), + {monitor, process, Oth}] = + lists:filter(fun ({monitor, process, _}) -> true; + (_) -> false + end, Effects), + [{send_msg, Self, leader_change, ra_event}, + {send_msg, Oth, leader_change, ra_event}] = + lists:filter(fun ({send_msg, _, leader_change, ra_event}) -> true; + (_) -> false + end, Effects), + ?ASSERT_EFF({monitor, process, _}, Effects), ok. - purge_test() -> Cid = {<<"purge_test">>, self()}, {State1, _} = enq(1, 1, first, test_init(test)), diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index c063ef9a17..635d85be4a 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -1,3 +1,19 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% + %% @doc Provides an easy to consume API for interacting with the {@link rabbit_fifo.} %% state machine implementation running inside a `ra' raft system. %% @@ -447,7 +463,8 @@ handle_ra_event(From, {applied, Seqs}, fun (Cid, {Settled, Returns, Discards}, Acc) -> add_command(Cid, settle, Settled, add_command(Cid, return, Returns, - add_command(Cid, discard, Discards, Acc))) + add_command(Cid, discard, + Discards, Acc))) end, [], State1#state.unsent_commands), Node = pick_node(State2), %% send all the settlements and returns @@ -465,10 +482,21 @@ handle_ra_event(From, {applied, Seqs}, end; handle_ra_event(Leader, {machine, {delivery, _ConsumerTag, _} = Del}, State0) -> handle_delivery(Leader, Del, State0); +handle_ra_event(Leader, {machine, leader_change}, + #state{leader = Leader} = State) -> + %% leader already known + {internal, [], [], State}; +handle_ra_event(Leader, {machine, leader_change}, State0) -> + %% we need to update leader + %% and resend any pending commands + State = resend_all_pending(State0#state{leader = Leader}), + {internal, [], [], State}; handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) -> % TODO: how should these be handled? re-sent on timer or try random {internal, [], [], State0}; handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) -> + % ?INFO("rabbit_fifo_client: rejected ~b not leader ~w leader: ~w~n", + % [Seq, From, Leader]), State1 = State0#state{leader = Leader}, State = resend(Seq, State1), {internal, [], [], State}; @@ -526,7 +554,9 @@ seq_applied({Seq, MaybeAction}, last_applied = Seq}}; error -> % must have already been resent or removed for some other reason - {Corrs, Actions, State} + % still need to update last_applied or we may inadvertently resend + % stuff later + {Corrs, Actions, State#state{last_applied = Seq}} end; seq_applied(_Seq, Acc) -> Acc. @@ -550,7 +580,7 @@ maybe_add_action(Action, Acc, State) -> {[Action | Acc], State}. do_resends(From, To, State) when From =< To -> - ?INFO("doing resends From ~w To ~w~n", [From, To]), + % ?INFO("rabbit_fifo_client: doing resends From ~w To ~w~n", [From, To]), lists:foldl(fun resend/2, State, lists:seq(From, To)); do_resends(_, _, State) -> State. @@ -565,6 +595,10 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) -> State end. +resend_all_pending(#state{pending = Pend} = State) -> + Seqs = lists:sort(maps:keys(Pend)), + lists:foldl(fun resend/2, State, Seqs). + handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0, #state{consumer_deliveries = CDels0} = State0) -> {LastId, _} = lists:last(IdMsgs), @@ -619,6 +653,7 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) -> Missing. pick_node(#state{leader = undefined, servers = [N | _]}) -> + %% TODO: pick random rather that first? N; pick_node(#state{leader = Leader}) -> Leader. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 88c79acf79..61fd7a1b2c 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -48,6 +48,8 @@ groups() -> ++ all_tests()}, {cluster_size_3, [], [ declare_during_node_down, + simple_confirm_availability_on_leader_change, + confirm_availability_on_leader_change, recover_from_single_failure, recover_from_multiple_failures, leadership_takeover, @@ -575,6 +577,19 @@ publish(Config) -> wait_for_messages_ready(Servers, Name, 1), wait_for_messages_pending_ack(Servers, Name, 0). +publish_confirm(Ch, QName) -> + publish(Ch, QName), + amqp_channel:register_confirm_handler(Ch, self()), + ct:pal("waiting for confirms from ~s", [QName]), + ok = receive + #'basic.ack'{} -> ok; + #'basic.nack'{} -> fail + after 2500 -> + exit(confirm_timeout) + end, + ct:pal("CONFIRMED! ~s", [QName]), + ok. + ra_name(Q) -> binary_to_atom(<<"%2F_", Q/binary>>, utf8). @@ -1549,6 +1564,82 @@ declare_during_node_down(Config) -> wait_for_messages_ready(Servers, RaName, 1), ok. +simple_confirm_availability_on_leader_change(Config) -> + [Node1, Node2, _Node3] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + %% declare a queue on node2 - this _should_ host the leader on node 2 + DCh = rabbit_ct_client_helpers:open_channel(Config, Node2), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(DCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + erlang:process_flag(trap_exit, true), + %% open a channel to another node + Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + publish_confirm(Ch, QQ), + + %% stop the node hosting the leader + stop_node(Config, Node2), + %% this should not fail as the channel should detect the new leader and + %% resend to that + publish_confirm(Ch, QQ), + ok = rabbit_ct_broker_helpers:start_node(Config, Node2), + ok. + +confirm_availability_on_leader_change(Config) -> + [Node1, Node2, _Node3] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + %% declare a queue on node2 - this _should_ host the leader on node 2 + DCh = rabbit_ct_client_helpers:open_channel(Config, Node2), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(DCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + erlang:process_flag(trap_exit, true), + Pid = spawn_link(fun () -> + %% open a channel to another node + Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + ConfirmLoop = fun Loop() -> + publish_confirm(Ch, QQ), + receive {done, P} -> + P ! done, + ok + after 0 -> Loop() end + end, + ConfirmLoop() + end), + + timer:sleep(500), + %% stop the node hosting the leader + stop_node(Config, Node2), + %% this should not fail as the channel should detect the new leader and + %% resend to that + timer:sleep(500), + Pid ! {done, self()}, + receive + done -> ok; + {'EXIT', Pid, Err} -> + exit(Err) + after 5500 -> + flush(100), + exit(bah) + end, + ok = rabbit_ct_broker_helpers:start_node(Config, Node2), + ok. + +flush(T) -> + receive X -> + ct:pal("flushed ~w", [X]), + flush(T) + after T -> + ok + end. + + add_member_not_running(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1966,7 +2057,7 @@ filter_queues(Expected, Got) -> end, Got). publish(Ch, Queue) -> - ok = amqp_channel:call(Ch, + ok = amqp_channel:cast(Ch, #'basic.publish'{routing_key = Queue}, #amqp_msg{props = #'P_basic'{delivery_mode = 2}, payload = <<"msg">>}). |
