diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2019-05-13 16:00:23 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-05-13 16:00:23 +0300 |
| commit | 18552f70b8552e0fb9c656eb262a4e8018af02ce (patch) | |
| tree | fccf8d861e7e698eae5213ee0ba7c73bf1a27b0d | |
| parent | 5c80bea709f4c89db3d8652f4a3e7d8421efb76e (diff) | |
| parent | 4d24352033d6778a2d5b9c20546280cb59686490 (diff) | |
| download | rabbitmq-server-git-18552f70b8552e0fb9c656eb262a4e8018af02ce.tar.gz | |
Merge pull request #1893 from rabbitmq/do_not_confirm_on_unreachable_queue
Make sure that publishes to dead or unaccessible queues return nack
| -rw-r--r-- | src/dtree.erl | 205 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 119 | ||||
| -rw-r--r-- | src/unconfirmed_messages.erl | 280 | ||||
| -rw-r--r-- | test/confirms_rejects_SUITE.erl | 118 |
4 files changed, 478 insertions, 244 deletions
diff --git a/src/dtree.erl b/src/dtree.erl deleted file mode 100644 index 6f4d102c10..0000000000 --- a/src/dtree.erl +++ /dev/null @@ -1,205 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at https://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved. -%% - -%% A dual-index tree. -%% -%% Entries have the following shape: -%% -%% +----+--------------------+---+ -%% | PK | SK1, SK2, ..., SKN | V | -%% +----+--------------------+---+ -%% -%% i.e. a primary key, set of secondary keys, and a value. -%% -%% There can be only one entry per primary key, but secondary keys may -%% appear in multiple entries. -%% -%% The set of secondary keys must be non-empty. Or, to put it another -%% way, entries only exist while their secondary key set is non-empty. - --module(dtree). - --export([empty/0, insert/4, take/3, take/2, take_one/2, take_all/2, drop/2, - is_defined/2, is_empty/1, smallest/1, size/1]). - -%%---------------------------------------------------------------------------- - --export_type([?MODULE/0]). - --opaque ?MODULE() :: {gb_trees:tree(), gb_trees:tree()}. - --type pk() :: any(). --type sk() :: any(). --type val() :: any(). --type kv() :: {pk(), val()}. - -%%---------------------------------------------------------------------------- - --spec empty() -> ?MODULE(). - -empty() -> {gb_trees:empty(), gb_trees:empty()}. - -%% Insert an entry. Fails if there already is an entry with the given -%% primary key. - --spec insert(pk(), [sk()], val(), ?MODULE()) -> ?MODULE(). - -insert(PK, [], V, {P, S}) -> - %% dummy insert to force error if PK exists - _ = gb_trees:insert(PK, {gb_sets:empty(), V}, P), - {P, S}; -insert(PK, SKs, V, {P, S}) -> - {gb_trees:insert(PK, {gb_sets:from_list(SKs), V}, P), - lists:foldl(fun (SK, S0) -> - case gb_trees:lookup(SK, S0) of - {value, PKS} -> PKS1 = gb_sets:insert(PK, PKS), - gb_trees:update(SK, PKS1, S0); - none -> PKS = gb_sets:singleton(PK), - gb_trees:insert(SK, PKS, S0) - end - end, S, SKs)}. - -%% Remove the given secondary key from the entries of the given -%% primary keys, returning the primary-key/value pairs of any entries -%% that were dropped as the result (i.e. due to their secondary key -%% set becoming empty). It is ok for the given primary keys and/or -%% secondary key to not exist. - --spec take([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}. - -take(PKs, SK, {P, S}) -> - case gb_trees:lookup(SK, S) of - none -> {[], {P, S}}; - {value, PKS} -> TakenPKS = gb_sets:from_list(PKs), - PKSInter = gb_sets:intersection(PKS, TakenPKS), - PKSDiff = gb_sets_difference (PKS, PKSInter), - {KVs, P1} = take2(PKSInter, SK, P), - {KVs, {P1, case gb_sets:is_empty(PKSDiff) of - true -> gb_trees:delete(SK, S); - false -> gb_trees:update(SK, PKSDiff, S) - end}} - end. - -%% Remove the given secondary key from all entries, returning the -%% primary-key/value pairs of any entries that were dropped as the -%% result (i.e. due to their secondary key set becoming empty). It is -%% ok for the given secondary key to not exist. - --spec take(sk(), ?MODULE()) -> {[kv()], ?MODULE()}. - -take(SK, {P, S}) -> - case gb_trees:lookup(SK, S) of - none -> {[], {P, S}}; - {value, PKS} -> {KVs, P1} = take2(PKS, SK, P), - {KVs, {P1, gb_trees:delete(SK, S)}} - end. - -%% Drop an entry with the primary key and clears secondary keys for this key, -%% returning a list with a key-value pair as a result. -%% If the primary key does not exist, returns an empty list. - --spec take_one(pk(), ?MODULE()) -> {[{pk(), val()}], ?MODULE()}. - -take_one(PK, {P, S}) -> - case gb_trees:lookup(PK, P) of - {value, {SKS, Value}} -> - P1 = gb_trees:delete(PK, P), - S1 = gb_sets:fold( - fun(SK, Acc) -> - {value, PKS} = gb_trees:lookup(SK, Acc), - PKS1 = gb_sets:delete(PK, PKS), - case gb_sets:is_empty(PKS1) of - true -> gb_trees:delete(SK, Acc); - false -> gb_trees:update(SK, PKS1, Acc) - end - end, S, SKS), - {[{PK, Value}], {P1, S1}}; - none -> {[], {P, S}} - end. - -%% Drop all entries which contain the given secondary key, returning -%% the primary-key/value pairs of these entries. It is ok for the -%% given secondary key to not exist. - --spec take_all(sk(), ?MODULE()) -> {[kv()], ?MODULE()}. - -take_all(SK, {P, S}) -> - case gb_trees:lookup(SK, S) of - none -> {[], {P, S}}; - {value, PKS} -> {KVs, SKS, P1} = take_all2(PKS, P), - {KVs, {P1, prune(SKS, PKS, S)}} - end. - -%% Drop all entries for the given primary key (which does not have to exist). - --spec drop(pk(), ?MODULE()) -> ?MODULE(). - -drop(PK, {P, S}) -> - case gb_trees:lookup(PK, P) of - none -> {P, S}; - {value, {SKS, _V}} -> {gb_trees:delete(PK, P), - prune(SKS, gb_sets:singleton(PK), S)} - end. - --spec is_defined(sk(), ?MODULE()) -> boolean(). - -is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S). - --spec is_empty(?MODULE()) -> boolean(). - -is_empty({P, _S}) -> gb_trees:is_empty(P). - --spec smallest(?MODULE()) -> kv(). - -smallest({P, _S}) -> {K, {_SKS, V}} = gb_trees:smallest(P), - {K, V}. - --spec size(?MODULE()) -> non_neg_integer(). - -size({P, _S}) -> gb_trees:size(P). - -%%---------------------------------------------------------------------------- - -take2(PKS, SK, P) -> - gb_sets:fold(fun (PK, {KVs, P0}) -> - {SKS, V} = gb_trees:get(PK, P0), - SKS1 = gb_sets:delete(SK, SKS), - case gb_sets:is_empty(SKS1) of - true -> KVs1 = [{PK, V} | KVs], - {KVs1, gb_trees:delete(PK, P0)}; - false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)} - end - end, {[], P}, PKS). - -take_all2(PKS, P) -> - gb_sets:fold(fun (PK, {KVs, SKS0, P0}) -> - {SKS, V} = gb_trees:get(PK, P0), - {[{PK, V} | KVs], gb_sets:union(SKS, SKS0), - gb_trees:delete(PK, P0)} - end, {[], gb_sets:empty(), P}, PKS). - -prune(SKS, PKS, S) -> - gb_sets:fold(fun (SK0, S0) -> - PKS1 = gb_trees:get(SK0, S0), - PKS2 = gb_sets_difference(PKS1, PKS), - case gb_sets:is_empty(PKS2) of - true -> gb_trees:delete(SK0, S0); - false -> gb_trees:update(SK0, PKS2, S0) - end - end, S, SKS). - -gb_sets_difference(S1, S2) -> - gb_sets:fold(fun gb_sets:delete_any/2, S1, S2). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 19d4449e21..7d4b7bde4a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -155,7 +155,7 @@ confirm_enabled, %% publisher confirm delivery tag sequence publish_seqno, - %% a dtree used to track unconfirmed + %% an unconfirmed_messages data structure used to track unconfirmed %% (to publishers) messages unconfirmed, %% a list of tags for published messages that were @@ -518,7 +518,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, delivering_queues = sets:new(), confirm_enabled = false, publish_seqno = 1, - unconfirmed = dtree:empty(), + unconfirmed = unconfirmed_messages:new(), rejected = [], confirmed = [], reply_consumer = none, @@ -707,10 +707,15 @@ handle_cast({mandatory_received, _MsgSeqNo}, State) -> handle_cast({reject_publish, MsgSeqNo, _QPid}, State = #ch{unconfirmed = UC}) -> %% It does not matter which queue rejected the message, - %% if any queue rejected it - it should not be confirmed. - {MXs, UC1} = dtree:take_one(MsgSeqNo, UC), + %% if any queue did, it should not be confirmed. + {MaybeRejected, UC1} = unconfirmed_messages:reject_msg(MsgSeqNo, UC), %% NB: don't call noreply/1 since we don't want to send confirms. - noreply_coalesce(record_rejects(MXs, State#ch{unconfirmed = UC1})); + case MaybeRejected of + not_confirmed -> + noreply_coalesce(State#ch{unconfirmed = UC1}); + {rejected, MX} -> + noreply_coalesce(record_rejects([MX], State#ch{unconfirmed = UC1})) + end; handle_cast({confirm, MsgSeqNos, QPid}, State) -> noreply_coalesce(confirm(MsgSeqNos, QPid, State)). @@ -766,9 +771,12 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, eol -> State1 = handle_consuming_queue_down_or_eol(Name, State0), State2 = handle_delivering_queue_down(Name, State1), - %% TODO: this should use dtree:take/3 - {MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed), - State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}), + {ConfirmMXs, RejectMXs, UC1} = + unconfirmed_messages:forget_ref(Name, State2#ch.unconfirmed), + %% Deleted queue is a special case. + %% Do not nack the "rejected" messages. + State3 = record_confirms(ConfirmMXs ++ RejectMXs, + State2#ch{unconfirmed = UC1}), erase_queue_stats(QName), noreply_coalesce( State3#ch{queue_states = maps:remove(Name, QueueStates), @@ -776,7 +784,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, end; _ -> %% the assumption here is that the queue state has been cleaned up and - %% this is a residual ra notification + %% this is a residual Ra notification noreply_coalesce(State0) end; @@ -1818,14 +1826,33 @@ track_delivering_queue(NoAck, QPid, QName, false -> sets:add_element(QRef, DQ) end}. -handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) +handle_publishing_queue_down(QPid, Reason, + State = #ch{unconfirmed = UC, + queue_names = QNames}) when ?IS_CLASSIC(QPid) -> - case rabbit_misc:is_abnormal_exit(Reason) of - true -> {MXs, UC1} = dtree:take_all(QPid, UC), - record_rejects(MXs, State#ch{unconfirmed = UC1}); - false -> {MXs, UC1} = dtree:take(QPid, UC), - record_confirms(MXs, State#ch{unconfirmed = UC1}) - + case maps:get(QPid, QNames, none) of + %% The queue is unknown, the confirm must have been processed already + none -> State; + _QName -> + case {rabbit_misc:is_abnormal_exit(Reason), Reason} of + {true, _} -> + {RejectMXs, UC1} = + unconfirmed_messages:reject_all_for_queue(QPid, UC), + record_rejects(RejectMXs, State#ch{unconfirmed = UC1}); + {false, normal} -> + {ConfirmMXs, RejectMXs, UC1} = + unconfirmed_messages:forget_ref(QPid, UC), + %% Deleted queue is a special case. + %% Do not nack the "rejected" messages. + record_confirms(ConfirmMXs ++ RejectMXs, + State#ch{unconfirmed = UC1}); + {false, _} -> + {ConfirmMXs, RejectMXs, UC1} = + unconfirmed_messages:forget_ref(QPid, UC), + State1 = record_confirms(ConfirmMXs, + State#ch{unconfirmed = UC1}), + record_rejects(RejectMXs, State1) + end end; handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) -> error(quorum_queues_should_never_be_monitored). @@ -2139,21 +2166,33 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ %% frequently would in fact be more expensive in the common case. {QNames1, QMons1} = lists:foldl(fun (Q, {QNames0, QMons0}) when ?is_amqqueue(Q) -> - QPid = amqqueue:get_pid(Q), - QRef = qpid_to_ref(QPid), - QName = amqqueue:get_name(Q), - {case maps:is_key(QRef, QNames0) of - true -> QNames0; - false -> maps:put(QRef, QName, QNames0) - end, maybe_monitor(QPid, QMons0)} - end, {QNames, maybe_monitor_all(DeliveredQPids, QMons)}, Qs), + QPid = amqqueue:get_pid(Q), + QRef = qpid_to_ref(QPid), + QName = amqqueue:get_name(Q), + case ?IS_CLASSIC(QRef) of + true -> + SPids = amqqueue:get_slave_pids(Q), + NewQNames = + maps:from_list([{Ref, QName} || Ref <- [QRef | SPids]]), + {maps:merge(NewQNames, QNames0), + maybe_monitor_all([QPid | SPids], QMons0)}; + false -> + {maps:put(QRef, QName, QNames0), QMons0} + end + end, + {QNames, QMons}, Qs), State1 = State#ch{queue_names = QNames1, queue_monitors = QMons1}, %% NB: the order here is important since basic.returns must be %% sent before confirms. ok = process_routing_mandatory(Mandatory, AllDeliveredQRefs, Message, State1), - State2 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo, + AllDeliveredQNames = [ QName || QRef <- AllDeliveredQRefs, + {ok, QName} <- [maps:find(QRef, QNames1)]], + State2 = process_routing_confirm(Confirm, + AllDeliveredQRefs, + AllDeliveredQNames, + MsgSeqNo, XName, State1), case rabbit_event:stats_level(State, #ch.stats_timer) of fine -> @@ -2179,16 +2218,22 @@ process_routing_mandatory(_Mandatory = false, process_routing_mandatory(_, _, _, _) -> ok. -process_routing_confirm(false, _, _, _, State) -> +process_routing_confirm(false, _, _, _, _, State) -> State; -process_routing_confirm(true, [], MsgSeqNo, XName, State) -> +process_routing_confirm(true, [], _, MsgSeqNo, XName, State) -> record_confirms([{MsgSeqNo, XName}], State); -process_routing_confirm(true, QRefs, MsgSeqNo, XName, State) -> - State#ch{unconfirmed = dtree:insert(MsgSeqNo, QRefs, XName, - State#ch.unconfirmed)}. - -confirm(MsgSeqNos, QRef, State = #ch{unconfirmed = UC}) -> - {MXs, UC1} = dtree:take(MsgSeqNos, QRef, UC), +process_routing_confirm(true, QRefs, QNames, MsgSeqNo, XName, State) -> + State#ch{unconfirmed = + unconfirmed_messages:insert(MsgSeqNo, QNames, QRefs, XName, + State#ch.unconfirmed)}. + +confirm(MsgSeqNos, QRef, State = #ch{queue_names = QNames, unconfirmed = UC}) -> + %% NOTE: if queue name does not exist here it's likely that the ref also + %% does not exist in unconfirmed messages. + %% Neither does the 'ignore' atom, so it's a reasonable fallback. + QName = maps:get(QRef, QNames, ignore), + {MXs, UC1} = + unconfirmed_messages:confirm_multiple_msg_ref(MsgSeqNos, QName, QRef, UC), %% NB: don't call noreply/1 since we don't want to send confirms. record_confirms(MXs, State#ch{unconfirmed = UC1}). @@ -2250,9 +2295,9 @@ send_confirms(Cs, Rs, State) -> coalesce_and_send(MsgSeqNos, NegativeMsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) -> SMsgSeqNos = lists:usort(MsgSeqNos), - UnconfirmedCutoff = case dtree:is_empty(UC) of + UnconfirmedCutoff = case unconfirmed_messages:is_empty(UC) of true -> lists:last(SMsgSeqNos) + 1; - false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo + false -> unconfirmed_messages:smallest(UC) end, Cutoff = lists:min([UnconfirmedCutoff | NegativeMsgSeqNos]), {Ms, Ss} = lists:splitwith(fun(X) -> X < Cutoff end, SMsgSeqNos), @@ -2271,7 +2316,7 @@ ack_len(Acks) -> lists:sum([length(L) || {ack, L} <- Acks]). maybe_complete_tx(State = #ch{tx = {_, _}}) -> State; maybe_complete_tx(State = #ch{unconfirmed = UC}) -> - case dtree:is_empty(UC) of + case unconfirmed_messages:is_empty(UC) of false -> State; true -> complete_tx(State#ch{confirmed = []}) end. @@ -2311,7 +2356,7 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE; i(source, #ch{cfg = #conf{source = ChSrc}}) -> ChSrc; i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM); -i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC); +i(messages_unconfirmed, #ch{unconfirmed = UC}) -> unconfirmed_messages:size(UC); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> ?QUEUE:len(UAMQ); i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs); i(messages_uncommitted, #ch{}) -> 0; diff --git a/src/unconfirmed_messages.erl b/src/unconfirmed_messages.erl new file mode 100644 index 0000000000..63a504a239 --- /dev/null +++ b/src/unconfirmed_messages.erl @@ -0,0 +1,280 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved. +%% + +%% Unconfirmed messages tracking. +%% +%% A message should be confirmed to the publisher only when all queues confirm. +%% +%% Messages are published to multiple queues while each queue may be +%% represented by several processes (queue refs). +%% +%% Queue refs return confirmations, rejections, can fail or disconnect. +%% If a queue ref fails, messgae should be rejected. +%% If all queue refs for a queue disconnect (not fail) without confirmation, +%% messge should be rejected. +%% +%% For simplicity, disconnects do not return a reject until all message refs +%% confirm or disconnect. + +-module(unconfirmed_messages). + +-export([new/0, + insert/5, + confirm_msg_ref/4, + confirm_multiple_msg_ref/4, + forget_ref/2, + + reject_msg/2, + reject_all_for_queue/2, + + smallest/1, + size/1, + is_empty/1]). + +%%---------------------------------------------------------------------------- + +-export_type([?MODULE/0]). +-define(SET_VALUE, []). + +-type queue_ref() :: term(). +-type msg_id() :: term(). +-type queue_name() :: rabbit_amqqueue:name(). +-type exchange_name() :: rabbit_exchange:name(). +-type map_set(Type) :: #{Type => ?SET_VALUE}. + +-record(msg_status, { + %% a set of refs waiting for confirm + refs = #{} :: map_set(queue_ref()), + %% shows which queues had at least one confirmation + queue_status = #{} :: #{queue_name() => confirmed | rejected}, + exchange :: exchange_name() +}). + +-record(unconfirmed, { + %% needed to get unconfirmed cutoff + ordered = gb_sets:new() :: gb_sets:set(msg_id()), + %% contains message statuses of all message IDs + index = #{} :: #{msg_id() => #msg_status{}}, + %% needed to look up message IDs for a queue ref + reverse = #{} :: #{queue_ref() => #{msg_id() => ?SET_VALUE}} +}). + +-opaque ?MODULE() :: #unconfirmed{}. + +%%---------------------------------------------------------------------------- + +-spec new() -> ?MODULE(). +new() -> #unconfirmed{}. + +%% Insert an entry for the message ID. Fails if there already is +%% an entry with the given ID. +-spec insert(msg_id(), [queue_name()], [queue_ref()], exchange_name(), ?MODULE()) -> ?MODULE(). +insert(MsgId, QueueNames, QueueRefs, XName, + #unconfirmed{ordered = Ordered, + index = Index, + reverse = Reverse} = UC) -> + case maps:get(MsgId, Index, none) of + none -> + UC#unconfirmed{ + ordered = gb_sets:add(MsgId, Ordered), + index = + Index#{MsgId => + #msg_status{ + refs = maps:from_list([{QR, ?SET_VALUE} || QR <- QueueRefs]), + queue_status = maps:from_list([{QN, rejected} || QN <- QueueNames]), + exchange = XName}}, + reverse = lists:foldl( + fun + (Ref, R) -> + case R of + #{Ref := MsgIdsSet} -> + R#{Ref => MsgIdsSet#{MsgId => ?SET_VALUE}}; + _ -> + R#{Ref => #{MsgId => ?SET_VALUE}} + end + end, + Reverse, QueueRefs) + }; + _ -> + error({message_already_exists, MsgId, QueueNames, QueueRefs, XName, UC}) + end. + +%% Confirms a message on behalf of the given queue. If it was the last queue (ref) +%% on the waiting list, returns 'confirmed' and performs the necessary cleanup. +-spec confirm_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) -> + {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}. +confirm_msg_ref(MsgId, QueueName, QueueRef, + #unconfirmed{reverse = Reverse} = UC) -> + remove_msg_ref(confirm, MsgId, QueueName, QueueRef, + UC#unconfirmed{reverse = remove_from_reverse(QueueRef, [MsgId], Reverse)}). + +-spec confirm_multiple_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) -> + {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}. +confirm_multiple_msg_ref(MsgIds, QueueName, QueueRef, + #unconfirmed{reverse = Reverse} = UC0) -> + lists:foldl( + fun(MsgId, {C, UC}) -> + case remove_msg_ref(confirm, MsgId, QueueName, QueueRef, UC) of + {{confirmed, V}, UC1} -> {[V | C], UC1}; + {not_confirmed, UC1} -> {C, UC1} + end + end, + {[], UC0#unconfirmed{reverse = remove_from_reverse(QueueRef, MsgIds, Reverse)}}, + MsgIds). + +%% Removes all messages for a queue. +%% Returns lists of confirmed and rejected messages. +%% +%% If there are no more refs left for the message, either +%% 'confirmed' or 'rejected'. +%% 'confirmed' is returned if all queues have confirmed the message. +-spec forget_ref(queue_ref(), ?MODULE()) -> + {Confirmed :: [{msg_id(), exchange_name()}], + Rejected :: [{msg_id(), exchange_name()}], + ?MODULE()}. +forget_ref(QueueRef, #unconfirmed{reverse = Reverse0} = UC0) -> + MsgIds = maps:keys(maps:get(QueueRef, Reverse0, #{})), + lists:foldl(fun(MsgId, {C, R, UC}) -> + case remove_msg_ref(no_confirm, MsgId, ignore, QueueRef, UC) of + {not_confirmed, UC1} -> {C, R, UC1}; + {{confirmed, V}, UC1} -> {[V | C], R, UC1}; + {{rejected, V}, UC1} -> {C, [V | R], UC1} + end + end, + {[], [], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}}, + MsgIds). + +%% Rejects a single message with the given ID. +%% Returns 'rejected' if there was a message with +%% such ID. +-spec reject_msg(msg_id(), ?MODULE()) -> + {{rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}. +reject_msg(MsgId, #unconfirmed{ordered = Ordered, index = Index, reverse = Reverse} = UC) -> + case maps:get(MsgId, Index, none) of + none -> + {not_confirmed, UC}; + #msg_status{exchange = XName, + refs = Refs} -> + {{rejected, {MsgId, XName}}, + UC#unconfirmed{ordered = gb_sets:del_element(MsgId, Ordered), + index = maps:remove(MsgId, Index), + reverse = remove_multiple_from_reverse(maps:keys(Refs), [MsgId], Reverse)}} + end. + +%% Rejects all pending messages for a queue. +-spec reject_all_for_queue(queue_ref(), ?MODULE()) -> + {Rejected :: [{msg_id(), exchange_name()}], ?MODULE()}. +reject_all_for_queue(QueueRef, #unconfirmed{reverse = Reverse0} = UC0) -> + MsgIds = maps:keys(maps:get(QueueRef, Reverse0, #{})), + lists:foldl(fun(MsgId, {R, UC}) -> + case reject_msg(MsgId, UC) of + {not_confirmed, UC1} -> {R, UC1}; + {{rejected, V}, UC1} -> {[V | R], UC1} + end + end, + {[], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}}, + MsgIds). + +%% Returns a smallest message id. +-spec smallest(?MODULE()) -> msg_id(). +smallest(#unconfirmed{ordered = Ordered}) -> + gb_sets:smallest(Ordered). + +-spec size(?MODULE()) -> msg_id(). +size(#unconfirmed{index = Index}) -> maps:size(Index). + +-spec is_empty(?MODULE()) -> boolean(). +is_empty(#unconfirmed{index = Index, reverse = Reverse, ordered = Ordered} = UC) -> + case maps:size(Index) == 0 of + true -> + %% Assertion + case maps:size(Reverse) == gb_sets:size(Ordered) + andalso + maps:size(Reverse) == 0 of + true -> ok; + false -> error({size_mismatch, UC}) + end, + true; + _ -> + false + end. + +-spec remove_from_reverse(queue_ref(), [msg_id()], + #{queue_ref() => #{msg_id() => ?SET_VALUE}}) -> + #{queue_ref() => #{msg_id() => ?SET_VALUE}}. +remove_from_reverse(QueueRef, MsgIds, Reverse) when is_list(MsgIds) -> + case maps:get(QueueRef, Reverse, none) of + none -> + Reverse; + MsgIdsSet -> + NewMsgIdsSet = maps:without(MsgIds, MsgIdsSet), + case maps:size(NewMsgIdsSet) > 0 of + true -> Reverse#{QueueRef => NewMsgIdsSet}; + false -> maps:remove(QueueRef, Reverse) + end + end. + +-spec remove_multiple_from_reverse([queue_ref()], [msg_id()], + #{queue_ref() => #{msg_id() => ?SET_VALUE}}) -> + #{queue_ref() => #{msg_id() => ?SET_VALUE}}. +remove_multiple_from_reverse(Refs, MsgIds, Reverse0) -> + lists:foldl( + fun(Ref, Reverse) -> + remove_from_reverse(Ref, MsgIds, Reverse) + end, + Reverse0, + Refs). + +-spec remove_msg_ref(confirm | no_confirm, msg_id(), queue_name(), queue_ref(), ?MODULE()) -> + {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, + ?MODULE()}. +remove_msg_ref(Confirm, MsgId, QueueName, QueueRef, + #unconfirmed{ordered = Ordered, index = Index} = UC) -> + case maps:get(MsgId, Index, none) of + none -> + {not_confirmed, UC}; + #msg_status{refs = #{QueueRef := ?SET_VALUE} = Refs, + queue_status = QStatus, + exchange = XName} = MsgStatus -> + QStatus1 = case {Confirm, QueueName} of + {no_confirm, _} -> QStatus; + {_, ignore} -> QStatus; + {confirm, _} -> QStatus#{QueueName => confirmed} + end, + case maps:size(Refs) == 1 of + true -> + {{confirm_status(QStatus1), {MsgId, XName}}, + UC#unconfirmed{ + ordered = gb_sets:del_element(MsgId, Ordered), + index = maps:remove(MsgId, Index)}}; + false -> + {not_confirmed, + UC#unconfirmed{ + index = Index#{MsgId => + MsgStatus#msg_status{ + refs = maps:remove(QueueRef, Refs), + queue_status = QStatus1}}}} + end; + _ -> {not_confirmed, UC} + end. + +-spec confirm_status(#{queue_name() => confirmed | rejected}) -> confirmed | rejected. +confirm_status(QueueStatus) -> + case lists:all(fun(confirmed) -> true; (_) -> false end, + maps:values(QueueStatus)) of + true -> confirmed; + false -> rejected + end. diff --git a/test/confirms_rejects_SUITE.erl b/test/confirms_rejects_SUITE.erl index 722b5240fc..8805b71ffe 100644 --- a/test/confirms_rejects_SUITE.erl +++ b/test/confirms_rejects_SUITE.erl @@ -14,7 +14,9 @@ groups() -> [ {parallel_tests, [parallel], [ confirms_rejects_conflict, - policy_resets_to_default + policy_resets_to_default, + dead_queue_rejects, + mixed_dead_alive_queues_reject ]} ]. @@ -45,7 +47,10 @@ init_per_testcase(policy_resets_to_default = Testcase, Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), rabbit_ct_helpers:testcase_started( rabbit_ct_helpers:set_config(Config, [{conn, Conn}]), Testcase); -init_per_testcase(confirms_rejects_conflict = Testcase, Config) -> +init_per_testcase(Testcase, Config) + when Testcase == confirms_rejects_conflict; + Testcase == dead_queue_rejects; + Testcase == mixed_dead_alive_queues_reject -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config), @@ -66,6 +71,19 @@ end_per_testcase(policy_resets_to_default = Testcase, Config) -> end_per_testcase(confirms_rejects_conflict = Testcase, Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), amqp_channel:call(Ch, #'queue.delete'{queue = <<"confirms_rejects_conflict">>}), + end_per_testcase0(Testcase, Config); +end_per_testcase(dead_queue_rejects = Testcase, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"dead_queue_rejects">>}), + end_per_testcase0(Testcase, Config); +end_per_testcase(mixed_dead_alive_queues_reject = Testcase, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"mixed_dead_alive_queues_reject_dead">>}), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"mixed_dead_alive_queues_reject_alive">>}), + amqp_channel:call(Ch, #'exchange.delete'{exchange = <<"mixed_dead_alive_queues_reject">>}), + end_per_testcase0(Testcase, Config). + +end_per_testcase0(Testcase, Config) -> rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), Conn = ?config(conn, Config), @@ -74,9 +92,89 @@ end_per_testcase(confirms_rejects_conflict = Testcase, Config) -> rabbit_ct_client_helpers:close_connection(Conn), rabbit_ct_client_helpers:close_connection(Conn1), + clean_acks_mailbox(), + rabbit_ct_helpers:testcase_finished(Config, Testcase). +dead_queue_rejects(Config) -> + Conn = ?config(conn, Config), + {ok, Ch} = amqp_connection:open_channel(Conn), + QueueName = <<"dead_queue_rejects">>, + amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + + amqp_channel:call(Ch, #'queue.declare'{queue = QueueName, + durable = true}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName}, + #amqp_msg{payload = <<"HI">>}), + + receive + {'basic.ack',_,_} -> ok + after 10000 -> + error(timeout_waiting_for_initial_ack) + end, + + kill_the_queue(QueueName, Config), + + amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName}, + #amqp_msg{payload = <<"HI">>}), + + receive + {'basic.ack',_,_} -> error(expecting_nack_got_ack); + {'basic.nack',_,_,_} -> ok + after 10000 -> + error(timeout_waiting_for_nack) + end. + +mixed_dead_alive_queues_reject(Config) -> + Conn = ?config(conn, Config), + {ok, Ch} = amqp_connection:open_channel(Conn), + QueueNameDead = <<"mixed_dead_alive_queues_reject_dead">>, + QueueNameAlive = <<"mixed_dead_alive_queues_reject_alive">>, + ExchangeName = <<"mixed_dead_alive_queues_reject">>, + + amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + + amqp_channel:call(Ch, #'queue.declare'{queue = QueueNameDead, + durable = true}), + amqp_channel:call(Ch, #'queue.declare'{queue = QueueNameAlive, + durable = true}), + + amqp_channel:call(Ch, #'exchange.declare'{exchange = ExchangeName, + durable = true}), + + amqp_channel:call(Ch, #'queue.bind'{exchange = ExchangeName, + queue = QueueNameAlive, + routing_key = <<"route">>}), + + amqp_channel:call(Ch, #'queue.bind'{exchange = ExchangeName, + queue = QueueNameDead, + routing_key = <<"route">>}), + + amqp_channel:call(Ch, #'basic.publish'{exchange = ExchangeName, + routing_key = <<"route">>}, + #amqp_msg{payload = <<"HI">>}), + + receive + {'basic.ack',_,_} -> ok + after 10000 -> + error(timeout_waiting_for_initial_ack) + end, + + kill_the_queue(QueueNameDead, Config), + + amqp_channel:call(Ch, #'basic.publish'{exchange = ExchangeName, + routing_key = <<"route">>}, + #amqp_msg{payload = <<"HI">>}), + + receive + {'basic.nack',_,_,_} -> ok; + {'basic.ack',_,_} -> error(expecting_nack_got_ack) + after 10000 -> + error(timeout_waiting_for_ack) + end. confirms_rejects_conflict(Config) -> Conn = ?config(conn, Config), @@ -256,3 +354,19 @@ clean_acks_mailbox() -> after 1000 -> done end. + +kill_the_queue(QueueName, Config) -> + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, kill_the_queue, [QueueName]). + +kill_the_queue(QueueName) -> + [begin + {ok, Q} = rabbit_amqqueue:lookup({resource, <<"/">>, queue, QueueName}), + Pid = amqqueue:get_pid(Q), + exit(Pid, kill) + end + || _ <- lists:seq(1, 11)]. + + + + + |
